diff --git a/crates/integrations/datafusion/src/table/bucketing.rs b/crates/integrations/datafusion/src/table/bucketing.rs index 55ab23b6fd..272a6ed0e7 100644 --- a/crates/integrations/datafusion/src/table/bucketing.rs +++ b/crates/integrations/datafusion/src/table/bucketing.rs @@ -23,6 +23,8 @@ use datafusion::arrow::array::{ }; use datafusion::arrow::datatypes::{DataType, Schema as ArrowSchema}; use datafusion::common::hash_utils::create_hashes; +use datafusion::physical_expr::PhysicalExpr; +use datafusion::physical_expr::expressions::Column; use datafusion::physical_plan::repartition::REPARTITION_RANDOM_STATE; use iceberg::scan::FileScanTask; use iceberg::spec::{Literal, PrimitiveLiteral, Transform}; @@ -96,34 +98,186 @@ fn is_supported_dtype(dt: &DataType) -> bool { ) } -/// Distribute `tasks` across `n_partitions` buckets. When `identity_cols` -/// describes a non-empty, hashable identity key, each task is hashed on -/// that key using DataFusion's repartition hash so the resulting partitioning -/// matches what `RepartitionExec` would produce on the same data. Tasks -/// missing partition data fall back to hashing `data_file_path`, which still -/// distributes evenly but breaks the `Hash` contract — the second tuple -/// element flags whether every task supplied a full identity key. +/// Spec field with `Transform::Bucket(_)`. The source column must be in the +/// output projection so we can reference it via `Column` in `Partitioning::Hash`. +/// We don't need the Arrow type because the partition tuple slot for a bucket +/// transform is always `Int32` (the spec-defined `result_type`). +pub(super) struct BucketCol { + pub(super) name: String, + /// Position of this column in the *output* schema (after projection). + pub(super) output_idx: usize, + /// Position of this column inside the partition spec's `fields()` slice, + /// matching the slot order of `FileScanTask::partition`. + pub(super) spec_field_idx: usize, + /// `N` parameter of the `Transform::Bucket(N)`. Used by [`bucket_tasks`] + /// to linearise multi-column bucket tuples deterministically before + /// projecting them on `n_partitions` via modulo. + pub(super) bucket_n: u32, +} + +/// Inspect the table's default partition spec and return the bucket columns +/// usable for a [`Partitioning::Hash`] declaration. The spec must be +/// *purely* bucketed (every field is a `Transform::Bucket(_)`); only the +/// fields whose source column is present in the output projection are +/// retained. Returns `None` on mixed transforms, spec evolution, empty +/// spec, or when no bucket source column survives the projection. +/// +/// Why a strict subset is still correct: `bucket[N]` is deterministic on +/// the source value, so file-level co-location on any subset of bucket +/// dimensions implies row-level co-location on that same subset. The +/// positional linearisation in [`bucket_linear_index`] iterates over the +/// retained `cols` only, indexing into `task.partition` via +/// `spec_field_idx`, so dropping unused dimensions changes which slots are +/// read but preserves "same key tuple → same partition". +/// +/// This deliberately rejects mixed identity+bucket specs: those are handled +/// by [`compute_identity_cols`] which retains only the identity fields. +pub(super) fn compute_bucket_cols( + table: &Table, + output_schema: &ArrowSchema, +) -> Option> { + let metadata = table.metadata(); + if metadata.partition_specs_iter().len() > 1 { + return None; + } + let spec = metadata.default_partition_spec(); + let fields = spec.fields(); + if fields.is_empty() { + return None; + } + let table_schema = metadata.current_schema(); + + let mut cols = Vec::with_capacity(fields.len()); + for (spec_field_idx, pf) in fields.iter().enumerate() { + let bucket_n = match pf.transform { + Transform::Bucket(n) => n, + _ => return None, + }; + let source_field = table_schema.field_by_id(pf.source_id)?; + let Ok(output_idx) = output_schema.index_of(source_field.name.as_str()) else { + continue; + }; + cols.push(BucketCol { + name: source_field.name.clone(), + output_idx, + spec_field_idx, + bucket_n, + }); + } + if cols.is_empty() { + return None; + } + Some(cols) +} + +/// Single-entry partition-key descriptor used by [`bucket_tasks`] and +/// `IcebergTableProvider::scan` to drive both task distribution and the +/// `Partitioning::Hash` declaration. +pub(super) enum PartitionKeys { + Identity(Vec), + Bucket(Vec), +} + +impl PartitionKeys { + /// `Column` exprs (one per key column) referencing the *output* schema, + /// suitable for `Partitioning::Hash`. + pub(super) fn column_exprs(&self) -> Vec> { + match self { + PartitionKeys::Identity(cols) => cols + .iter() + .map(|c| Arc::new(Column::new(&c.name, c.output_idx)) as Arc) + .collect(), + PartitionKeys::Bucket(cols) => cols + .iter() + .map(|c| Arc::new(Column::new(&c.name, c.output_idx)) as Arc) + .collect(), + } + } +} + +/// Try identity detection first (preserves the existing behaviour, including +/// extracting identity-only keys from mixed identity+bucket specs). If no +/// identity columns exist, fall back to *pure* bucket detection. +/// +/// Why declaring `Hash` is correct for a pure-bucket spec even though the +/// hash function differs from DataFusion's: DataFusion checks +/// `Partitioning::Hash` against `Distribution::HashPartitioned` purely by +/// expression equality, not by the underlying hash function (see +/// `datafusion_physical_expr::Partitioning::satisfaction`). The contract to +/// honour is "rows with the same key tuple end up in the same partition", +/// which Iceberg `bucket[N]` already guarantees at the file level (same +/// source value implies same bucket index, hence same files). Our task +/// distribution preserves that property at the partition level by sending +/// every unique bucket index (or bucket-index tuple, for multi-column +/// specs) to a single DataFusion partition. +/// +/// Crucially, this property does **not** require rehashing the bucket +/// index: co-location is already a consequence of `bucket[N]` being +/// deterministic on the source value. The scan therefore distributes tasks +/// by linearising the bucket-index tuple and taking `% n_partitions` +/// directly, which avoids birthday-paradox collisions in the common +/// `N ≈ n_partitions` regime. +pub(super) fn compute_partition_keys( + table: &Table, + output_schema: &ArrowSchema, +) -> Option { + if let Some(cols) = compute_identity_cols(table, output_schema) + && !cols.is_empty() + { + return Some(PartitionKeys::Identity(cols)); + } + compute_bucket_cols(table, output_schema).map(PartitionKeys::Bucket) +} + +/// Distribute `tasks` across `n_partitions` slots. Two regimes coexist, +/// driven by `keys`: +/// +/// * `PartitionKeys::Identity` — each task is hashed on its identity +/// partition values using [`REPARTITION_RANDOM_STATE`] and projected by +/// `% n_partitions`. This is strictly aligned with what a downstream +/// `RepartitionExec(Hash([id_col], n))` would compute, so the +/// `Partitioning::Hash` annotation produced by `scan()` is honest at the +/// row level and the planner can elide such a repartition. +/// +/// * `PartitionKeys::Bucket` — each task is placed via positional +/// linearisation of its bucket-index tuple, then `% n_partitions`. No +/// hash is applied to the indices: Iceberg's `bucket[N]` is itself +/// deterministic on the source value, so co-location of same-key rows +/// is already guaranteed at the file level. Rehashing the indices would +/// only add a uniform-spread layer that, in the common +/// `N_total ≈ n_partitions` regime, produces birthday-paradox empty +/// partitions. The deterministic modulo distributes the `N_1 * … * N_k` +/// logical buckets evenly with a worst-case skew bounded to ±1 task. +/// +/// Tasks missing partition data fall back to hashing `data_file_path`; the +/// boolean returned alongside the buckets flags whether every task could +/// supply a full key. A `false` flag forces the caller to drop to +/// `UnknownPartitioning`. pub(super) fn bucket_tasks( tasks: Vec, n_partitions: usize, - identity_cols: Option<&[IdentityCol]>, + keys: Option<&PartitionKeys>, ) -> (Vec>, bool) { if n_partitions == 0 { return (Vec::new(), tasks.is_empty()); } let mut buckets: Vec> = (0..n_partitions).map(|_| Vec::new()).collect(); let mut all_full_key = true; - let cols = identity_cols.unwrap_or(&[]); for task in tasks { - let bucket_idx = match identity_hash(&task, cols) { - Some(h) => (h % n_partitions as u64) as usize, - None => { - all_full_key = false; - fallback_hash(&task) as usize % n_partitions + let dest = match keys { + Some(PartitionKeys::Identity(cols)) => { + identity_hash(&task, cols).map(|h| (h % n_partitions as u64) as usize) } + Some(PartitionKeys::Bucket(cols)) => bucket_linear_index(&task, cols) + .map(|linear| (linear % n_partitions as u64) as usize), + None => None, }; - buckets[bucket_idx].push(task); + let dest = dest.unwrap_or_else(|| { + all_full_key = false; + fallback_hash(&task) as usize % n_partitions + }); + buckets[dest].push(task); } (buckets, all_full_key) } @@ -152,6 +306,36 @@ fn identity_hash(task: &FileScanTask, cols: &[IdentityCol]) -> Option { Some(hashes[0]) } +/// Linearise the bucket-index tuple of `task` into a single `u64` using a +/// positional encoding `linear = ((idx_1 * N_2 * … * N_k) + … + idx_k)`. +/// The slot for a `Transform::Bucket(_)` field is always `Int32` per the +/// Iceberg spec; a missing slot or non-`Int` literal returns `None`, +/// driving the caller's `all_full_key` flag to `false`. +/// +/// For a single-column spec the formula reduces to `linear = idx_1`, so +/// `dest = idx_1 % n_partitions` — the natural Iceberg distribution. +fn bucket_linear_index(task: &FileScanTask, cols: &[BucketCol]) -> Option { + if cols.is_empty() { + return None; + } + let partition = task.partition.as_ref()?; + let mut linear: u64 = 0; + for col in cols { + let lit = partition.fields().get(col.spec_field_idx)?.as_ref()?; + let idx = match lit { + Literal::Primitive(PrimitiveLiteral::Int(v)) => *v, + _ => return None, + }; + // `idx` is non-negative per the Iceberg spec (result of + // `bucket[N]` is in `[0, N)`); cast through `u32` first to avoid + // sign extension if the file accidentally carries a negative slot. + linear = linear + .wrapping_mul(col.bucket_n as u64) + .wrapping_add(idx as u32 as u64); + } + Some(linear) +} + /// Deterministic per-file fallback used when `identity_hash` cannot produce a /// bucket. The hash function does not need to match DataFusion's because any /// task taking this path causes the scan to drop to `UnknownPartitioning`. diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 08d7dfc29e..60525b56b4 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -41,8 +41,6 @@ use datafusion::datasource::{TableProvider, TableType}; use datafusion::error::Result as DFResult; use datafusion::logical_expr::dml::InsertOp; use datafusion::logical_expr::{Expr, TableProviderFilterPushDown}; -use datafusion::physical_expr::PhysicalExpr; -use datafusion::physical_expr::expressions::Column; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::{ExecutionPlan, Partitioning}; use futures::TryStreamExt; @@ -232,23 +230,20 @@ impl TableProvider for IcebergTableProvider { // bucket simply yields an empty record-batch stream. let n_partitions = target_partitions.min(tasks.len()).max(1); - // identity_cols is Some(non-empty) iff every condition for declaring - // Partitioning::Hash is met: the table's default spec has identity-transform - // fields, every such source column is present in the output projection, and - // every column type is supported by literal_to_array. Any miss collapses to - // None, which forces UnknownPartitioning regardless of bucketing strategy. - let identity_cols = bucketing::compute_identity_cols(&table, &output_schema); + // `keys` is `Some` iff the table's default spec is hash-declarable: + // either pure-identity (or mixed identity+bucket, in which case only + // the identity columns become the key) or pure-bucket. Any other + // shape (spec evolution, missing source column, mixed bucket+other + // transform, unsupported identity dtype) collapses to `None`, which + // forces `UnknownPartitioning` regardless of bucketing strategy. + let keys = bucketing::compute_partition_keys(&table, &output_schema); let (buckets, all_had_full_key) = - bucketing::bucket_tasks(tasks, n_partitions, identity_cols.as_deref()); - - let partitioning = match identity_cols { - Some(cols) if !cols.is_empty() && all_had_full_key && n_partitions > 0 => { - let exprs: Vec> = cols - .iter() - .map(|c| Arc::new(Column::new(&c.name, c.output_idx)) as Arc) - .collect(); - Partitioning::Hash(exprs, n_partitions) + bucketing::bucket_tasks(tasks, n_partitions, keys.as_ref()); + + let partitioning = match &keys { + Some(keys) if all_had_full_key && n_partitions > 0 => { + Partitioning::Hash(keys.column_exprs(), n_partitions) } _ => Partitioning::UnknownPartitioning(n_partitions), }; @@ -1363,4 +1358,896 @@ mod tests { Partitioning::UnknownPartitioning(_) )); } + + // ── Bucket-transform partitioning tests ───────────────────────────────── + + /// Build a table partitioned by `bucket[N](col)`. `transform` lets callers + /// also build mixed-spec tables for negative tests. + async fn make_bucket_partitioned_catalog_and_table_for_bucketing( + n_buckets: u32, + ) -> (Arc, NamespaceIdent, String, tempfile::TempDir) { + use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; + use iceberg::spec::{ + NestedField, PrimitiveType, Schema, Transform, Type, UnboundPartitionSpec, + }; + use iceberg::{CatalogBuilder, TableCreation}; + + let temp_dir = tempfile::TempDir::new().unwrap(); + let warehouse = temp_dir.path().to_str().unwrap().to_string(); + + let catalog = Arc::new( + MemoryCatalogBuilder::default() + .load( + "memory", + std::collections::HashMap::from([( + MEMORY_CATALOG_WAREHOUSE.to_string(), + warehouse.clone(), + )]), + ) + .await + .unwrap(), + ); + + let namespace = NamespaceIdent::new("ns".to_string()); + catalog + .create_namespace(&namespace, std::collections::HashMap::new()) + .await + .unwrap(); + + let schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(); + + let partition_spec = UnboundPartitionSpec::builder() + .with_spec_id(0) + .add_partition_field(2, "name_bucket", Transform::Bucket(n_buckets)) + .unwrap() + .build(); + + catalog + .create_table( + &namespace, + TableCreation::builder() + .name("t".to_string()) + .location(format!("{warehouse}/t")) + .schema(schema) + .partition_spec(partition_spec) + .properties(std::collections::HashMap::new()) + .build(), + ) + .await + .unwrap(); + + (catalog, namespace, "t".to_string(), temp_dir) + } + + /// Append synthetic data files whose partition slot carries a bucket + /// *index* (an `i32`), matching what Iceberg writes for `Transform::Bucket`. + async fn append_bucket_partitioned_fake_data_files( + catalog: &Arc, + namespace: &NamespaceIdent, + table_name: &str, + bucket_indices: Vec>, + ) { + use iceberg::spec::{DataContentType, DataFileBuilder, DataFileFormat, Literal, Struct}; + use iceberg::transaction::{ApplyTransactionAction, Transaction}; + + let table = catalog + .load_table(&TableIdent::new(namespace.clone(), table_name.to_string())) + .await + .unwrap(); + + let data_files = bucket_indices + .iter() + .enumerate() + .map(|(i, idx)| { + DataFileBuilder::default() + .content(DataContentType::Data) + .file_path(format!( + "{}/data/fake_{i}.parquet", + table.metadata().location() + )) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(128) + .record_count(1) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter(vec![idx.map(Literal::int)])) + .build() + .unwrap() + }) + .collect::>(); + + let tx = Transaction::new(&table); + let action = tx.fast_append().add_data_files(data_files); + action + .apply(tx) + .unwrap() + .commit(catalog.as_ref()) + .await + .unwrap(); + } + + /// Pure `Bucket[N]` spec with the source column in the projection: scan + /// must declare `Partitioning::Hash` referencing that source column. + #[tokio::test] + async fn test_bucket_partitioned_declares_hash() { + use datafusion::physical_expr::expressions::Column; + use datafusion::physical_plan::Partitioning; + + let (catalog, namespace, table_name, _temp_dir) = + make_bucket_partitioned_catalog_and_table_for_bucketing(8).await; + append_bucket_partitioned_fake_data_files(&catalog, &namespace, &table_name, vec![ + Some(0), + Some(1), + Some(2), + Some(0), + Some(7), + Some(7), + ]) + .await; + + let provider = IcebergTableProvider::try_new(catalog, namespace, table_name) + .await + .unwrap(); + let plan = provider + .scan(&ctx_with_target_partitions(4).state(), None, &[], None) + .await + .unwrap(); + let scan = plan.as_any().downcast_ref::().unwrap(); + + let total_files: usize = scan.buckets().iter().map(|b| b.len()).sum(); + assert_eq!(total_files, 6); + + match &scan.properties().partitioning { + Partitioning::Hash(exprs, n) => { + assert_eq!(*n, 4); + assert_eq!(exprs.len(), 1); + let col = exprs[0] + .as_any() + .downcast_ref::() + .expect("expected Column expr"); + assert_eq!(col.name(), "name"); + } + other => panic!("expected Partitioning::Hash, got {other:?}"), + } + } + + /// Single-column bucket spec where the projection excludes the *only* + /// bucket source column: after filtering, `compute_bucket_cols` has zero + /// surviving columns and returns `None`. Scan must declare + /// `UnknownPartitioning`. This is the empty-intersection corner case of + /// the partial-projection logic exercised by + /// [`test_bucket_multi_column_partial_projection_declares_hash_on_subset`]. + #[tokio::test] + async fn test_bucket_projection_drops_all_bucket_sources_falls_back_to_unknown() { + use datafusion::physical_plan::Partitioning; + + let (catalog, namespace, table_name, _temp_dir) = + make_bucket_partitioned_catalog_and_table_for_bucketing(4).await; + append_bucket_partitioned_fake_data_files(&catalog, &namespace, &table_name, vec![ + Some(0), + Some(1), + ]) + .await; + + let provider = IcebergTableProvider::try_new(catalog, namespace, table_name) + .await + .unwrap(); + // Project only "id" (idx 0); the bucket source "name" (idx 1) is excluded. + let projection = vec![0_usize]; + let plan = provider + .scan( + &ctx_with_target_partitions(3).state(), + Some(&projection), + &[], + None, + ) + .await + .unwrap(); + let scan = plan.as_any().downcast_ref::().unwrap(); + + assert!(matches!( + scan.properties().partitioning, + Partitioning::UnknownPartitioning(_) + )); + } + + /// A `None` partition slot makes `bucket_hash` return `None`, so the + /// task takes the fallback path. Even a single such task forces the + /// whole scan to drop to `UnknownPartitioning`. + #[tokio::test] + async fn test_bucket_with_null_partition_value_falls_back_to_unknown() { + use datafusion::physical_plan::Partitioning; + + let (catalog, namespace, table_name, _temp_dir) = + make_bucket_partitioned_catalog_and_table_for_bucketing(4).await; + append_bucket_partitioned_fake_data_files(&catalog, &namespace, &table_name, vec![ + Some(0), + None, + Some(2), + ]) + .await; + + let provider = IcebergTableProvider::try_new(catalog, namespace, table_name) + .await + .unwrap(); + let plan = provider + .scan(&ctx_with_target_partitions(3).state(), None, &[], None) + .await + .unwrap(); + let scan = plan.as_any().downcast_ref::().unwrap(); + + assert!(matches!( + scan.properties().partitioning, + Partitioning::UnknownPartitioning(_) + )); + } + + /// Mixed `Bucket[N] + Truncate(_)` spec: `compute_bucket_cols` rejects + /// it because not every field is a bucket transform. Identity detection + /// also yields zero columns. Final declaration is `UnknownPartitioning`. + #[tokio::test] + async fn test_mixed_bucket_and_other_transform_falls_back_to_unknown() { + use datafusion::physical_plan::Partitioning; + use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; + use iceberg::spec::{ + DataContentType, DataFileBuilder, DataFileFormat, Literal, NestedField, PrimitiveType, + Schema, Struct, Transform, Type, UnboundPartitionSpec, + }; + use iceberg::transaction::{ApplyTransactionAction, Transaction}; + use iceberg::{CatalogBuilder, TableCreation}; + + let temp_dir = tempfile::TempDir::new().unwrap(); + let warehouse = temp_dir.path().to_str().unwrap().to_string(); + let catalog: Arc = Arc::new( + MemoryCatalogBuilder::default() + .load( + "memory", + std::collections::HashMap::from([( + MEMORY_CATALOG_WAREHOUSE.to_string(), + warehouse.clone(), + )]), + ) + .await + .unwrap(), + ); + let namespace = NamespaceIdent::new("ns".to_string()); + catalog + .create_namespace(&namespace, std::collections::HashMap::new()) + .await + .unwrap(); + + let schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(); + + let partition_spec = UnboundPartitionSpec::builder() + .with_spec_id(0) + .add_partition_field(2, "name_bucket", Transform::Bucket(8)) + .unwrap() + .add_partition_field(2, "name_trunc", Transform::Truncate(4)) + .unwrap() + .build(); + + catalog + .create_table( + &namespace, + TableCreation::builder() + .name("t".to_string()) + .location(format!("{warehouse}/t")) + .schema(schema) + .partition_spec(partition_spec) + .properties(std::collections::HashMap::new()) + .build(), + ) + .await + .unwrap(); + + let table = catalog + .load_table(&TableIdent::new(namespace.clone(), "t".to_string())) + .await + .unwrap(); + let data_file = DataFileBuilder::default() + .content(DataContentType::Data) + .file_path(format!("{}/data/fake.parquet", table.metadata().location())) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(128) + .record_count(1) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter(vec![ + Some(Literal::int(0)), + Some(Literal::string("aaaa")), + ])) + .build() + .unwrap(); + let tx = Transaction::new(&table); + tx.fast_append() + .add_data_files(vec![data_file]) + .apply(tx) + .unwrap() + .commit(catalog.as_ref()) + .await + .unwrap(); + + let provider = IcebergTableProvider::try_new(catalog, namespace, "t".to_string()) + .await + .unwrap(); + let plan = provider + .scan(&ctx_with_target_partitions(3).state(), None, &[], None) + .await + .unwrap(); + let scan = plan.as_any().downcast_ref::().unwrap(); + + assert!(matches!( + scan.properties().partitioning, + Partitioning::UnknownPartitioning(_) + )); + } + + /// Mixed `Identity + Bucket` spec must keep the existing behaviour: + /// only the identity column is exposed as the hash key, bucket fields + /// are ignored. Locks the V1 contract that mixed specs aren't promoted + /// to multi-key hashes. + #[tokio::test] + async fn test_mixed_identity_and_bucket_keeps_identity_only_hash() { + use datafusion::physical_expr::expressions::Column; + use datafusion::physical_plan::Partitioning; + use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; + use iceberg::spec::{ + DataContentType, DataFileBuilder, DataFileFormat, Literal, NestedField, PrimitiveType, + Schema, Struct, Transform, Type, UnboundPartitionSpec, + }; + use iceberg::transaction::{ApplyTransactionAction, Transaction}; + use iceberg::{CatalogBuilder, TableCreation}; + + let temp_dir = tempfile::TempDir::new().unwrap(); + let warehouse = temp_dir.path().to_str().unwrap().to_string(); + let catalog: Arc = Arc::new( + MemoryCatalogBuilder::default() + .load( + "memory", + std::collections::HashMap::from([( + MEMORY_CATALOG_WAREHOUSE.to_string(), + warehouse.clone(), + )]), + ) + .await + .unwrap(), + ); + let namespace = NamespaceIdent::new("ns".to_string()); + catalog + .create_namespace(&namespace, std::collections::HashMap::new()) + .await + .unwrap(); + + let schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "country", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "customer_id", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(); + let partition_spec = UnboundPartitionSpec::builder() + .with_spec_id(0) + .add_partition_field(1, "country", Transform::Identity) + .unwrap() + .add_partition_field(2, "customer_bucket", Transform::Bucket(10)) + .unwrap() + .build(); + + catalog + .create_table( + &namespace, + TableCreation::builder() + .name("t".to_string()) + .location(format!("{warehouse}/t")) + .schema(schema) + .partition_spec(partition_spec) + .properties(std::collections::HashMap::new()) + .build(), + ) + .await + .unwrap(); + + let table = catalog + .load_table(&TableIdent::new(namespace.clone(), "t".to_string())) + .await + .unwrap(); + let data_files = vec![ + (Some("us"), Some(1)), + (Some("us"), Some(2)), + (Some("fr"), Some(3)), + ] + .into_iter() + .enumerate() + .map(|(i, (country, bucket_idx))| { + DataFileBuilder::default() + .content(DataContentType::Data) + .file_path(format!( + "{}/data/fake_{i}.parquet", + table.metadata().location() + )) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(128) + .record_count(1) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter(vec![ + country.map(Literal::string), + bucket_idx.map(Literal::int), + ])) + .build() + .unwrap() + }) + .collect::>(); + let tx = Transaction::new(&table); + tx.fast_append() + .add_data_files(data_files) + .apply(tx) + .unwrap() + .commit(catalog.as_ref()) + .await + .unwrap(); + + let provider = IcebergTableProvider::try_new(catalog, namespace, "t".to_string()) + .await + .unwrap(); + let plan = provider + .scan(&ctx_with_target_partitions(3).state(), None, &[], None) + .await + .unwrap(); + let scan = plan.as_any().downcast_ref::().unwrap(); + + match &scan.properties().partitioning { + Partitioning::Hash(exprs, n) => { + assert_eq!(*n, 3); + assert_eq!( + exprs.len(), + 1, + "only the identity column should be retained" + ); + let col = exprs[0] + .as_any() + .downcast_ref::() + .expect("expected Column expr"); + assert_eq!(col.name(), "country"); + } + other => panic!("expected Partitioning::Hash, got {other:?}"), + } + } + + /// Pure `Bucket[N]` with `target_partitions == N`: tasks must land + /// *deterministically* at `bucket_idx % n_partitions = bucket_idx`, + /// so every scan partition holds exactly one file and none is empty. + /// This is the regression test for the birthday-paradox empty + /// partitions observed in the original investigation + /// (`dd-notes/iceberg-bucket-rehash-investigation.md`). + #[tokio::test] + async fn test_bucket_n_eq_target_partitions_is_balanced() { + let (catalog, namespace, table_name, _temp_dir) = + make_bucket_partitioned_catalog_and_table_for_bucketing(8).await; + // One file per bucket index 0..=7. + append_bucket_partitioned_fake_data_files( + &catalog, + &namespace, + &table_name, + (0..8).map(Some).collect(), + ) + .await; + + let provider = IcebergTableProvider::try_new(catalog, namespace, table_name) + .await + .unwrap(); + let plan = provider + .scan(&ctx_with_target_partitions(8).state(), None, &[], None) + .await + .unwrap(); + let scan = plan.as_any().downcast_ref::().unwrap(); + let buckets = scan.buckets(); + + assert_eq!(buckets.len(), 8); + for (partition_idx, files) in buckets.iter().enumerate() { + assert_eq!( + files.len(), + 1, + "partition {partition_idx} should hold exactly one file" + ); + // Each file's stored partition slot must equal its scan-partition + // index — the deterministic identity mapping when `N == n`. + let slot = files[0] + .partition + .as_ref() + .and_then(|s| s.fields().first().and_then(|f| f.clone())) + .expect("partition slot must be present"); + let stored_idx = match slot { + iceberg::spec::Literal::Primitive(iceberg::spec::PrimitiveLiteral::Int(v)) => v, + other => panic!("unexpected slot literal: {other:?}"), + }; + assert_eq!( + stored_idx as usize, partition_idx, + "bucket idx must map to its own scan partition" + ); + } + } + + /// Pure `Bucket[N]` with `target_partitions < N`: tasks land at + /// `bucket_idx % target_partitions` deterministically. + #[tokio::test] + async fn test_bucket_n_gt_target_partitions_modulo_grouping() { + let (catalog, namespace, table_name, _temp_dir) = + make_bucket_partitioned_catalog_and_table_for_bucketing(8).await; + append_bucket_partitioned_fake_data_files( + &catalog, + &namespace, + &table_name, + (0..8).map(Some).collect(), + ) + .await; + + let provider = IcebergTableProvider::try_new(catalog, namespace, table_name) + .await + .unwrap(); + let plan = provider + .scan(&ctx_with_target_partitions(3).state(), None, &[], None) + .await + .unwrap(); + let scan = plan.as_any().downcast_ref::().unwrap(); + let buckets = scan.buckets(); + + assert_eq!(buckets.len(), 3); + // Expected grouping by `idx % 3`: {0,3,6} → p0, {1,4,7} → p1, {2,5} → p2. + let expected: [&[i32]; 3] = [&[0, 3, 6], &[1, 4, 7], &[2, 5]]; + for (partition_idx, files) in buckets.iter().enumerate() { + let mut got: Vec = files + .iter() + .map(|t| { + match t + .partition + .as_ref() + .and_then(|s| s.fields().first().and_then(|f| f.clone())) + { + Some(iceberg::spec::Literal::Primitive( + iceberg::spec::PrimitiveLiteral::Int(v), + )) => v, + other => panic!("unexpected slot: {other:?}"), + } + }) + .collect(); + got.sort_unstable(); + assert_eq!( + got, + expected[partition_idx].to_vec(), + "partition {partition_idx} grouping mismatch" + ); + } + } + + /// Multi-column pure-bucket spec: tasks are placed via the positional + /// linearisation `(idx_1 * N_2 + idx_2) % n_partitions`. Spec is + /// `bucket(2, name), bucket(4, id)`, target_partitions = 4. + #[tokio::test] + async fn test_bucket_multi_column_linearised_placement() { + use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; + use iceberg::spec::{ + DataContentType, DataFileBuilder, DataFileFormat, Literal, NestedField, PrimitiveType, + Schema, Struct, Transform, Type, UnboundPartitionSpec, + }; + use iceberg::transaction::{ApplyTransactionAction, Transaction}; + use iceberg::{CatalogBuilder, TableCreation}; + + let temp_dir = tempfile::TempDir::new().unwrap(); + let warehouse = temp_dir.path().to_str().unwrap().to_string(); + let catalog: Arc = Arc::new( + MemoryCatalogBuilder::default() + .load( + "memory", + std::collections::HashMap::from([( + MEMORY_CATALOG_WAREHOUSE.to_string(), + warehouse.clone(), + )]), + ) + .await + .unwrap(), + ); + let namespace = NamespaceIdent::new("ns".to_string()); + catalog + .create_namespace(&namespace, std::collections::HashMap::new()) + .await + .unwrap(); + + let schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(); + + // First spec field buckets `name` into 2 slots, second buckets `id` + // into 4 slots. The linearisation is `idx_name * 4 + idx_id`. + let partition_spec = UnboundPartitionSpec::builder() + .with_spec_id(0) + .add_partition_field(2, "name_bucket", Transform::Bucket(2)) + .unwrap() + .add_partition_field(1, "id_bucket", Transform::Bucket(4)) + .unwrap() + .build(); + + catalog + .create_table( + &namespace, + TableCreation::builder() + .name("t".to_string()) + .location(format!("{warehouse}/t")) + .schema(schema) + .partition_spec(partition_spec) + .properties(std::collections::HashMap::new()) + .build(), + ) + .await + .unwrap(); + + let table = catalog + .load_table(&TableIdent::new(namespace.clone(), "t".to_string())) + .await + .unwrap(); + + // (name_idx, id_idx) → linear = name_idx * 4 + id_idx → dest = linear % 4 + // (0,0) → 0 % 4 = 0 + // (0,1) → 1 % 4 = 1 + // (1,0) → 4 % 4 = 0 + // (1,3) → 7 % 4 = 3 + let tuples: [(i32, i32); 4] = [(0, 0), (0, 1), (1, 0), (1, 3)]; + let data_files = tuples + .iter() + .enumerate() + .map(|(i, &(name_idx, id_idx))| { + DataFileBuilder::default() + .content(DataContentType::Data) + .file_path(format!( + "{}/data/fake_{i}.parquet", + table.metadata().location() + )) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(128) + .record_count(1) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter(vec![ + Some(Literal::int(name_idx)), + Some(Literal::int(id_idx)), + ])) + .build() + .unwrap() + }) + .collect::>(); + let tx = Transaction::new(&table); + tx.fast_append() + .add_data_files(data_files) + .apply(tx) + .unwrap() + .commit(catalog.as_ref()) + .await + .unwrap(); + + let provider = IcebergTableProvider::try_new(catalog, namespace, "t".to_string()) + .await + .unwrap(); + let plan = provider + .scan(&ctx_with_target_partitions(4).state(), None, &[], None) + .await + .unwrap(); + let scan = plan.as_any().downcast_ref::().unwrap(); + let buckets = scan.buckets(); + + assert_eq!(buckets.len(), 4); + + let read_tuple = |task: &iceberg::scan::FileScanTask| -> (i32, i32) { + let part = task.partition.as_ref().expect("partition tuple"); + let name_idx = match part.fields()[0].as_ref().expect("name slot") { + iceberg::spec::Literal::Primitive(iceberg::spec::PrimitiveLiteral::Int(v)) => *v, + _ => panic!("unexpected name slot"), + }; + let id_idx = match part.fields()[1].as_ref().expect("id slot") { + iceberg::spec::Literal::Primitive(iceberg::spec::PrimitiveLiteral::Int(v)) => *v, + _ => panic!("unexpected id slot"), + }; + (name_idx, id_idx) + }; + + let mut p0: Vec<(i32, i32)> = buckets[0].iter().map(read_tuple).collect(); + let p1: Vec<(i32, i32)> = buckets[1].iter().map(read_tuple).collect(); + let p2: Vec<(i32, i32)> = buckets[2].iter().map(read_tuple).collect(); + let p3: Vec<(i32, i32)> = buckets[3].iter().map(read_tuple).collect(); + + p0.sort_unstable(); + assert_eq!(p0, vec![(0, 0), (1, 0)], "partition 0"); + assert_eq!(p1, vec![(0, 1)], "partition 1"); + assert!(p2.is_empty(), "partition 2 should be empty"); + assert_eq!(p3, vec![(1, 3)], "partition 3"); + } + + /// Multi-column pure-bucket spec `(bucket(2, name), bucket(4, id))`, but + /// the projection only retains `id`. `compute_bucket_cols` now skips the + /// missing `name` source and returns a single `BucketCol` for `id`. The + /// scan must declare `Partitioning::Hash([Column("id")], n_partitions)`, + /// and tasks must be distributed according to `idx_id % n_partitions` + /// alone — so two files with the same `id_bucket` slot end up in the + /// same DataFusion partition regardless of their `name_bucket` slot. + #[tokio::test] + async fn test_bucket_multi_column_partial_projection_declares_hash_on_subset() { + use datafusion::physical_expr::expressions::Column; + use datafusion::physical_plan::Partitioning; + use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; + use iceberg::spec::{ + DataContentType, DataFileBuilder, DataFileFormat, Literal, NestedField, PrimitiveType, + Schema, Struct, Transform, Type, UnboundPartitionSpec, + }; + use iceberg::transaction::{ApplyTransactionAction, Transaction}; + use iceberg::{CatalogBuilder, TableCreation}; + + let temp_dir = tempfile::TempDir::new().unwrap(); + let warehouse = temp_dir.path().to_str().unwrap().to_string(); + let catalog: Arc = Arc::new( + MemoryCatalogBuilder::default() + .load( + "memory", + std::collections::HashMap::from([( + MEMORY_CATALOG_WAREHOUSE.to_string(), + warehouse.clone(), + )]), + ) + .await + .unwrap(), + ); + let namespace = NamespaceIdent::new("ns".to_string()); + catalog + .create_namespace(&namespace, std::collections::HashMap::new()) + .await + .unwrap(); + + let schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(); + + let partition_spec = UnboundPartitionSpec::builder() + .with_spec_id(0) + .add_partition_field(2, "name_bucket", Transform::Bucket(2)) + .unwrap() + .add_partition_field(1, "id_bucket", Transform::Bucket(4)) + .unwrap() + .build(); + + catalog + .create_table( + &namespace, + TableCreation::builder() + .name("t".to_string()) + .location(format!("{warehouse}/t")) + .schema(schema) + .partition_spec(partition_spec) + .properties(std::collections::HashMap::new()) + .build(), + ) + .await + .unwrap(); + + let table = catalog + .load_table(&TableIdent::new(namespace.clone(), "t".to_string())) + .await + .unwrap(); + + // Two files share `id_bucket = 0` but differ on `name_bucket`; they + // must co-locate after partial-projection on `id`. A third file with + // `id_bucket = 3` lands in a different partition. The fourth file + // brings the task count up to `target_partitions=4` so that + // `n_partitions = min(target, tasks.len()) = 4` and we can assert on + // the full 4-way layout. + let tuples: [(i32, i32); 4] = [(0, 0), (1, 0), (1, 3), (0, 1)]; + let data_files = tuples + .iter() + .enumerate() + .map(|(i, &(name_idx, id_idx))| { + DataFileBuilder::default() + .content(DataContentType::Data) + .file_path(format!( + "{}/data/fake_{i}.parquet", + table.metadata().location() + )) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(128) + .record_count(1) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter(vec![ + Some(Literal::int(name_idx)), + Some(Literal::int(id_idx)), + ])) + .build() + .unwrap() + }) + .collect::>(); + let tx = Transaction::new(&table); + tx.fast_append() + .add_data_files(data_files) + .apply(tx) + .unwrap() + .commit(catalog.as_ref()) + .await + .unwrap(); + + let provider = IcebergTableProvider::try_new(catalog, namespace, "t".to_string()) + .await + .unwrap(); + // Project only "id" (idx 0); "name" (idx 1) is excluded. + let projection = vec![0_usize]; + let plan = provider + .scan( + &ctx_with_target_partitions(4).state(), + Some(&projection), + &[], + None, + ) + .await + .unwrap(); + let scan = plan.as_any().downcast_ref::().unwrap(); + + match &scan.properties().partitioning { + Partitioning::Hash(exprs, n) => { + assert_eq!(*n, 4); + assert_eq!(exprs.len(), 1, "only `id` survives the projection"); + let col = exprs[0] + .as_any() + .downcast_ref::() + .expect("expected Column expr"); + assert_eq!(col.name(), "id"); + } + other => panic!("expected Partitioning::Hash, got {other:?}"), + } + + // dest = idx_id % 4 (linearisation reduces to `idx_id` alone). So: + // (name=0, id=0) → 0 + // (name=1, id=0) → 0 ← co-located with the previous despite + // differing `name_bucket` + // (name=1, id=3) → 3 + // (name=0, id=1) → 1 + let buckets = scan.buckets(); + assert_eq!(buckets.len(), 4); + + let read_tuple = |task: &iceberg::scan::FileScanTask| -> (i32, i32) { + let part = task.partition.as_ref().expect("partition tuple"); + let name_idx = match part.fields()[0].as_ref().expect("name slot") { + iceberg::spec::Literal::Primitive(iceberg::spec::PrimitiveLiteral::Int(v)) => *v, + _ => panic!("unexpected name slot"), + }; + let id_idx = match part.fields()[1].as_ref().expect("id slot") { + iceberg::spec::Literal::Primitive(iceberg::spec::PrimitiveLiteral::Int(v)) => *v, + _ => panic!("unexpected id slot"), + }; + (name_idx, id_idx) + }; + + let mut p0: Vec<(i32, i32)> = buckets[0].iter().map(read_tuple).collect(); + let p1: Vec<(i32, i32)> = buckets[1].iter().map(read_tuple).collect(); + let p3: Vec<(i32, i32)> = buckets[3].iter().map(read_tuple).collect(); + p0.sort_unstable(); + assert_eq!(p0, vec![(0, 0), (1, 0)], "id_bucket=0 co-located"); + assert_eq!(p1, vec![(0, 1)], "id_bucket=1 alone"); + assert!(buckets[2].is_empty(), "partition 2 should be empty"); + assert_eq!(p3, vec![(1, 3)], "id_bucket=3 alone"); + } }