Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,4 @@ uuid = { version = "1.18", features = ["v7"] }
volo = "0.10.6"
volo-thrift = "0.10.8"
zstd = "0.13.3"
lz4_flex = "0.12.0"
1 change: 1 addition & 0 deletions crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ typed-builder = { workspace = true }
url = { workspace = true }
uuid = { workspace = true }
zstd = { workspace = true }
lz4_flex = { workspace = true }

[dev-dependencies]
ctor = { workspace = true }
Expand Down
6 changes: 6 additions & 0 deletions crates/iceberg/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ pub enum ErrorKind {

/// Catalog commit failed due to outdated metadata
CatalogCommitConflicts,

/// An I/O error occurred during filesystem or object store operations (e.g., file not found,
/// permission denied, network timeout, disk full, etc.).
/// Typically recoverable with retries, but may require manual intervention in persistent cases.
IOError,
}

impl ErrorKind {
Expand All @@ -84,6 +89,7 @@ impl From<ErrorKind> for &'static str {
ErrorKind::NamespaceNotFound => "NamespaceNotFound",
ErrorKind::PreconditionFailed => "PreconditionFailed",
ErrorKind::CatalogCommitConflicts => "CatalogCommitConflicts",
ErrorKind::IOError => "IOError",
}
}
}
Expand Down
76 changes: 52 additions & 24 deletions crates/iceberg/src/puffin/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
// specific language governing permissions and limitations
// under the License.

use std::io::{Read, Write};

use lz4_flex::frame::{FrameDecoder, FrameEncoder};
use serde::{Deserialize, Serialize};

use crate::{Error, ErrorKind, Result};
Expand All @@ -36,10 +39,14 @@ impl CompressionCodec {
pub(crate) fn decompress(&self, bytes: Vec<u8>) -> Result<Vec<u8>> {
match self {
CompressionCodec::None => Ok(bytes),
CompressionCodec::Lz4 => Err(Error::new(
ErrorKind::FeatureUnsupported,
"LZ4 decompression is not supported currently",
)),
CompressionCodec::Lz4 => {
let mut decoder = FrameDecoder::new(&bytes[..]);
let mut decompressed = Vec::new();
decoder
.read_to_end(&mut decompressed)
.map_err(|e| Error::new(ErrorKind::IOError, e.to_string()))?;
Ok(decompressed)
}
CompressionCodec::Zstd => {
let decompressed = zstd::stream::decode_all(&bytes[..])?;
Ok(decompressed)
Expand All @@ -50,10 +57,16 @@ impl CompressionCodec {
pub(crate) fn compress(&self, bytes: Vec<u8>) -> Result<Vec<u8>> {
match self {
CompressionCodec::None => Ok(bytes),
CompressionCodec::Lz4 => Err(Error::new(
ErrorKind::FeatureUnsupported,
"LZ4 compression is not supported currently",
)),
CompressionCodec::Lz4 => {
let mut encoder = FrameEncoder::new(Vec::new());
encoder
.write_all(&bytes)
.map_err(|e| Error::new(ErrorKind::IOError, e.to_string()))?;
let compressed = encoder
.finish()
.map_err(|e| Error::new(ErrorKind::IOError, e.to_string()))?;
Ok(compressed)
}
CompressionCodec::Zstd => {
let writer = Vec::<u8>::new();
let mut encoder = zstd::stream::Encoder::new(writer, 3)?;
Expand Down Expand Up @@ -90,23 +103,19 @@ mod tests {
#[tokio::test]
async fn test_compression_codec_lz4() {
let compression_codec = CompressionCodec::Lz4;
let bytes_vec = [0_u8; 100].to_vec();

assert_eq!(
compression_codec
.compress(bytes_vec.clone())
.unwrap_err()
.to_string(),
"FeatureUnsupported => LZ4 compression is not supported currently",
);

assert_eq!(
compression_codec
.decompress(bytes_vec.clone())
.unwrap_err()
.to_string(),
"FeatureUnsupported => LZ4 decompression is not supported currently",
)
// Highly compressible data: all zeros
let data = vec![0u8; 10_000];
let compressed = compression_codec.compress(data.clone()).unwrap();
assert!(compressed.len() < data.len() / 2); // Should compress to less than half the original size
let decompressed = compression_codec.decompress(compressed).unwrap();
assert_eq!(decompressed, data);

// Empty input
let empty = vec![];
let compressed_empty = compression_codec.compress(empty.clone()).unwrap();
let decompressed_empty = compression_codec.decompress(compressed_empty).unwrap();
assert_eq!(decompressed_empty, empty);
}

#[tokio::test]
Expand All @@ -120,4 +129,23 @@ mod tests {
let decompressed = compression_codec.decompress(compressed.clone()).unwrap();
assert_eq!(decompressed, bytes_vec)
}

#[test]
fn test_lz4_roundtrip() {
let data: Vec<u8> = (0..10_000).map(|i| (i % 256) as u8).collect();

let compressed = CompressionCodec::Lz4.compress(data.clone()).unwrap();
assert!(compressed.len() < data.len());

let decompressed = CompressionCodec::Lz4.decompress(compressed).unwrap();
assert_eq!(data, decompressed);
}

#[test]
fn test_lz4_empty() {
let empty = vec![];
let compressed = CompressionCodec::Lz4.compress(empty.clone()).unwrap();
let decompressed = CompressionCodec::Lz4.decompress(compressed).unwrap();
assert_eq!(empty, decompressed);
}
}
Loading