Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 199 additions & 15 deletions crates/integrations/datafusion/src/table/bucketing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use datafusion::arrow::array::{
};
use datafusion::arrow::datatypes::{DataType, Schema as ArrowSchema};
use datafusion::common::hash_utils::create_hashes;
use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_expr::expressions::Column;
use datafusion::physical_plan::repartition::REPARTITION_RANDOM_STATE;
use iceberg::scan::FileScanTask;
use iceberg::spec::{Literal, PrimitiveLiteral, Transform};
Expand Down Expand Up @@ -96,34 +98,186 @@ fn is_supported_dtype(dt: &DataType) -> bool {
)
}

/// Distribute `tasks` across `n_partitions` buckets. When `identity_cols`
/// describes a non-empty, hashable identity key, each task is hashed on
/// that key using DataFusion's repartition hash so the resulting partitioning
/// matches what `RepartitionExec` would produce on the same data. Tasks
/// missing partition data fall back to hashing `data_file_path`, which still
/// distributes evenly but breaks the `Hash` contract — the second tuple
/// element flags whether every task supplied a full identity key.
/// Spec field with `Transform::Bucket(_)`. The source column must be in the
/// output projection so we can reference it via `Column` in `Partitioning::Hash`.
/// We don't need the Arrow type because the partition tuple slot for a bucket
/// transform is always `Int32` (the spec-defined `result_type`).
pub(super) struct BucketCol {
pub(super) name: String,
/// Position of this column in the *output* schema (after projection).
pub(super) output_idx: usize,
/// Position of this column inside the partition spec's `fields()` slice,
/// matching the slot order of `FileScanTask::partition`.
pub(super) spec_field_idx: usize,
/// `N` parameter of the `Transform::Bucket(N)`. Used by [`bucket_tasks`]
/// to linearise multi-column bucket tuples deterministically before
/// projecting them on `n_partitions` via modulo.
pub(super) bucket_n: u32,
}

/// Inspect the table's default partition spec and return the bucket columns
/// usable for a [`Partitioning::Hash`] declaration. The spec must be
/// *purely* bucketed (every field is a `Transform::Bucket(_)`); only the
/// fields whose source column is present in the output projection are
/// retained. Returns `None` on mixed transforms, spec evolution, empty
/// spec, or when no bucket source column survives the projection.
///
/// Why a strict subset is still correct: `bucket[N]` is deterministic on
/// the source value, so file-level co-location on any subset of bucket
/// dimensions implies row-level co-location on that same subset. The
/// positional linearisation in [`bucket_linear_index`] iterates over the
/// retained `cols` only, indexing into `task.partition` via
/// `spec_field_idx`, so dropping unused dimensions changes which slots are
/// read but preserves "same key tuple → same partition".
///
/// This deliberately rejects mixed identity+bucket specs: those are handled
/// by [`compute_identity_cols`] which retains only the identity fields.
pub(super) fn compute_bucket_cols(
table: &Table,
output_schema: &ArrowSchema,
) -> Option<Vec<BucketCol>> {
let metadata = table.metadata();
if metadata.partition_specs_iter().len() > 1 {
return None;
}
let spec = metadata.default_partition_spec();
let fields = spec.fields();
if fields.is_empty() {
return None;
}
let table_schema = metadata.current_schema();

let mut cols = Vec::with_capacity(fields.len());
for (spec_field_idx, pf) in fields.iter().enumerate() {
let bucket_n = match pf.transform {
Transform::Bucket(n) => n,
_ => return None,
};
let source_field = table_schema.field_by_id(pf.source_id)?;
let Ok(output_idx) = output_schema.index_of(source_field.name.as_str()) else {
continue;
};
cols.push(BucketCol {
name: source_field.name.clone(),
output_idx,
spec_field_idx,
bucket_n,
});
}
if cols.is_empty() {
return None;
}
Some(cols)
}

/// Single-entry partition-key descriptor used by [`bucket_tasks`] and
/// `IcebergTableProvider::scan` to drive both task distribution and the
/// `Partitioning::Hash` declaration.
pub(super) enum PartitionKeys {
Identity(Vec<IdentityCol>),
Bucket(Vec<BucketCol>),
}

impl PartitionKeys {
/// `Column` exprs (one per key column) referencing the *output* schema,
/// suitable for `Partitioning::Hash`.
pub(super) fn column_exprs(&self) -> Vec<Arc<dyn PhysicalExpr>> {
match self {
PartitionKeys::Identity(cols) => cols
.iter()
.map(|c| Arc::new(Column::new(&c.name, c.output_idx)) as Arc<dyn PhysicalExpr>)
.collect(),
PartitionKeys::Bucket(cols) => cols
.iter()
.map(|c| Arc::new(Column::new(&c.name, c.output_idx)) as Arc<dyn PhysicalExpr>)
.collect(),
}
}
}

/// Try identity detection first (preserves the existing behaviour, including
/// extracting identity-only keys from mixed identity+bucket specs). If no
/// identity columns exist, fall back to *pure* bucket detection.
///
/// Why declaring `Hash` is correct for a pure-bucket spec even though the
/// hash function differs from DataFusion's: DataFusion checks
/// `Partitioning::Hash` against `Distribution::HashPartitioned` purely by
/// expression equality, not by the underlying hash function (see
/// `datafusion_physical_expr::Partitioning::satisfaction`). The contract to
/// honour is "rows with the same key tuple end up in the same partition",
/// which Iceberg `bucket[N]` already guarantees at the file level (same
/// source value implies same bucket index, hence same files). Our task
/// distribution preserves that property at the partition level by sending
/// every unique bucket index (or bucket-index tuple, for multi-column
/// specs) to a single DataFusion partition.
///
/// Crucially, this property does **not** require rehashing the bucket
/// index: co-location is already a consequence of `bucket[N]` being
/// deterministic on the source value. The scan therefore distributes tasks
/// by linearising the bucket-index tuple and taking `% n_partitions`
/// directly, which avoids birthday-paradox collisions in the common
/// `N ≈ n_partitions` regime.
pub(super) fn compute_partition_keys(
table: &Table,
output_schema: &ArrowSchema,
) -> Option<PartitionKeys> {
if let Some(cols) = compute_identity_cols(table, output_schema)
&& !cols.is_empty()
{
return Some(PartitionKeys::Identity(cols));
}
compute_bucket_cols(table, output_schema).map(PartitionKeys::Bucket)
}

/// Distribute `tasks` across `n_partitions` slots. Two regimes coexist,
/// driven by `keys`:
///
/// * `PartitionKeys::Identity` — each task is hashed on its identity
/// partition values using [`REPARTITION_RANDOM_STATE`] and projected by
/// `% n_partitions`. This is strictly aligned with what a downstream
/// `RepartitionExec(Hash([id_col], n))` would compute, so the
/// `Partitioning::Hash` annotation produced by `scan()` is honest at the
/// row level and the planner can elide such a repartition.
///
/// * `PartitionKeys::Bucket` — each task is placed via positional
/// linearisation of its bucket-index tuple, then `% n_partitions`. No
/// hash is applied to the indices: Iceberg's `bucket[N]` is itself
/// deterministic on the source value, so co-location of same-key rows
/// is already guaranteed at the file level. Rehashing the indices would
/// only add a uniform-spread layer that, in the common
/// `N_total ≈ n_partitions` regime, produces birthday-paradox empty
/// partitions. The deterministic modulo distributes the `N_1 * … * N_k`
/// logical buckets evenly with a worst-case skew bounded to ±1 task.
///
/// Tasks missing partition data fall back to hashing `data_file_path`; the
/// boolean returned alongside the buckets flags whether every task could
/// supply a full key. A `false` flag forces the caller to drop to
/// `UnknownPartitioning`.
pub(super) fn bucket_tasks(
tasks: Vec<FileScanTask>,
n_partitions: usize,
identity_cols: Option<&[IdentityCol]>,
keys: Option<&PartitionKeys>,
) -> (Vec<Vec<FileScanTask>>, bool) {
if n_partitions == 0 {
return (Vec::new(), tasks.is_empty());
}
let mut buckets: Vec<Vec<FileScanTask>> = (0..n_partitions).map(|_| Vec::new()).collect();
let mut all_full_key = true;
let cols = identity_cols.unwrap_or(&[]);

for task in tasks {
let bucket_idx = match identity_hash(&task, cols) {
Some(h) => (h % n_partitions as u64) as usize,
None => {
all_full_key = false;
fallback_hash(&task) as usize % n_partitions
let dest = match keys {
Some(PartitionKeys::Identity(cols)) => {
identity_hash(&task, cols).map(|h| (h % n_partitions as u64) as usize)
}
Some(PartitionKeys::Bucket(cols)) => bucket_linear_index(&task, cols)
.map(|linear| (linear % n_partitions as u64) as usize),
None => None,
};
buckets[bucket_idx].push(task);
let dest = dest.unwrap_or_else(|| {
all_full_key = false;
fallback_hash(&task) as usize % n_partitions
});
buckets[dest].push(task);
}
(buckets, all_full_key)
}
Expand Down Expand Up @@ -152,6 +306,36 @@ fn identity_hash(task: &FileScanTask, cols: &[IdentityCol]) -> Option<u64> {
Some(hashes[0])
}

/// Linearise the bucket-index tuple of `task` into a single `u64` using a
/// positional encoding `linear = ((idx_1 * N_2 * … * N_k) + … + idx_k)`.
/// The slot for a `Transform::Bucket(_)` field is always `Int32` per the
/// Iceberg spec; a missing slot or non-`Int` literal returns `None`,
/// driving the caller's `all_full_key` flag to `false`.
///
/// For a single-column spec the formula reduces to `linear = idx_1`, so
/// `dest = idx_1 % n_partitions` — the natural Iceberg distribution.
fn bucket_linear_index(task: &FileScanTask, cols: &[BucketCol]) -> Option<u64> {
if cols.is_empty() {
return None;
}
let partition = task.partition.as_ref()?;
let mut linear: u64 = 0;
for col in cols {
let lit = partition.fields().get(col.spec_field_idx)?.as_ref()?;
let idx = match lit {
Literal::Primitive(PrimitiveLiteral::Int(v)) => *v,
_ => return None,
};
// `idx` is non-negative per the Iceberg spec (result of
// `bucket[N]` is in `[0, N)`); cast through `u32` first to avoid
// sign extension if the file accidentally carries a negative slot.
linear = linear
.wrapping_mul(col.bucket_n as u64)
.wrapping_add(idx as u32 as u64);
}
Some(linear)
}

/// Deterministic per-file fallback used when `identity_hash` cannot produce a
/// bucket. The hash function does not need to match DataFusion's because any
/// task taking this path causes the scan to drop to `UnknownPartitioning`.
Expand Down
Loading