diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 08d4032409..bdef13e97b 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -122,7 +122,7 @@ impl SnapshotProduceOperation for FastAppendOperation { async fn existing_manifest( &self, - snapshot_produce: &SnapshotProducer<'_>, + snapshot_produce: &mut SnapshotProducer<'_>, ) -> Result> { let Some(snapshot) = snapshot_produce.table.metadata().current_snapshot() else { return Ok(vec![]); diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index cb2ff7cf37..4257ac8ab5 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -54,6 +54,7 @@ mod action; pub use action::*; mod append; +mod row_delta; mod snapshot; mod sort_order; mod update_location; @@ -71,6 +72,7 @@ use crate::spec::TableProperties; use crate::table::Table; use crate::transaction::action::BoxedTransactionAction; use crate::transaction::append::FastAppendAction; +use crate::transaction::row_delta::RowDeltaAction; use crate::transaction::sort_order::ReplaceSortOrderAction; use crate::transaction::update_location::UpdateLocationAction; use crate::transaction::update_properties::UpdatePropertiesAction; @@ -141,6 +143,16 @@ impl Transaction { FastAppendAction::new() } + /// Creates a row delta action for row-level modifications. + /// + /// RowDelta supports: + /// - Adding new data files (inserts) + /// - Removing data files (deletes in Copy-on-Write (COW) mode) + /// - Both operations in a single transaction (updates/merges) + pub fn row_delta(&self) -> RowDeltaAction { + RowDeltaAction::new() + } + /// Creates replace sort order action. pub fn replace_sort_order(&self) -> ReplaceSortOrderAction { ReplaceSortOrderAction::new() diff --git a/crates/iceberg/src/transaction/row_delta.rs b/crates/iceberg/src/transaction/row_delta.rs new file mode 100644 index 0000000000..b36750e34d --- /dev/null +++ b/crates/iceberg/src/transaction/row_delta.rs @@ -0,0 +1,507 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use async_trait::async_trait; +use uuid::Uuid; + +use crate::error::Result; +use crate::spec::{DataFile, ManifestContentType, ManifestEntry, ManifestFile, Operation}; +use crate::table::Table; +use crate::transaction::snapshot::{ + DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer, +}; +use crate::transaction::{ActionCommit, TransactionAction}; + +/// Transaction action for Copy-on-Write row-level modifications (UPDATE, DELETE, MERGE INTO). +/// +/// Corresponds to `org.apache.iceberg.RowDelta` in the Java implementation. +pub struct RowDeltaAction { + added_data_files: Vec, + removed_data_files: Vec, + /// Reserved for future Merge-on-Read support; calling `add_delete_files` currently errors. + added_delete_files: Vec, + commit_uuid: Option, + snapshot_properties: HashMap, + starting_snapshot_id: Option, +} + +impl RowDeltaAction { + pub(crate) fn new() -> Self { + Self { + added_data_files: vec![], + removed_data_files: vec![], + added_delete_files: vec![], + commit_uuid: None, + snapshot_properties: HashMap::default(), + starting_snapshot_id: None, + } + } + + /// Add new data files (INSERT rows or Copy-on-Write rewritten files). + pub fn add_data_files(mut self, data_files: impl IntoIterator) -> Self { + self.added_data_files.extend(data_files); + self + } + + /// Mark existing data files as deleted (Copy-on-Write mode). + /// + /// Corresponds to `removeRows(DataFile)` in the Java implementation. + pub fn remove_data_files(mut self, data_files: impl IntoIterator) -> Self { + self.removed_data_files.extend(data_files); + self + } + + /// Reserved for future Merge-on-Read support — currently returns an error on commit. + /// + /// Once MoR is implemented, this will write position/equality delete files instead of + /// rewriting data files. + pub fn add_delete_files(mut self, delete_files: impl IntoIterator) -> Self { + self.added_delete_files.extend(delete_files); + self + } + + /// Set the commit UUID used for manifest file naming. + pub fn set_commit_uuid(mut self, commit_uuid: Uuid) -> Self { + self.commit_uuid = Some(commit_uuid); + self + } + + /// Attach custom key/value metadata to the snapshot summary. + pub fn set_snapshot_properties(mut self, snapshot_properties: HashMap) -> Self { + self.snapshot_properties = snapshot_properties; + self + } + + /// Reject the commit if the table has advanced past `snapshot_id` (optimistic concurrency). + pub fn validate_from_snapshot(mut self, snapshot_id: i64) -> Self { + self.starting_snapshot_id = Some(snapshot_id); + self + } +} + +#[async_trait] +impl TransactionAction for RowDeltaAction { + async fn commit(self: Arc, table: &Table) -> Result { + if !self.added_delete_files.is_empty() { + return Err(crate::Error::new( + crate::ErrorKind::FeatureUnsupported, + "add_delete_files is not yet implemented; Merge-on-Read support is pending. \ + Use remove_data_files for Copy-on-Write deletes instead.", + )); + } + + if let Some(expected_snapshot_id) = self.starting_snapshot_id + && table.metadata().current_snapshot_id() != Some(expected_snapshot_id) + { + return Err(crate::Error::new( + crate::ErrorKind::DataInvalid, + format!( + "Cannot commit RowDelta based on stale snapshot. Expected: {}, Current: {:?}", + expected_snapshot_id, + table.metadata().current_snapshot_id() + ), + )); + } + + let snapshot_producer = SnapshotProducer::new( + table, + self.commit_uuid.unwrap_or_else(Uuid::now_v7), + None, + self.snapshot_properties.clone(), + self.added_data_files.clone(), + ); + + // Validate newly added data files (partition value type-checks, etc.). + // removed_data_files are not re-validated: they are existing table files that were + // already validated when originally committed. This matches Java's MergingSnapshotProducer. + snapshot_producer.validate_added_data_files()?; + + let operation = RowDeltaOperation { + removed_data_files: self.removed_data_files.clone(), + }; + + snapshot_producer + .commit(operation, DefaultManifestProcess) + .await + } +} + +struct RowDeltaOperation { + removed_data_files: Vec, +} + +impl SnapshotProduceOperation for RowDeltaOperation { + /// Operation type based on Java `BaseRowDelta.operation()`: + /// - No removes → `Append` + /// - Any removes → `Overwrite` + /// + /// `Operation::Delete` (MoR-only delete files, no data file changes) is deferred until + /// Merge-on-Read is wired up. + fn operation(&self) -> Operation { + if self.removed_data_files.is_empty() { + Operation::Append + } else { + Operation::Overwrite + } + } + + /// Delete entries are handled inside `existing_manifest` by rewriting the manifest. + async fn delete_entries( + &self, + _snapshot_produce: &SnapshotProducer<'_>, + ) -> Result> { + Ok(vec![]) + } + + /// Returns manifest files for the new snapshot. + /// + /// For each manifest in the previous snapshot: + /// - If it contains any file being removed: rewrite it with DELETED entries for removed files + /// and EXISTING entries for survivors, preserving original sequence numbers. + /// - Otherwise: carry it forward unchanged. + /// + /// This matches Java's `ManifestFilterManager.filterManifestWithDeletedFiles` logic. + async fn existing_manifest( + &self, + snapshot_produce: &mut SnapshotProducer<'_>, + ) -> Result> { + let Some(snapshot) = snapshot_produce.table.metadata().current_snapshot() else { + return Ok(vec![]); + }; + + let manifest_list = snapshot + .load_manifest_list( + snapshot_produce.table.file_io(), + &snapshot_produce.table.metadata_ref(), + ) + .await?; + + let deleted_paths: HashSet<&str> = self + .removed_data_files + .iter() + .map(|f| f.file_path()) + .collect(); + + let mut result = Vec::new(); + for manifest_file in manifest_list.entries() { + if !manifest_file.has_added_files() && !manifest_file.has_existing_files() { + continue; + } + + let manifest = manifest_file + .load_manifest(snapshot_produce.table.file_io()) + .await?; + + let needs_rewrite = manifest + .entries() + .iter() + .any(|e| e.is_alive() && deleted_paths.contains(e.data_file().file_path())); + + if !needs_rewrite { + result.push(manifest_file.clone()); + continue; + } + + // Rewrite: deleted files → DELETED (new snapshot_id, original seq nums preserved), + // surviving files → EXISTING (all original fields preserved). + let mut writer = + snapshot_produce.new_manifest_writer(ManifestContentType::Data)?; + for entry in manifest.entries() { + if deleted_paths.contains(entry.data_file().file_path()) { + writer.add_delete_entry((**entry).clone())?; + } else { + writer.add_existing_entry((**entry).clone())?; + } + } + result.push(writer.write_manifest_file().await?); + } + + Ok(result) + } + + fn removed_data_files(&self) -> &[DataFile] { + &self.removed_data_files + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use crate::spec::{ + DataContentType, DataFile, DataFileBuilder, DataFileFormat, Literal, MAIN_BRANCH, + ManifestStatus, Struct, TableMetadataBuilder, + }; + use crate::table::Table; + use crate::transaction::tests::make_v2_minimal_table; + use crate::transaction::{Transaction, TransactionAction}; + use crate::{TableIdent, TableUpdate}; + + fn make_data_file(table: &Table, path: &str, size: u64) -> DataFile { + DataFileBuilder::default() + .content(DataContentType::Data) + .file_path(path.to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(size) + .record_count(10) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter([Some(Literal::long(100))])) + .build() + .unwrap() + } + + /// Build a table that has `snapshot` as its current snapshot, backed by the same FileIO. + async fn table_with_snapshot(base: &Table, snapshot: crate::spec::Snapshot) -> Table { + let updated_metadata = TableMetadataBuilder::new_from_metadata( + base.metadata_ref().as_ref().clone(), + None, + ) + .set_branch_snapshot(snapshot, MAIN_BRANCH) + .unwrap() + .build() + .unwrap() + .metadata; + + Table::builder() + .metadata(updated_metadata) + .metadata_location("s3://bucket/test/location/metadata/v2.json".to_string()) + .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap()) + .file_io(base.file_io().clone()) + .build() + .unwrap() + } + + #[tokio::test] + async fn test_row_delta_add_only() { + let table = make_v2_minimal_table(); + let data_file = make_data_file(&table, "test/1.parquet", 100); + let action = Transaction::new(&table) + .row_delta() + .add_data_files(vec![data_file]); + + let mut commit = Arc::new(action).commit(&table).await.unwrap(); + let updates = commit.take_updates(); + + if let TableUpdate::AddSnapshot { snapshot } = &updates[0] { + assert_eq!(snapshot.summary().operation, crate::spec::Operation::Append); + } else { + panic!("expected AddSnapshot"); + } + } + + #[tokio::test] + async fn test_row_delta_with_snapshot_properties() { + let table = make_v2_minimal_table(); + let data_file = make_data_file(&table, "test/1.parquet", 100); + let mut props = std::collections::HashMap::new(); + props.insert("key".to_string(), "value".to_string()); + let action = Transaction::new(&table) + .row_delta() + .set_snapshot_properties(props) + .add_data_files(vec![data_file]); + + let mut commit = Arc::new(action).commit(&table).await.unwrap(); + let updates = commit.take_updates(); + + if let TableUpdate::AddSnapshot { snapshot } = &updates[0] { + assert_eq!( + snapshot.summary().additional_properties.get("key").unwrap(), + "value" + ); + } else { + panic!("expected AddSnapshot"); + } + } + + #[tokio::test] + async fn test_row_delta_validate_from_snapshot() { + let table = make_v2_minimal_table(); + let data_file = make_data_file(&table, "test/1.parquet", 100); + let action = Transaction::new(&table) + .row_delta() + .validate_from_snapshot(99999) + .add_data_files(vec![data_file]); + + let result = Arc::new(action).commit(&table).await; + match result { + Ok(_) => panic!("expected DataInvalid error for stale snapshot"), + Err(e) => assert_eq!(e.kind(), crate::ErrorKind::DataInvalid), + } + } + + #[tokio::test] + async fn test_row_delta_empty_action() { + let table = make_v2_minimal_table(); + assert!( + Arc::new(Transaction::new(&table).row_delta()) + .commit(&table) + .await + .is_err() + ); + } + + #[tokio::test] + async fn test_row_delta_incompatible_partition_value() { + let table = make_v2_minimal_table(); + let bad_file = DataFileBuilder::default() + .content(DataContentType::Data) + .file_path("test/bad.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(10) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter([Some(Literal::string("wrong"))])) + .build() + .unwrap(); + let action = Transaction::new(&table) + .row_delta() + .add_data_files(vec![bad_file]); + assert!(Arc::new(action).commit(&table).await.is_err()); + } + + #[tokio::test] + async fn test_row_delta_add_delete_files_errors() { + let table = make_v2_minimal_table(); + let file = make_data_file(&table, "test/delete.parquet", 100); + let action = Transaction::new(&table) + .row_delta() + .add_delete_files(vec![file]); + let result = Arc::new(action).commit(&table).await; + match result { + Ok(_) => panic!("expected FeatureUnsupported"), + Err(e) => assert_eq!(e.kind(), crate::ErrorKind::FeatureUnsupported), + } + } + + /// End-to-end CoW test: append two files, then remove one via RowDelta. + /// + /// Verifies: + /// - The removed file appears as DELETED with correct sequence numbers. + /// - The surviving file appears as EXISTING with correct sequence numbers. + /// - The new file appears as ADDED. + /// - The snapshot summary counts `deleted-data-files = 1`. + #[tokio::test] + async fn test_row_delta_cow_manifest_rewrite() { + let base_table = make_v2_minimal_table(); + + // --- S1: append file-A and file-B --- + let file_a = make_data_file(&base_table, "test/a.parquet", 100); + let file_b = make_data_file(&base_table, "test/b.parquet", 200); + + let action1 = Transaction::new(&base_table) + .fast_append() + .add_data_files(vec![file_a.clone(), file_b.clone()]); + let mut commit1 = Arc::new(action1).commit(&base_table).await.unwrap(); + let updates1 = commit1.take_updates(); + + let snapshot_s1 = if let TableUpdate::AddSnapshot { snapshot } = updates1.into_iter().next().unwrap() { + snapshot + } else { + panic!("expected AddSnapshot"); + }; + + let table_s1 = table_with_snapshot(&base_table, snapshot_s1).await; + + // --- S2: remove file-A (CoW), add file-C --- + let file_c = make_data_file(&table_s1, "test/c.parquet", 300); + let action2 = Transaction::new(&table_s1) + .row_delta() + .remove_data_files(vec![file_a.clone()]) + .add_data_files(vec![file_c.clone()]); + let mut commit2 = Arc::new(action2).commit(&table_s1).await.unwrap(); + let updates2 = commit2.take_updates(); + + let snapshot_s2 = if let TableUpdate::AddSnapshot { ref snapshot } = updates2[0] { + snapshot + } else { + panic!("expected AddSnapshot"); + }; + + assert_eq!( + snapshot_s2.summary().operation, + crate::spec::Operation::Overwrite + ); + + // Verify snapshot summary metrics + let props = &snapshot_s2.summary().additional_properties; + assert_eq!( + props.get("deleted-data-files").map(String::as_str), + Some("1"), + "summary should count 1 deleted file" + ); + + // Scan all manifest entries in S2 + let manifest_list = snapshot_s2 + .load_manifest_list(table_s1.file_io(), table_s1.metadata()) + .await + .unwrap(); + + let mut found_deleted_a = false; + let mut found_existing_b = false; + let mut found_added_c = false; + + for manifest_file in manifest_list.entries() { + let manifest = manifest_file + .load_manifest(table_s1.file_io()) + .await + .unwrap(); + for entry in manifest.entries() { + match entry.data_file().file_path() { + "test/a.parquet" => { + assert_eq!( + entry.status(), + ManifestStatus::Deleted, + "file-A must be DELETED" + ); + assert!( + entry.sequence_number().is_some(), + "DELETED entry must have sequence number" + ); + assert!( + entry.file_sequence_number.is_some(), + "DELETED entry must have file sequence number" + ); + found_deleted_a = true; + } + "test/b.parquet" => { + assert_eq!( + entry.status(), + ManifestStatus::Existing, + "file-B must be EXISTING" + ); + assert!( + entry.sequence_number().is_some(), + "EXISTING entry must have sequence number" + ); + found_existing_b = true; + } + "test/c.parquet" => { + found_added_c = true; + } + other => panic!("unexpected file in S2 manifests: {other}"), + } + } + } + + assert!(found_deleted_a, "file-A should have a DELETED entry in S2"); + assert!(found_existing_b, "file-B should have an EXISTING entry in S2"); + assert!(found_added_c, "file-C should have an ADDED entry in S2"); + } +} diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index c8bf26a174..76c761fb27 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -60,10 +60,6 @@ const META_ROOT_PATH: &str = "metadata"; /// 3. **Delete Entry Processing**: The `delete_entries()` method is intended for future delete /// operations to specify which manifest entries should be marked as deleted. pub(crate) trait SnapshotProduceOperation: Send + Sync { - /// Returns the operation type that will be recorded in the snapshot summary. - /// - /// This determines what kind of operation is being performed (e.g., `Append`, `Overwrite`), - /// which is stored in the snapshot metadata for tracking and auditing purposes. fn operation(&self) -> Operation; /// Returns manifest entries that should be marked as deleted in the new snapshot. @@ -73,18 +69,21 @@ pub(crate) trait SnapshotProduceOperation: Send + Sync { snapshot_produce: &SnapshotProducer, ) -> impl Future>> + Send; - /// Returns existing manifest files that should be included in the new snapshot. - /// - /// This method determines which manifest files from the current snapshot should be - /// carried forward to the new snapshot. The selection depends on the operation type: + /// Returns existing manifest files to carry forward (or rewrite) into the new snapshot. /// - /// - **Append operations**: Typically include all existing manifests - /// - **Overwrite operations**: May exclude manifests for partitions being overwritten - /// - **Delete operations**: May exclude manifests for partitions being deleted + /// Implementations that need to delete specific files within a manifest should rewrite that + /// manifest (DELETED + EXISTING entries) and return the rewritten `ManifestFile` here. + /// `&mut SnapshotProducer` is provided so that implementations can call + /// `snapshot_produce.new_manifest_writer()` to produce the rewritten manifest. fn existing_manifest( &self, - snapshot_produce: &SnapshotProducer<'_>, + snapshot_produce: &mut SnapshotProducer<'_>, ) -> impl Future>> + Send; + + /// Data files being removed in this operation (used for snapshot summary metrics). + fn removed_data_files(&self) -> &[DataFile] { + &[] + } } pub(crate) struct DefaultManifestProcess; @@ -223,7 +222,7 @@ impl<'a> SnapshotProducer<'a> { snapshot_id } - fn new_manifest_writer(&mut self, content: ManifestContentType) -> Result { + pub(crate) fn new_manifest_writer(&mut self, content: ManifestContentType) -> Result { let new_manifest_path = format!( "{}/{}/{}-m{}.{}", self.table.metadata().location(), @@ -319,20 +318,51 @@ impl<'a> SnapshotProducer<'a> { writer.write_manifest_file().await } + // Write a data manifest containing DELETED-status entries and return the ManifestFile. + // Note: this is NOT an Iceberg "delete manifest" (content=Deletes for MoR delete files). + // It is a data manifest (content=Data) whose entries carry ManifestStatus::Deleted to + // record which data files were removed in Copy-on-Write mode. + async fn write_manifest_with_deleted_entries( + &mut self, + delete_entries: Vec, + ) -> Result { + if delete_entries.is_empty() { + return Err(Error::new( + ErrorKind::PreconditionFailed, + "No delete entries found when writing a delete manifest file", + )); + } + + let mut writer = self.new_manifest_writer(ManifestContentType::Data)?; + for entry in delete_entries { + // Use add_delete_entry() to preserve Deleted status instead of add_entry() + // which always overwrites status to Added + writer.add_delete_entry(entry)?; + } + writer.write_manifest_file().await + } + async fn manifest_file( &mut self, snapshot_produce_operation: &OP, manifest_process: &MP, ) -> Result> { + // Check if there's any content to add to the new snapshot + let delete_entries = snapshot_produce_operation.delete_entries(self).await?; + let has_delete_entries = !delete_entries.is_empty(); + // Assert current snapshot producer contains new content to add to new snapshot. // // TODO: Allowing snapshot property setup with no added data files is a workaround. // We should clean it up after all necessary actions are supported. // For details, please refer to https://github.com/apache/iceberg-rust/issues/1548 - if self.added_data_files.is_empty() && self.snapshot_properties.is_empty() { + if self.added_data_files.is_empty() + && self.snapshot_properties.is_empty() + && !has_delete_entries + { return Err(Error::new( ErrorKind::PreconditionFailed, - "No added data files or added snapshot properties found when write a manifest file", + "No added data files, delete entries, or snapshot properties found when write a manifest file", )); } @@ -345,8 +375,11 @@ impl<'a> SnapshotProducer<'a> { manifest_files.push(added_manifest); } - // # TODO - // Support process delete entries. + // Process delete entries. + if has_delete_entries { + let delete_manifest = self.write_manifest_with_deleted_entries(delete_entries).await?; + manifest_files.push(delete_manifest); + } let manifest_files = manifest_process.process_manifests(self, manifest_files); Ok(manifest_files) @@ -383,6 +416,14 @@ impl<'a> SnapshotProducer<'a> { ); } + for data_file in snapshot_produce_operation.removed_data_files() { + summary_collector.remove_file( + data_file, + table_metadata.current_schema().clone(), + table_metadata.default_partition_spec().clone(), + ); + } + let previous_snapshot = table_metadata .snapshot_by_id(self.snapshot_id) .and_then(|snapshot| snapshot.parent_snapshot_id())