Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/integrations/datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ datafusion = { workspace = true }
futures = { workspace = true }
iceberg = { workspace = true }
parquet = { workspace = true }
tokio = { workspace = true }
tokio = { workspace = true, features = ["sync", "rt"] }
uuid = { workspace = true }

[dev-dependencies]
Expand Down
125 changes: 106 additions & 19 deletions crates/integrations/datafusion/src/physical_plan/partitioned_scan.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,48 @@
use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::Result as DFResult;
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::physical_expr::EquivalenceProperties;
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, PlanProperties};
use futures::TryStreamExt;
use futures::{Stream, StreamExt, TryStreamExt};
use iceberg::arrow::ArrowReaderBuilder;
use iceberg::io::FileIO;
use iceberg::scan::FileScanTask;
use tokio::runtime::Handle;
use tokio::sync::mpsc;

use crate::to_datafusion_error;

/// Channel buffer size for streaming record batches between runtimes.
///
/// Mirrors the constant used in `dd-datafusion`'s `IOExec`.
const CHANNEL_BUFFER_SIZE: usize = 32;

/// A DataFusion [`ExecutionPlan`] that reads one [`FileScanTask`] per partition.
///
/// Display information (projection, predicate) is derived at runtime from the output schema and
/// the tasks rather than stored as dedicated struct fields. This keeps the node self-contained:
/// all state is already serializable via `FileScanTask`, which simplifies the DataFusion
/// distributed codec, adding dedicated fields would require encoding them separately in the
/// protobuf round-trip.
///
/// When an IO runtime [`Handle`] is provided via [`IcebergPartitionedScan::with_io_handle`],
/// `execute()` spawns Parquet reads on that runtime and bridges results back via a channel.
/// This ensures that opendal / network I/O does not compete with CPU-bound compute threads
/// when runtime segregation is enabled.
#[derive(Debug, Clone)]
pub struct IcebergPartitionedScan {
tasks: Vec<FileScanTask>,
file_io: FileIO,
plan_properties: PlanProperties,
io_handle: Option<Handle>,
}

impl IcebergPartitionedScan {
Expand All @@ -37,9 +53,20 @@ impl IcebergPartitionedScan {
tasks,
file_io,
plan_properties,
io_handle: None,
}
}

/// Attaches an IO runtime handle to this scan.
///
/// When set, `execute()` spawns Parquet reads on the given runtime and bridges results
/// back to the caller via an mpsc channel, ensuring that opendal / network I/O runs on
/// the IO runtime rather than the CPU runtime.
pub fn with_io_handle(mut self, handle: Handle) -> Self {
self.io_handle = Some(handle);
self
}

pub fn tasks(&self) -> &[FileScanTask] {
&self.tasks
}
Expand Down Expand Up @@ -97,23 +124,61 @@ impl ExecutionPlan for IcebergPartitionedScan {
})?;

let file_io = self.file_io.clone();

let fut = async move {
let task_stream = futures::stream::once(futures::future::ready(Ok(task)));
let record_batch_stream = ArrowReaderBuilder::new(file_io)
.build()
.read(Box::pin(task_stream))
.map_err(to_datafusion_error)?
.map_err(to_datafusion_error);
Ok::<_, datafusion::error::DataFusionError>(record_batch_stream)
};

let stream = futures::stream::once(fut).try_flatten();

Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
stream,
)))
let schema = self.schema();

match &self.io_handle {
None => {
let fut = async move {
let task_stream = futures::stream::once(futures::future::ready(Ok(task)));
let record_batch_stream = ArrowReaderBuilder::new(file_io)
.build()
.read(Box::pin(task_stream))
.map_err(to_datafusion_error)?
.map_err(to_datafusion_error);
Ok::<_, datafusion::error::DataFusionError>(record_batch_stream)
};

let stream = futures::stream::once(fut).try_flatten();

Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
}
Some(io_handle) => {
let (tx, rx) = mpsc::channel::<DFResult<RecordBatch>>(CHANNEL_BUFFER_SIZE);

// The JoinHandle is intentionally dropped (task detached).
// Errors from the arrow reader are forwarded via the channel.
// If the task panics, `tx` is dropped, the channel closes, and
// the consumer sees end-of-stream. This matches the behaviour of
// `dd-datafusion`'s `IOExec`.
io_handle.spawn(async move {
let task_stream = futures::stream::once(futures::future::ready(Ok(task)));
match ArrowReaderBuilder::new(file_io)
.build()
.read(Box::pin(task_stream))
.map_err(to_datafusion_error)
{
Err(e) => {
// If the receiver is dropped (query cancelled), there is nothing to
// propagate the error to.
// Mirrors `dd-datafusion`'s `IOExec`.
let _ = tx.send(Err(e)).await;
}
Ok(stream) => {
let mut stream = stream.map_err(to_datafusion_error);
while let Some(batch) = stream.next().await {
// If the receiver is dropped, stop processing.
if tx.send(batch).await.is_err() {
break;
}
}
}
}
});

let stream = ChannelRecordBatchStream { receiver: rx };
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
}
}
}
}

