Skip to content

[SPARK-52283][SQL] Declarative Pipelines DataflowGraph creation and resolution #51003

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: master
Choose a base branch
from

Conversation

aakash-db
Copy link
Contributor

@aakash-db aakash-db commented May 23, 2025

What changes were proposed in this pull request?

This PR introduces the DataflowGraph, a container for Declarative Pipelines datasets and flows, as described in the Declarative Pipelines SPIP. It also adds functionality for

  • Constructing a graph by registering a set of graph elements in succession (GraphRegistrationContext)
  • "Resolving" a graph, which means resolving each of the flows within a graph. Resolving a flow means:
    • Validating that its plan can be successfully analyzed
    • Determining the schema of the data it will produce
    • Determining what upstream datasets within the graph it depends on

It also introduces various secondary changes:

  • Changes to SparkBuild to support declarative pipelines.
  • Updates to the pom.xml for the module.
  • New error conditions

Why are the changes needed?

In order to implement Declarative Pipelines.

Does this PR introduce any user-facing change?

No changes to existing behavior.

How was this patch tested?

New test suites:

  • ConnectValidPipelineSuite – test cases where the graph can be successfully resolved
  • ConnectInvalidPipelineSuite – test cases where the graph fails to be resolved

Was this patch authored or co-authored using generative AI tooling?

No

@sryza sryza changed the title [SPARK-52283][CONNECT] SDP DataflowGraph creation and resolution [SPARK-52283][CONNECT] Declarative Pipelines DataflowGraph creation and resolution May 23, 2025
@sryza sryza self-requested a review May 23, 2025 21:01
@sryza sryza self-assigned this May 23, 2025
Copy link

@jonmio jonmio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flushing some comments

@@ -2025,6 +2031,18 @@
],
"sqlState" : "42613"
},
"INCOMPATIBLE_BATCH_VIEW_READ": {
"message": [
"View <datasetIdentifier> is not a streaming view and must be referenced using read. This check can be disabled by setting Spark conf pipelines.incompatibleViewCheck.enabled = false."
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this conf and do we really need it?

* @param upstreamNodes Upstream nodes for the node
* @return
*/
def processNode(node: GraphElement, upstreamNodes: Seq[GraphElement]): Seq[GraphElement] = {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Document return. I'm especially curious why this is a Seq and when processNode would return more than one element

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now, it's mostly just for flexibility - in case one node maps to several in the future.

@apache apache deleted a comment from aakash-db May 27, 2025
@aakash-db aakash-db requested a review from sryza May 27, 2025 22:40
@aakash-db aakash-db changed the title [SPARK-52283][CONNECT] Declarative Pipelines DataflowGraph creation and resolution [SPARK-52283] Declarative Pipelines DataflowGraph creation and resolution May 27, 2025
@aakash-db aakash-db changed the title [SPARK-52283] Declarative Pipelines DataflowGraph creation and resolution [SPARK-52283][SQL] Declarative Pipelines DataflowGraph creation and resolution May 27, 2025
@sryza sryza requested a review from cloud-fan May 27, 2025 22:53
val materializedFlowIdentifiers: Set[TableIdentifier] = materializedFlows.map(_.identifier).toSet

/** Returns a [[Table]] given its identifier */
lazy val table: Map[TableIdentifier, Table] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TableIdentifier only supports 3-level namespace. Shall we use Seq[String] to better support DS v2, which can have an arbitrary level of namespace?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seq[String] is a bit hard to use here. We can switch to the DS v2 API after we create an encapsulation class to match TableIdentifier.

@aakash-db aakash-db requested a review from jonmio May 28, 2025 05:52
f.inputs.toSeq
.map(availableResolvedInputs(_))
.filter {
// Input is a flow implies that the upstream table is a View.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it hard to understand this comment. We are resolving a flow, and the input of a flow can be other flows? Why it means the upstream table is view?

Copy link
Contributor Author

@aakash-db aakash-db May 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because for views, we don't have a ViewInput object - we instead map the view's downstream directly to the view's upstream. Thus if a flow's upstream is a flow, the node it is writing to is a view. If a flow's upstream was a table, then there would be a TableInput object here.

@sryza
Copy link
Contributor

sryza commented May 30, 2025

@aakash-db

I'm still seeing some test failures:

@cloud-fan
Copy link
Contributor

yea the docker test is generally flaky and we can ignore

// 1. If table is present in context.fullRefreshTables
// 2. If table has any virtual inputs (flows or tables)
// 3. If the table pre-existing metadata is different from current metadata
val virtualTableInput = VirtualTableInput(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need comments to explain why all table inputs are virtual

/**
* Returns a [[TableInput]], if one is available, that can be read from by downstream flows.
*/
def tableInput(identifier: TableIdentifier): Option[TableInput] = table.get(identifier)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If tables are both input and output, shall we just have a single name to table map, instead of a output map and a tableInput method?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually there is already a table map here, why do we need the output map which is identical to the table map but just give different error message for duplication?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm yeah it looks like we do not need a separate tableInput and we should take this out. I suspect this was a relic of an older version of a code where there was a subclass of DataflowGraph that overrode tableInput.

output is a little bit forward-looking. We will eventually allow DataflowGraphs to have sinks (described in the SPIP but not yet implemented here), and output will include both sinks and tables. However we can leave it out for now and add it back when we add sinks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants