diff --git a/Cargo.lock b/Cargo.lock index 97ee25d658..f7d4c7df6e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3372,6 +3372,7 @@ dependencies = [ "futures", "iceberg_test_utils", "itertools 0.13.0", + "lz4_flex", "minijinja", "mockall", "moka", diff --git a/Cargo.toml b/Cargo.toml index d099398dbd..9cfe71a452 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -132,3 +132,4 @@ uuid = { version = "1.18", features = ["v7"] } volo = "0.10.6" volo-thrift = "0.10.8" zstd = "0.13.3" +lz4_flex = "0.12.0" \ No newline at end of file diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 6f1332a444..b543e3063d 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -89,6 +89,7 @@ typed-builder = { workspace = true } url = { workspace = true } uuid = { workspace = true } zstd = { workspace = true } +lz4_flex = { workspace = true } [dev-dependencies] ctor = { workspace = true } diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs index 6ab3a78c8b..e6a76ba960 100644 --- a/crates/iceberg/src/error.rs +++ b/crates/iceberg/src/error.rs @@ -63,6 +63,11 @@ pub enum ErrorKind { /// Catalog commit failed due to outdated metadata CatalogCommitConflicts, + + /// An I/O error occurred during filesystem or object store operations (e.g., file not found, + /// permission denied, network timeout, disk full, etc.). + /// Typically recoverable with retries, but may require manual intervention in persistent cases. + IOError, } impl ErrorKind { @@ -84,6 +89,7 @@ impl From for &'static str { ErrorKind::NamespaceNotFound => "NamespaceNotFound", ErrorKind::PreconditionFailed => "PreconditionFailed", ErrorKind::CatalogCommitConflicts => "CatalogCommitConflicts", + ErrorKind::IOError => "IOError", } } } diff --git a/crates/iceberg/src/puffin/compression.rs b/crates/iceberg/src/puffin/compression.rs index a9a56ef12c..bd60d159d7 100644 --- a/crates/iceberg/src/puffin/compression.rs +++ b/crates/iceberg/src/puffin/compression.rs @@ -15,6 +15,9 @@ // specific language governing permissions and limitations // under the License. +use std::io::{Read, Write}; + +use lz4_flex::frame::{FrameDecoder, FrameEncoder}; use serde::{Deserialize, Serialize}; use crate::{Error, ErrorKind, Result}; @@ -36,10 +39,14 @@ impl CompressionCodec { pub(crate) fn decompress(&self, bytes: Vec) -> Result> { match self { CompressionCodec::None => Ok(bytes), - CompressionCodec::Lz4 => Err(Error::new( - ErrorKind::FeatureUnsupported, - "LZ4 decompression is not supported currently", - )), + CompressionCodec::Lz4 => { + let mut decoder = FrameDecoder::new(&bytes[..]); + let mut decompressed = Vec::new(); + decoder + .read_to_end(&mut decompressed) + .map_err(|e| Error::new(ErrorKind::IOError, e.to_string()))?; + Ok(decompressed) + } CompressionCodec::Zstd => { let decompressed = zstd::stream::decode_all(&bytes[..])?; Ok(decompressed) @@ -50,10 +57,16 @@ impl CompressionCodec { pub(crate) fn compress(&self, bytes: Vec) -> Result> { match self { CompressionCodec::None => Ok(bytes), - CompressionCodec::Lz4 => Err(Error::new( - ErrorKind::FeatureUnsupported, - "LZ4 compression is not supported currently", - )), + CompressionCodec::Lz4 => { + let mut encoder = FrameEncoder::new(Vec::new()); + encoder + .write_all(&bytes) + .map_err(|e| Error::new(ErrorKind::IOError, e.to_string()))?; + let compressed = encoder + .finish() + .map_err(|e| Error::new(ErrorKind::IOError, e.to_string()))?; + Ok(compressed) + } CompressionCodec::Zstd => { let writer = Vec::::new(); let mut encoder = zstd::stream::Encoder::new(writer, 3)?; @@ -90,23 +103,19 @@ mod tests { #[tokio::test] async fn test_compression_codec_lz4() { let compression_codec = CompressionCodec::Lz4; - let bytes_vec = [0_u8; 100].to_vec(); - assert_eq!( - compression_codec - .compress(bytes_vec.clone()) - .unwrap_err() - .to_string(), - "FeatureUnsupported => LZ4 compression is not supported currently", - ); - - assert_eq!( - compression_codec - .decompress(bytes_vec.clone()) - .unwrap_err() - .to_string(), - "FeatureUnsupported => LZ4 decompression is not supported currently", - ) + // Highly compressible data: all zeros + let data = vec![0u8; 10_000]; + let compressed = compression_codec.compress(data.clone()).unwrap(); + assert!(compressed.len() < data.len() / 2); // Should compress to less than half the original size + let decompressed = compression_codec.decompress(compressed).unwrap(); + assert_eq!(decompressed, data); + + // Empty input + let empty = vec![]; + let compressed_empty = compression_codec.compress(empty.clone()).unwrap(); + let decompressed_empty = compression_codec.decompress(compressed_empty).unwrap(); + assert_eq!(decompressed_empty, empty); } #[tokio::test] @@ -120,4 +129,23 @@ mod tests { let decompressed = compression_codec.decompress(compressed.clone()).unwrap(); assert_eq!(decompressed, bytes_vec) } + + #[test] + fn test_lz4_roundtrip() { + let data: Vec = (0..10_000).map(|i| (i % 256) as u8).collect(); + + let compressed = CompressionCodec::Lz4.compress(data.clone()).unwrap(); + assert!(compressed.len() < data.len()); + + let decompressed = CompressionCodec::Lz4.decompress(compressed).unwrap(); + assert_eq!(data, decompressed); + } + + #[test] + fn test_lz4_empty() { + let empty = vec![]; + let compressed = CompressionCodec::Lz4.compress(empty.clone()).unwrap(); + let decompressed = CompressionCodec::Lz4.decompress(compressed).unwrap(); + assert_eq!(empty, decompressed); + } }