Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 161 additions & 0 deletions crates/iceberg/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,39 @@ mod tests {
.unwrap()
}

pub(crate) async fn make_v2_minimal_table_in_catalog(catalog: &impl Catalog) -> Table {
let table_ident =
TableIdent::from_strs([format!("ns1-{}", uuid::Uuid::new_v4()), "test1".to_string()])
.unwrap();

catalog
.create_namespace(table_ident.namespace(), HashMap::new())
.await
.unwrap();

let file = File::open(format!(
"{}/testdata/table_metadata/{}",
env!("CARGO_MANIFEST_DIR"),
"TableMetadataV2ValidMinimal.json"
))
.unwrap();
let reader = BufReader::new(file);
let base_metadata = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();

let table_creation = TableCreation::builder()
.schema((**base_metadata.current_schema()).clone())
.partition_spec((**base_metadata.default_partition_spec()).clone())
.sort_order((**base_metadata.default_sort_order()).clone())
.name(table_ident.name().to_string())
.format_version(crate::spec::FormatVersion::V2)
.build();

catalog
.create_table(table_ident.namespace(), table_creation)
.await
.unwrap()
}

/// Helper function to create a test table with retry properties
pub(super) fn setup_test_table(num_retries: &str) -> Table {
let table = make_v2_table();
Expand Down Expand Up @@ -584,3 +617,131 @@ mod test_row_lineage {
assert_eq!(manifest_file.first_row_id, Some(30));
}
}

#[cfg(test)]
mod test_commit_against_memory_catalog {
//! End-to-end tests for transaction APIs against an in-process `MemoryCatalog`.
//!
//! Each test constructs a table inside a fresh memory catalog, builds a
//! transaction, commits it, and then asserts that the catalog's view of the
//! table reflects the action's intended metadata change. This complements
//! the action-level unit tests in the per-action modules (which only inspect
//! the produced `ActionCommit`) and the mock-catalog tests in this file
//! (which exercise the retry loop).

use crate::Catalog;
use crate::memory::tests::new_memory_catalog;
use crate::transaction::tests::make_v2_minimal_table_in_catalog;
use crate::transaction::{ApplyTransactionAction, Transaction};

#[tokio::test]
async fn test_update_properties_commit_round_trip() {
let catalog = new_memory_catalog().await;
let table = make_v2_minimal_table_in_catalog(&catalog).await;

// Sanity: the keys we are about to set are not already present.
assert!(table.metadata().properties().get("owner").is_none());
assert!(table.metadata().properties().get("team").is_none());

let tx = Transaction::new(&table);
let tx = tx
.update_table_properties()
.set("owner".to_string(), "iceberg-rust".to_string())
.set("team".to_string(), "storage".to_string())
.apply(tx)
.unwrap();

let committed = tx.commit(&catalog).await.unwrap();

// The returned table reflects the commit.
assert_eq!(
committed.metadata().properties().get("owner"),
Some(&"iceberg-rust".to_string())
);
assert_eq!(
committed.metadata().properties().get("team"),
Some(&"storage".to_string())
);

// A fresh load from the catalog also sees the properties, confirming
// the commit was persisted rather than only mutated in-memory.
let reloaded = catalog.load_table(committed.identifier()).await.unwrap();
assert_eq!(
reloaded.metadata().properties().get("owner"),
Some(&"iceberg-rust".to_string())
);
assert_eq!(
reloaded.metadata().properties().get("team"),
Some(&"storage".to_string())
);

// The metadata location must have advanced.
assert_ne!(
committed.metadata_location(),
table.metadata_location(),
"commit should advance the metadata pointer"
);
}

#[tokio::test]
async fn test_update_location_commit_round_trip() {
let catalog = new_memory_catalog().await;
let table = make_v2_minimal_table_in_catalog(&catalog).await;

let original_location = table.metadata().location().to_string();
let new_location = format!("{original_location}/relocated");

let tx = Transaction::new(&table);
let tx = tx
.update_location()
.set_location(new_location.clone())
.apply(tx)
.unwrap();

let committed = tx.commit(&catalog).await.unwrap();

assert_eq!(committed.metadata().location(), new_location);

let reloaded = catalog.load_table(committed.identifier()).await.unwrap();
assert_eq!(reloaded.metadata().location(), new_location);
}

#[tokio::test]
async fn test_chained_actions_single_commit() {
// A transaction can carry multiple actions. Committing once should
// apply all of them atomically and produce a single updated table.
let catalog = new_memory_catalog().await;
let table = make_v2_minimal_table_in_catalog(&catalog).await;

let original_location = table.metadata().location().to_string();
let new_location = format!("{original_location}/relocated");

let tx = Transaction::new(&table);
let tx = tx
.update_table_properties()
.set("owner".to_string(), "iceberg-rust".to_string())
.apply(tx)
.unwrap();
let tx = tx
.update_location()
.set_location(new_location.clone())
.apply(tx)
.unwrap();

let committed = tx.commit(&catalog).await.unwrap();

assert_eq!(
committed.metadata().properties().get("owner"),
Some(&"iceberg-rust".to_string())
);
assert_eq!(committed.metadata().location(), new_location);

// And the catalog agrees.
let reloaded = catalog.load_table(committed.identifier()).await.unwrap();
assert_eq!(
reloaded.metadata().properties().get("owner"),
Some(&"iceberg-rust".to_string())
);
assert_eq!(reloaded.metadata().location(), new_location);
}
}
Loading