Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions sql/connect/common/src/main/protobuf/spark/connect/base.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import "spark/connect/expressions.proto";
import "spark/connect/relations.proto";
import "spark/connect/types.proto";
import "spark/connect/ml.proto";
import "spark/connect/pipelines.proto";

option java_multiple_files = true;
option java_package = "org.apache.spark.connect.proto";
Expand Down Expand Up @@ -399,6 +400,13 @@ message ExecutePlanResponse {
// ML command response
MlCommandResult ml_command_result = 20;

// Response containing pipeline events that are streamed back to the client during a pipeline
// run
PipelineEventsResult pipeline_events_result = 21;

// Pipeline command response
PipelineCommandResult pipeline_command_result = 22;

// Support arbitrary result objects.
google.protobuf.Any extension = 999;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import "spark/connect/common.proto";
import "spark/connect/expressions.proto";
import "spark/connect/relations.proto";
import "spark/connect/ml.proto";
import "spark/connect/pipelines.proto";

package spark.connect;

Expand Down Expand Up @@ -51,6 +52,7 @@ message Command {
MergeIntoTableCommand merge_into_table_command = 16;
MlCommand ml_command = 17;
ExecuteExternalCommand execute_external_command = 18;
PipelineCommand pipeline_command = 19;

// This field is used to mark extensions to the protocol. When plugins generate arbitrary
// Commands they can add them here. During the planning the correct resolution is done.
Expand Down
173 changes: 173 additions & 0 deletions sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

syntax = "proto3";

package spark.connect;

import "spark/connect/relations.proto";
import "spark/connect/types.proto";

option java_multiple_files = true;
option java_package = "org.apache.spark.connect.proto";

// Dispatch object for pipelines commands. See each individual command for documentation.
message PipelineCommand {
oneof command_type {
CreateDataflowGraph create_dataflow_graph = 1;
DefineDataset define_dataset = 2;
DefineFlow define_flow = 3;
DropDataflowGraph drop_dataflow_graph = 4;
StartRun start_run = 5;
StopRun stop_run = 6;
DefineSqlGraphElements define_sql_graph_elements = 7;
}

// Request to create a new dataflow graph.
message CreateDataflowGraph {
// The default catalog.
optional string default_catalog = 1;

// The default database.
optional string default_database = 2;

// SQL configurations for all flows in this graph.
map<string, string> sql_conf = 5;

message Response {
// The ID of the created graph.
string dataflow_graph_id = 1;
}
}

// Drops the graph and stops any running attached flows.
message DropDataflowGraph {
// The graph to drop.
string dataflow_graph_id = 1;
}

// Request to define a dataset: a table, a materialized view, or a temporary view.
message DefineDataset {
// The graph to attach this dataset to.
string dataflow_graph_id = 1;

// Name of the dataset. Can be partially or fully qualified.
string dataset_name = 2;

// The type of the dataset.
DatasetType dataset_type = 3;

// Optional comment for the dataset.
optional string comment = 4;

// Optional table properties. Only applies to dataset_type == TABLE and dataset_type == MATERIALIZED_VIEW.
map<string, string> table_properties = 5;

// Optional partition columns for the dataset. Only applies to dataset_type == TABLE and
// dataset_type == MATERIALIZED_VIEW.
repeated string partition_cols = 6;

// Schema for the dataset. If unset, this will be inferred from incoming flows.
optional spark.connect.DataType schema = 7;

// The output table format of the dataset. Only applies to dataset_type == TABLE and
// dataset_type == MATERIALIZED_VIEW.
optional string format = 8;
}

// Request to define a flow targeting a dataset.
message DefineFlow {
// The graph to attach this dataset to.
string dataflow_graph_id = 1;

// Name of the flow. For standalone flows, this must be a single-part name.
string flow_name = 2;

// Name of the dataset this flow writes to. Can be partially or fully qualified.
string target_dataset_name = 3;

// An unresolved relation that defines the dataset's flow.
spark.connect.Relation plan = 4;

// Default SQL configurations set when running this flow.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: is the word "Default" relevant here? There's nothing more specific, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this related to the session in which the flow is defined? Is this an additional way to set configurations? I assume this takes precedence over what the session has configured?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, no need to say default - there is no more specific mechanism to set confs.

How is this related to the session in which the flow is defined? Is this an additional way to set configurations? I assume this takes precedence over what the session has configured?

For now, this is not supported. Users have to set confs directly in the table / flow decorators for them to be applied to the pipeline.

map<string, string> sql_conf = 5;

// If true, this flow will only be run once per execution.
bool once = 6;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Care to elaborate? Is this a synonym for this is batch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This corresponds to Trigger.Once in Spark - the flow runs once per update. This is similar to batch in triggered updates, but not in continuous ones (which we will add eventually).

}

// Resolves all datasets and flows and start a pipeline update. Should be called after all
// graph elements are registered.
message StartRun {
// The graph to start.
string dataflow_graph_id = 1;
}

// Stops all running flows in the graph. This is a no-op if the graph is not running.
message StopRun {
// The ID of the graph to stop.
string dataflow_graph_id = 1;
}
}

// Parses the SQL file and registers all datasets and flows.
message DefineSqlGraphElements {
// The graph to attach this dataset to.
optional string dataflow_graph_id = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that this is marked optional, but that the corresponding field in DefineDataset is not. How should we decide when to use optional?

cc @hvanhovell if there's a general recommendation on this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What optional does is generate a has<FIELD> method in Java. We can use that to throw an exception when a field isn't present. Else, the field always has an empty string value.

So really, all of our primitives should have an optional designation. I will change that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made all of these optional.


// The full path to the SQL file. Can be relative or absolute.
optional string sql_file_path = 2;

// The contents of the SQL file.
optional string sql_text = 3;
}

// Dispatch object for pipelines command results.
message PipelineCommandResult {
oneof result_type {
CreateDataflowGraphResult create_dataflow_graph_result = 1;
}
message CreateDataflowGraphResult {
// The ID of the created graph.
string dataflow_graph_id = 1;
}
}

// The type of dataset.
enum DatasetType {
// Safe default value. Should not be used.
DATASET_UNSPECIFIED = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Linter rule should say: DATASET_TYPE_UNSPECIFIED

https://protobuf.dev/programming-guides/style/#enums

// A materialized view dataset which is published to the catalog
MATERIALIZED_VIEW = 1;
// A table which is published to the catalog
TABLE = 2;
// A view which is not published to the catalog
TEMPORARY_VIEW = 3;
}

// A response containing events emitted during the run of a pipeline.
message PipelineEventsResult {
repeated PipelineEvent events = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Batching events should not be needed. gRPC server side streaming can return multiple 'events' at the same time, provided it can fit them in a single window (~30k).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fair. But I think the repeated field adds more flexibility in general. We can group events logically, rather than just to avoid network latency.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per further feedback from @grundprinzip and @hvanhovell, I'm going to take this batching out. We can always add it in in the future if we come up with a use case for logical grouping.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The doc should be more explicit about how "complete" the set of events is that you receive here. Are these all events or just some? How do you know if more are coming or not.

Generally, I'd stand with Herman that if you don't expect to emit thousands of events per second, your code will be easier and simpler if you don't use a repeated field here and simply emit one event per message.

}

// An event emitted during the run of a graph.
message PipelineEvent {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this also supposed to include errors? If so, it'd be nice to understand what has failed... In that case adding add flow/dataset name would be nice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I can the see the value in adding dataset and flow name. But two things:

  1. OTOH, we wanted to keep PipelineEvent's as a generic event bus rather than a structured logging format.
  2. It's possible an error happens that isn't scoped to a dataset/flow, making this field unpredictably empty.

But at the very least, the dataset/flow name will be in the error message.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To add on to what @aakash-db said, our main use case for these events is to print out to the console, and the string messages will include all the context that's needed for that. Once we have a use case that involves consuming the dataset/flow name programmatically, I'd be supportive of adding more structure to this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, errors should flow the regular way through the exception process and the error details. If we were to do it differently it would just create issues later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@grundprinzip I actually agree with you. If the pipeline fails we should fail in the normal way. However, that failure can originate from multiple places. As I user I would like to able to figure out what failed. We could embed that failure information in these events.

// The time of the event.
optional string timestamp = 1;
// The message that should be displayed to users.
optional string message = 2;
}