diff --git a/crates/integrations/datafusion/src/lib.rs b/crates/integrations/datafusion/src/lib.rs index 9a84832d88..4a41940fad 100644 --- a/crates/integrations/datafusion/src/lib.rs +++ b/crates/integrations/datafusion/src/lib.rs @@ -24,7 +24,9 @@ pub use error::*; pub mod physical_plan; mod schema; pub mod table; +pub use physical_plan::IcebergBucketScan; pub use physical_plan::IcebergPartitionedScan; +pub use table::bucket::IcebergBucketTableProvider; pub use table::partitioned::IcebergPartitionedTableProvider; pub use table::table_provider_factory::IcebergTableProviderFactory; pub use table::*; diff --git a/crates/integrations/datafusion/src/physical_plan/bucket_scan.rs b/crates/integrations/datafusion/src/physical_plan/bucket_scan.rs new file mode 100644 index 0000000000..bbc9c6d1d2 --- /dev/null +++ b/crates/integrations/datafusion/src/physical_plan/bucket_scan.rs @@ -0,0 +1,255 @@ +use std::any::Any; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::error::Result as DFResult; +use datafusion::execution::{SendableRecordBatchStream, TaskContext}; +use datafusion::physical_expr::{EquivalenceProperties, PhysicalExpr}; +use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, PlanProperties}; +use futures::{Stream, StreamExt, TryStreamExt}; +use iceberg::arrow::ArrowReaderBuilder; +use iceberg::io::FileIO; +use iceberg::scan::FileScanTask; +use tokio::runtime::Handle; +use tokio::sync::mpsc; + +use crate::to_datafusion_error; + +const CHANNEL_BUFFER_SIZE: usize = 32; + +/// A DataFusion [`ExecutionPlan`] that maps Iceberg bucket partitions to DataFusion partitions. +/// +/// Each DataFusion partition corresponds to one Iceberg bucket value (0..N). A bucket may +/// contain zero, one, or multiple data files; `execute(i)` streams all RecordBatches for +/// bucket `i` sequentially via a single [`ArrowReaderBuilder`] pass. +/// +/// Reports `Partitioning::Hash([source_col_expr], N)`, so DataFusion can eliminate `Exchange` +/// nodes for aggregations and joins on the bucketed column. +/// +/// When an IO runtime [`Handle`] is provided via [`IcebergBucketScan::with_io_handle`], +/// Parquet reads are spawned on that runtime and bridged back via an mpsc channel, same +/// pattern as [`IcebergPartitionedScan`](crate::physical_plan::partitioned_scan::IcebergPartitionedScan). +#[derive(Debug, Clone)] +pub struct IcebergBucketScan { + /// Non-empty groups of FileScanTasks, one group per active Iceberg bucket. + /// `len()` equals the number of populated buckets K (≤ N total bucket count). + /// Empty buckets are excluded: only buckets with at least one data file after + /// Iceberg predicate pruning become DataFusion partitions. + tasks_by_bucket: Vec>, + file_io: FileIO, + plan_properties: PlanProperties, + io_handle: Option, + /// Total bucket count N from the partition spec. Stored for display and codec purposes; + /// may differ from `tasks_by_bucket.len()` when some buckets are empty or pruned. + bucket_count: usize, +} + +impl IcebergBucketScan { + pub fn new( + tasks_by_bucket: Vec>, + file_io: FileIO, + schema: ArrowSchemaRef, + source_col_expr: Arc, + bucket_count: usize, + ) -> Self { + let active_buckets = tasks_by_bucket.len(); + let plan_properties = PlanProperties::new( + EquivalenceProperties::new(schema), + Partitioning::Hash(vec![source_col_expr], active_buckets), + EmissionType::Incremental, + Boundedness::Bounded, + ); + Self { + tasks_by_bucket, + file_io, + plan_properties, + io_handle: None, + bucket_count, + } + } + + /// Attaches an IO runtime handle. + /// + /// When set, `execute()` spawns Parquet reads on the given runtime and bridges results + /// back via an mpsc channel, preventing opendal / network I/O from running on the CPU + /// runtime. + pub fn with_io_handle(mut self, handle: Handle) -> Self { + self.io_handle = Some(handle); + self + } + + /// Returns the total bucket count N from the partition spec. + /// + /// This may differ from `tasks_by_bucket().len()` when some buckets are empty or pruned + /// by predicate pushdown. + pub fn bucket_count(&self) -> usize { + self.bucket_count + } + + pub fn file_io(&self) -> &FileIO { + &self.file_io + } + + pub fn tasks_by_bucket(&self) -> &[Vec] { + &self.tasks_by_bucket + } +} + +impl ExecutionPlan for IcebergBucketScan { + fn name(&self) -> &str { + "IcebergBucketScan" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> DFResult> { + Ok(self) + } + + fn properties(&self) -> &PlanProperties { + &self.plan_properties + } + + fn execute( + &self, + partition: usize, + _context: Arc, + ) -> DFResult { + let tasks = self + .tasks_by_bucket + .get(partition) + .ok_or_else(|| { + datafusion::error::DataFusionError::Internal(format!( + "IcebergBucketScan: partition index {partition} out of bounds \ + (bucket count: {})", + self.tasks_by_bucket.len() + )) + })? + .clone(); + + let file_io = self.file_io.clone(); + let schema = self.schema(); + + if tasks.is_empty() { + let empty = futures::stream::empty::>(); + return Ok(Box::pin(RecordBatchStreamAdapter::new(schema, empty))); + } + + match &self.io_handle { + None => { + let fut = async move { + let task_stream = + futures::stream::iter(tasks.into_iter().map(Ok::<_, iceberg::Error>)); + let record_batch_stream = ArrowReaderBuilder::new(file_io) + .build() + .read(Box::pin(task_stream)) + .map_err(to_datafusion_error)? + .map_err(to_datafusion_error); + Ok::<_, datafusion::error::DataFusionError>(record_batch_stream) + }; + let stream = futures::stream::once(fut).try_flatten(); + Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))) + } + Some(io_handle) => { + let (tx, rx) = mpsc::channel::>(CHANNEL_BUFFER_SIZE); + + io_handle.spawn(async move { + let task_stream = + futures::stream::iter(tasks.into_iter().map(Ok::<_, iceberg::Error>)); + match ArrowReaderBuilder::new(file_io) + .build() + .read(Box::pin(task_stream)) + .map_err(to_datafusion_error) + { + Err(e) => { + let _ = tx.send(Err(e)).await; + } + Ok(stream) => { + let mut stream = stream.map_err(to_datafusion_error); + while let Some(batch) = stream.next().await { + if tx.send(batch).await.is_err() { + break; + } + } + } + } + }); + + let stream = BucketChannelStream { receiver: rx }; + Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))) + } + } + } +} + +impl DisplayAs for IcebergBucketScan { + fn fmt_as( + &self, + _t: datafusion::physical_plan::DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + let projection = self + .schema() + .fields() + .iter() + .map(|f| f.name().as_str()) + .collect::>() + .join(","); + let predicate = self + .tasks_by_bucket + .iter() + .flatten() + .next() + .and_then(|t| t.predicate()) + .map_or(String::new(), |p| format!("{p}")); + let total_files: usize = self.tasks_by_bucket.iter().map(|g| g.len()).sum(); + let active_buckets = self.tasks_by_bucket.len(); + let bucket_count = self.bucket_count; + let io_tag = if self.io_handle.is_some() { + " [io-runtime]" + } else { + "" + }; + let partition_spec = match self.properties().output_partitioning() { + Partitioning::Hash(exprs, _) => exprs + .iter() + .map(|e| format!("bucket({bucket_count}, {e})")) + .collect::>() + .join(", "), + other => format!("{other:?}"), + }; + write!( + f, + "{}{io_tag} projection:[{projection}] predicate:[{predicate}] \ + partition_spec:[{partition_spec}] \ + active_buckets:[{active_buckets}/{bucket_count}] total_files:[{total_files}]", + self.name() + ) + } +} + +struct BucketChannelStream { + receiver: mpsc::Receiver>, +} + +impl Stream for BucketChannelStream { + type Item = DFResult; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.receiver).poll_recv(cx) + } +} diff --git a/crates/integrations/datafusion/src/physical_plan/mod.rs b/crates/integrations/datafusion/src/physical_plan/mod.rs index ed9f46f30f..83299c9dc1 100644 --- a/crates/integrations/datafusion/src/physical_plan/mod.rs +++ b/crates/integrations/datafusion/src/physical_plan/mod.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +pub(crate) mod bucket_scan; pub(crate) mod commit; pub(crate) mod expr_to_predicate; pub(crate) mod metadata_scan; @@ -27,6 +28,7 @@ pub(crate) mod write; pub(crate) const DATA_FILES_COL_NAME: &str = "data_files"; +pub use bucket_scan::IcebergBucketScan; pub use partitioned_scan::IcebergPartitionedScan; pub use project::project_with_partition; pub use scan::IcebergTableScan; diff --git a/crates/integrations/datafusion/src/table/bucket.rs b/crates/integrations/datafusion/src/table/bucket.rs new file mode 100644 index 0000000000..24e83ac46d --- /dev/null +++ b/crates/integrations/datafusion/src/table/bucket.rs @@ -0,0 +1,278 @@ +use std::any::Any; +use std::sync::Arc; + +use async_trait::async_trait; +use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; +use datafusion::catalog::Session; +use datafusion::common::DataFusionError; +use datafusion::datasource::{TableProvider, TableType}; +use datafusion::error::Result as DFResult; +use datafusion::logical_expr::{Expr, TableProviderFilterPushDown}; +use datafusion::physical_expr::PhysicalExpr; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_plan::expressions::Column; +use futures::TryStreamExt; +use iceberg::arrow::schema_to_arrow_schema; +use iceberg::io::FileIO; +use iceberg::scan::FileScanTask; +use iceberg::spec::{Literal, PartitionSpec, PrimitiveLiteral, Schema, Transform}; +use iceberg::{Catalog, NamespaceIdent, Result, TableIdent}; +use tokio::runtime::Handle; + +use crate::error::to_datafusion_error; +use crate::physical_plan::bucket_scan::IcebergBucketScan; +use crate::physical_plan::expr_to_predicate::convert_filters_to_predicate; + +/// A DataFusion [`TableProvider`] that exposes Iceberg bucket partitions as native DataFusion +/// partitions, enabling DataFusion to eliminate shuffle (`Exchange`) nodes for aggregations and +/// joins on the bucketed column. +/// +/// Unlike [`IcebergPartitionedTableProvider`](crate::table::partitioned::IcebergPartitionedTableProvider), +/// which maps one DataFusion partition per data file, this provider maps one DataFusion partition +/// per Iceberg bucket. All files sharing the same bucket value are read within a single partition, +/// and the plan reports `Partitioning::Hash([source_col], N)`. +/// +/// Returns an error from `scan()` if the table has no bucket partition field or if the bucketed +/// source column was projected out. +#[derive(Debug, Clone)] +pub struct IcebergBucketTableProvider { + catalog: Arc, + table_ident: TableIdent, + schema: ArrowSchemaRef, + io_handle: Option, +} + +impl IcebergBucketTableProvider { + pub async fn try_new( + catalog: Arc, + namespace: NamespaceIdent, + name: impl Into, + ) -> Result { + let table_ident = TableIdent::new(namespace, name.into()); + let table = catalog.load_table(&table_ident).await?; + let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?); + Ok(Self { + catalog, + table_ident, + schema, + io_handle: None, + }) + } + + /// Attaches an IO runtime handle. + /// + /// When set, the network I/O performed during `scan()` (`load_table`, `plan_files`) is + /// spawned on this handle, and each resulting [`IcebergBucketScan`] will have the handle + /// injected for Parquet reads. + pub fn with_io_handle(mut self, handle: Handle) -> Self { + self.io_handle = Some(handle); + self + } + + async fn fetch_tasks( + catalog: Arc, + table_ident: TableIdent, + col_names: Option>, + predicate: Option, + ) -> Result<(FileIO, Vec, Arc, Arc)> { + let table = catalog.load_table(&table_ident).await?; + let partition_spec = table.metadata().default_partition_spec().clone(); + let iceberg_schema = table.metadata().current_schema().clone(); + + let mut builder = table.scan(); + builder = match col_names { + Some(names) => builder.select(names), + None => builder.select_all(), + }; + if let Some(pred) = predicate { + builder = builder.with_filter(pred); + } + + let tasks = builder + .build()? + .plan_files() + .await? + .try_collect::>() + .await?; + + Ok(( + table.file_io().clone(), + tasks, + partition_spec, + iceberg_schema, + )) + } +} + +#[async_trait] +impl TableProvider for IcebergBucketTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> ArrowSchemaRef { + self.schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + _state: &dyn Session, + projection: Option<&Vec>, + filters: &[Expr], + _limit: Option, + ) -> DFResult> { + let col_names = projection.map(|indices| { + indices + .iter() + .map(|&i| self.schema.field(i).name().clone()) + .collect::>() + }); + + let predicate = convert_filters_to_predicate(filters); + + let catalog = Arc::clone(&self.catalog); + let table_ident = self.table_ident.clone(); + let (file_io, tasks, partition_spec, iceberg_schema) = match &self.io_handle { + Some(h) => h + .spawn(Self::fetch_tasks( + catalog, + table_ident, + col_names, + predicate, + )) + .await + .map_err(|e| { + DataFusionError::Internal(format!("IcebergBucketScan: IO task panicked: {e}")) + })? + .map_err(to_datafusion_error)?, + None => Self::fetch_tasks(catalog, table_ident, col_names, predicate) + .await + .map_err(to_datafusion_error)?, + }; + + let output_schema: ArrowSchemaRef = match projection { + None => self.schema.clone(), + Some(indices) => Arc::new(self.schema.project(indices).map_err(|e| { + DataFusionError::Internal(format!("schema projection failed: {e}")) + })?), + }; + + let info = detect_bucket_field(&partition_spec, &iceberg_schema, &output_schema) + .ok_or_else(|| { + DataFusionError::Plan( + "IcebergBucketTableProvider: table has no bucket partition field, \ + or the bucket source column was projected out" + .to_string(), + ) + })?; + + let source_col_expr = Arc::new(Column::new(&info.source_col_name, info.source_col_idx)) + as Arc; + + let tasks_by_bucket = group_tasks_by_bucket(tasks, &info); + + let mut scan = IcebergBucketScan::new( + tasks_by_bucket, + file_io, + output_schema, + source_col_expr, + info.bucket_count as usize, + ); + if let Some(h) = &self.io_handle { + scan = scan.with_io_handle(h.clone()); + } + + Ok(Arc::new(scan)) + } + + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> DFResult> { + Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()]) + } +} + +struct BucketFieldInfo { + /// Index of the bucket field within `PartitionSpec.fields()`. + field_idx: usize, + /// Name of the source column in the Arrow output schema. + source_col_name: String, + /// Index of the source column in the Arrow output schema. + source_col_idx: usize, + /// Number of buckets (N in `bucket(N, col)`). + bucket_count: u32, +} + +/// Finds the first `Transform::Bucket(N)` field in the partition spec and resolves its source +/// column in the output Arrow schema. +/// +/// Returns `None` if no bucket field exists or if the source column was projected out. +fn detect_bucket_field( + partition_spec: &PartitionSpec, + iceberg_schema: &Schema, + output_schema: &datafusion::arrow::datatypes::Schema, +) -> Option { + partition_spec + .fields() + .iter() + .enumerate() + .find_map(|(field_idx, pf)| { + let Transform::Bucket(n) = pf.transform else { + return None; + }; + let source_name = iceberg_schema.field_by_id(pf.source_id)?.name.clone(); + let col_idx = output_schema.index_of(&source_name).ok()?; + Some(BucketFieldInfo { + field_idx, + source_col_name: source_name, + source_col_idx: col_idx, + bucket_count: n, + }) + }) +} + +/// Groups `FileScanTask`s by their Iceberg bucket value, retaining only non-empty groups. +/// +/// Tasks are distributed into `bucket_count` slots by bucket value. Empty slots are then +/// discarded, so the returned `Vec` contains only groups with at least one task. Its length +/// equals the number of distinct populated buckets (≤ `info.bucket_count`). +/// +/// This ensures `IcebergBucketScan` creates DataFusion partitions only for buckets that +/// actually have data, including after Iceberg predicate pruning, where `plan_files()` may +/// already have reduced the task list to a single bucket. +/// +/// Tasks whose partition value is absent or out of range are silently ignored. +fn group_tasks_by_bucket( + tasks: Vec, + info: &BucketFieldInfo, +) -> Vec> { + let mut groups: Vec> = vec![vec![]; info.bucket_count as usize]; + for task in tasks { + if let Some(bucket) = extract_bucket_value(&task, info.field_idx) { + let b = bucket as usize; + if b < groups.len() { + groups[b].push(task); + } + } + } + // Drop empty groups: only populated buckets become DataFusion partitions. + // Partitioning::Hash([source_col], K) remains correct: same source_col value → + // same Iceberg bucket (deterministic) = same surviving partition. + groups.into_iter().filter(|g| !g.is_empty()).collect() +} + +/// Extracts the integer bucket value stored in `FileScanTask.partition` at `field_idx`. +/// +/// Bucket values are stored as `Literal::Primitive(PrimitiveLiteral::Int(i32))` in the +/// partition `Struct`, pre-computed when the manifest was written. +fn extract_bucket_value(task: &FileScanTask, field_idx: usize) -> Option { + match task.partition.as_ref()?.fields().get(field_idx)?.as_ref()? { + Literal::Primitive(PrimitiveLiteral::Int(v)) => Some(*v), + _ => None, + } +} diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index c98c4f3d99..c72d298536 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -25,6 +25,7 @@ //! - [`IcebergStaticTableProvider`]: Static provider for read-only access to a specific //! table snapshot. Use for consistent analytical queries or time-travel scenarios. +pub mod bucket; pub mod metadata_table; pub mod partitioned; pub mod table_provider_factory;