diff --git a/crates/integrations/datafusion/Cargo.toml b/crates/integrations/datafusion/Cargo.toml index fd3e489e4b..7e030d16ef 100644 --- a/crates/integrations/datafusion/Cargo.toml +++ b/crates/integrations/datafusion/Cargo.toml @@ -36,7 +36,7 @@ datafusion = { workspace = true } futures = { workspace = true } iceberg = { workspace = true } parquet = { workspace = true } -tokio = { workspace = true } +tokio = { workspace = true, features = ["sync", "rt"] } uuid = { workspace = true } [dev-dependencies] diff --git a/crates/integrations/datafusion/src/physical_plan/partitioned_scan.rs b/crates/integrations/datafusion/src/physical_plan/partitioned_scan.rs index dcd1c3e09a..1c852ea538 100644 --- a/crates/integrations/datafusion/src/physical_plan/partitioned_scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/partitioned_scan.rs @@ -1,20 +1,30 @@ use std::any::Any; +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; +use datafusion::arrow::record_batch::RecordBatch; use datafusion::error::Result as DFResult; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; use datafusion::physical_expr::EquivalenceProperties; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, PlanProperties}; -use futures::TryStreamExt; +use futures::{Stream, StreamExt, TryStreamExt}; use iceberg::arrow::ArrowReaderBuilder; use iceberg::io::FileIO; use iceberg::scan::FileScanTask; +use tokio::runtime::Handle; +use tokio::sync::mpsc; use crate::to_datafusion_error; +/// Channel buffer size for streaming record batches between runtimes. +/// +/// Mirrors the constant used in `dd-datafusion`'s `IOExec`. +const CHANNEL_BUFFER_SIZE: usize = 32; + /// A DataFusion [`ExecutionPlan`] that reads one [`FileScanTask`] per partition. /// /// Display information (projection, predicate) is derived at runtime from the output schema and @@ -22,11 +32,17 @@ use crate::to_datafusion_error; /// all state is already serializable via `FileScanTask`, which simplifies the DataFusion /// distributed codec, adding dedicated fields would require encoding them separately in the /// protobuf round-trip. +/// +/// When an IO runtime [`Handle`] is provided via [`IcebergPartitionedScan::with_io_handle`], +/// `execute()` spawns Parquet reads on that runtime and bridges results back via a channel. +/// This ensures that opendal / network I/O does not compete with CPU-bound compute threads +/// when runtime segregation is enabled. #[derive(Debug, Clone)] pub struct IcebergPartitionedScan { tasks: Vec, file_io: FileIO, plan_properties: PlanProperties, + io_handle: Option, } impl IcebergPartitionedScan { @@ -37,9 +53,20 @@ impl IcebergPartitionedScan { tasks, file_io, plan_properties, + io_handle: None, } } + /// Attaches an IO runtime handle to this scan. + /// + /// When set, `execute()` spawns Parquet reads on the given runtime and bridges results + /// back to the caller via an mpsc channel, ensuring that opendal / network I/O runs on + /// the IO runtime rather than the CPU runtime. + pub fn with_io_handle(mut self, handle: Handle) -> Self { + self.io_handle = Some(handle); + self + } + pub fn tasks(&self) -> &[FileScanTask] { &self.tasks } @@ -97,23 +124,61 @@ impl ExecutionPlan for IcebergPartitionedScan { })?; let file_io = self.file_io.clone(); - - let fut = async move { - let task_stream = futures::stream::once(futures::future::ready(Ok(task))); - let record_batch_stream = ArrowReaderBuilder::new(file_io) - .build() - .read(Box::pin(task_stream)) - .map_err(to_datafusion_error)? - .map_err(to_datafusion_error); - Ok::<_, datafusion::error::DataFusionError>(record_batch_stream) - }; - - let stream = futures::stream::once(fut).try_flatten(); - - Ok(Box::pin(RecordBatchStreamAdapter::new( - self.schema(), - stream, - ))) + let schema = self.schema(); + + match &self.io_handle { + None => { + let fut = async move { + let task_stream = futures::stream::once(futures::future::ready(Ok(task))); + let record_batch_stream = ArrowReaderBuilder::new(file_io) + .build() + .read(Box::pin(task_stream)) + .map_err(to_datafusion_error)? + .map_err(to_datafusion_error); + Ok::<_, datafusion::error::DataFusionError>(record_batch_stream) + }; + + let stream = futures::stream::once(fut).try_flatten(); + + Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))) + } + Some(io_handle) => { + let (tx, rx) = mpsc::channel::>(CHANNEL_BUFFER_SIZE); + + // The JoinHandle is intentionally dropped (task detached). + // Errors from the arrow reader are forwarded via the channel. + // If the task panics, `tx` is dropped, the channel closes, and + // the consumer sees end-of-stream. This matches the behaviour of + // `dd-datafusion`'s `IOExec`. + io_handle.spawn(async move { + let task_stream = futures::stream::once(futures::future::ready(Ok(task))); + match ArrowReaderBuilder::new(file_io) + .build() + .read(Box::pin(task_stream)) + .map_err(to_datafusion_error) + { + Err(e) => { + // If the receiver is dropped (query cancelled), there is nothing to + // propagate the error to. + // Mirrors `dd-datafusion`'s `IOExec`. + let _ = tx.send(Err(e)).await; + } + Ok(stream) => { + let mut stream = stream.map_err(to_datafusion_error); + while let Some(batch) = stream.next().await { + // If the receiver is dropped, stop processing. + if tx.send(batch).await.is_err() { + break; + } + } + } + } + }); + + let stream = ChannelRecordBatchStream { receiver: rx }; + Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))) + } + } } } @@ -138,9 +203,14 @@ impl DisplayAs for IcebergPartitionedScan { .and_then(|t| t.predicate()) .map_or(String::new(), |p| format!("{p}")); let file_count = self.tasks.len(); + let io_tag = if self.io_handle.is_some() { + " [io-runtime]" + } else { + "" + }; write!( f, - "{} projection:[{projection}] predicate:[{predicate}] file_count:[{file_count}]", + "{}{io_tag} projection:[{projection}] predicate:[{predicate}] file_count:[{file_count}]", self.name() )?; if self.tasks.len() <= 5 { @@ -155,3 +225,20 @@ impl DisplayAs for IcebergPartitionedScan { Ok(()) } } + +/// Bridges an mpsc channel into a [`Stream`] of [`RecordBatch`] results. +/// +/// Used by [`IcebergPartitionedScan::execute`] when an IO runtime handle is configured: +/// the Parquet read runs on the IO runtime and pushes batches through this channel to the +/// CPU runtime that is polling the stream. +struct ChannelRecordBatchStream { + receiver: mpsc::Receiver>, +} + +impl Stream for ChannelRecordBatchStream { + type Item = DFResult; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.receiver).poll_recv(cx) + } +} diff --git a/crates/integrations/datafusion/src/table/partitioned.rs b/crates/integrations/datafusion/src/table/partitioned.rs index e94f46eefb..8a51f7cb65 100644 --- a/crates/integrations/datafusion/src/table/partitioned.rs +++ b/crates/integrations/datafusion/src/table/partitioned.rs @@ -11,7 +11,10 @@ use datafusion::logical_expr::{Expr, TableProviderFilterPushDown}; use datafusion::physical_plan::ExecutionPlan; use futures::TryStreamExt; use iceberg::arrow::schema_to_arrow_schema; +use iceberg::io::FileIO; +use iceberg::scan::FileScanTask; use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent}; +use tokio::runtime::Handle; use crate::error::to_datafusion_error; use crate::physical_plan::expr_to_predicate::convert_filters_to_predicate; @@ -22,6 +25,7 @@ pub struct IcebergPartitionedTableProvider { catalog: Arc, table_ident: TableIdent, schema: ArrowSchemaRef, + io_handle: Option, } impl IcebergPartitionedTableProvider { @@ -39,8 +43,55 @@ impl IcebergPartitionedTableProvider { catalog, table_ident, schema, + io_handle: None, }) } + + /// Attaches an IO runtime handle to this provider. + /// + /// When set, every [`IcebergPartitionedScan`] produced by [`scan()`](Self::scan) will have + /// the handle injected via [`IcebergPartitionedScan::with_io_handle`], ensuring that Parquet + /// reads via opendal run on the IO runtime rather than the CPU runtime. + /// + /// Additionally, the network I/O performed during `scan()` itself (`load_table` and + /// `plan_files`) is spawned on this handle, preventing DNS / HTTP calls from running + /// on the CPU runtime during DataFusion's physical planning phase. + pub fn with_io_handle(mut self, handle: Handle) -> Self { + self.io_handle = Some(handle); + self + } + + /// Fetches the file scan tasks for this table. + /// + /// Performs all network I/O: `load_table` (REST catalog) and `plan_files` (manifest reads). + /// Extracted as a static method so it can be spawned on the IO runtime via + /// `io_handle.spawn()` when runtime segregation is enabled. + async fn fetch_tasks( + catalog: Arc, + table_ident: TableIdent, + col_names: Option>, + predicate: Option, + ) -> Result<(FileIO, Vec)> { + let table = catalog.load_table(&table_ident).await?; + + let mut builder = table.scan(); + builder = match col_names { + Some(names) => builder.select(names), + None => builder.select_all(), + }; + if let Some(pred) = predicate { + builder = builder.with_filter(pred); + } + + let tasks = builder + .build()? + .plan_files() + .await? + .try_collect::>() + .await?; + + Ok((table.file_io().clone(), tasks)) + } } #[async_trait] @@ -64,13 +115,6 @@ impl TableProvider for IcebergPartitionedTableProvider { filters: &[Expr], _limit: Option, ) -> DFResult> { - // Second load: fetch the latest snapshot so scans always reflect current table state. - let table = self - .catalog - .load_table(&self.table_ident) - .await - .map_err(to_datafusion_error)?; - // Projection indices are resolved against self.schema (captured at try_new time), // same as IcebergTableProvider / IcebergTableScan. let col_names = projection.map(|indices| { @@ -82,24 +126,31 @@ impl TableProvider for IcebergPartitionedTableProvider { let predicate = convert_filters_to_predicate(filters); - let mut builder = table.scan(); - builder = match col_names { - Some(names) => builder.select(names), - None => builder.select_all(), + // `load_table` (REST catalog) and `plan_files` (manifest reads) both trigger network I/O. + // When runtime segregation is enabled, DataFusion calls `scan()` on the CPU runtime during + // physical planning. Spawning on the IO handle prevents DNS / HTTP calls from running on + // the CPU runtime and causing a panic. + let catalog = Arc::clone(&self.catalog); + let table_ident = self.table_ident.clone(); + let (file_io, tasks) = match &self.io_handle { + Some(h) => h + .spawn(Self::fetch_tasks( + catalog, + table_ident, + col_names, + predicate, + )) + .await + .map_err(|e| { + DataFusionError::Internal(format!( + "IcebergPartitionedScan: IO task panicked: {e}" + )) + })? + .map_err(to_datafusion_error)?, + None => Self::fetch_tasks(catalog, table_ident, col_names, predicate) + .await + .map_err(to_datafusion_error)?, }; - if let Some(pred) = predicate { - builder = builder.with_filter(pred); - } - - let tasks = builder - .build() - .map_err(to_datafusion_error)? - .plan_files() - .await - .map_err(to_datafusion_error)? - .try_collect::>() - .await - .map_err(to_datafusion_error)?; let output_schema = match projection { None => self.schema.clone(), @@ -108,11 +159,12 @@ impl TableProvider for IcebergPartitionedTableProvider { })?), }; - Ok(Arc::new(IcebergPartitionedScan::new( - tasks, - table.file_io().clone(), - output_schema, - ))) + let mut scan = IcebergPartitionedScan::new(tasks, file_io, output_schema); + if let Some(h) = &self.io_handle { + scan = scan.with_io_handle(h.clone()); + } + + Ok(Arc::new(scan)) } fn supports_filters_pushdown(