Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -158,18 +158,6 @@ class PipelineEventSender(
}

private def constructProtoEvent(event: PipelineEvent): proto.PipelineEvent = {
val message = if (event.error.nonEmpty) {
// Returns the message associated with a Throwable and all its causes
def getExceptionMessages(throwable: Throwable): Seq[String] = {
throwable.getMessage +:
Option(throwable.getCause).map(getExceptionMessages).getOrElse(Nil)
}
val errorMessages = getExceptionMessages(event.error.get)
s"""${event.message}
|Error: ${errorMessages.mkString("\n")}""".stripMargin
} else {
event.message
}
val protoEventBuilder = proto.PipelineEvent
.newBuilder()
.setTimestamp(
Expand All @@ -182,7 +170,7 @@ class PipelineEventSender(
.setSeconds(event.timestamp.getTime / 1000)
.setNanos(event.timestamp.getNanos)
.build())
.setMessage(message)
.setMessage(event.messageWithError)
protoEventBuilder.build()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,23 @@ case class PipelineEvent(
message: String,
details: EventDetails,
error: Option[Throwable]
)
) {
/** Combines the message and error (if any) into a single string */
def messageWithError: String = {
if (error.nonEmpty) {
// Returns the message associated with a Throwable and all its causes
def getExceptionMessages(throwable: Throwable): Seq[String] = {
throwable.getMessage +:
Option(throwable.getCause).map(getExceptionMessages).getOrElse(Nil)
}
val errorMessages = getExceptionMessages(error.get)
s"""${message}
|Error: ${errorMessages.mkString("\n")}""".stripMargin
} else {
message
}
}
}

/**
* Describes where the event originated from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.pipelines.utils

import org.scalatest.Assertions.fail

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.classic.SparkSession
import org.apache.spark.sql.pipelines.common.{FlowStatus, RunState}
Expand Down Expand Up @@ -51,17 +53,34 @@ trait TestPipelineUpdateContextMixin {
* @param fullRefreshTables Set of tables to be fully refreshed.
* @param refreshTables Set of tables to be refreshed.
* @param resetCheckpointFlows Set of flows to be reset.
* @param failOnErrorEvent Whether to fail test when receiving event with error.
*/
case class TestPipelineUpdateContext(
spark: SparkSession,
unresolvedGraph: DataflowGraph,
fullRefreshTables: TableFilter = NoTables,
refreshTables: TableFilter = AllTables,
resetCheckpointFlows: FlowFilter = AllFlows
resetCheckpointFlows: FlowFilter = AllFlows,
failOnErrorEvent: Boolean = false
) extends PipelineUpdateContext {
val eventBuffer = new PipelineRunEventBuffer()

override val eventCallback: PipelineEvent => Unit = eventBuffer.addEvent
override val eventCallback: PipelineEvent => Unit = { event =>
eventBuffer.addEvent(event)
// For debugging purposes, print the event to the console.
// Most tests expects pipeline to succeed, this makes it easier to see
// the error when it happens.
if (event.error.nonEmpty) {
// scalastyle:off println
println("\n=== Received Pipeline Event with Error ===")
println(event.messageWithError)
println("=================================")
// scalastyle:on println
if (failOnErrorEvent) {
fail(s"Pipeline event with error received: ${event.messageWithError}")
}
}
}

override def flowProgressEventLogger: FlowProgressEventLogger = {
new FlowProgressEventLogger(eventCallback = eventCallback)
Expand Down