Expand All @@ -138,9 +203,14 @@ impl DisplayAs for IcebergPartitionedScan {
.and_then(|t| t.predicate())
.map_or(String::new(), |p| format!("{p}"));
let file_count = self.tasks.len();
let io_tag = if self.io_handle.is_some() {
" [io-runtime]"
} else {
""
};
write!(
f,
"{} projection:[{projection}] predicate:[{predicate}] file_count:[{file_count}]",
"{}{io_tag} projection:[{projection}] predicate:[{predicate}] file_count:[{file_count}]",
self.name()
)?;
if self.tasks.len() <= 5 {
Expand All @@ -155,3 +225,20 @@ impl DisplayAs for IcebergPartitionedScan {
Ok(())
}
}

/// Bridges an mpsc channel into a [`Stream`] of [`RecordBatch`] results.
///
/// Used by [`IcebergPartitionedScan::execute`] when an IO runtime handle is configured:
/// the Parquet read runs on the IO runtime and pushes batches through this channel to the
/// CPU runtime that is polling the stream.
struct ChannelRecordBatchStream {
receiver: mpsc::Receiver<DFResult<RecordBatch>>,
}

impl Stream for ChannelRecordBatchStream {
type Item = DFResult<RecordBatch>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.receiver).poll_recv(cx)
}
}
110 changes: 81 additions & 29 deletions crates/integrations/datafusion/src/table/partitioned.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
use datafusion::physical_plan::ExecutionPlan;
use futures::TryStreamExt;
use iceberg::arrow::schema_to_arrow_schema;
use iceberg::io::FileIO;
use iceberg::scan::FileScanTask;
use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent};
use tokio::runtime::Handle;

use crate::error::to_datafusion_error;
use crate::physical_plan::expr_to_predicate::convert_filters_to_predicate;
Expand All @@ -22,6 +25,7 @@ pub struct IcebergPartitionedTableProvider {
catalog: Arc<dyn Catalog>,
table_ident: TableIdent,
schema: ArrowSchemaRef,
io_handle: Option<Handle>,
}

impl IcebergPartitionedTableProvider {
Expand All @@ -39,8 +43,55 @@ impl IcebergPartitionedTableProvider {
catalog,
table_ident,
schema,
io_handle: None,
})
}

/// Attaches an IO runtime handle to this provider.
///
/// When set, every [`IcebergPartitionedScan`] produced by [`scan()`](Self::scan) will have
/// the handle injected via [`IcebergPartitionedScan::with_io_handle`], ensuring that Parquet
/// reads via opendal run on the IO runtime rather than the CPU runtime.
///
/// Additionally, the network I/O performed during `scan()` itself (`load_table` and
/// `plan_files`) is spawned on this handle, preventing DNS / HTTP calls from running
/// on the CPU runtime during DataFusion's physical planning phase.
pub fn with_io_handle(mut self, handle: Handle) -> Self {
self.io_handle = Some(handle);
self
}

/// Fetches the file scan tasks for this table.
///
/// Performs all network I/O: `load_table` (REST catalog) and `plan_files` (manifest reads).
/// Extracted as a static method so it can be spawned on the IO runtime via
/// `io_handle.spawn()` when runtime segregation is enabled.
async fn fetch_tasks(
catalog: Arc<dyn Catalog>,
table_ident: TableIdent,
col_names: Option<Vec<String>>,
predicate: Option<iceberg::expr::Predicate>,
) -> Result<(FileIO, Vec<FileScanTask>)> {
let table = catalog.load_table(&table_ident).await?;

let mut builder = table.scan();
builder = match col_names {
Some(names) => builder.select(names),
None => builder.select_all(),
};
if let Some(pred) = predicate {
builder = builder.with_filter(pred);
}

let tasks = builder
.build()?
.plan_files()
.await?
.try_collect::<Vec<_>>()
.await?;

Ok((table.file_io().clone(), tasks))
}
}

#[async_trait]
Expand All @@ -64,13 +115,6 @@ impl TableProvider for IcebergPartitionedTableProvider {
filters: &[Expr],
_limit: Option<usize>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
// Second load: fetch the latest snapshot so scans always reflect current table state.
let table = self
.catalog
.load_table(&self.table_ident)
.await
.map_err(to_datafusion_error)?;

// Projection indices are resolved against self.schema (captured at try_new time),
// same as IcebergTableProvider / IcebergTableScan.
let col_names = projection.map(|indices| {
Expand All @@ -82,24 +126,31 @@ impl TableProvider for IcebergPartitionedTableProvider {

let predicate = convert_filters_to_predicate(filters);

let mut builder = table.scan();
builder = match col_names {
Some(names) => builder.select(names),
None => builder.select_all(),
// `load_table` (REST catalog) and `plan_files` (manifest reads) both trigger network I/O.
// When runtime segregation is enabled, DataFusion calls `scan()` on the CPU runtime during
// physical planning. Spawning on the IO handle prevents DNS / HTTP calls from running on
// the CPU runtime and causing a panic.
let catalog = Arc::clone(&self.catalog);
let table_ident = self.table_ident.clone();
let (file_io, tasks) = match &self.io_handle {
Some(h) => h
.spawn(Self::fetch_tasks(
catalog,
table_ident,
col_names,
predicate,
))
.await
.map_err(|e| {
DataFusionError::Internal(format!(
"IcebergPartitionedScan: IO task panicked: {e}"
))
})?
.map_err(to_datafusion_error)?,
None => Self::fetch_tasks(catalog, table_ident, col_names, predicate)
.await
.map_err(to_datafusion_error)?,
};
if let Some(pred) = predicate {
builder = builder.with_filter(pred);
}

let tasks = builder
.build()
.map_err(to_datafusion_error)?
.plan_files()
.await
.map_err(to_datafusion_error)?
.try_collect::<Vec<_>>()
.await
.map_err(to_datafusion_error)?;

let output_schema = match projection {
None => self.schema.clone(),
Expand All @@ -108,11 +159,12 @@ impl TableProvider for IcebergPartitionedTableProvider {
})?),
};

Ok(Arc::new(IcebergPartitionedScan::new(
tasks,
table.file_io().clone(),
output_schema,
)))
let mut scan = IcebergPartitionedScan::new(tasks, file_io, output_schema);
if let Some(h) = &self.io_handle {
scan = scan.with_io_handle(h.clone());
}

Ok(Arc::new(scan))
}

fn supports_filters_pushdown(
Expand Down