From 5089c55a4c7d3465d0ffbcfa4c51db964f6fea70 Mon Sep 17 00:00:00 2001 From: Jacky Wang Date: Thu, 25 Sep 2025 19:27:49 -0700 Subject: [PATCH 1/2] impl --- .../pipelines/PipelineEventSender.scala | 14 +------------- .../sql/pipelines/logging/PipelineEvent.scala | 18 +++++++++++++++++- .../sql/pipelines/utils/ExecutionTest.scala | 14 +++++++++++++- 3 files changed, 31 insertions(+), 15 deletions(-) diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelineEventSender.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelineEventSender.scala index 4f263ebe372c3..5ea8c6f703124 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelineEventSender.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelineEventSender.scala @@ -158,18 +158,6 @@ class PipelineEventSender( } private def constructProtoEvent(event: PipelineEvent): proto.PipelineEvent = { - val message = if (event.error.nonEmpty) { - // Returns the message associated with a Throwable and all its causes - def getExceptionMessages(throwable: Throwable): Seq[String] = { - throwable.getMessage +: - Option(throwable.getCause).map(getExceptionMessages).getOrElse(Nil) - } - val errorMessages = getExceptionMessages(event.error.get) - s"""${event.message} - |Error: ${errorMessages.mkString("\n")}""".stripMargin - } else { - event.message - } val protoEventBuilder = proto.PipelineEvent .newBuilder() .setTimestamp( @@ -182,7 +170,7 @@ class PipelineEventSender( .setSeconds(event.timestamp.getTime / 1000) .setNanos(event.timestamp.getNanos) .build()) - .setMessage(message) + .setMessage(event.messageWithError) protoEventBuilder.build() } } diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/logging/PipelineEvent.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/logging/PipelineEvent.scala index 697f225df1f31..0a10ebaa69f7a 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/logging/PipelineEvent.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/logging/PipelineEvent.scala @@ -39,7 +39,23 @@ case class PipelineEvent( message: String, details: EventDetails, error: Option[Throwable] -) +) { + /** Combines the message and error (if any) into a single string */ + def messageWithError: String = { + if (error.nonEmpty) { + // Returns the message associated with a Throwable and all its causes + def getExceptionMessages(throwable: Throwable): Seq[String] = { + throwable.getMessage +: + Option(throwable.getCause).map(getExceptionMessages).getOrElse(Nil) + } + val errorMessages = getExceptionMessages(error.get) + s"""${message} + |Error: ${errorMessages.mkString("\n")}""".stripMargin + } else { + message + } + } +} /** * Describes where the event originated from diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/ExecutionTest.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/ExecutionTest.scala index 991a47d6b562f..cce5c87c1eed8 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/ExecutionTest.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/ExecutionTest.scala @@ -61,7 +61,19 @@ trait TestPipelineUpdateContextMixin { ) extends PipelineUpdateContext { val eventBuffer = new PipelineRunEventBuffer() - override val eventCallback: PipelineEvent => Unit = eventBuffer.addEvent + override val eventCallback: PipelineEvent => Unit = { event => + eventBuffer.addEvent(event) + // For debugging purposes, print the event to the console. + // Most tests expects pipeline to succeed, this makes it easier to see + // the error when it happens. + if (event.error.nonEmpty) { + // scalastyle:off println + println("\n=== Received Pipeline Event with Error ===") + println(event.messageWithError) + println("=================================") + // scalastyle:on println + } + } override def flowProgressEventLogger: FlowProgressEventLogger = { new FlowProgressEventLogger(eventCallback = eventCallback) From 21f575586c87c0b8c8659d199b30d65e355bd9b6 Mon Sep 17 00:00:00 2001 From: Jacky Wang Date: Thu, 25 Sep 2025 20:14:09 -0700 Subject: [PATCH 2/2] add an option to fail test early --- .../apache/spark/sql/pipelines/utils/ExecutionTest.scala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/ExecutionTest.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/ExecutionTest.scala index cce5c87c1eed8..6c2c07e574987 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/ExecutionTest.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/ExecutionTest.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.pipelines.utils +import org.scalatest.Assertions.fail + import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.classic.SparkSession import org.apache.spark.sql.pipelines.common.{FlowStatus, RunState} @@ -51,13 +53,15 @@ trait TestPipelineUpdateContextMixin { * @param fullRefreshTables Set of tables to be fully refreshed. * @param refreshTables Set of tables to be refreshed. * @param resetCheckpointFlows Set of flows to be reset. + * @param failOnErrorEvent Whether to fail test when receiving event with error. */ case class TestPipelineUpdateContext( spark: SparkSession, unresolvedGraph: DataflowGraph, fullRefreshTables: TableFilter = NoTables, refreshTables: TableFilter = AllTables, - resetCheckpointFlows: FlowFilter = AllFlows + resetCheckpointFlows: FlowFilter = AllFlows, + failOnErrorEvent: Boolean = false ) extends PipelineUpdateContext { val eventBuffer = new PipelineRunEventBuffer() @@ -72,6 +76,9 @@ trait TestPipelineUpdateContextMixin { println(event.messageWithError) println("=================================") // scalastyle:on println + if (failOnErrorEvent) { + fail(s"Pipeline event with error received: ${event.messageWithError}") + } } }