Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions crates/integrations/datafusion/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::sync::Arc;

use datafusion::catalog::{CatalogProvider, SchemaProvider};
use futures::future::try_join_all;
use iceberg::{Catalog, NamespaceIdent, Result};
use iceberg::{Catalog, NamespaceIdent, Result, Runtime};

use crate::schema::IcebergSchemaProvider;

Expand All @@ -47,6 +47,14 @@ impl IcebergCatalogProvider {
/// attempts to create a schema provider for each namespace, and
/// collects these providers into a `HashMap`.
pub async fn try_new(client: Arc<dyn Catalog>) -> Result<Self> {
Self::try_new_with_runtime(client, None).await
}

/// Like [`Self::try_new`], propagating `runtime` to all child providers.
pub async fn try_new_with_runtime(
client: Arc<dyn Catalog>,
runtime: Option<Runtime>,
) -> Result<Self> {
// TODO:
// Schemas and providers should be cached and evicted based on time
// As of right now; schemas might become stale.
Expand All @@ -61,9 +69,10 @@ impl IcebergCatalogProvider {
schema_names
.iter()
.map(|name| {
IcebergSchemaProvider::try_new(
IcebergSchemaProvider::try_new_with_runtime(
client.clone(),
NamespaceIdent::new(name.clone()),
runtime.clone(),
)
})
.collect::<Vec<_>>(),
Expand Down
30 changes: 22 additions & 8 deletions crates/integrations/datafusion/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ use futures::StreamExt;
use futures::future::try_join_all;
use iceberg::arrow::arrow_schema_to_schema_auto_assign_ids;
use iceberg::inspect::MetadataTableType;
use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableCreation, TableIdent};
use iceberg::{
Catalog, Error, ErrorKind, NamespaceIdent, Result, Runtime, TableCreation, TableIdent,
};

use crate::table::IcebergTableProvider;
use crate::to_datafusion_error;
Expand All @@ -47,19 +49,21 @@ pub(crate) struct IcebergSchemaProvider {
/// [`TableProvider`] trait.
/// Wrapped in Arc to allow sharing across async boundaries in register_table.
tables: Arc<DashMap<String, Arc<IcebergTableProvider>>>,
/// Propagated to every [`IcebergTableProvider`] created by this provider.
runtime: Option<Runtime>,
}

impl IcebergSchemaProvider {
/// Asynchronously tries to construct a new [`IcebergSchemaProvider`]
/// using the given client to fetch and initialize table providers for
/// the provided namespace in the Iceberg [`Catalog`].
///
/// This method retrieves a list of table names
/// attempts to create a table provider for each table name, and
/// collects these providers into a `HashMap`.
pub(crate) async fn try_new(
/// `runtime` is propagated to every [`IcebergTableProvider`] created by
/// this schema provider.
pub(crate) async fn try_new_with_runtime(
client: Arc<dyn Catalog>,
namespace: NamespaceIdent,
runtime: Option<Runtime>,
) -> Result<Self> {
// TODO:
// Tables and providers should be cached based on table_name
Expand All @@ -75,7 +79,14 @@ impl IcebergSchemaProvider {
let providers = try_join_all(
table_names
.iter()
.map(|name| IcebergTableProvider::try_new(client.clone(), namespace.clone(), name))
.map(|name| {
IcebergTableProvider::try_new_with_runtime(
client.clone(),
namespace.clone(),
name,
runtime.clone(),
)
})
.collect::<Vec<_>>(),
)
.await?;
Expand All @@ -89,6 +100,7 @@ impl IcebergSchemaProvider {
catalog: client,
namespace,
tables,
runtime,
})
}
}
Expand Down Expand Up @@ -173,6 +185,7 @@ impl SchemaProvider for IcebergSchemaProvider {
let namespace = self.namespace.clone();
let tables = self.tables.clone();
let name_clone = name.clone();
let runtime = self.runtime.clone();

// Use tokio's spawn_blocking to handle the async work on a blocking thread pool
let result = tokio::task::spawn_blocking(move || {
Expand All @@ -190,10 +203,11 @@ impl SchemaProvider for IcebergSchemaProvider {
.map_err(to_datafusion_error)?;

// Create a new table provider using the catalog reference
let table_provider = IcebergTableProvider::try_new(
let table_provider = IcebergTableProvider::try_new_with_runtime(
catalog.clone(),
namespace.clone(),
name_clone.clone(),
runtime,
)
.await
.map_err(to_datafusion_error)?;
Expand Down Expand Up @@ -315,7 +329,7 @@ mod tests {
.await
.unwrap();

let provider = IcebergSchemaProvider::try_new(Arc::new(catalog), namespace)
let provider = IcebergSchemaProvider::try_new_with_runtime(Arc::new(catalog), namespace, None)
.await
.unwrap();

Expand Down
132 changes: 92 additions & 40 deletions crates/integrations/datafusion/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use iceberg::inspect::MetadataTableType;
use iceberg::scan::FileScanTask;
use iceberg::spec::TableProperties;
use iceberg::table::Table;
use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent};
use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, Runtime, TableIdent};
use metadata_table::IcebergMetadataTableProvider;

use crate::error::to_datafusion_error;
Expand All @@ -71,6 +71,10 @@ use crate::physical_plan::write::IcebergWriteExec;
///
/// For read-only access to a specific snapshot without catalog overhead, use
/// [`IcebergStaticTableProvider`] instead.
///
/// When using a CPU/IO split runtime, pass a [`Runtime`] via
/// [`Self::try_new_with_runtime`] so that catalog IO is dispatched to the IO
/// runtime rather than running on the caller's runtime.
#[derive(Debug, Clone)]
pub struct IcebergTableProvider {
/// The catalog that manages this table
Expand All @@ -79,6 +83,8 @@ pub struct IcebergTableProvider {
table_ident: TableIdent,
/// A reference-counted arrow `Schema` (cached at construction)
schema: ArrowSchemaRef,
/// When `Some`, IO in `scan` and `insert_into` is spawned on `runtime.io()`.
runtime: Option<Runtime>,
}

impl IcebergTableProvider {
Expand All @@ -90,6 +96,17 @@ impl IcebergTableProvider {
catalog: Arc<dyn Catalog>,
namespace: NamespaceIdent,
name: impl Into<String>,
) -> Result<Self> {
Self::try_new_with_runtime(catalog, namespace, name, None).await
}

/// Like [`Self::try_new`], but routes catalog IO in `scan` and `insert_into`
/// through `runtime.io()` instead of running inline on the caller's runtime.
pub async fn try_new_with_runtime(
catalog: Arc<dyn Catalog>,
namespace: NamespaceIdent,
name: impl Into<String>,
runtime: Option<Runtime>,
) -> Result<Self> {
let table_ident = TableIdent::new(namespace, name.into());

Expand All @@ -100,9 +117,25 @@ impl IcebergTableProvider {
catalog,
table_ident,
schema,
runtime,
})
}

/// Runs `fut` on `self.runtime.io()` if set, otherwise runs it inline.
async fn run_on_io<F, T>(&self, fut: F) -> DFResult<T>
where
F: std::future::Future<Output = DFResult<T>> + Send + 'static,
T: Send + 'static,
{
match &self.runtime {
Some(rt) => match rt.io().spawn(fut).await {
Ok(inner) => inner,
Err(e) => Err(to_datafusion_error(e)),
},
None => fut.await,
}
}

pub(crate) async fn metadata_table(
&self,
r#type: MetadataTableType,
Expand Down Expand Up @@ -133,43 +166,58 @@ impl TableProvider for IcebergTableProvider {
filters: &[Expr],
limit: Option<usize>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
// Second load: fetch the latest snapshot so scans always reflect current table state.
let table = self
.catalog
.load_table(&self.table_ident)
.await
.map_err(to_datafusion_error)?;

// Build a TableScan mirroring the inputs we'll hand to IcebergTableScan,
// so plan_files() uses the same projection/filters the scan will replay in execute().
let col_names = projection.map(|indices| {
indices
.iter()
.map(|&i| self.schema.field(i).name().clone())
.collect::<Vec<_>>()
});

// Compute the predicate on the caller's runtime (pure CPU, no IO).
let predicate = convert_filters_to_predicate(filters);

let mut builder = table.scan();
builder = match col_names {
Some(names) => builder.select(names),
None => builder.select_all(),
};
if let Some(pred) = predicate {
builder = builder.with_filter(pred);
}

let tasks: Vec<FileScanTask> = builder
.build()
.map_err(to_datafusion_error)?
.plan_files()
.await
.map_err(to_datafusion_error)?
.try_collect::<Vec<_>>()
.await
.map_err(to_datafusion_error)?;
// Capture everything the IO closure needs; Session is not Send.
let target_partitions = state.config().target_partitions();
let projection_owned: Option<Vec<usize>> = projection.cloned();
let catalog = self.catalog.clone();
let table_ident = self.table_ident.clone();
let arrow_schema = self.schema.clone();

// ── IO-bound: reload table + plan files ──────────────────────────────
// Spawned on `self.runtime.io()` when configured, otherwise runs inline.
let (table, tasks) = self
.run_on_io(async move {
// Second load: fetch the latest snapshot so scans always reflect
// current table state.
let table = catalog
.load_table(&table_ident)
.await
.map_err(to_datafusion_error)?;

let col_names = projection_owned.as_ref().map(|indices| {
indices
.iter()
.map(|&i| arrow_schema.field(i).name().clone())
.collect::<Vec<_>>()
});

let mut builder = table.scan();
builder = match col_names {
Some(names) => builder.select(names),
None => builder.select_all(),
};
if let Some(pred) = predicate {
builder = builder.with_filter(pred);
}

let tasks: Vec<FileScanTask> = builder
.build()
.map_err(to_datafusion_error)?
.plan_files()
.await
.map_err(to_datafusion_error)?
.try_collect::<Vec<_>>()
.await
.map_err(to_datafusion_error)?;

DFResult::Ok((table, tasks))
})
.await?;

// ── CPU-bound: schema projection + bucketing ──────────────────────────
// Output schema after projection: column indices in `Hash` exprs and any
// Arrow array we hash must reference this schema, not the full table schema.
let output_schema = match projection {
Expand All @@ -179,7 +227,6 @@ impl TableProvider for IcebergTableProvider {
})?),
};

let target_partitions = state.config().target_partitions();
// Always produce at least 1 partition so that DataFusion can schedule
// the plan normally and callers can safely call execute(0). An empty
// bucket simply yields an empty record-batch stream.
Expand Down Expand Up @@ -232,11 +279,16 @@ impl TableProvider for IcebergTableProvider {
input: Arc<dyn ExecutionPlan>,
_insert_op: InsertOp,
) -> DFResult<Arc<dyn ExecutionPlan>> {
let catalog = self.catalog.clone();
let table_ident = self.table_ident.clone();
let table = self
.catalog
.load_table(&self.table_ident)
.await
.map_err(to_datafusion_error)?;
.run_on_io(async move {
catalog
.load_table(&table_ident)
.await
.map_err(to_datafusion_error)
})
.await?;

let partition_spec = table.metadata().default_partition_spec();

Expand Down