Skip to content

Commit 6521aa2

Browse files
committed
refine append action
1 parent ab20fa7 commit 6521aa2

File tree

4 files changed

+96
-44
lines changed

4 files changed

+96
-44
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919
resolver = "2"
2020
members = [
2121
"crates/catalog/*",
22+
"crates/e2e_test",
2223
"crates/examples",
2324
"crates/iceberg",
24-
"crates/e2e_test",
2525
"crates/test_utils",
2626
]
2727

crates/e2e_test/tests/append_data_file_test.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -171,9 +171,9 @@ async fn test_append_data_file() {
171171

172172
// commit result
173173
let tx = Transaction::new(&table);
174-
let mut merge_action = tx.merge_snapshot(None, vec![]).unwrap();
175-
merge_action.add_data_files(data_file.clone()).unwrap();
176-
let tx = merge_action.apply().await.unwrap();
174+
let mut append_action = tx.fast_append(None, vec![]).unwrap();
175+
append_action.add_data_files(data_file.clone()).unwrap();
176+
let tx = append_action.apply().await.unwrap();
177177
let table = tx.commit(&fixture.rest_catalog).await.unwrap();
178178

179179
// check result
@@ -191,9 +191,9 @@ async fn test_append_data_file() {
191191

192192
// commit result again
193193
let tx = Transaction::new(&table);
194-
let mut merge_action = tx.merge_snapshot(None, vec![]).unwrap();
195-
merge_action.add_data_files(data_file.clone()).unwrap();
196-
let tx = merge_action.apply().await.unwrap();
194+
let mut append_action = tx.fast_append(None, vec![]).unwrap();
195+
append_action.add_data_files(data_file.clone()).unwrap();
196+
let tx = append_action.apply().await.unwrap();
197197
let table = tx.commit(&fixture.rest_catalog).await.unwrap();
198198

199199
// check result again

crates/e2e_test/tests/conflict_commit_test.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -171,13 +171,13 @@ async fn test_append_data_file_conflict() {
171171

172172
// start two transaction and commit one of them
173173
let tx1 = Transaction::new(&table);
174-
let mut merge_action = tx1.merge_snapshot(None, vec![]).unwrap();
175-
merge_action.add_data_files(data_file.clone()).unwrap();
176-
let tx1 = merge_action.apply().await.unwrap();
174+
let mut append_action = tx1.fast_append(None, vec![]).unwrap();
175+
append_action.add_data_files(data_file.clone()).unwrap();
176+
let tx1 = append_action.apply().await.unwrap();
177177
let tx2 = Transaction::new(&table);
178-
let mut merge_action = tx2.merge_snapshot(None, vec![]).unwrap();
179-
merge_action.add_data_files(data_file.clone()).unwrap();
180-
let tx2 = merge_action.apply().await.unwrap();
178+
let mut append_action = tx2.fast_append(None, vec![]).unwrap();
179+
append_action.add_data_files(data_file.clone()).unwrap();
180+
let tx2 = append_action.apply().await.unwrap();
181181
let table = tx2.commit(&fixture.rest_catalog).await.unwrap();
182182

183183
// check result

crates/iceberg/src/transaction.rs

Lines changed: 83 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -104,12 +104,12 @@ impl<'a> Transaction<'a> {
104104
Ok(self)
105105
}
106106

107-
/// Creates a merge snapshot action.
108-
pub fn merge_snapshot(
107+
/// Creates a fast append action.
108+
pub fn fast_append(
109109
self,
110110
commit_uuid: Option<String>,
111111
key_metadata: Vec<u8>,
112-
) -> Result<MergeSnapshotAction<'a>> {
112+
) -> Result<FastAppendAction<'a>> {
113113
let parent_snapshot_id = self
114114
.table
115115
.metadata()
@@ -127,7 +127,7 @@ impl<'a> Transaction<'a> {
127127
.unwrap_or_default();
128128
let commit_uuid = commit_uuid.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
129129

130-
MergeSnapshotAction::new(
130+
FastAppendAction::new(
131131
self,
132132
parent_snapshot_id,
133133
snapshot_id,
@@ -166,8 +166,8 @@ impl<'a> Transaction<'a> {
166166
}
167167
}
168168

169-
/// Transaction action for merging snapshot.
170-
pub struct MergeSnapshotAction<'a> {
169+
/// FastAppendAction is a transaction action for fast append data files to the table.
170+
pub struct FastAppendAction<'a> {
171171
tx: Transaction<'a>,
172172

173173
parent_snapshot_id: Option<i64>,
@@ -184,7 +184,7 @@ pub struct MergeSnapshotAction<'a> {
184184
appended_data_files: Vec<DataFile>,
185185
}
186186

187-
impl<'a> MergeSnapshotAction<'a> {
187+
impl<'a> FastAppendAction<'a> {
188188
#[allow(clippy::too_many_arguments)]
189189
pub(crate) fn new(
190190
tx: Transaction<'a>,
@@ -234,21 +234,6 @@ impl<'a> MergeSnapshotAction<'a> {
234234
)
235235
}
236236

237-
fn generate_manifest_list_file_path(&self, next_seq_num: i64) -> String {
238-
format!(
239-
"{}/{}/snap-{}-{}-{}.{}",
240-
self.tx.table.metadata().location(),
241-
META_ROOT_PATH,
242-
self.snapshot_id,
243-
next_seq_num,
244-
self.commit_uuid,
245-
DataFileFormat::Avro
246-
)
247-
}
248-
249-
// # TODO:
250-
// This method act like fast append now, because we don't support overwrite and partial overwrite.
251-
// In the future, this method should be modify when we support them.
252237
async fn manifest_from_parent_snapshot(&self) -> Result<Vec<ManifestFile>> {
253238
if let Some(snapshot) = self.tx.table.metadata().current_snapshot() {
254239
let manifest_list = snapshot
@@ -315,31 +300,98 @@ impl<'a> MergeSnapshotAction<'a> {
315300

316301
/// Finished building the action and apply it to the transaction.
317302
pub async fn apply(mut self) -> Result<Transaction<'a>> {
303+
let summary = self.summary();
304+
let manifest = self.manifest_for_data_file().await?;
305+
let existing_manifest_files = self.manifest_from_parent_snapshot().await?;
306+
307+
let snapshot_produce_action = SnapshotProduceAction::new(
308+
self.tx,
309+
self.snapshot_id,
310+
self.parent_snapshot_id,
311+
self.schema_id,
312+
self.format_version,
313+
self.commit_uuid,
314+
)?;
315+
316+
snapshot_produce_action
317+
.apply(
318+
vec![manifest]
319+
.into_iter()
320+
.chain(existing_manifest_files.into_iter()),
321+
summary,
322+
)
323+
.await
324+
}
325+
}
326+
327+
struct SnapshotProduceAction<'a> {
328+
tx: Transaction<'a>,
329+
330+
parent_snapshot_id: Option<i64>,
331+
snapshot_id: i64,
332+
schema_id: i32,
333+
format_version: FormatVersion,
334+
335+
commit_uuid: String,
336+
}
337+
338+
impl<'a> SnapshotProduceAction<'a> {
339+
pub(crate) fn new(
340+
tx: Transaction<'a>,
341+
snapshot_id: i64,
342+
parent_snapshot_id: Option<i64>,
343+
schema_id: i32,
344+
format_version: FormatVersion,
345+
commit_uuid: String,
346+
) -> Result<Self> {
347+
Ok(Self {
348+
tx,
349+
parent_snapshot_id,
350+
snapshot_id,
351+
schema_id,
352+
format_version,
353+
commit_uuid,
354+
})
355+
}
356+
357+
fn generate_manifest_list_file_path(&self, next_seq_num: i64) -> String {
358+
format!(
359+
"{}/{}/snap-{}-{}-{}.{}",
360+
self.tx.table.metadata().location(),
361+
META_ROOT_PATH,
362+
self.snapshot_id,
363+
next_seq_num,
364+
self.commit_uuid,
365+
DataFileFormat::Avro
366+
)
367+
}
368+
369+
/// Finished building the action and apply it to the transaction.
370+
pub async fn apply(
371+
mut self,
372+
manifest_files: impl IntoIterator<Item = ManifestFile>,
373+
summary: Summary,
374+
) -> Result<Transaction<'a>> {
318375
let next_seq_num = if self.format_version as u8 > 1u8 {
319376
self.tx.table.metadata().last_sequence_number() + 1
320377
} else {
321378
INITIAL_SEQUENCE_NUMBER
322379
};
323380
let commit_ts = chrono::Utc::now().timestamp_millis();
324-
let summary = self.summary();
325-
326-
let manifest = self.manifest_for_data_file().await?;
327-
328381
let manifest_list_path = self.generate_manifest_list_file_path(next_seq_num);
382+
329383
let mut manifest_list_writer = ManifestListWriter::v2(
330384
self.tx
331385
.table
332386
.file_io()
333-
.new_output(self.generate_manifest_list_file_path(next_seq_num))?,
387+
.new_output(manifest_list_path.clone())?,
334388
self.snapshot_id,
335389
// # TODO
336390
// Should we use `0` here for default parent snapshot id?
337391
self.parent_snapshot_id.unwrap_or_default(),
338392
next_seq_num,
339393
);
340-
manifest_list_writer
341-
.add_manifests(self.manifest_from_parent_snapshot().await?.into_iter())?;
342-
manifest_list_writer.add_manifests(vec![manifest].into_iter())?;
394+
manifest_list_writer.add_manifests(manifest_files.into_iter())?;
343395
manifest_list_writer.close().await?;
344396

345397
let new_snapshot = Snapshot::builder()
@@ -644,7 +696,7 @@ mod tests {
644696
.partition(Struct::from_iter([Some(Literal::long(300))]))
645697
.build()
646698
.unwrap();
647-
let mut action = tx.merge_snapshot(None, vec![]).unwrap();
699+
let mut action = tx.fast_append(None, vec![]).unwrap();
648700
action.add_data_files(vec![data_file.clone()]).unwrap();
649701
let tx = action.apply().await.unwrap();
650702

0 commit comments

Comments
 (0)