From 1f542939ad00e1c7589008cebd9dfb641b80d90b Mon Sep 17 00:00:00 2001 From: fqaiser94 Date: Wed, 22 Jan 2025 20:54:36 -0500 Subject: [PATCH 1/3] feat(puffin): Add PuffinReader --- crates/iceberg/src/puffin/blob.rs | 38 +++++++ crates/iceberg/src/puffin/mod.rs | 2 + crates/iceberg/src/puffin/reader.rs | 126 ++++++++++++++++++++++++ crates/iceberg/src/puffin/test_utils.rs | 26 +++++ 4 files changed, 192 insertions(+) create mode 100644 crates/iceberg/src/puffin/blob.rs create mode 100644 crates/iceberg/src/puffin/reader.rs diff --git a/crates/iceberg/src/puffin/blob.rs b/crates/iceberg/src/puffin/blob.rs new file mode 100644 index 000000000..a08fd9417 --- /dev/null +++ b/crates/iceberg/src/puffin/blob.rs @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; + +/// A serialized form of a "compact" Theta sketch produced by the Apache DataSketches library. +pub(crate) const APACHE_DATASKETCHES_THETA_V1: &str = "apache-datasketches-theta-v1"; + +/// The blob +#[derive(Debug, PartialEq, Clone)] +pub(crate) struct Blob { + /// See blob types: https://iceberg.apache.org/puffin-spec/#blob-types + pub(crate) r#type: String, + /// List of field IDs the blob was computed for; the order of items is used to compute sketches stored in the blob. + pub(crate) fields: Vec, + /// ID of the Iceberg table's snapshot the blob was computed from + pub(crate) snapshot_id: i64, + /// Sequence number of the Iceberg table's snapshot the blob was computed from + pub(crate) sequence_number: i64, + /// The uncompressed blob data + pub(crate) data: Vec, + /// Arbitrary meta-information about the blob + pub(crate) properties: HashMap, +} diff --git a/crates/iceberg/src/puffin/mod.rs b/crates/iceberg/src/puffin/mod.rs index 91bdf125f..ee8c8077a 100644 --- a/crates/iceberg/src/puffin/mod.rs +++ b/crates/iceberg/src/puffin/mod.rs @@ -21,8 +21,10 @@ // Temporarily allowing this while crate is under active development #![allow(dead_code)] +mod blob; mod compression; mod metadata; +mod reader; #[cfg(test)] mod test_utils; diff --git a/crates/iceberg/src/puffin/reader.rs b/crates/iceberg/src/puffin/reader.rs new file mode 100644 index 000000000..1ab1b4dd5 --- /dev/null +++ b/crates/iceberg/src/puffin/reader.rs @@ -0,0 +1,126 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::io::{FileRead, InputFile}; +use crate::puffin::blob::Blob; +use crate::puffin::metadata::{BlobMetadata, FileMetadata}; +use crate::Result; + +/// Puffin reader +pub(crate) struct PuffinReader { + input_file: InputFile, + file_metadata: Option, +} + +impl PuffinReader { + /// Returns a new Puffin reader + pub(crate) fn new(input_file: InputFile) -> Self { + Self { + input_file, + file_metadata: None, + } + } + + /// Returns file metadata + pub(crate) async fn file_metadata(&mut self) -> Result<&FileMetadata> { + if let Some(ref file_metadata) = self.file_metadata { + Ok(file_metadata) + } else { + let file_metadata = FileMetadata::read(&self.input_file).await?; + Ok(self.file_metadata.insert(file_metadata)) + } + } + + /// Returns blob + pub(crate) async fn blob(&self, blob_metadata: &BlobMetadata) -> Result { + let file_read = self.input_file.reader().await?; + let start = blob_metadata.offset; + let end = start + blob_metadata.length; + let bytes = file_read.read(start..end).await?.to_vec(); + let data = blob_metadata.compression_codec.decompress(bytes)?; + + Ok(Blob { + r#type: blob_metadata.r#type.clone(), + fields: blob_metadata.fields.clone(), + snapshot_id: blob_metadata.snapshot_id, + sequence_number: blob_metadata.sequence_number, + data, + properties: blob_metadata.properties.clone(), + }) + } +} + +#[cfg(test)] +mod tests { + + use crate::puffin::reader::PuffinReader; + use crate::puffin::test_utils::{ + blob_0, blob_1, java_uncompressed_metric_input_file, + java_zstd_compressed_metric_input_file, uncompressed_metric_file_metadata, + zstd_compressed_metric_file_metadata, + }; + + #[tokio::test] + async fn test_puffin_reader_uncompressed_metric_data() { + let input_file = java_uncompressed_metric_input_file(); + let mut puffin_reader = PuffinReader::new(input_file); + + let file_metadata = puffin_reader.file_metadata().await.unwrap().clone(); + assert_eq!(file_metadata, uncompressed_metric_file_metadata()); + + assert_eq!( + puffin_reader + .blob(file_metadata.blobs.first().unwrap()) + .await + .unwrap(), + blob_0() + ); + + assert_eq!( + puffin_reader + .blob(file_metadata.blobs.get(1).unwrap()) + .await + .unwrap(), + blob_1(), + ) + } + + #[tokio::test] + async fn test_puffin_reader_zstd_compressed_metric_data() { + let input_file = java_zstd_compressed_metric_input_file(); + let mut puffin_reader = PuffinReader::new(input_file); + + let file_metadata = puffin_reader.file_metadata().await.unwrap().clone(); + assert_eq!(file_metadata, zstd_compressed_metric_file_metadata()); + + assert_eq!( + puffin_reader + .blob(file_metadata.blobs.first().unwrap()) + .await + .unwrap(), + blob_0() + ); + + assert_eq!( + puffin_reader + .blob(file_metadata.blobs.get(1).unwrap()) + .await + .unwrap(), + blob_1(), + ) + } +} diff --git a/crates/iceberg/src/puffin/test_utils.rs b/crates/iceberg/src/puffin/test_utils.rs index e49e51d50..8efb13b9a 100644 --- a/crates/iceberg/src/puffin/test_utils.rs +++ b/crates/iceberg/src/puffin/test_utils.rs @@ -17,6 +17,7 @@ use std::collections::HashMap; +use super::blob::Blob; use crate::io::{FileIOBuilder, InputFile}; use crate::puffin::compression::CompressionCodec; use crate::puffin::metadata::{BlobMetadata, FileMetadata, CREATED_BY_PROPERTY}; @@ -68,6 +69,7 @@ pub(crate) const METRIC_BLOB_0_TYPE: &str = "some-blob"; pub(crate) const METRIC_BLOB_0_INPUT_FIELDS: [i32; 1] = [1]; pub(crate) const METRIC_BLOB_0_SNAPSHOT_ID: i64 = 2; pub(crate) const METRIC_BLOB_0_SEQUENCE_NUMBER: i64 = 1; +pub(crate) const METRIC_BLOB_0_DATA: &str = "abcdefghi"; pub(crate) fn zstd_compressed_metric_blob_0_metadata() -> BlobMetadata { BlobMetadata { @@ -95,10 +97,23 @@ pub(crate) fn uncompressed_metric_blob_0_metadata() -> BlobMetadata { } } +pub(crate) fn blob_0() -> Blob { + Blob { + r#type: METRIC_BLOB_0_TYPE.to_string(), + fields: METRIC_BLOB_0_INPUT_FIELDS.to_vec(), + snapshot_id: METRIC_BLOB_0_SNAPSHOT_ID, + sequence_number: METRIC_BLOB_0_SEQUENCE_NUMBER, + data: METRIC_BLOB_0_DATA.as_bytes().to_vec(), + properties: HashMap::new(), + } +} + pub(crate) const METRIC_BLOB_1_TYPE: &str = "some-other-blob"; pub(crate) const METRIC_BLOB_1_INPUT_FIELDS: [i32; 1] = [2]; pub(crate) const METRIC_BLOB_1_SNAPSHOT_ID: i64 = 2; pub(crate) const METRIC_BLOB_1_SEQUENCE_NUMBER: i64 = 1; +pub(crate) const METRIC_BLOB_1_DATA: &str = + "some blob \u{0000} binary data 🤯 that is not very very very very very very long, is it?"; pub(crate) fn uncompressed_metric_blob_1_metadata() -> BlobMetadata { BlobMetadata { @@ -126,6 +141,17 @@ pub(crate) fn zstd_compressed_metric_blob_1_metadata() -> BlobMetadata { } } +pub(crate) fn blob_1() -> Blob { + Blob { + r#type: METRIC_BLOB_1_TYPE.to_string(), + fields: METRIC_BLOB_1_INPUT_FIELDS.to_vec(), + snapshot_id: METRIC_BLOB_1_SNAPSHOT_ID, + sequence_number: METRIC_BLOB_1_SEQUENCE_NUMBER, + data: METRIC_BLOB_1_DATA.as_bytes().to_vec(), + properties: HashMap::new(), + } +} + pub(crate) const CREATED_BY_PROPERTY_VALUE: &str = "Test 1234"; pub(crate) fn file_properties() -> HashMap { From 8752ce4d6bb37cdb18479cc1e33ff9e0fd776434 Mon Sep 17 00:00:00 2001 From: fqaiser94 Date: Sat, 25 Jan 2025 05:57:50 -0500 Subject: [PATCH 2/3] Initialize FileMetadata using OnceCell --- crates/iceberg/Cargo.toml | 5 ++--- crates/iceberg/src/puffin/reader.rs | 21 ++++++++++----------- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 626ca15ef..c9de665eb 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -29,7 +29,7 @@ license = { workspace = true } keywords = ["iceberg"] [features] -default = ["storage-memory", "storage-fs", "storage-s3", "tokio"] +default = ["storage-memory", "storage-fs", "storage-s3"] storage-all = ["storage-memory", "storage-fs", "storage-s3", "storage-gcs"] storage-memory = ["opendal/services-memory"] @@ -38,7 +38,6 @@ storage-s3 = ["opendal/services-s3"] storage-gcs = ["opendal/services-gcs"] async-std = ["dep:async-std"] -tokio = ["dep:tokio"] [dependencies] anyhow = { workspace = true } @@ -78,7 +77,7 @@ serde_derive = { workspace = true } serde_json = { workspace = true } serde_repr = { workspace = true } serde_with = { workspace = true } -tokio = { workspace = true, optional = true } +tokio = { workspace = true, features = ["sync"] } typed-builder = { workspace = true } url = { workspace = true } uuid = { workspace = true } diff --git a/crates/iceberg/src/puffin/reader.rs b/crates/iceberg/src/puffin/reader.rs index 1ab1b4dd5..1114d29e9 100644 --- a/crates/iceberg/src/puffin/reader.rs +++ b/crates/iceberg/src/puffin/reader.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +use tokio::sync::OnceCell; + use crate::io::{FileRead, InputFile}; use crate::puffin::blob::Blob; use crate::puffin::metadata::{BlobMetadata, FileMetadata}; @@ -23,7 +25,7 @@ use crate::Result; /// Puffin reader pub(crate) struct PuffinReader { input_file: InputFile, - file_metadata: Option, + file_metadata: OnceCell, } impl PuffinReader { @@ -31,18 +33,15 @@ impl PuffinReader { pub(crate) fn new(input_file: InputFile) -> Self { Self { input_file, - file_metadata: None, + file_metadata: OnceCell::new(), } } /// Returns file metadata - pub(crate) async fn file_metadata(&mut self) -> Result<&FileMetadata> { - if let Some(ref file_metadata) = self.file_metadata { - Ok(file_metadata) - } else { - let file_metadata = FileMetadata::read(&self.input_file).await?; - Ok(self.file_metadata.insert(file_metadata)) - } + pub(crate) async fn file_metadata(&self) -> Result<&FileMetadata> { + self.file_metadata + .get_or_try_init(|| FileMetadata::read(&self.input_file)) + .await } /// Returns blob @@ -77,7 +76,7 @@ mod tests { #[tokio::test] async fn test_puffin_reader_uncompressed_metric_data() { let input_file = java_uncompressed_metric_input_file(); - let mut puffin_reader = PuffinReader::new(input_file); + let puffin_reader = PuffinReader::new(input_file); let file_metadata = puffin_reader.file_metadata().await.unwrap().clone(); assert_eq!(file_metadata, uncompressed_metric_file_metadata()); @@ -102,7 +101,7 @@ mod tests { #[tokio::test] async fn test_puffin_reader_zstd_compressed_metric_data() { let input_file = java_zstd_compressed_metric_input_file(); - let mut puffin_reader = PuffinReader::new(input_file); + let puffin_reader = PuffinReader::new(input_file); let file_metadata = puffin_reader.file_metadata().await.unwrap().clone(); assert_eq!(file_metadata, zstd_compressed_metric_file_metadata()); From 9cdc2bc889ae1eb74df04b29862997356167106c Mon Sep 17 00:00:00 2001 From: fqaiser94 Date: Sun, 26 Jan 2025 09:01:51 -0500 Subject: [PATCH 3/3] Reader optional --- crates/iceberg/Cargo.toml | 5 +++-- crates/iceberg/src/puffin/mod.rs | 1 + 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index c9de665eb..08d5efabe 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -29,7 +29,7 @@ license = { workspace = true } keywords = ["iceberg"] [features] -default = ["storage-memory", "storage-fs", "storage-s3"] +default = ["storage-memory", "storage-fs", "storage-s3", "tokio"] storage-all = ["storage-memory", "storage-fs", "storage-s3", "storage-gcs"] storage-memory = ["opendal/services-memory"] @@ -38,6 +38,7 @@ storage-s3 = ["opendal/services-s3"] storage-gcs = ["opendal/services-gcs"] async-std = ["dep:async-std"] +tokio = ["dep:tokio"] [dependencies] anyhow = { workspace = true } @@ -77,7 +78,7 @@ serde_derive = { workspace = true } serde_json = { workspace = true } serde_repr = { workspace = true } serde_with = { workspace = true } -tokio = { workspace = true, features = ["sync"] } +tokio = { workspace = true, optional = true, features = ["sync"] } typed-builder = { workspace = true } url = { workspace = true } uuid = { workspace = true } diff --git a/crates/iceberg/src/puffin/mod.rs b/crates/iceberg/src/puffin/mod.rs index ee8c8077a..6f37b43b5 100644 --- a/crates/iceberg/src/puffin/mod.rs +++ b/crates/iceberg/src/puffin/mod.rs @@ -24,6 +24,7 @@ mod blob; mod compression; mod metadata; +#[cfg(feature = "tokio")] mod reader; #[cfg(test)]