Skip to content

[SPARK-52346][SQL] Declarative Pipeline DataflowGraph execution and event logging #51050

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: master
Choose a base branch
from

Conversation

SCHJonathan
Copy link

@SCHJonathan SCHJonathan commented May 29, 2025

What changes were proposed in this pull request?

As described in Declarative Pipelines SPIP, after we parse user's code and represent datasets and dataflows in a DataflowGraph (from PR #51003), this PR add the functionality to execute the corresponding workloads based on the DataflowGraph in the following steps.

Step 1: Initialize the raw DataflowGraph
In PipelineExecution::runPipeline(), we will first initialize the dataflow graph by topologically sort the dependencies and also figure out the expected metadata (e.g., schema) for each dataset (DataflowGraph::resolve()). Also, we would run some pre-flight validations to caught some early errors like circular dependencies, create a streaming table with batch data source, etc (DataflowGraph::validate()).

Step 2: Materialized datasets defined in the DataflowGraph as empty table
After the graph is topologically sorted and validated and every dataset / flow has correct metadata populated, we go ahead publish the corresponding dataset in the metastore (which could be Hive, UC, or others) in DatasetManager::materializeDatasets(). For example, for each Materialized View and Streaming Table, it would register a empty table in the metastore with correct metadata (e.g., table schema, table properties, etc).

Step 3: Populate data to the registered empty table
After datasets have been registered to the metastore, inside TriggeredGraphExecution, we transform each dataflow defined in the DataflowGraph into a actual execution plan to run the actual workload and populate the data to the empty table (we transform DataflowGraph::flow into FlowExecution through FlowPlanner).

FlowExecution currently have two types:

  1. for dataflow represents a batch execution, we convert it into a BatchFlowExecution
  2. for dataflow represents a streaming execution, we convert it into a StreamingFlowExecution

Each FlowExecution will be executed in topological order based on the sorted DataflowGraph, and we would parallelize the execution as much as possible. Each execution could failed due to different type of error, some are transient and some are fatal and required user action (e.g., incompatible table schema evolution). Proper error retries are in place and proper event log would be emitted to indicate the current state of the execution. These event log would eventually be logged into the CLI console.

More details on the event logs.

  1. RunProgress:STARTED: logged indicate we start running the user-configured pipeline.
  2. FlowProgress::QUEUED: initially, all dataflow will be queued to be executed, except these dataflows explicitly marked as excluded from execution by users (using the --selective-refresh parameter when start a pipeline in CLI)
  3. FlowProgress::EXCLUDED: logged if a dataflow is excluded from execution
  4. FlowProgress::STARTING: logged when a dataflow is popped from the job queue and is ready to be planned
  5. FlowProgress::PLANNING: logged when a dataflow is started to be planned and transformed into a FlowExecution
  6. FlowProgress::RUNNING: logged when a FlowExecution has been started and the corresponding workload is running.
  7. FlowProgress::COMPLETED: logged when a FlowExecution has successfully completed
  8. FlowProgress::FAILED: logged when a FlowExecution has failed due to transient or fatal error. In case of transient error, the event log would be logged in WARN level and the execution would be re-queued for retry. In case of fatal error or transient errors exhausted all the retry attempts, we log the event log in ERROR level and ready to stop the entire execution.
  9. FlowProgress::SKIPPED: logged when a upstream FlowExecution has failed fatally, all the downstream FlowExecution would be skipped.
  10. FlowProgress::STOPPED: logged when users explicitly stop the pipeline while it's still running. All the actively running FlowExecution would be interrupted and stopped, and this event log would be emitted.

When all FlowExecutions either completed or one of them failed fatally, the pipeline reach the terminal state and corresponding event log would also be emitted.

  1. RunProgress::COMPLETED: logged when all the FlowExecutions has completed successfully.
  2. RunProgress::FAILED: logged when one (or more) of the FlowExecutions failed fatally and the pipeline execution is aborted.
  3. RunProgress::CANCELED: logged when user explicitly stop the pipeline, all the running flow will be interrupted and stopped (FlowProgress::STOPPED event log would be emitted).

Why are the changes needed?

This PR implemented the core functionality to executing a Declarative Pipeline

Does this PR introduce any user-facing change?

Yes, see the first section.

How was this patch tested?

New unit test suite:

  • TriggeredGraphExecutionSuite: test end-to-end executions of the pipeline under different scenarios (happy path, failure path, etc) and validate proper data has been written and proper event log is emitted.

Augment existing test suites:

  • ConstructPipelineEventSuite and PipelineEventSuite to validate the new FlowProgress event log we're introducing.

Was this patch authored or co-authored using generative AI tooling?

No

@sryza sryza self-assigned this May 29, 2025
@SCHJonathan SCHJonathan requested a review from sryza May 30, 2025 17:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants