Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 71 additions & 10 deletions crates/integrations/datafusion/src/physical_plan/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use datafusion::arrow::datatypes::{
};
use datafusion::common::{DataFusionError, Result as DFResult};
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::logical_expr::dml::InsertOp;
use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
Expand All @@ -38,14 +39,16 @@ use iceberg::transaction::{ApplyTransactionAction, Transaction};
use crate::physical_plan::DATA_FILES_COL_NAME;
use crate::to_datafusion_error;

/// IcebergCommitExec is responsible for collecting the files written and use
/// [`Transaction::fast_append`] to commit the data files written.
/// IcebergCommitExec is responsible for collecting the files written and
/// committing them to the table via the appropriate transaction action
/// (`fast_append` for `INSERT INTO`, `overwrite` for `INSERT OVERWRITE`).
#[derive(Debug)]
pub(crate) struct IcebergCommitExec {
table: Table,
catalog: Arc<dyn Catalog>,
input: Arc<dyn ExecutionPlan>,
schema: ArrowSchemaRef,
insert_op: InsertOp,
count_schema: ArrowSchemaRef,
plan_properties: PlanProperties,
}
Expand All @@ -56,6 +59,7 @@ impl IcebergCommitExec {
catalog: Arc<dyn Catalog>,
input: Arc<dyn ExecutionPlan>,
schema: ArrowSchemaRef,
insert_op: InsertOp,
) -> Self {
let count_schema = Self::make_count_schema();

Expand All @@ -66,6 +70,7 @@ impl IcebergCommitExec {
catalog,
input,
schema,
insert_op,
count_schema,
plan_properties,
}
Expand Down Expand Up @@ -165,6 +170,7 @@ impl ExecutionPlan for IcebergCommitExec {
self.catalog.clone(),
children[0].clone(),
self.schema.clone(),
self.insert_op,
)))
}

Expand All @@ -190,6 +196,7 @@ impl ExecutionPlan for IcebergCommitExec {
let current_schema = self.table.metadata().current_schema().clone();

let catalog = Arc::clone(&self.catalog);
let insert_op = self.insert_op;

// Process the input streams from all partitions and commit the data files
let stream = futures::stream::once(async move {
Expand Down Expand Up @@ -245,14 +252,34 @@ impl ExecutionPlan for IcebergCommitExec {
return Ok(RecordBatch::new_empty(count_schema));
}

// Create a transaction and commit the data files
// Create a transaction and commit the data files using the appropriate action
let tx = Transaction::new(&table);
let action = tx.fast_append().add_data_files(data_files);
let tx = match insert_op {
InsertOp::Append => tx
.fast_append()
.add_data_files(data_files)
.apply(tx)
.map_err(to_datafusion_error)?,
InsertOp::Overwrite => {
// Collect all existing live data files to delete them
let existing_files =
collect_existing_data_files(&table).await.map_err(to_datafusion_error)?;

tx.overwrite()
.add_data_files(data_files)
.delete_data_files(existing_files)
.apply(tx)
.map_err(to_datafusion_error)?
}
InsertOp::Replace => {
return Err(to_datafusion_error(iceberg::Error::new(
iceberg::ErrorKind::FeatureUnsupported,
"INSERT REPLACE is not supported for Iceberg tables",
)));
}
};

// Apply the action and commit the transaction
let _updated_table = action
.apply(tx)
.map_err(to_datafusion_error)?
let _updated_table = tx
.commit(catalog.as_ref())
.await
.map_err(to_datafusion_error)?;
Expand All @@ -268,6 +295,35 @@ impl ExecutionPlan for IcebergCommitExec {
}
}

/// Collect all live data files from the table's current snapshot.
///
/// Used by `INSERT OVERWRITE` to enumerate files that need to be deleted
/// before the new data is added.
async fn collect_existing_data_files(table: &Table) -> iceberg::Result<Vec<DataFile>> {
let Some(snapshot) = table.metadata().current_snapshot() else {
return Ok(vec![]);
};

let manifest_list = snapshot
.load_manifest_list(table.file_io(), &table.metadata_ref())
.await?;

let mut existing_files = Vec::new();
for manifest_file in manifest_list.entries() {
if !manifest_file.has_added_files() && !manifest_file.has_existing_files() {
continue;
}
let manifest = manifest_file.load_manifest(table.file_io()).await?;
for entry in manifest.entries() {
if entry.is_alive() {
existing_files.push(entry.data_file().clone());
}
}
}

Ok(existing_files)
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;
Expand Down Expand Up @@ -470,8 +526,13 @@ mod tests {
false,
)]));

