diff --git a/crates/integrations/datafusion/src/physical_plan/commit.rs b/crates/integrations/datafusion/src/physical_plan/commit.rs index f876908ae6..57b3a48366 100644 --- a/crates/integrations/datafusion/src/physical_plan/commit.rs +++ b/crates/integrations/datafusion/src/physical_plan/commit.rs @@ -25,6 +25,7 @@ use datafusion::arrow::datatypes::{ }; use datafusion::common::{DataFusionError, Result as DFResult}; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; +use datafusion::logical_expr::dml::InsertOp; use datafusion::physical_expr::{EquivalenceProperties, Partitioning}; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; @@ -38,14 +39,16 @@ use iceberg::transaction::{ApplyTransactionAction, Transaction}; use crate::physical_plan::DATA_FILES_COL_NAME; use crate::to_datafusion_error; -/// IcebergCommitExec is responsible for collecting the files written and use -/// [`Transaction::fast_append`] to commit the data files written. +/// IcebergCommitExec is responsible for collecting the files written and +/// committing them to the table via the appropriate transaction action +/// (`fast_append` for `INSERT INTO`, `overwrite` for `INSERT OVERWRITE`). #[derive(Debug)] pub(crate) struct IcebergCommitExec { table: Table, catalog: Arc, input: Arc, schema: ArrowSchemaRef, + insert_op: InsertOp, count_schema: ArrowSchemaRef, plan_properties: PlanProperties, } @@ -56,6 +59,7 @@ impl IcebergCommitExec { catalog: Arc, input: Arc, schema: ArrowSchemaRef, + insert_op: InsertOp, ) -> Self { let count_schema = Self::make_count_schema(); @@ -66,6 +70,7 @@ impl IcebergCommitExec { catalog, input, schema, + insert_op, count_schema, plan_properties, } @@ -165,6 +170,7 @@ impl ExecutionPlan for IcebergCommitExec { self.catalog.clone(), children[0].clone(), self.schema.clone(), + self.insert_op, ))) } @@ -190,6 +196,7 @@ impl ExecutionPlan for IcebergCommitExec { let current_schema = self.table.metadata().current_schema().clone(); let catalog = Arc::clone(&self.catalog); + let insert_op = self.insert_op; // Process the input streams from all partitions and commit the data files let stream = futures::stream::once(async move { @@ -245,14 +252,34 @@ impl ExecutionPlan for IcebergCommitExec { return Ok(RecordBatch::new_empty(count_schema)); } - // Create a transaction and commit the data files + // Create a transaction and commit the data files using the appropriate action let tx = Transaction::new(&table); - let action = tx.fast_append().add_data_files(data_files); + let tx = match insert_op { + InsertOp::Append => tx + .fast_append() + .add_data_files(data_files) + .apply(tx) + .map_err(to_datafusion_error)?, + InsertOp::Overwrite => { + // Collect all existing live data files to delete them + let existing_files = + collect_existing_data_files(&table).await.map_err(to_datafusion_error)?; + + tx.overwrite() + .add_data_files(data_files) + .delete_data_files(existing_files) + .apply(tx) + .map_err(to_datafusion_error)? + } + InsertOp::Replace => { + return Err(to_datafusion_error(iceberg::Error::new( + iceberg::ErrorKind::FeatureUnsupported, + "INSERT REPLACE is not supported for Iceberg tables", + ))); + } + }; - // Apply the action and commit the transaction - let _updated_table = action - .apply(tx) - .map_err(to_datafusion_error)? + let _updated_table = tx .commit(catalog.as_ref()) .await .map_err(to_datafusion_error)?; @@ -268,6 +295,35 @@ impl ExecutionPlan for IcebergCommitExec { } } +/// Collect all live data files from the table's current snapshot. +/// +/// Used by `INSERT OVERWRITE` to enumerate files that need to be deleted +/// before the new data is added. +async fn collect_existing_data_files(table: &Table) -> iceberg::Result> { + let Some(snapshot) = table.metadata().current_snapshot() else { + return Ok(vec![]); + }; + + let manifest_list = snapshot + .load_manifest_list(table.file_io(), &table.metadata_ref()) + .await?; + + let mut existing_files = Vec::new(); + for manifest_file in manifest_list.entries() { + if !manifest_file.has_added_files() && !manifest_file.has_existing_files() { + continue; + } + let manifest = manifest_file.load_manifest(table.file_io()).await?; + for entry in manifest.entries() { + if entry.is_alive() { + existing_files.push(entry.data_file().clone()); + } + } + } + + Ok(existing_files) +} + #[cfg(test)] mod tests { use std::collections::HashMap; @@ -470,8 +526,13 @@ mod tests { false, )])); - let commit_exec = - IcebergCommitExec::new(table.clone(), catalog.clone(), input_exec, arrow_schema); + let commit_exec = IcebergCommitExec::new( + table.clone(), + catalog.clone(), + input_exec, + arrow_schema, + InsertOp::Append, + ); // Verify Execution Plan schema matches the count schema assert_eq!(commit_exec.schema(), IcebergCommitExec::make_count_schema()); diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index ae87342fa5..25833aeafd 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -159,7 +159,7 @@ impl TableProvider for IcebergTableProvider { &self, state: &dyn Session, input: Arc, - _insert_op: InsertOp, + insert_op: InsertOp, ) -> DFResult> { // Load fresh table metadata from catalog let table = self @@ -231,6 +231,7 @@ impl TableProvider for IcebergTableProvider { self.catalog.clone(), coalesce_partitions, self.schema.clone(), + insert_op, ))) } } diff --git a/crates/integrations/datafusion/tests/integration_datafusion_test.rs b/crates/integrations/datafusion/tests/integration_datafusion_test.rs index 6f8898abb8..eedb58eb6d 100644 --- a/crates/integrations/datafusion/tests/integration_datafusion_test.rs +++ b/crates/integrations/datafusion/tests/integration_datafusion_test.rs @@ -943,3 +943,113 @@ async fn test_insert_into_partitioned() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_insert_overwrite() -> Result<()> { + let iceberg_catalog = get_iceberg_catalog().await; + let namespace = NamespaceIdent::new("test_insert_overwrite".to_string()); + set_test_namespace(&iceberg_catalog, &namespace).await?; + + let creation = get_table_creation(temp_path(), "my_table", None)?; + iceberg_catalog.create_table(&namespace, creation).await?; + + let client = Arc::new(iceberg_catalog); + let catalog = Arc::new(IcebergCatalogProvider::try_new(client.clone()).await?); + + let ctx = SessionContext::new(); + ctx.register_catalog("catalog", catalog); + + // Step 1: Insert initial data + let df = ctx + .sql("INSERT INTO catalog.test_insert_overwrite.my_table VALUES (1, 'alice'), (2, 'bob')") + .await + .unwrap(); + let batches = df.collect().await.unwrap(); + let rows_inserted = batches[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(rows_inserted.value(0), 2); + + // Step 2: Verify initial data is present + let df = ctx + .sql("SELECT * FROM catalog.test_insert_overwrite.my_table ORDER BY foo1") + .await + .unwrap(); + let batches = df.collect().await.unwrap(); + check_record_batches( + batches, + expect![[r#" + Field { "foo1": Int32, metadata: {"PARQUET:field_id": "1"} }, + Field { "foo2": Utf8, metadata: {"PARQUET:field_id": "2"} }"#]], + expect![[r#" + foo1: PrimitiveArray + [ + 1, + 2, + ], + foo2: StringArray + [ + "alice", + "bob", + ]"#]], + &[], + Some("foo1"), + ); + + // Step 3: Overwrite with new data + let df = ctx + .sql( + "INSERT OVERWRITE catalog.test_insert_overwrite.my_table VALUES (10, 'charlie'), (20, 'diana'), (30, 'eve')", + ) + .await + .unwrap(); + let batches = df.collect().await.unwrap(); + let rows_inserted = batches[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(rows_inserted.value(0), 3); + + // Step 4: Verify only the new data is present (old data replaced) + let df = ctx + .sql("SELECT * FROM catalog.test_insert_overwrite.my_table ORDER BY foo1") + .await + .unwrap(); + let batches = df.collect().await.unwrap(); + check_record_batches( + batches, + expect![[r#" + Field { "foo1": Int32, metadata: {"PARQUET:field_id": "1"} }, + Field { "foo2": Utf8, metadata: {"PARQUET:field_id": "2"} }"#]], + expect![[r#" + foo1: PrimitiveArray + [ + 10, + 20, + 30, + ], + foo2: StringArray + [ + "charlie", + "diana", + "eve", + ]"#]], + &[], + Some("foo1"), + ); + + // Step 5: Verify the snapshot is recorded as an overwrite operation + let table_ident = TableIdent::new(namespace.clone(), "my_table".to_string()); + let table = client.load_table(&table_ident).await?; + let current_snapshot = table.metadata().current_snapshot().unwrap(); + assert_eq!( + current_snapshot.summary().operation, + iceberg::spec::Operation::Overwrite, + "The latest snapshot should be an overwrite operation" + ); + + Ok(()) +}