diff --git a/crates/iceberg/src/expr/mod.rs b/crates/iceberg/src/expr/mod.rs index 42f409b42c..ebfb86daa5 100644 --- a/crates/iceberg/src/expr/mod.rs +++ b/crates/iceberg/src/expr/mod.rs @@ -21,10 +21,12 @@ mod term; use serde::{Deserialize, Serialize}; pub use term::*; pub(crate) mod accessor; +mod partition_filter; mod predicate; pub(crate) mod visitors; use std::fmt::{Display, Formatter}; +pub use partition_filter::{PartitionCoverage, PartitionCoverageFilter}; pub use predicate::*; use crate::spec::SchemaRef; diff --git a/crates/iceberg/src/expr/partition_filter.rs b/crates/iceberg/src/expr/partition_filter.rs new file mode 100644 index 0000000000..68357de010 --- /dev/null +++ b/crates/iceberg/src/expr/partition_filter.rs @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Utilities for reasoning about how a predicate relates to the partitioning of a +//! [`DataFile`]. Useful for partition-aligned row-level operations such as DELETE +//! that is only permitted when it drops whole files. + +use std::sync::Arc; + +use crate::Result; +use crate::expr::visitors::expression_evaluator::ExpressionEvaluator; +use crate::expr::visitors::inclusive_projection::InclusiveProjection; +use crate::expr::visitors::strict_projection::StrictProjection; +use crate::expr::{Bind, Predicate}; +use crate::spec::{DataFile, PartitionSpecRef, Schema, SchemaRef}; + +/// How a predicate relates to the partition value of a single [`DataFile`]. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum PartitionCoverage { + /// Every row in the file matches the predicate. + /// + /// Produced when the strict projection of the predicate onto the partition spec + /// evaluates to true against the file's partition value. + AllRowsMatch, + /// No row in the file can match the predicate. + /// + /// Produced when the inclusive projection of the predicate onto the partition + /// spec evaluates to false against the file's partition value. + NoRowsMatch, + /// The file straddles the predicate: some rows may match and some may not. + /// + /// Produced when the inclusive projection matches but the strict projection + /// does not. In this case row-level filtering would be required to answer the + /// predicate exactly; the partition value alone is insufficient. + Straddle, +} + +/// Classifies a [`DataFile`] against a predicate by evaluating strict and inclusive +/// projections of the predicate onto a single partition spec. +/// +/// Construct one filter per partition spec the caller may encounter (tables with +/// evolved specs have multiple). See [`DataFile::partition_spec_id`] to route each +/// file to the right filter. +pub struct PartitionCoverageFilter { + inclusive: ExpressionEvaluator, + strict: ExpressionEvaluator, +} + +impl PartitionCoverageFilter { + /// Build a filter for `predicate` against `schema` and `partition_spec`. + /// + /// `case_sensitive` controls column-name binding. + pub fn try_new( + predicate: &Predicate, + schema: SchemaRef, + partition_spec: PartitionSpecRef, + case_sensitive: bool, + ) -> Result { + let bound = predicate + .clone() + .rewrite_not() + .bind(schema.clone(), case_sensitive)?; + + let partition_type = partition_spec.partition_type(&schema)?; + let partition_schema = Arc::new( + Schema::builder() + .with_schema_id(partition_spec.spec_id()) + .with_fields(partition_type.fields().to_owned()) + .build()?, + ); + + let inclusive = InclusiveProjection::new(partition_spec.clone()) + .project(&bound)? + .rewrite_not() + .bind(partition_schema.clone(), case_sensitive)?; + + let strict = StrictProjection::new(partition_spec) + .strict_project(&bound)? + .rewrite_not() + .bind(partition_schema, case_sensitive)?; + + Ok(Self { + inclusive: ExpressionEvaluator::new(inclusive), + strict: ExpressionEvaluator::new(strict), + }) + } + + /// Classify `data_file`'s partition value against this filter. + pub fn classify(&self, data_file: &DataFile) -> Result { + if self.strict.eval(data_file)? { + return Ok(PartitionCoverage::AllRowsMatch); + } + if !self.inclusive.eval(data_file)? { + return Ok(PartitionCoverage::NoRowsMatch); + } + Ok(PartitionCoverage::Straddle) + } +} diff --git a/crates/iceberg/src/spec/manifest/data_file.rs b/crates/iceberg/src/spec/manifest/data_file.rs index 77bd046f8a..3e98595de2 100644 --- a/crates/iceberg/src/spec/manifest/data_file.rs +++ b/crates/iceberg/src/spec/manifest/data_file.rs @@ -201,6 +201,10 @@ impl DataFile { pub fn partition(&self) -> &Struct { &self.partition } + /// Get the id of the partition spec these partition values were produced under. + pub fn partition_spec_id(&self) -> i32 { + self.partition_spec_id + } /// Get the record count in the data file. pub fn record_count(&self) -> u64 { self.record_count diff --git a/crates/iceberg/src/spec/manifest/writer.rs b/crates/iceberg/src/spec/manifest/writer.rs index 1b3b605fd8..775dca1df8 100644 --- a/crates/iceberg/src/spec/manifest/writer.rs +++ b/crates/iceberg/src/spec/manifest/writer.rs @@ -345,6 +345,14 @@ impl ManifestWriter { Ok(()) } + /// Add a deleted manifest entry, preserving the original sequence numbers. + pub(crate) fn add_deleted_entry(&mut self, mut entry: ManifestEntry) -> Result<()> { + self.check_data_file(&entry.data_file)?; + entry.status = ManifestStatus::Deleted; + self.add_entry_inner(entry)?; + Ok(()) + } + /// Add an file as existing manifest entry. The original data and file sequence numbers, snapshot ID, /// which were assigned at commit, must be preserved when adding an existing entry. pub fn add_existing_file( diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 08d4032409..cbe9b906be 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -90,6 +90,7 @@ impl TransactionAction for FastAppendAction { self.key_metadata.clone(), self.snapshot_properties.clone(), self.added_data_files.clone(), + vec![], ); // validate added files @@ -138,7 +139,9 @@ impl SnapshotProduceOperation for FastAppendOperation { Ok(manifest_list .entries() .iter() - .filter(|entry| entry.has_added_files() || entry.has_existing_files()) + .filter(|entry| { + entry.has_added_files() || entry.has_existing_files() || entry.has_deleted_files() + }) .cloned() .collect()) } diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index cb2ff7cf37..f7531234b5 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -54,6 +54,7 @@ mod action; pub use action::*; mod append; +mod overwrite; mod snapshot; mod sort_order; mod update_location; @@ -71,6 +72,7 @@ use crate::spec::TableProperties; use crate::table::Table; use crate::transaction::action::BoxedTransactionAction; use crate::transaction::append::FastAppendAction; +use crate::transaction::overwrite::OverwriteAction; use crate::transaction::sort_order::ReplaceSortOrderAction; use crate::transaction::update_location::UpdateLocationAction; use crate::transaction::update_properties::UpdatePropertiesAction; @@ -141,6 +143,11 @@ impl Transaction { FastAppendAction::new() } + /// Creates an overwrite action. + pub fn overwrite(&self) -> OverwriteAction { + OverwriteAction::new() + } + /// Creates replace sort order action. pub fn replace_sort_order(&self) -> ReplaceSortOrderAction { ReplaceSortOrderAction::new() diff --git a/crates/iceberg/src/transaction/overwrite.rs b/crates/iceberg/src/transaction/overwrite.rs new file mode 100644 index 0000000000..26f62834dd --- /dev/null +++ b/crates/iceberg/src/transaction/overwrite.rs @@ -0,0 +1,537 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use async_trait::async_trait; +use uuid::Uuid; + +use crate::error::Result; +use crate::spec::{ + DataFile, FormatVersion, ManifestContentType, ManifestEntry, ManifestFile, + ManifestWriterBuilder, Operation, +}; +use crate::table::Table; +use crate::transaction::snapshot::{ + DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer, +}; +use crate::transaction::{ActionCommit, TransactionAction}; + +/// OverwriteAction is a transaction action for overwriting data files in the table. +/// +/// Creates a snapshot with `Operation::Overwrite` semantics — adds new data files and +/// optionally removes existing data files by rewriting affected manifests with those +/// entries marked as `ManifestStatus::Deleted`. +pub struct OverwriteAction { + check_duplicate: bool, + commit_uuid: Option, + key_metadata: Option>, + snapshot_properties: HashMap, + added_data_files: Vec, + deleted_data_files: Vec, +} + +impl OverwriteAction { + pub(crate) fn new() -> Self { + Self { + check_duplicate: true, + commit_uuid: None, + key_metadata: None, + snapshot_properties: HashMap::default(), + added_data_files: vec![], + deleted_data_files: vec![], + } + } + + /// Set whether to check duplicate files. + pub fn with_check_duplicate(mut self, v: bool) -> Self { + self.check_duplicate = v; + self + } + + /// Add data files to the snapshot. + pub fn add_data_files(mut self, data_files: impl IntoIterator) -> Self { + self.added_data_files.extend(data_files); + self + } + + /// Specify data files to be removed from the table in this overwrite. + pub fn delete_data_files(mut self, data_files: impl IntoIterator) -> Self { + self.deleted_data_files.extend(data_files); + self + } + + /// Set commit UUID for the snapshot. + pub fn set_commit_uuid(mut self, commit_uuid: Uuid) -> Self { + self.commit_uuid = Some(commit_uuid); + self + } + + /// Set key metadata for manifest files. + pub fn set_key_metadata(mut self, key_metadata: Vec) -> Self { + self.key_metadata = Some(key_metadata); + self + } + + /// Set snapshot summary properties. + pub fn set_snapshot_properties(mut self, snapshot_properties: HashMap) -> Self { + self.snapshot_properties = snapshot_properties; + self + } +} + +#[async_trait] +impl TransactionAction for OverwriteAction { + async fn commit(self: Arc, table: &Table) -> Result { + let snapshot_producer = SnapshotProducer::new( + table, + self.commit_uuid.unwrap_or_else(Uuid::now_v7), + self.key_metadata.clone(), + self.snapshot_properties.clone(), + self.added_data_files.clone(), + self.deleted_data_files.clone(), + ); + + snapshot_producer.validate_added_data_files()?; + + if self.check_duplicate { + snapshot_producer.validate_duplicate_files().await?; + } + + let deleted_file_paths: HashSet = self + .deleted_data_files + .iter() + .map(|f| f.file_path.clone()) + .collect(); + + let snapshot_id = snapshot_producer.snapshot_id(); + snapshot_producer + .commit( + OverwriteOperation { + deleted_file_paths, + snapshot_id, + }, + DefaultManifestProcess, + ) + .await + } +} + +struct OverwriteOperation { + deleted_file_paths: HashSet, + snapshot_id: i64, +} + +impl SnapshotProduceOperation for OverwriteOperation { + fn operation(&self) -> Operation { + Operation::Overwrite + } + + async fn delete_entries( + &self, + _snapshot_produce: &SnapshotProducer<'_>, + ) -> Result> { + Ok(vec![]) + } + + async fn existing_manifest( + &self, + snapshot_produce: &SnapshotProducer<'_>, + ) -> Result> { + let Some(snapshot) = snapshot_produce.table.metadata().current_snapshot() else { + return Ok(vec![]); + }; + + let manifest_list = snapshot + .load_manifest_list( + snapshot_produce.table.file_io(), + &snapshot_produce.table.metadata_ref(), + ) + .await?; + + if self.deleted_file_paths.is_empty() { + return Ok(manifest_list + .entries() + .iter() + .filter(|entry| entry.has_added_files() || entry.has_existing_files()) + .cloned() + .collect()); + } + + let mut result = Vec::new(); + + for manifest_file in manifest_list.entries() { + if !manifest_file.has_added_files() && !manifest_file.has_existing_files() { + continue; + } + + let manifest = manifest_file + .load_manifest(snapshot_produce.table.file_io()) + .await?; + + let has_deletes = manifest.entries().iter().any(|entry| { + entry.is_alive() && self.deleted_file_paths.contains(entry.file_path()) + }); + + if has_deletes { + let rewritten = self + .rewrite_manifest(snapshot_produce, manifest_file, &manifest) + .await?; + result.push(rewritten); + } else { + result.push(manifest_file.clone()); + } + } + + Ok(result) + } +} + +impl OverwriteOperation { + /// Rewrite a manifest, marking entries whose file paths are in `deleted_file_paths` + /// as `ManifestStatus::Deleted`. + async fn rewrite_manifest( + &self, + snapshot_produce: &SnapshotProducer<'_>, + manifest_file: &ManifestFile, + manifest: &crate::spec::Manifest, + ) -> Result { + let table = snapshot_produce.table; + + let new_manifest_path = format!( + "{}/metadata/{}-m-overwrite.avro", + table.metadata().location(), + Uuid::now_v7(), + ); + let output_file = table.file_io().new_output(&new_manifest_path)?; + let builder = ManifestWriterBuilder::new( + output_file, + Some(self.snapshot_id), + manifest_file.key_metadata.clone(), + table.metadata().current_schema().clone(), + table.metadata().default_partition_spec().as_ref().clone(), + ); + + let mut writer = match table.metadata().format_version() { + FormatVersion::V1 => builder.build_v1(), + FormatVersion::V2 => match manifest_file.content { + ManifestContentType::Data => builder.build_v2_data(), + ManifestContentType::Deletes => builder.build_v2_deletes(), + }, + FormatVersion::V3 => match manifest_file.content { + ManifestContentType::Data => builder.build_v3_data(), + ManifestContentType::Deletes => builder.build_v3_deletes(), + }, + }; + + for entry in manifest.entries() { + if entry.is_alive() && self.deleted_file_paths.contains(entry.file_path()) { + let mut deleted: ManifestEntry = (**entry).clone(); + deleted.snapshot_id = Some(self.snapshot_id); + writer.add_deleted_entry(deleted)?; + } else { + let cloned: ManifestEntry = (**entry).clone(); + writer.add_existing_entry(cloned)?; + } + } + + writer.write_manifest_file().await + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + + use crate::spec::{ + DataContentType, DataFileBuilder, DataFileFormat, Literal, MAIN_BRANCH, ManifestStatus, + Operation, Struct, + }; + use crate::transaction::tests::make_v2_minimal_table; + use crate::transaction::{Transaction, TransactionAction}; + use crate::{TableRequirement, TableUpdate}; + + fn test_data_file(path: &str, partition_spec_id: i32) -> crate::spec::DataFile { + DataFileBuilder::default() + .content(DataContentType::Data) + .file_path(path.to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition_spec_id(partition_spec_id) + .partition(Struct::from_iter([Some(Literal::long(300))])) + .build() + .unwrap() + } + + #[tokio::test] + async fn test_empty_data_overwrite_action() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + let action = tx.overwrite().add_data_files(vec![]); + assert!(Arc::new(action).commit(&table).await.is_err()); + } + + #[tokio::test] + async fn test_overwrite_snapshot_properties() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + + let mut snapshot_properties = HashMap::new(); + snapshot_properties.insert("key".to_string(), "val".to_string()); + + let data_file = test_data_file( + "test/1.parquet", + table.metadata().default_partition_spec_id(), + ); + + let action = tx + .overwrite() + .set_snapshot_properties(snapshot_properties) + .add_data_files(vec![data_file]); + let mut action_commit = Arc::new(action).commit(&table).await.unwrap(); + let updates = action_commit.take_updates(); + + let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] { + snapshot + } else { + unreachable!() + }; + assert_eq!( + new_snapshot + .summary() + .additional_properties + .get("key") + .unwrap(), + "val" + ); + } + + #[tokio::test] + async fn test_overwrite_incompatible_partition_value() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + + let data_file = DataFileBuilder::default() + .content(DataContentType::Data) + .file_path("test/3.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter([Some(Literal::string("test"))])) + .build() + .unwrap(); + + let action = tx.overwrite().add_data_files(vec![data_file]); + assert!(Arc::new(action).commit(&table).await.is_err()); + } + + #[tokio::test] + async fn test_overwrite_basic() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + + let data_file = test_data_file( + "test/3.parquet", + table.metadata().default_partition_spec_id(), + ); + + let action = tx.overwrite().add_data_files(vec![data_file.clone()]); + let mut action_commit = Arc::new(action).commit(&table).await.unwrap(); + let updates = action_commit.take_updates(); + let requirements = action_commit.take_requirements(); + + assert!( + matches!((&updates[0],&updates[1]), (TableUpdate::AddSnapshot { snapshot },TableUpdate::SetSnapshotRef { reference,ref_name }) if snapshot.snapshot_id() == reference.snapshot_id && ref_name == MAIN_BRANCH) + ); + + assert_eq!( + vec![ + TableRequirement::UuidMatch { + uuid: table.metadata().uuid() + }, + TableRequirement::RefSnapshotIdMatch { + r#ref: MAIN_BRANCH.to_string(), + snapshot_id: table.metadata().current_snapshot_id + } + ], + requirements + ); + + let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] { + snapshot + } else { + unreachable!() + }; + assert_eq!(new_snapshot.summary().operation, Operation::Overwrite); + + let manifest_list = new_snapshot + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + assert_eq!(1, manifest_list.entries().len()); + assert_eq!( + manifest_list.entries()[0].sequence_number, + new_snapshot.sequence_number() + ); + + let manifest = manifest_list.entries()[0] + .load_manifest(table.file_io()) + .await + .unwrap(); + assert_eq!(1, manifest.entries().len()); + assert_eq!( + new_snapshot.sequence_number(), + manifest.entries()[0] + .sequence_number() + .expect("Inherit sequence number by load manifest") + ); + assert_eq!( + new_snapshot.snapshot_id(), + manifest.entries()[0].snapshot_id().unwrap() + ); + assert_eq!(data_file, *manifest.entries()[0].data_file()); + } + + #[tokio::test] + async fn test_overwrite_with_deleted_files() { + use crate::memory::tests::new_memory_catalog; + use crate::transaction::ApplyTransactionAction; + use crate::transaction::tests::make_v3_minimal_table_in_catalog; + + let catalog = new_memory_catalog().await; + let table = make_v3_minimal_table_in_catalog(&catalog).await; + let spec_id = table.metadata().default_partition_spec_id(); + + let original_file1 = test_data_file("test/original1.parquet", spec_id); + let original_file2 = test_data_file("test/original2.parquet", spec_id); + let tx = Transaction::new(&table); + let action = tx + .fast_append() + .add_data_files(vec![original_file1.clone(), original_file2.clone()]); + let tx = action.apply(tx).unwrap(); + let table = tx.commit(&catalog).await.unwrap(); + + let snapshot = table.metadata().current_snapshot().unwrap(); + let manifest_list = snapshot + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + assert_eq!(1, manifest_list.entries().len()); + + let replacement_file = test_data_file("test/replacement.parquet", spec_id); + let tx = Transaction::new(&table); + let action = tx + .overwrite() + .add_data_files(vec![replacement_file.clone()]) + .delete_data_files(vec![original_file1.clone(), original_file2.clone()]); + let tx = action.apply(tx).unwrap(); + let table = tx.commit(&catalog).await.unwrap(); + + let snapshot = table.metadata().current_snapshot().unwrap(); + assert_eq!(snapshot.summary().operation, Operation::Overwrite); + + let manifest_list = snapshot + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + + assert_eq!(2, manifest_list.entries().len()); + + let mut all_entries = vec![]; + for manifest_file in manifest_list.entries() { + let manifest = manifest_file.load_manifest(table.file_io()).await.unwrap(); + for entry in manifest.entries() { + all_entries.push((entry.status(), entry.file_path().to_string())); + } + } + + for original_file in ["test/original1.parquet", "test/original2.parquet"] { + assert!( + all_entries + .iter() + .any(|(status, path)| *status == ManifestStatus::Deleted + && path == original_file), + "Original file {original_file} should be marked as Deleted, entries: {all_entries:?}", + ); + } + + assert!( + all_entries + .iter() + .any(|(status, path)| *status == ManifestStatus::Added + && path == "test/replacement.parquet"), + "Replacement file should be marked as Added, entries: {all_entries:?}", + ); + + // Verify snapshot summary reports the deleted file. + assert_eq!( + snapshot + .summary() + .additional_properties + .get("deleted-data-files") + .map(|s| s.as_str()), + Some("2") + ); + assert_eq!( + snapshot + .summary() + .additional_properties + .get("deleted-records") + .map(|s| s.as_str()), + Some("2") + ); + + // Step 3: Fast append after overwrite — delete-only manifest must survive. + let appended_file = test_data_file("test/appended.parquet", spec_id); + let tx = Transaction::new(&table); + let action = tx.fast_append().add_data_files(vec![appended_file.clone()]); + let tx = action.apply(tx).unwrap(); + let table = tx.commit(&catalog).await.unwrap(); + + let snapshot = table.metadata().current_snapshot().unwrap(); + let manifest_list = snapshot + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + + // 3 manifests: rewritten (deleted entry), overwrite added, fast_append added. + assert_eq!(3, manifest_list.entries().len()); + + let mut all_entries = vec![]; + for manifest_file in manifest_list.entries() { + let manifest = manifest_file.load_manifest(table.file_io()).await.unwrap(); + for entry in manifest.entries() { + all_entries.push((entry.status(), entry.file_path().to_string())); + } + } + + // The deleted entry must still be present after fast_append. + for original_file in ["test/original1.parquet", "test/original2.parquet"] { + assert!( + all_entries + .iter() + .any(|(status, path)| *status == ManifestStatus::Deleted + && path == original_file), + "Deleted entry should survive fast_append, entries: {all_entries:?}", + ); + } + } +} diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index c8bf26a174..7ae52f0d6e 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -114,6 +114,7 @@ pub(crate) struct SnapshotProducer<'a> { key_metadata: Option>, snapshot_properties: HashMap, added_data_files: Vec, + deleted_data_files: Vec, // A counter used to generate unique manifest file names. // It starts from 0 and increments for each new manifest file. // Note: This counter is limited to the range of (0..u64::MAX). @@ -121,12 +122,17 @@ pub(crate) struct SnapshotProducer<'a> { } impl<'a> SnapshotProducer<'a> { + pub(crate) fn snapshot_id(&self) -> i64 { + self.snapshot_id + } + pub(crate) fn new( table: &'a Table, commit_uuid: Uuid, key_metadata: Option>, snapshot_properties: HashMap, added_data_files: Vec, + deleted_data_files: Vec, ) -> Self { Self { table, @@ -135,6 +141,7 @@ impl<'a> SnapshotProducer<'a> { key_metadata, snapshot_properties, added_data_files, + deleted_data_files, manifest_counter: (0..), } } @@ -329,7 +336,10 @@ impl<'a> SnapshotProducer<'a> { // TODO: Allowing snapshot property setup with no added data files is a workaround. // We should clean it up after all necessary actions are supported. // For details, please refer to https://github.com/apache/iceberg-rust/issues/1548 - if self.added_data_files.is_empty() && self.snapshot_properties.is_empty() { + if self.added_data_files.is_empty() + && self.deleted_data_files.is_empty() + && self.snapshot_properties.is_empty() + { return Err(Error::new( ErrorKind::PreconditionFailed, "No added data files or added snapshot properties found when write a manifest file", @@ -383,10 +393,15 @@ impl<'a> SnapshotProducer<'a> { ); } - let previous_snapshot = table_metadata - .snapshot_by_id(self.snapshot_id) - .and_then(|snapshot| snapshot.parent_snapshot_id()) - .and_then(|parent_id| table_metadata.snapshot_by_id(parent_id)); + for data_file in &self.deleted_data_files { + summary_collector.remove_file( + data_file, + table_metadata.current_schema().clone(), + table_metadata.default_partition_spec().clone(), + ); + } + + let previous_snapshot = table_metadata.current_snapshot(); let mut additional_properties = summary_collector.build(); additional_properties.extend(self.snapshot_properties.clone()); diff --git a/crates/integrations/datafusion/src/physical_plan/delete.rs b/crates/integrations/datafusion/src/physical_plan/delete.rs new file mode 100644 index 0000000000..7902f3120a --- /dev/null +++ b/crates/integrations/datafusion/src/physical_plan/delete.rs @@ -0,0 +1,320 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Execution plan for partition-aligned DELETE on an Iceberg table. +//! +//! Only deletes whose filter aligns with partition boundaries are accepted: +//! every data file must be either wholly covered by the predicate (and is +//! dropped) or wholly untouched by it. A file that the predicate straddles +//! produces a hard error — the caller must use a column/row-level delete +//! strategy for that shape of predicate, which this plan does not implement. + +use std::any::Any; +use std::fmt::{Debug, Formatter}; +use std::sync::Arc; + +use datafusion::arrow::array::{ArrayRef, RecordBatch, UInt64Array}; +use datafusion::arrow::datatypes::{ + DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, +}; +use datafusion::common::{DataFusionError, Result as DFResult}; +use datafusion::execution::{SendableRecordBatchStream, TaskContext}; +use datafusion::logical_expr::Expr; +use datafusion::physical_expr::{EquivalenceProperties, Partitioning}; +use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; +use futures::StreamExt; +use iceberg::expr::{PartitionCoverage, PartitionCoverageFilter, Predicate}; +use iceberg::spec::{DataContentType, DataFile, PartitionSpecRef}; +use iceberg::transaction::{ApplyTransactionAction, Transaction}; +use iceberg::{Catalog, Error, ErrorKind, TableIdent}; + +use crate::physical_plan::expr_to_predicate::convert_filters_to_predicate_strict; +use crate::to_datafusion_error; + +/// Execution plan for partition-aligned DELETE. Has no child plan — all work +/// happens inside [`execute`](ExecutionPlan::execute): load fresh table state, +/// classify files, commit an [`OverwriteAction`](iceberg::transaction::OverwriteAction) +/// that drops the fully-covered files, and emit a single `count` row. +#[derive(Debug)] +pub(crate) struct IcebergDeleteExec { + catalog: Arc, + table_ident: TableIdent, + filters: Vec, + count_schema: ArrowSchemaRef, + plan_properties: Arc, +} + +impl IcebergDeleteExec { + pub fn new(catalog: Arc, table_ident: TableIdent, filters: Vec) -> Self { + let count_schema = Self::make_count_schema(); + let plan_properties = Arc::new(PlanProperties::new( + EquivalenceProperties::new(count_schema.clone()), + Partitioning::UnknownPartitioning(1), + EmissionType::Final, + Boundedness::Bounded, + )); + Self { + catalog, + table_ident, + filters, + count_schema, + plan_properties, + } + } + + fn make_count_schema() -> ArrowSchemaRef { + Arc::new(ArrowSchema::new(vec![Field::new( + "count", + DataType::UInt64, + false, + )])) + } + + fn make_count_batch(count: u64) -> DFResult { + let array = Arc::new(UInt64Array::from(vec![count])) as ArrayRef; + RecordBatch::try_from_iter_with_nullable(vec![("count", array, false)]).map_err(|e| { + DataFusionError::ArrowError( + Box::new(e), + Some("Failed to build DELETE count batch".to_string()), + ) + }) + } +} + +impl DisplayAs for IcebergDeleteExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::TreeRender => { + write!(f, "IcebergDeleteExec: table={}", self.table_ident) + } + DisplayFormatType::Verbose => write!( + f, + "IcebergDeleteExec: table={}, filters={:?}", + self.table_ident, self.filters + ), + } + } +} + +impl ExecutionPlan for IcebergDeleteExec { + fn name(&self) -> &str { + "IcebergDeleteExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &Arc { + &self.plan_properties + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> DFResult> { + if !children.is_empty() { + return Err(DataFusionError::Internal(format!( + "IcebergDeleteExec expects no children, got {}", + children.len() + ))); + } + Ok(self) + } + + fn execute( + &self, + partition: usize, + _context: Arc, + ) -> DFResult { + if partition != 0 { + return Err(DataFusionError::Internal(format!( + "IcebergDeleteExec only has one partition, got partition {partition}" + ))); + } + + let catalog = self.catalog.clone(); + let table_ident = self.table_ident.clone(); + let filters = self.filters.clone(); + + let stream = futures::stream::once(async move { + let deleted = run_partition_aligned_delete(catalog.as_ref(), &table_ident, &filters) + .await + .map_err(to_datafusion_error)?; + Self::make_count_batch(deleted) + }) + .boxed(); + + Ok(Box::pin(RecordBatchStreamAdapter::new( + self.count_schema.clone(), + stream, + ))) + } +} + +/// Run a partition-aligned DELETE against the table identified by `table_ident`. +/// Returns the number of rows affected. +async fn run_partition_aligned_delete( + catalog: &dyn Catalog, + table_ident: &TableIdent, + filters: &[Expr], +) -> iceberg::Result { + // 1. Convert DataFusion filters to a single Iceberg predicate. Empty filters + // means "DELETE everything" (effectively TRUNCATE); any unconvertible + // filter errors out. + let predicate = convert_filters_to_predicate_strict(filters).map_err(|msg| { + Error::new( + ErrorKind::FeatureUnsupported, + format!( + "DELETE filter cannot be pushed into Iceberg: {msg}. \ + This DELETE would have to be evaluated row-by-row, which \ + partition-aligned delete does not support." + ), + ) + })?; + + // 2. Load fresh table state. + let table = catalog.load_table(table_ident).await?; + let Some(snapshot) = table.metadata().current_snapshot() else { + // Empty table — nothing to delete. + return Ok(0); + }; + let schema = snapshot.schema(table.metadata())?; + + // 3. Classify each data file in the current snapshot. + let manifest_list = snapshot + .load_manifest_list(table.file_io(), table.metadata()) + .await?; + + let mut filters_by_spec: std::collections::HashMap = + std::collections::HashMap::new(); + let mut to_delete: Vec = Vec::new(); + let mut total_record_count: u64 = 0; + + for manifest_file in manifest_list.entries() { + if manifest_file.content != iceberg::spec::ManifestContentType::Data { + // Skip delete-file manifests — we only touch data files here. + continue; + } + + let manifest = manifest_file.load_manifest(table.file_io()).await?; + for entry in manifest.entries() { + if !entry.is_alive() { + continue; + } + if entry.data_file().content_type() != DataContentType::Data { + continue; + } + + let data_file = entry.data_file(); + let spec_id = data_file.partition_spec_id(); + + let spec_filter = match filters_by_spec.get(&spec_id) { + Some(f) => f, + None => { + let partition_spec = table + .metadata() + .partition_spec_by_id(spec_id) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Data file {} references partition spec {} which is not in table metadata", + data_file.file_path(), + spec_id + ), + ) + })? + .clone(); + let filter = build_spec_filter(predicate.as_ref(), &schema, partition_spec)?; + filters_by_spec.entry(spec_id).or_insert(filter) + } + }; + + match spec_filter.classify(data_file)? { + PartitionCoverage::AllRowsMatch => { + total_record_count += data_file.record_count(); + to_delete.push(data_file.clone()); + } + PartitionCoverage::NoRowsMatch => {} + PartitionCoverage::Straddle => { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + format!( + "DELETE predicate does not align with partition boundaries: \ + data file {} would need row-level filtering. \ + Partition-aligned delete only supports predicates that \ + drop whole files.", + data_file.file_path() + ), + )); + } + } + } + } + + if to_delete.is_empty() { + return Ok(0); + } + + // 4. Commit the overwrite (with no added files) via the catalog. + let tx = Transaction::new(&table); + let action = tx.overwrite().delete_data_files(to_delete); + action.apply(tx)?.commit(catalog).await?; + + Ok(total_record_count) +} + +/// A [`PartitionCoverageFilter`] paired with metadata that the caller can use +/// to route files to the right instance (tables with evolved partition specs +/// have more than one). +struct SpecFilter { + filter: PartitionCoverageFilter, +} + +impl SpecFilter { + fn classify(&self, data_file: &DataFile) -> iceberg::Result { + self.filter.classify(data_file) + } +} + +fn build_spec_filter( + predicate: Option<&Predicate>, + schema: &iceberg::spec::SchemaRef, + partition_spec: PartitionSpecRef, +) -> iceberg::Result { + match predicate { + // Empty filter: every file is fully covered regardless of spec. + None => Ok(SpecFilter { + filter: PartitionCoverageFilter::try_new( + &Predicate::AlwaysTrue, + schema.clone(), + partition_spec, + true, + )?, + }), + Some(p) => Ok(SpecFilter { + filter: PartitionCoverageFilter::try_new(p, schema.clone(), partition_spec, true)?, + }), + } +} diff --git a/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs b/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs index 17c9416d54..29ed9e7351 100644 --- a/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs +++ b/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs @@ -49,6 +49,62 @@ pub fn convert_filters_to_predicate(filters: &[Expr]) -> Option { .reduce(Predicate::and) } +/// Strict variant of [`convert_filters_to_predicate`]: returns an error if any +/// filter (or any sub-expression inside it) cannot be represented as an Iceberg +/// predicate. +/// +/// The permissive variant is safe for scan pushdown (unconverted clauses stay in +/// the DataFusion filter). For row-level DML it is not: silently dropping a +/// clause from the predicate would delete/update rows the user didn't ask to +/// touch. +pub fn convert_filters_to_predicate_strict( + filters: &[Expr], +) -> std::result::Result, String> { + let mut combined: Option = None; + for expr in filters { + let predicate = convert_filter_to_predicate_strict(expr)?; + combined = Some(match combined { + Some(existing) => existing.and(predicate), + None => predicate, + }); + } + Ok(combined) +} + +fn convert_filter_to_predicate_strict(expr: &Expr) -> std::result::Result { + // Logical connectives are walked recursively so that silent drops in + // `to_iceberg_and_predicate` / `to_iceberg_or_predicate` can't hide a + // non-convertible sub-expression. + if let Expr::BinaryExpr(b) = expr { + match b.op { + Operator::And => { + return Ok(convert_filter_to_predicate_strict(&b.left)? + .and(convert_filter_to_predicate_strict(&b.right)?)); + } + Operator::Or => { + return Ok(convert_filter_to_predicate_strict(&b.left)? + .or(convert_filter_to_predicate_strict(&b.right)?)); + } + _ => {} + } + } + if let Expr::Not(inner) = expr { + return Ok(!convert_filter_to_predicate_strict(inner)?); + } + + match to_iceberg_predicate(expr) { + TransformedResult::Predicate(p) => Ok(p), + TransformedResult::Column(column) => Ok(Predicate::Binary(BinaryExpression::new( + PredicateOperator::Eq, + column, + Datum::bool(true), + ))), + TransformedResult::Literal(_) | TransformedResult::NotTransformed => Err(format!( + "Expression `{expr}` cannot be represented as an Iceberg predicate" + )), + } +} + fn convert_filter_to_predicate(expr: &Expr) -> Option { match to_iceberg_predicate(expr) { TransformedResult::Predicate(predicate) => Some(predicate), diff --git a/crates/integrations/datafusion/src/physical_plan/mod.rs b/crates/integrations/datafusion/src/physical_plan/mod.rs index aeac30de32..ad1d5dbea8 100644 --- a/crates/integrations/datafusion/src/physical_plan/mod.rs +++ b/crates/integrations/datafusion/src/physical_plan/mod.rs @@ -16,6 +16,7 @@ // under the License. pub(crate) mod commit; +pub(crate) mod delete; pub(crate) mod expr_to_predicate; pub(crate) mod metadata_scan; pub(crate) mod project; @@ -26,6 +27,6 @@ pub(crate) mod write; pub(crate) const DATA_FILES_COL_NAME: &str = "data_files"; -pub use expr_to_predicate::convert_filters_to_predicate; +pub use expr_to_predicate::{convert_filters_to_predicate, convert_filters_to_predicate_strict}; pub use project::project_with_partition; pub use scan::IcebergTableScan; diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 75b7988d8d..1457898da4 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -51,6 +51,7 @@ use metadata_table::IcebergMetadataTableProvider; use crate::error::to_datafusion_error; use crate::physical_plan::commit::IcebergCommitExec; +use crate::physical_plan::delete::IcebergDeleteExec; use crate::physical_plan::project::project_with_partition; use crate::physical_plan::repartition::repartition; use crate::physical_plan::scan::IcebergTableScan; @@ -233,6 +234,18 @@ impl TableProvider for IcebergTableProvider { self.schema.clone(), ))) } + + async fn delete_from( + &self, + _state: &dyn Session, + filters: Vec, + ) -> DFResult> { + Ok(Arc::new(IcebergDeleteExec::new( + self.catalog.clone(), + self.table_ident.clone(), + filters, + ))) + } } /// Static table provider for read-only snapshot access. @@ -346,6 +359,19 @@ impl TableProvider for IcebergStaticTableProvider { .to_string(), ))) } + + async fn delete_from( + &self, + _state: &dyn Session, + _filters: Vec, + ) -> DFResult> { + Err(to_datafusion_error(Error::new( + ErrorKind::FeatureUnsupported, + "DELETE is not supported on IcebergStaticTableProvider. \ + Use IcebergTableProvider with a catalog for write support." + .to_string(), + ))) + } } #[cfg(test)] @@ -865,4 +891,166 @@ mod tests { "Limit should be None when not specified" ); } + + // --- partition-aligned delete_from tests -------------------------------- + + async fn count_rows(ctx: &SessionContext, table: &str) -> u64 { + use datafusion::arrow::array::Int64Array; + let df = ctx + .sql(&format!("SELECT count(*) FROM {table}")) + .await + .unwrap(); + let batches = df.collect().await.unwrap(); + let arr = batches[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + arr.value(0) as u64 + } + + async fn collected_delete_count(df: datafusion::dataframe::DataFrame) -> u64 { + use datafusion::arrow::array::UInt64Array; + let batches = df.collect().await.unwrap(); + let arr = batches[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + arr.value(0) + } + + async fn register_partitioned_table(ctx: &SessionContext) -> (Arc, TempDir) { + let (catalog, namespace, table_name, temp_dir) = + get_partitioned_test_catalog_and_table(Some(true)).await; + let provider = IcebergTableProvider::try_new(catalog.clone(), namespace, table_name) + .await + .unwrap(); + ctx.register_table("t", Arc::new(provider)).unwrap(); + (catalog, temp_dir) + } + + #[tokio::test] + async fn test_delete_partition_aligned_drops_whole_partition() { + let ctx = SessionContext::new(); + let (_catalog, _temp_dir) = register_partitioned_table(&ctx).await; + + // 6 rows across 3 partitions: a (2), b (2), c (2). + ctx.sql( + "INSERT INTO t VALUES \ + (1, 'a'), (2, 'a'), (3, 'b'), (4, 'b'), (5, 'c'), (6, 'c')", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + assert_eq!(count_rows(&ctx, "t").await, 6); + + let df = ctx.sql("DELETE FROM t WHERE category = 'b'").await.unwrap(); + let deleted = collected_delete_count(df).await; + assert_eq!(deleted, 2, "should report 2 rows deleted"); + + assert_eq!(count_rows(&ctx, "t").await, 4); + + // Surviving partitions' data is untouched. + let df = ctx + .sql("SELECT id FROM t WHERE category = 'a' ORDER BY id") + .await + .unwrap(); + let batches = df.collect().await.unwrap(); + assert_eq!(batches[0].num_rows(), 2); + } + + #[tokio::test] + async fn test_delete_straddling_predicate_hard_errors() { + let ctx = SessionContext::new(); + let (_catalog, _temp_dir) = register_partitioned_table(&ctx).await; + + ctx.sql("INSERT INTO t VALUES (1, 'a'), (2, 'a'), (3, 'b')") + .await + .unwrap() + .collect() + .await + .unwrap(); + + // `id` is not a partition column — the predicate does not align. + let df = ctx.sql("DELETE FROM t WHERE id = 1").await.unwrap(); + let err = df.collect().await.expect_err( + "DELETE on non-partition column must error (predicate straddles data files)", + ); + let msg = err.to_string(); + assert!( + msg.contains("partition") && msg.contains("align"), + "expected alignment error, got: {msg}" + ); + + // State is unchanged — the error must happen before any commit. + assert_eq!(count_rows(&ctx, "t").await, 3); + } + + #[tokio::test] + async fn test_delete_unpartitioned_full_truncate() { + let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await; + let provider = IcebergTableProvider::try_new(catalog.clone(), namespace, table_name) + .await + .unwrap(); + let ctx = SessionContext::new(); + ctx.register_table("t", Arc::new(provider)).unwrap(); + + ctx.sql("INSERT INTO t VALUES (1, 'x'), (2, 'y'), (3, 'z')") + .await + .unwrap() + .collect() + .await + .unwrap(); + assert_eq!(count_rows(&ctx, "t").await, 3); + + // Unfiltered DELETE on an unpartitioned table is always partition-aligned: + // every file is wholly covered by AlwaysTrue. + let df = ctx.sql("DELETE FROM t").await.unwrap(); + let deleted = collected_delete_count(df).await; + assert_eq!(deleted, 3); + assert_eq!(count_rows(&ctx, "t").await, 0); + } + + #[tokio::test] + async fn test_delete_unpartitioned_with_predicate_errors() { + let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await; + let provider = IcebergTableProvider::try_new(catalog.clone(), namespace, table_name) + .await + .unwrap(); + let ctx = SessionContext::new(); + ctx.register_table("t", Arc::new(provider)).unwrap(); + + ctx.sql("INSERT INTO t VALUES (1, 'x')") + .await + .unwrap() + .collect() + .await + .unwrap(); + + // Any predicate on an unpartitioned table is a straddle by construction. + let df = ctx.sql("DELETE FROM t WHERE id = 1").await.unwrap(); + let err = df.collect().await.expect_err("must error"); + assert!( + err.to_string().contains("partition"), + "expected alignment error, got: {err}" + ); + assert_eq!(count_rows(&ctx, "t").await, 1); + } + + #[tokio::test] + async fn test_delete_empty_table_returns_zero() { + let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await; + let provider = IcebergTableProvider::try_new(catalog.clone(), namespace, table_name) + .await + .unwrap(); + let ctx = SessionContext::new(); + ctx.register_table("t", Arc::new(provider)).unwrap(); + + let df = ctx.sql("DELETE FROM t").await.unwrap(); + let deleted = collected_delete_count(df).await; + assert_eq!(deleted, 0); + } }