diff --git a/crates/integrations/datafusion/src/catalog.rs b/crates/integrations/datafusion/src/catalog.rs index c3cbcc88b4..c9cfa2022e 100644 --- a/crates/integrations/datafusion/src/catalog.rs +++ b/crates/integrations/datafusion/src/catalog.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use datafusion::catalog::{CatalogProvider, SchemaProvider}; use futures::future::try_join_all; -use iceberg::{Catalog, NamespaceIdent, Result}; +use iceberg::{Catalog, NamespaceIdent, Result, Runtime}; use crate::schema::IcebergSchemaProvider; @@ -47,6 +47,14 @@ impl IcebergCatalogProvider { /// attempts to create a schema provider for each namespace, and /// collects these providers into a `HashMap`. pub async fn try_new(client: Arc) -> Result { + Self::try_new_with_runtime(client, None).await + } + + /// Like [`Self::try_new`], propagating `runtime` to all child providers. + pub async fn try_new_with_runtime( + client: Arc, + runtime: Option, + ) -> Result { // TODO: // Schemas and providers should be cached and evicted based on time // As of right now; schemas might become stale. @@ -61,9 +69,10 @@ impl IcebergCatalogProvider { schema_names .iter() .map(|name| { - IcebergSchemaProvider::try_new( + IcebergSchemaProvider::try_new_with_runtime( client.clone(), NamespaceIdent::new(name.clone()), + runtime.clone(), ) }) .collect::>(), diff --git a/crates/integrations/datafusion/src/schema.rs b/crates/integrations/datafusion/src/schema.rs index 508aeb303b..fb24af3eef 100644 --- a/crates/integrations/datafusion/src/schema.rs +++ b/crates/integrations/datafusion/src/schema.rs @@ -29,7 +29,9 @@ use futures::StreamExt; use futures::future::try_join_all; use iceberg::arrow::arrow_schema_to_schema_auto_assign_ids; use iceberg::inspect::MetadataTableType; -use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableCreation, TableIdent}; +use iceberg::{ + Catalog, Error, ErrorKind, NamespaceIdent, Result, Runtime, TableCreation, TableIdent, +}; use crate::table::IcebergTableProvider; use crate::to_datafusion_error; @@ -47,6 +49,8 @@ pub(crate) struct IcebergSchemaProvider { /// [`TableProvider`] trait. /// Wrapped in Arc to allow sharing across async boundaries in register_table. tables: Arc>>, + /// Propagated to every [`IcebergTableProvider`] created by this provider. + runtime: Option, } impl IcebergSchemaProvider { @@ -54,12 +58,12 @@ impl IcebergSchemaProvider { /// using the given client to fetch and initialize table providers for /// the provided namespace in the Iceberg [`Catalog`]. /// - /// This method retrieves a list of table names - /// attempts to create a table provider for each table name, and - /// collects these providers into a `HashMap`. - pub(crate) async fn try_new( + /// `runtime` is propagated to every [`IcebergTableProvider`] created by + /// this schema provider. + pub(crate) async fn try_new_with_runtime( client: Arc, namespace: NamespaceIdent, + runtime: Option, ) -> Result { // TODO: // Tables and providers should be cached based on table_name @@ -75,7 +79,14 @@ impl IcebergSchemaProvider { let providers = try_join_all( table_names .iter() - .map(|name| IcebergTableProvider::try_new(client.clone(), namespace.clone(), name)) + .map(|name| { + IcebergTableProvider::try_new_with_runtime( + client.clone(), + namespace.clone(), + name, + runtime.clone(), + ) + }) .collect::>(), ) .await?; @@ -89,6 +100,7 @@ impl IcebergSchemaProvider { catalog: client, namespace, tables, + runtime, }) } } @@ -173,6 +185,7 @@ impl SchemaProvider for IcebergSchemaProvider { let namespace = self.namespace.clone(); let tables = self.tables.clone(); let name_clone = name.clone(); + let runtime = self.runtime.clone(); // Use tokio's spawn_blocking to handle the async work on a blocking thread pool let result = tokio::task::spawn_blocking(move || { @@ -190,10 +203,11 @@ impl SchemaProvider for IcebergSchemaProvider { .map_err(to_datafusion_error)?; // Create a new table provider using the catalog reference - let table_provider = IcebergTableProvider::try_new( + let table_provider = IcebergTableProvider::try_new_with_runtime( catalog.clone(), namespace.clone(), name_clone.clone(), + runtime, ) .await .map_err(to_datafusion_error)?; @@ -315,7 +329,7 @@ mod tests { .await .unwrap(); - let provider = IcebergSchemaProvider::try_new(Arc::new(catalog), namespace) + let provider = IcebergSchemaProvider::try_new_with_runtime(Arc::new(catalog), namespace, None) .await .unwrap(); diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 873217ff82..08d7dfc29e 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -51,7 +51,7 @@ use iceberg::inspect::MetadataTableType; use iceberg::scan::FileScanTask; use iceberg::spec::TableProperties; use iceberg::table::Table; -use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent}; +use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, Runtime, TableIdent}; use metadata_table::IcebergMetadataTableProvider; use crate::error::to_datafusion_error; @@ -71,6 +71,10 @@ use crate::physical_plan::write::IcebergWriteExec; /// /// For read-only access to a specific snapshot without catalog overhead, use /// [`IcebergStaticTableProvider`] instead. +/// +/// When using a CPU/IO split runtime, pass a [`Runtime`] via +/// [`Self::try_new_with_runtime`] so that catalog IO is dispatched to the IO +/// runtime rather than running on the caller's runtime. #[derive(Debug, Clone)] pub struct IcebergTableProvider { /// The catalog that manages this table @@ -79,6 +83,8 @@ pub struct IcebergTableProvider { table_ident: TableIdent, /// A reference-counted arrow `Schema` (cached at construction) schema: ArrowSchemaRef, + /// When `Some`, IO in `scan` and `insert_into` is spawned on `runtime.io()`. + runtime: Option, } impl IcebergTableProvider { @@ -90,6 +96,17 @@ impl IcebergTableProvider { catalog: Arc, namespace: NamespaceIdent, name: impl Into, + ) -> Result { + Self::try_new_with_runtime(catalog, namespace, name, None).await + } + + /// Like [`Self::try_new`], but routes catalog IO in `scan` and `insert_into` + /// through `runtime.io()` instead of running inline on the caller's runtime. + pub async fn try_new_with_runtime( + catalog: Arc, + namespace: NamespaceIdent, + name: impl Into, + runtime: Option, ) -> Result { let table_ident = TableIdent::new(namespace, name.into()); @@ -100,9 +117,25 @@ impl IcebergTableProvider { catalog, table_ident, schema, + runtime, }) } + /// Runs `fut` on `self.runtime.io()` if set, otherwise runs it inline. + async fn run_on_io(&self, fut: F) -> DFResult + where + F: std::future::Future> + Send + 'static, + T: Send + 'static, + { + match &self.runtime { + Some(rt) => match rt.io().spawn(fut).await { + Ok(inner) => inner, + Err(e) => Err(to_datafusion_error(e)), + }, + None => fut.await, + } + } + pub(crate) async fn metadata_table( &self, r#type: MetadataTableType, @@ -133,43 +166,58 @@ impl TableProvider for IcebergTableProvider { filters: &[Expr], limit: Option, ) -> DFResult> { - // Second load: fetch the latest snapshot so scans always reflect current table state. - let table = self - .catalog - .load_table(&self.table_ident) - .await - .map_err(to_datafusion_error)?; - - // Build a TableScan mirroring the inputs we'll hand to IcebergTableScan, - // so plan_files() uses the same projection/filters the scan will replay in execute(). - let col_names = projection.map(|indices| { - indices - .iter() - .map(|&i| self.schema.field(i).name().clone()) - .collect::>() - }); - + // Compute the predicate on the caller's runtime (pure CPU, no IO). let predicate = convert_filters_to_predicate(filters); - let mut builder = table.scan(); - builder = match col_names { - Some(names) => builder.select(names), - None => builder.select_all(), - }; - if let Some(pred) = predicate { - builder = builder.with_filter(pred); - } - - let tasks: Vec = builder - .build() - .map_err(to_datafusion_error)? - .plan_files() - .await - .map_err(to_datafusion_error)? - .try_collect::>() - .await - .map_err(to_datafusion_error)?; + // Capture everything the IO closure needs; Session is not Send. + let target_partitions = state.config().target_partitions(); + let projection_owned: Option> = projection.cloned(); + let catalog = self.catalog.clone(); + let table_ident = self.table_ident.clone(); + let arrow_schema = self.schema.clone(); + + // ── IO-bound: reload table + plan files ────────────────────────────── + // Spawned on `self.runtime.io()` when configured, otherwise runs inline. + let (table, tasks) = self + .run_on_io(async move { + // Second load: fetch the latest snapshot so scans always reflect + // current table state. + let table = catalog + .load_table(&table_ident) + .await + .map_err(to_datafusion_error)?; + + let col_names = projection_owned.as_ref().map(|indices| { + indices + .iter() + .map(|&i| arrow_schema.field(i).name().clone()) + .collect::>() + }); + + let mut builder = table.scan(); + builder = match col_names { + Some(names) => builder.select(names), + None => builder.select_all(), + }; + if let Some(pred) = predicate { + builder = builder.with_filter(pred); + } + + let tasks: Vec = builder + .build() + .map_err(to_datafusion_error)? + .plan_files() + .await + .map_err(to_datafusion_error)? + .try_collect::>() + .await + .map_err(to_datafusion_error)?; + + DFResult::Ok((table, tasks)) + }) + .await?; + // ── CPU-bound: schema projection + bucketing ────────────────────────── // Output schema after projection: column indices in `Hash` exprs and any // Arrow array we hash must reference this schema, not the full table schema. let output_schema = match projection { @@ -179,7 +227,6 @@ impl TableProvider for IcebergTableProvider { })?), }; - let target_partitions = state.config().target_partitions(); // Always produce at least 1 partition so that DataFusion can schedule // the plan normally and callers can safely call execute(0). An empty // bucket simply yields an empty record-batch stream. @@ -232,11 +279,16 @@ impl TableProvider for IcebergTableProvider { input: Arc, _insert_op: InsertOp, ) -> DFResult> { + let catalog = self.catalog.clone(); + let table_ident = self.table_ident.clone(); let table = self - .catalog - .load_table(&self.table_ident) - .await - .map_err(to_datafusion_error)?; + .run_on_io(async move { + catalog + .load_table(&table_ident) + .await + .map_err(to_datafusion_error) + }) + .await?; let partition_spec = table.metadata().default_partition_spec();