Skip to content

feat: support append data file and add e2e test #349

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Nov 28, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -18,11 +18,12 @@
[workspace]
resolver = "2"
members = [
"crates/catalog/*",
"crates/examples",
"crates/iceberg",
"crates/integrations/*",
"crates/test_utils",
"crates/catalog/*",
"crates/examples",
"crates/iceberg",
"crates/integration_tests",
"crates/integrations/*",
"crates/test_utils",
]
exclude = ["bindings/python"]

2 changes: 1 addition & 1 deletion crates/catalog/rest/src/catalog.rs
Original file line number Diff line number Diff line change
@@ -1376,7 +1376,7 @@ mod tests {
.with_schema_id(0)
.with_summary(Summary {
operation: Operation::Append,
other: HashMap::from_iter([
additional_properties: HashMap::from_iter([
("spark.app.id", "local-1646787004168"),
("added-data-files", "1"),
("added-records", "1"),
4 changes: 2 additions & 2 deletions crates/iceberg/src/catalog/mod.rs
Original file line number Diff line number Diff line change
@@ -1423,7 +1423,7 @@ mod tests {
.with_schema_id(1)
.with_summary(Summary {
operation: Operation::Append,
other: HashMap::default(),
additional_properties: HashMap::default(),
})
.build(),
};
@@ -1457,7 +1457,7 @@ mod tests {
.with_manifest_list("s3://a/b/2.avro")
.with_summary(Summary {
operation: Operation::Append,
other: HashMap::default(),
additional_properties: HashMap::default(),
})
.build(),
};
6 changes: 2 additions & 4 deletions crates/iceberg/src/io/object_cache.rs
Original file line number Diff line number Diff line change
@@ -185,7 +185,7 @@ mod tests {
use crate::spec::{
DataContentType, DataFileBuilder, DataFileFormat, FormatVersion, Literal, Manifest,
ManifestContentType, ManifestEntry, ManifestListWriter, ManifestMetadata, ManifestStatus,
ManifestWriter, Struct, TableMetadata, EMPTY_SNAPSHOT_ID,
ManifestWriter, Struct, TableMetadata,
};
use crate::table::Table;
use crate::TableIdent;
@@ -293,9 +293,7 @@ mod tests {
.new_output(current_snapshot.manifest_list())
.unwrap(),
current_snapshot.snapshot_id(),
current_snapshot
.parent_snapshot_id()
.unwrap_or(EMPTY_SNAPSHOT_ID),
current_snapshot.parent_snapshot_id(),
current_snapshot.sequence_number(),
);
manifest_list_write
5 changes: 1 addition & 4 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
@@ -977,7 +977,6 @@ mod tests {
DataContentType, DataFileBuilder, DataFileFormat, Datum, FormatVersion, Literal, Manifest,
ManifestContentType, ManifestEntry, ManifestListWriter, ManifestMetadata, ManifestStatus,
ManifestWriter, NestedField, PrimitiveType, Schema, Struct, TableMetadata, Type,
EMPTY_SNAPSHOT_ID,
};
use crate::table::Table;
use crate::TableIdent;
@@ -1124,9 +1123,7 @@ mod tests {
.new_output(current_snapshot.manifest_list())
.unwrap(),
current_snapshot.snapshot_id(),
current_snapshot
.parent_snapshot_id()
.unwrap_or(EMPTY_SNAPSHOT_ID),
current_snapshot.parent_snapshot_id(),
current_snapshot.sequence_number(),
);
manifest_list_write
8 changes: 7 additions & 1 deletion crates/iceberg/src/spec/manifest.rs
Original file line number Diff line number Diff line change
@@ -900,6 +900,12 @@ impl ManifestEntry {
}
}

/// Snapshot id
#[inline]
pub fn snapshot_id(&self) -> Option<i64> {
self.snapshot_id
}

/// Data sequence number.
#[inline]
pub fn sequence_number(&self) -> Option<i64> {
@@ -1328,7 +1334,7 @@ mod _serde {
Ok(Self {
content: value.content as i32,
file_path: value.file_path,
file_format: value.file_format.to_string(),
file_format: value.file_format.to_string().to_ascii_uppercase(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pyiceberg fail to read if this file format is not uppercase. BTW, we can add test using pyiceberg to ensure the write is valid for other SDK later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, I've created an issue to get that fixed: apache/iceberg-python#1340

partition: RawLiteral::try_from(
Literal::Struct(value.partition),
&Type::Struct(partition_type.clone()),
50 changes: 33 additions & 17 deletions crates/iceberg/src/spec/manifest_list.rs
Original file line number Diff line number Diff line change
@@ -106,34 +106,38 @@ impl std::fmt::Debug for ManifestListWriter {

impl ManifestListWriter {
/// Construct a v1 [`ManifestListWriter`] that writes to a provided [`OutputFile`].
pub fn v1(output_file: OutputFile, snapshot_id: i64, parent_snapshot_id: i64) -> Self {
let metadata = HashMap::from_iter([
pub fn v1(output_file: OutputFile, snapshot_id: i64, parent_snapshot_id: Option<i64>) -> Self {
let mut metadata = HashMap::from_iter([
("snapshot-id".to_string(), snapshot_id.to_string()),
(
"parent-snapshot-id".to_string(),
parent_snapshot_id.to_string(),
),
("format-version".to_string(), "1".to_string()),
]);
if let Some(parent_snapshot_id) = parent_snapshot_id {
metadata.insert(
"parent-snapshot-id".to_string(),
parent_snapshot_id.to_string(),
);
}
Self::new(FormatVersion::V1, output_file, metadata, 0, snapshot_id)
}

/// Construct a v2 [`ManifestListWriter`] that writes to a provided [`OutputFile`].
pub fn v2(
output_file: OutputFile,
snapshot_id: i64,
parent_snapshot_id: i64,
parent_snapshot_id: Option<i64>,
sequence_number: i64,
) -> Self {
let metadata = HashMap::from_iter([
let mut metadata = HashMap::from_iter([
("snapshot-id".to_string(), snapshot_id.to_string()),
(
"parent-snapshot-id".to_string(),
parent_snapshot_id.to_string(),
),
("sequence-number".to_string(), sequence_number.to_string()),
("format-version".to_string(), "2".to_string()),
]);
metadata.insert(
"parent-snapshot-id".to_string(),
parent_snapshot_id
.map(|v| v.to_string())
.unwrap_or("null".to_string()),
);
Self::new(
FormatVersion::V2,
output_file,
@@ -580,6 +584,18 @@ pub struct ManifestFile {
pub key_metadata: Vec<u8>,
}

impl ManifestFile {
/// Checks if the manifest file has any added files.
pub fn has_added_files(&self) -> bool {
self.added_files_count.is_none() || self.added_files_count.unwrap() > 0
}

/// Checks if the manifest file has any existed files.
pub fn has_existing_files(&self) -> bool {
self.existing_files_count.is_none() || self.existing_files_count.unwrap() > 0
}
}

/// The type of files tracked by the manifest, either data or delete files; Data(0) for all v1 manifests
#[derive(Debug, PartialEq, Clone, Eq)]
pub enum ManifestContentType {
@@ -1146,7 +1162,7 @@ mod test {
let mut writer = ManifestListWriter::v1(
file_io.new_output(full_path.clone()).unwrap(),
1646658105718557341,
1646658105718557341,
Some(1646658105718557341),
);

writer
@@ -1213,7 +1229,7 @@ mod test {
let mut writer = ManifestListWriter::v2(
file_io.new_output(full_path.clone()).unwrap(),
1646658105718557341,
1646658105718557341,
Some(1646658105718557341),
1,
);

@@ -1335,7 +1351,7 @@ mod test {
let io = FileIOBuilder::new_fs_io().build().unwrap();
let output_file = io.new_output(path.to_str().unwrap()).unwrap();

let mut writer = ManifestListWriter::v1(output_file, 1646658105718557341, 0);
let mut writer = ManifestListWriter::v1(output_file, 1646658105718557341, Some(0));
writer
.add_manifests(expected_manifest_list.entries.clone().into_iter())
.unwrap();
@@ -1391,7 +1407,7 @@ mod test {
let io = FileIOBuilder::new_fs_io().build().unwrap();
let output_file = io.new_output(path.to_str().unwrap()).unwrap();

let mut writer = ManifestListWriter::v2(output_file, snapshot_id, 0, seq_num);
let mut writer = ManifestListWriter::v2(output_file, snapshot_id, Some(0), seq_num);
writer
.add_manifests(expected_manifest_list.entries.clone().into_iter())
.unwrap();
@@ -1445,7 +1461,7 @@ mod test {
let io = FileIOBuilder::new_fs_io().build().unwrap();
let output_file = io.new_output(path.to_str().unwrap()).unwrap();

let mut writer = ManifestListWriter::v2(output_file, 1646658105718557341, 0, 1);
let mut writer = ManifestListWriter::v2(output_file, 1646658105718557341, Some(0), 1);
writer
.add_manifests(expected_manifest_list.entries.clone().into_iter())
.unwrap();
21 changes: 18 additions & 3 deletions crates/iceberg/src/spec/snapshot.rs
Original file line number Diff line number Diff line change
@@ -59,7 +59,7 @@ pub struct Summary {
pub operation: Operation,
/// Other summary data.
#[serde(flatten)]
pub other: HashMap<String, String>,
pub additional_properties: HashMap<String, String>,
}

impl Default for Operation {
@@ -291,7 +291,7 @@ pub(super) mod _serde {
},
summary: v1.summary.unwrap_or(Summary {
operation: Operation::default(),
other: HashMap::new(),
additional_properties: HashMap::new(),
}),
schema_id: v1.schema_id,
})
@@ -372,6 +372,21 @@ pub enum SnapshotRetention {
},
}

impl SnapshotRetention {
/// Create a new branch retention policy
pub fn branch(
min_snapshots_to_keep: Option<i32>,
max_snapshot_age_ms: Option<i64>,
max_ref_age_ms: Option<i64>,
) -> Self {
SnapshotRetention::Branch {
min_snapshots_to_keep,
max_snapshot_age_ms,
max_ref_age_ms,
}
}
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;
@@ -408,7 +423,7 @@ mod tests {
assert_eq!(
Summary {
operation: Operation::Append,
other: HashMap::new()
additional_properties: HashMap::new()
},
*result.summary()
);
18 changes: 15 additions & 3 deletions crates/iceberg/src/spec/table_metadata.rs
Original file line number Diff line number Diff line change
@@ -201,6 +201,18 @@ impl TableMetadata {
self.last_sequence_number
}

/// Returns the next sequence number for the table.
///
/// For format version 1, it always returns the initial sequence number.
/// For other versions, it returns the last sequence number incremented by 1.
#[inline]
pub fn next_sequence_number(&self) -> i64 {
match self.format_version {
FormatVersion::V1 => INITIAL_SEQUENCE_NUMBER,
_ => self.last_sequence_number + 1,
}
}

/// Returns last updated time.
#[inline]
pub fn last_updated_timestamp(&self) -> Result<DateTime<Utc>> {
@@ -1476,7 +1488,7 @@ mod tests {
.with_sequence_number(0)
.with_schema_id(0)
.with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro")
.with_summary(Summary { operation: Operation::Append, other: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) })
.with_summary(Summary { operation: Operation::Append, additional_properties: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) })
.build();

let expected = TableMetadata {
@@ -1895,7 +1907,7 @@ mod tests {
.with_manifest_list("s3://a/b/1.avro")
.with_summary(Summary {
operation: Operation::Append,
other: HashMap::new(),
additional_properties: HashMap::new(),
})
.build();

@@ -1908,7 +1920,7 @@ mod tests {
.with_manifest_list("s3://a/b/2.avro")
.with_summary(Summary {
operation: Operation::Append,
other: HashMap::new(),
additional_properties: HashMap::new(),
})
.build();

14 changes: 7 additions & 7 deletions crates/iceberg/src/spec/table_metadata_builder.rs
Original file line number Diff line number Diff line change
@@ -1818,7 +1818,7 @@ mod tests {
.with_manifest_list("/snap-1.avro")
.with_summary(Summary {
operation: Operation::Append,
other: HashMap::from_iter(vec![
additional_properties: HashMap::from_iter(vec![
(
"spark.app.id".to_string(),
"local-1662532784305".to_string(),
@@ -1881,7 +1881,7 @@ mod tests {
.with_manifest_list("/snap-1.avro")
.with_summary(Summary {
operation: Operation::Append,
other: HashMap::from_iter(vec![
additional_properties: HashMap::from_iter(vec![
(
"spark.app.id".to_string(),
"local-1662532784305".to_string(),
@@ -1901,7 +1901,7 @@ mod tests {
.with_manifest_list("/snap-1.avro")
.with_summary(Summary {
operation: Operation::Append,
other: HashMap::from_iter(vec![
additional_properties: HashMap::from_iter(vec![
(
"spark.app.id".to_string(),
"local-1662532784305".to_string(),
@@ -1949,7 +1949,7 @@ mod tests {
.with_manifest_list("/snap-1.avro")
.with_summary(Summary {
operation: Operation::Append,
other: HashMap::new(),
additional_properties: HashMap::new(),
})
.build();

@@ -1994,7 +1994,7 @@ mod tests {
.with_manifest_list("/snap-1.avro")
.with_summary(Summary {
operation: Operation::Append,
other: HashMap::from_iter(vec![
additional_properties: HashMap::from_iter(vec![
(
"spark.app.id".to_string(),
"local-1662532784305".to_string(),
@@ -2114,7 +2114,7 @@ mod tests {
.with_manifest_list("/snap-1")
.with_summary(Summary {
operation: Operation::Append,
other: HashMap::new(),
additional_properties: HashMap::new(),
})
.build();

@@ -2140,7 +2140,7 @@ mod tests {
.with_parent_snapshot_id(Some(1))
.with_summary(Summary {
operation: Operation::Append,
other: HashMap::new(),
additional_properties: HashMap::new(),
})
.build();

Loading
Loading