let commit_exec =
IcebergCommitExec::new(table.clone(), catalog.clone(), input_exec, arrow_schema);
let commit_exec = IcebergCommitExec::new(
table.clone(),
catalog.clone(),
input_exec,
arrow_schema,
InsertOp::Append,
);

// Verify Execution Plan schema matches the count schema
assert_eq!(commit_exec.schema(), IcebergCommitExec::make_count_schema());
Expand Down
3 changes: 2 additions & 1 deletion crates/integrations/datafusion/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ impl TableProvider for IcebergTableProvider {
&self,
state: &dyn Session,
input: Arc<dyn ExecutionPlan>,
_insert_op: InsertOp,
insert_op: InsertOp,
) -> DFResult<Arc<dyn ExecutionPlan>> {
// Load fresh table metadata from catalog
let table = self
Expand Down Expand Up @@ -231,6 +231,7 @@ impl TableProvider for IcebergTableProvider {
self.catalog.clone(),
coalesce_partitions,
self.schema.clone(),
insert_op,
)))
}
}
Expand Down
110 changes: 110 additions & 0 deletions crates/integrations/datafusion/tests/integration_datafusion_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -943,3 +943,113 @@ async fn test_insert_into_partitioned() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn test_insert_overwrite() -> Result<()> {
let iceberg_catalog = get_iceberg_catalog().await;
let namespace = NamespaceIdent::new("test_insert_overwrite".to_string());
set_test_namespace(&iceberg_catalog, &namespace).await?;

let creation = get_table_creation(temp_path(), "my_table", None)?;
iceberg_catalog.create_table(&namespace, creation).await?;

let client = Arc::new(iceberg_catalog);
let catalog = Arc::new(IcebergCatalogProvider::try_new(client.clone()).await?);

let ctx = SessionContext::new();
ctx.register_catalog("catalog", catalog);

// Step 1: Insert initial data
let df = ctx
.sql("INSERT INTO catalog.test_insert_overwrite.my_table VALUES (1, 'alice'), (2, 'bob')")
.await
.unwrap();
let batches = df.collect().await.unwrap();
let rows_inserted = batches[0]
.column(0)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
assert_eq!(rows_inserted.value(0), 2);

// Step 2: Verify initial data is present
let df = ctx
.sql("SELECT * FROM catalog.test_insert_overwrite.my_table ORDER BY foo1")
.await
.unwrap();
let batches = df.collect().await.unwrap();
check_record_batches(
batches,
expect![[r#"
Field { "foo1": Int32, metadata: {"PARQUET:field_id": "1"} },
Field { "foo2": Utf8, metadata: {"PARQUET:field_id": "2"} }"#]],
expect![[r#"
foo1: PrimitiveArray<Int32>
[
1,
2,
],
foo2: StringArray
[
"alice",
"bob",
]"#]],
&[],
Some("foo1"),
);

// Step 3: Overwrite with new data
let df = ctx
.sql(
"INSERT OVERWRITE catalog.test_insert_overwrite.my_table VALUES (10, 'charlie'), (20, 'diana'), (30, 'eve')",
)
.await
.unwrap();
let batches = df.collect().await.unwrap();
let rows_inserted = batches[0]
.column(0)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
assert_eq!(rows_inserted.value(0), 3);

// Step 4: Verify only the new data is present (old data replaced)
let df = ctx
.sql("SELECT * FROM catalog.test_insert_overwrite.my_table ORDER BY foo1")
.await
.unwrap();
let batches = df.collect().await.unwrap();
check_record_batches(
batches,
expect![[r#"
Field { "foo1": Int32, metadata: {"PARQUET:field_id": "1"} },
Field { "foo2": Utf8, metadata: {"PARQUET:field_id": "2"} }"#]],
expect![[r#"
foo1: PrimitiveArray<Int32>
[
10,
20,
30,
],
foo2: StringArray
[
"charlie",
"diana",
"eve",
]"#]],
&[],
Some("foo1"),
);

// Step 5: Verify the snapshot is recorded as an overwrite operation
let table_ident = TableIdent::new(namespace.clone(), "my_table".to_string());
let table = client.load_table(&table_ident).await?;
let current_snapshot = table.metadata().current_snapshot().unwrap();
assert_eq!(
current_snapshot.summary().operation,
iceberg::spec::Operation::Overwrite,
"The latest snapshot should be an overwrite operation"
);

Ok(())
}