Skip to content

fix: improve CSV path handling and error handling in substrait example #1073

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 48 additions & 20 deletions examples/substrait.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,35 +15,63 @@
# specific language governing permissions and limitations
# under the License.

import os
from datafusion import SessionContext
from datafusion import substrait as ss

# Create a DataFusion context
ctx = SessionContext()

# Register table with context
ctx.register_csv("aggregate_test_data", "./testing/data/csv/aggregate_test_100.csv")
# Get the directory of this script
current_dir = os.path.dirname(os.path.abspath(__file__))
csv_path = os.path.join(current_dir, "testing", "data", "csv", "aggregate_test_100.csv")

substrait_plan = ss.Serde.serialize_to_plan("SELECT * FROM aggregate_test_data", ctx)
# type(substrait_plan) -> <class 'datafusion.substrait.plan'>
try:
# Register table with context using cross-platform path
ctx.register_csv("aggregate_test_data", csv_path)
except Exception as e:
print(f"Error registering CSV file: {e}")
print(f"Please ensure the CSV file exists at: {csv_path}")
raise

# Encode it to bytes
substrait_bytes = substrait_plan.encode()
# type(substrait_bytes) -> <class 'bytes'>, at this point the bytes can be distributed to file, network, etc safely
# where they could subsequently be deserialized on the receiving end.
# Create a Substrait plan from SQL query
try:
substrait_plan = ss.Serde.serialize_to_plan("SELECT * FROM aggregate_test_data", ctx)
except Exception as e:
print(f"Error creating Substrait plan: {e}")
raise

# Alternative serialization approaches
# type(substrait_bytes) -> <class 'bytes'>, at this point the bytes can be distributed to file, network, etc safely
# where they could subsequently be deserialized on the receiving end.
substrait_bytes = ss.Serde.serialize_bytes("SELECT * FROM aggregate_test_data", ctx)
# Encode the plan to bytes
try:
substrait_bytes = substrait_plan.encode()
except Exception as e:
print(f"Error encoding Substrait plan: {e}")
raise

# Imagine here bytes would be read from network, file, etc ... for example brevity this is omitted and variable is simply reused
# type(substrait_plan) -> <class 'datafusion.substrait.plan'>
substrait_plan = ss.Serde.deserialize_bytes(substrait_bytes)
# Alternative serialization approach
try:
substrait_bytes = ss.Serde.serialize_bytes("SELECT * FROM aggregate_test_data", ctx)
except Exception as e:
print(f"Error in alternative serialization: {e}")
raise

# type(df_logical_plan) -> <class 'substrait.LogicalPlan'>
df_logical_plan = ss.Consumer.from_substrait_plan(ctx, substrait_plan)
# Deserialize the bytes back to a Substrait plan
try:
substrait_plan = ss.Serde.deserialize_bytes(substrait_bytes)
except Exception as e:
print(f"Error deserializing Substrait plan: {e}")
raise

# Back to Substrait Plan just for demonstration purposes
# type(substrait_plan) -> <class 'datafusion.substrait.plan'>
substrait_plan = ss.Producer.to_substrait_plan(df_logical_plan, ctx)
# Convert Substrait plan to DataFusion logical plan
try:
df_logical_plan = ss.Consumer.from_substrait_plan(ctx, substrait_plan)
except Exception as e:
print(f"Error converting to logical plan: {e}")
raise

# Convert back to Substrait plan for demonstration
try:
substrait_plan = ss.Producer.to_substrait_plan(df_logical_plan, ctx)
except Exception as e:
print(f"Error converting back to Substrait plan: {e}")
raise