Skip to content

[SPARK-52238][PYTHON] Python client for Declarative Pipelines #50963

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions bin/spark-pipelines
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Default to standard python3 interpreter unless told otherwise
if [[ -z "$PYSPARK_PYTHON" ]]; then
PYSPARK_PYTHON=python3
fi

if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
fi

# Add the PySpark classes to the Python path:
export PYTHONPATH="${SPARK_HOME}/python/:$PYTHONPATH"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.9.9-src.zip:$PYTHONPATH"

${SPARK_HOME}/bin/spark-submit --conf spark.api.mode=connect "${SPARK_HOME}"/python/pyspark/sql/pipelines/cli.py "$@"
1 change: 1 addition & 0 deletions dev/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pytest-mypy-plugins==1.9.3
flake8==3.9.0
# See SPARK-38680.
pandas-stubs<1.2.0.54
types-PyYAML

# Documentation (SQL)
mkdocs
Expand Down
4 changes: 4 additions & 0 deletions dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,10 @@ def __hash__(self):
"pyspark.sql.tests.pandas.test_pandas_udf_window",
"pyspark.sql.tests.pandas.test_pandas_sqlmetrics",
"pyspark.sql.tests.pandas.test_converter",
"pyspark.sql.tests.pipelines.test_block_connect_access",
"pyspark.sql.tests.pipelines.test_cli",
"pyspark.sql.tests.pipelines.test_decorators",
"pyspark.sql.tests.pipelines.test_graph_element_registry",
"pyspark.sql.tests.test_python_datasource",
"pyspark.sql.tests.test_python_streaming_datasource",
"pyspark.sql.tests.test_readwriter",
Expand Down
3 changes: 3 additions & 0 deletions python/mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,6 @@ ignore_missing_imports = True
; Ignore errors for proto generated code
[mypy-pyspark.sql.connect.proto.*, pyspark.sql.connect.proto, pyspark.sql.streaming.proto]
ignore_errors = True

[mypy-pyspark.sql.pipelines.proto.*]
ignore_errors = True
60 changes: 60 additions & 0 deletions python/pyspark/errors/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@
"Arrow legacy IPC format is not supported in PySpark, please unset ARROW_PRE_0_15_IPC_FORMAT."
]
},
"ATTEMPT_ANALYSIS_IN_PIPELINE_QUERY_FUNCTION": {
"message": [
"Operations that trigger DataFrame analysis or execution are not allowed in pipeline query functions. Move code outside of the pipeline query function."
]
},
"ATTRIBUTE_NOT_CALLABLE": {
"message": [
"Attribute `<attr_name>` in provided object `<obj_name>` is not callable."
Expand Down Expand Up @@ -219,6 +224,11 @@
"Unexpected filter <name>."
]
},
"DECORATOR_ARGUMENT_NOT_CALLABLE": {
"message": [
"The first positional argument passed to @<decorator_name> must be callable. Either add @<decorator_name> with no parameters to your function, or pass options to @<decorator_name> using keyword arguments (e.g. <example_usage>)."
]
},
"DIFFERENT_PANDAS_DATAFRAME": {
"message": [
"DataFrames are not almost equal:",
Expand Down Expand Up @@ -336,6 +346,11 @@
"<field_name>: <obj> is not an instance of type <data_type>."
]
},
"GRAPH_ELEMENT_DEFINED_OUTSIDE_OF_DECLARATIVE_PIPELINE": {
"message": [
"APIs that define elements of a declarative pipeline can only be invoked within the context of defining a pipeline."
]
},
"HIGHER_ORDER_FUNCTION_SHOULD_RETURN_COLUMN": {
"message": [
"Function `<func_name>` should return Column, got <return_type>."
Expand Down Expand Up @@ -552,6 +567,11 @@
"Mixed type replacements are not supported."
]
},
"MULTIPLE_PIPELINE_SPEC_FILES_FOUND": {
"message": [
"Multiple pipeline spec files found in the directory `<dir_path>`. Please remove one or choose a particular one with the --spec argument."
]
},
"NEGATIVE_VALUE": {
"message": [
"Value for `<arg_name>` must be greater than or equal to 0, got '<arg_value>'."
Expand Down Expand Up @@ -839,6 +859,41 @@
"The Pandas SCALAR_ITER UDF outputs more rows than input rows."
]
},
"PIPELINE_SPEC_DICT_KEY_NOT_STRING": {
"message": [
"For pipeline spec field `<field_name>`, key should be a string, got <key_type>."
]
},
"PIPELINE_SPEC_DICT_VALUE_NOT_STRING": {
"message": [
"For pipeline spec field `<field_name>`, value for key `<key_name>` should be a string, got <value_type>."
]
},
"PIPELINE_SPEC_FIELD_NOT_DICT": {
"message": [
"Pipeline spec field `<field_name>` should be a dict, got <field_type>."
]
},
"PIPELINE_SPEC_FILE_DOES_NOT_EXIST": {
"message": [
"The pipeline spec file `<spec_path>` does not exist."
]
},
"PIPELINE_SPEC_FILE_NOT_FOUND": {
"message": [
"No pipeline.yaml or pipeline.yml file provided in arguments or found in directory `<dir_path>` or readable ancestor directories."
]
},
"PIPELINE_SPEC_UNEXPECTED_FIELD": {
"message": [
"Pipeline spec field `<field_name>` is unexpected."
]
},
"PIPELINE_UNSUPPORTED_DEFINITIONS_FILE_EXTENSION": {
"message": [
"Pipeline definitions file `<file_path>` has an unsupported extension. Supported extensions are `.py` and `.sql`."
]
},
"PIPE_FUNCTION_EXITED": {
"message": [
"Pipe function `<func_name>` exited with error code <error_code>."
Expand Down Expand Up @@ -1145,6 +1200,11 @@
"Pie plot requires either a `y` column or `subplots=True`."
]
},
"UNSUPPORTED_PIPELINES_DATASET_TYPE": {
"message": [
"Unsupported pipelines dataset type: <dataset_type>."
]
},
"UNSUPPORTED_PLOT_BACKEND": {
"message": [
"`<backend>` is not supported, it should be one of the values from <supported_backends>"
Expand Down
1 change: 1 addition & 0 deletions python/pyspark/sql/connect/proto/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@
from pyspark.sql.connect.proto.common_pb2 import *
from pyspark.sql.connect.proto.ml_pb2 import *
from pyspark.sql.connect.proto.ml_common_pb2 import *
from pyspark.sql.connect.proto.pipelines_pb2 import *
31 changes: 31 additions & 0 deletions python/pyspark/sql/pipelines/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from pyspark.sql.pipelines.api import (
append_flow,
create_streaming_table,
materialized_view,
table,
temporary_view,
)

__all__ = [
"append_flow",
"create_streaming_table",
"materialized_view",
"table",
"temporary_view",
]
Loading