Skip to content

Commit 336df2b

Browse files
committed
Add Puffin crate and CompressionCodec
1 parent f3a571d commit 336df2b

File tree

4 files changed

+183
-0
lines changed

4 files changed

+183
-0
lines changed

Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ members = [
2323
"crates/iceberg",
2424
"crates/integration_tests",
2525
"crates/integrations/*",
26+
"crates/puffin",
2627
"crates/test_utils",
2728
]
2829
exclude = ["bindings/python"]
@@ -98,3 +99,4 @@ uuid = { version = "1.6.1", features = ["v7"] }
9899
volo-thrift = "0.10"
99100
hive_metastore = "0.1"
100101
tera = "1"
102+
zstd = "0.13.2"

crates/puffin/Cargo.toml

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
[package]
19+
name = "iceberg-puffin"
20+
version = { workspace = true }
21+
edition = { workspace = true }
22+
homepage = { workspace = true }
23+
rust-version = { workspace = true }
24+
25+
categories = ["database"]
26+
description = "Apache Iceberg Puffin"
27+
repository = { workspace = true }
28+
license = { workspace = true }
29+
keywords = ["iceberg", "puffin"]
30+
31+
[dependencies]
32+
iceberg = { workspace = true }
33+
zstd = { workspace = true }
34+
35+
[dev-dependencies]
36+
tokio = { workspace = true }

crates/puffin/src/compression.rs

+120
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use iceberg::{Error, ErrorKind, Result};
19+
20+
#[derive(Debug, PartialEq, Eq, Clone, Copy, Default)]
21+
/// Data compression formats
22+
pub enum CompressionCodec {
23+
#[default]
24+
/// No compression
25+
None,
26+
/// LZ4 single compression frame with content size present
27+
Lz4,
28+
/// Zstandard single compression frame with content size present
29+
Zstd,
30+
}
31+
32+
impl CompressionCodec {
33+
pub(crate) fn decompress(&self, bytes: Vec<u8>) -> Result<Vec<u8>> {
34+
match self {
35+
CompressionCodec::None => Ok(bytes),
36+
CompressionCodec::Lz4 => Err(Error::new(
37+
ErrorKind::FeatureUnsupported,
38+
"LZ4 decompression is not supported currently",
39+
)),
40+
CompressionCodec::Zstd => {
41+
let decompressed = zstd::stream::decode_all(&bytes[..])?;
42+
Ok(decompressed)
43+
}
44+
}
45+
}
46+
47+
pub(crate) fn compress(&self, bytes: Vec<u8>) -> Result<Vec<u8>> {
48+
match self {
49+
CompressionCodec::None => Ok(bytes),
50+
CompressionCodec::Lz4 => Err(Error::new(
51+
ErrorKind::FeatureUnsupported,
52+
"LZ4 compression is not supported currently",
53+
)),
54+
CompressionCodec::Zstd => {
55+
let writer = Vec::<u8>::new();
56+
let mut encoder = zstd::stream::Encoder::new(writer, 3)?;
57+
encoder.include_checksum(true)?;
58+
encoder.set_pledged_src_size(Some(bytes.len().try_into()?))?;
59+
std::io::copy(&mut &bytes[..], &mut encoder)?;
60+
let compressed = encoder.finish()?;
61+
Ok(compressed)
62+
}
63+
}
64+
}
65+
66+
pub(crate) fn is_none(&self) -> bool {
67+
matches!(self, CompressionCodec::None)
68+
}
69+
}
70+
71+
#[cfg(test)]
72+
mod tests {
73+
use crate::compression::CompressionCodec;
74+
75+
#[tokio::test]
76+
async fn test_compression_codec_none() {
77+
let compression_codec = CompressionCodec::None;
78+
let bytes_vec = [0_u8; 100].to_vec();
79+
80+
let compressed = compression_codec.compress(bytes_vec.clone()).unwrap();
81+
assert_eq!(bytes_vec, compressed);
82+
83+
let decompressed = compression_codec.decompress(compressed.clone()).unwrap();
84+
assert_eq!(compressed, decompressed)
85+
}
86+
87+
#[tokio::test]
88+
async fn test_compression_codec_lz4() {
89+
let compression_codec = CompressionCodec::Lz4;
90+
let bytes_vec = [0_u8; 100].to_vec();
91+
92+
assert_eq!(
93+
compression_codec
94+
.compress(bytes_vec.clone())
95+
.unwrap_err()
96+
.to_string(),
97+
"FeatureUnsupported => LZ4 compression is not supported currently",
98+
);
99+
100+
assert_eq!(
101+
compression_codec
102+
.decompress(bytes_vec.clone())
103+
.unwrap_err()
104+
.to_string(),
105+
"FeatureUnsupported => LZ4 decompression is not supported currently",
106+
)
107+
}
108+
109+
#[tokio::test]
110+
async fn test_compression_codec_zstd() {
111+
let compression_codec = CompressionCodec::Zstd;
112+
let bytes_vec = [0_u8; 100].to_vec();
113+
114+
let compressed = compression_codec.compress(bytes_vec.clone()).unwrap();
115+
assert!(compressed.len() < bytes_vec.len());
116+
117+
let decompressed = compression_codec.decompress(compressed.clone()).unwrap();
118+
assert_eq!(decompressed, bytes_vec)
119+
}
120+
}

crates/puffin/src/lib.rs

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Iceberg Puffin implementation.
19+
20+
#![deny(missing_docs)]
21+
// Temporarily allowing this while crate is under active development
22+
#![allow(dead_code)]
23+
24+
mod compression;
25+
pub use compression::CompressionCodec;

0 commit comments

Comments
 (0)