Skip to content

[SPARK-52166] [SDP] Add support for PipelineEvents #50906

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 12 commits into from

Conversation

jonmio
Copy link

@jonmio jonmio commented May 15, 2025

What changes were proposed in this pull request?

The execution of an SDP is a complex, multi-stage computation. To observe progress and monitor state transitions of a pipeline execution, we will maintain a stream of pipeline events (e.g. flow started, flow completed, pipeline run started, pipeline run completed). This PR introduces the data model for PipelineEvents.

Additionally since this is the first PR to add tests to the pipelines module, we update the GA CI job to run tests in the pipeline module.

Why are the changes needed?

See description above

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added unit tests for event construction as well as unit tests for the helper functions used in ConstructPipelineEvent.

Was this patch authored or co-authored using generative AI tooling?

No

@jonmio jonmio changed the title [WIP] [SPARK-52166] [SDP] Add support for PipelineEvents [SPARK-52166] [SDP] Add support for PipelineEvents May 19, 2025
@github-actions github-actions bot added the INFRA label May 19, 2025

import org.apache.spark.internal.Logging

case class UnresolvedFlowFailureException(name: String, cause: Throwable)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's move this to QueryCompilationErrors and create an error condition for it in error-conditions.json

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw that this is actually unused so I am just going to remove this for now

case _ =>
val className = t.getClass.getName
t match {
case _: UnresolvedFlowFailureException =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why converting UnresolvedFlowFailureException to SerializedException?

Copy link
Author

@jonmio jonmio May 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SerializedException is a slightly more structured format for recording errors that happen during pipeline execution.

Note that SerializedException is an internal exception type so I don't think we need to create an error class for it


/**
* An internal event that is emitted during the run of a pipeline.
* @param id A time based, globally unique id
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this PR, the id is UUID.randomUUID(). Why it is time based?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed the comment

* @param flowName The name of the flow
* @param sourceCodeLocation The location of the source code
*/
case class Origin(
Copy link
Member

@gengliangwang gengliangwang May 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given there is an existing Origin in Spark catalyst: https://github.com/apache/spark/blob/master/sql/api/src/main/scala/org/apache/spark/sql/catalyst/trees/origin.scala, shall we rename this one as PipelineOrigin?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to PipelineEventOrigin since the origin contains information not just about the pipeline but also about the dataset

/** A format string that defines how timestamps are serialized in a [[PipelineEvent]]. */
private val timestampFormat: String = "yyyy-MM-dd'T'HH:mm:ss.SSSXX"
private val tz: TimeZone = TimeZone.getTimeZone("UTC")
private val df = new SimpleDateFormat(timestampFormat)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we use DateTimeFormatter(thread-safe)?


/** A format string that defines how timestamps are serialized in a [[PipelineEvent]]. */
private val timestampFormat: String = "yyyy-MM-dd'T'HH:mm:ss.SSSXX"
private val tz: TimeZone = TimeZone.getTimeZone("UTC")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the timezone is always UTC?
The default timestamp is using a session local time zone, which can be set via

  val SESSION_LOCAL_TIMEZONE = buildConf(SqlApiConfHelper.SESSION_LOCAL_TIMEZONE_KEY)
    .doc("The ID of session local timezone in the format of either region-based zone IDs or " +
      "zone offsets. Region IDs must have the form 'area/city', such as 'America/Los_Angeles'. " +
      "Zone offsets must be in the format '(+|-)HH', '(+|-)HH:mm' or '(+|-)HH:mm:ss', e.g '-08', " +
      "'+01:00' or '-13:33:33'. Also 'UTC' and 'Z' are supported as aliases of '+00:00'. Other " +
      "short names are not recommended to use because they can be ambiguous.")
    .version("2.2.0")
    .stringConf
    .checkValue(isValidTimezone, errorClass = "TIME_ZONE", parameters = tz => Map.empty)
    .createWithDefaultFunction(() => TimeZone.getDefault.getID)

Copy link
Author

@jonmio jonmio May 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we address this in a followup? I think we need to discuss what the right interface to set this conf is for a pipeline. It seems like this should be a pipeline level setting that cannot be changed throughout the run of the pipeline and it should be set as a spark conf in the pipeline settings

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pipeline events are also not user facing and just used by the system at the moment

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a comment to explain the intention.

@jonmio jonmio requested a review from gengliangwang May 20, 2025 00:56
* automatically filled in. Developers should always use this factory rather than construct
* an event directly from an empty proto.
*/
object ConstructPipelineEvent extends Logging {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the Logging trait used?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope, removed

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this required in this PR?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

@gengliangwang
Copy link
Member

Thanks, merging to master

@LuciferYang
Copy link
Contributor

@jonmio The compilation of all Maven daily tests failed due to the following reasons:

scaladoc error: fatal error: object scala in compiler mirror not found.
Error:  Failed to execute goal net.alchim31.maven:scala-maven-plugin:4.9.2:doc-jar (attach-scaladocs) on project spark-pipelines_2.13: MavenReportException: Error while creating archive: wrap: Process exited with an error: 1 (Exit value: 1) -> [Help 1]
Error:  
Error:  To see the full stack trace of the errors, re-run Maven with the -e switch.
Error:  Re-run Maven using the -X switch to enable full debug logging.
Error:  
Error:  For more information about the errors and possible solutions, please read the following articles:
Error:  [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException
Error:  
Error:  After correcting the problems, you can resume the build with the command
Error:    mvn <args> -rf :spark-pipelines_2.13
Error: Process completed with exit code 1.

Would you have a moment to look ? Thanks

image

also cc @gengliangwang

@LuciferYang
Copy link
Contributor

@jonmio The compilation of all Maven daily tests failed due to the following reasons:

scaladoc error: fatal error: object scala in compiler mirror not found.
Error:  Failed to execute goal net.alchim31.maven:scala-maven-plugin:4.9.2:doc-jar (attach-scaladocs) on project spark-pipelines_2.13: MavenReportException: Error while creating archive: wrap: Process exited with an error: 1 (Exit value: 1) -> [Help 1]
Error:  
Error:  To see the full stack trace of the errors, re-run Maven with the -e switch.
Error:  Re-run Maven using the -X switch to enable full debug logging.
Error:  
Error:  For more information about the errors and possible solutions, please read the following articles:
Error:  [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException
Error:  
Error:  After correcting the problems, you can resume the build with the command
Error:    mvn <args> -rf :spark-pipelines_2.13
Error: Process completed with exit code 1.

Would you have a moment to look ? Thanks

image

also cc @gengliangwang

Give a quick fix: #51008

@sryza sryza self-assigned this May 25, 2025
@sryza
Copy link
Contributor

sryza commented May 30, 2025

Thanks for jumping in and fixing this @LuciferYang !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants