From ca9570066bc85d1079c6f86546fc1952d3cad0bf Mon Sep 17 00:00:00 2001 From: SanjayUG Date: Wed, 19 Mar 2025 23:08:57 +0530 Subject: [PATCH] fix: improve CSV path handling and error handling in substrait example --- examples/substrait.py | 68 ++++++++++++++++++++++++++++++------------- 1 file changed, 48 insertions(+), 20 deletions(-) diff --git a/examples/substrait.py b/examples/substrait.py index fa6f77912..ac03d0c86 100644 --- a/examples/substrait.py +++ b/examples/substrait.py @@ -15,35 +15,63 @@ # specific language governing permissions and limitations # under the License. +import os from datafusion import SessionContext from datafusion import substrait as ss # Create a DataFusion context ctx = SessionContext() -# Register table with context -ctx.register_csv("aggregate_test_data", "./testing/data/csv/aggregate_test_100.csv") +# Get the directory of this script +current_dir = os.path.dirname(os.path.abspath(__file__)) +csv_path = os.path.join(current_dir, "testing", "data", "csv", "aggregate_test_100.csv") -substrait_plan = ss.Serde.serialize_to_plan("SELECT * FROM aggregate_test_data", ctx) -# type(substrait_plan) -> +try: + # Register table with context using cross-platform path + ctx.register_csv("aggregate_test_data", csv_path) +except Exception as e: + print(f"Error registering CSV file: {e}") + print(f"Please ensure the CSV file exists at: {csv_path}") + raise -# Encode it to bytes -substrait_bytes = substrait_plan.encode() -# type(substrait_bytes) -> , at this point the bytes can be distributed to file, network, etc safely -# where they could subsequently be deserialized on the receiving end. +# Create a Substrait plan from SQL query +try: + substrait_plan = ss.Serde.serialize_to_plan("SELECT * FROM aggregate_test_data", ctx) +except Exception as e: + print(f"Error creating Substrait plan: {e}") + raise -# Alternative serialization approaches -# type(substrait_bytes) -> , at this point the bytes can be distributed to file, network, etc safely -# where they could subsequently be deserialized on the receiving end. -substrait_bytes = ss.Serde.serialize_bytes("SELECT * FROM aggregate_test_data", ctx) +# Encode the plan to bytes +try: + substrait_bytes = substrait_plan.encode() +except Exception as e: + print(f"Error encoding Substrait plan: {e}") + raise -# Imagine here bytes would be read from network, file, etc ... for example brevity this is omitted and variable is simply reused -# type(substrait_plan) -> -substrait_plan = ss.Serde.deserialize_bytes(substrait_bytes) +# Alternative serialization approach +try: + substrait_bytes = ss.Serde.serialize_bytes("SELECT * FROM aggregate_test_data", ctx) +except Exception as e: + print(f"Error in alternative serialization: {e}") + raise -# type(df_logical_plan) -> -df_logical_plan = ss.Consumer.from_substrait_plan(ctx, substrait_plan) +# Deserialize the bytes back to a Substrait plan +try: + substrait_plan = ss.Serde.deserialize_bytes(substrait_bytes) +except Exception as e: + print(f"Error deserializing Substrait plan: {e}") + raise -# Back to Substrait Plan just for demonstration purposes -# type(substrait_plan) -> -substrait_plan = ss.Producer.to_substrait_plan(df_logical_plan, ctx) +# Convert Substrait plan to DataFusion logical plan +try: + df_logical_plan = ss.Consumer.from_substrait_plan(ctx, substrait_plan) +except Exception as e: + print(f"Error converting to logical plan: {e}") + raise + +# Convert back to Substrait plan for demonstration +try: + substrait_plan = ss.Producer.to_substrait_plan(df_logical_plan, ctx) +except Exception as e: + print(f"Error converting back to Substrait plan: {e}") + raise