diff --git a/Cargo.toml b/Cargo.toml index a3b8180f..37ad7d6b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ path = "src/lib.rs" [features] default = [] lz4 = ["dep:lz4_flex"] +zlib =["dep:flate2"] bytes_1 = ["dep:bytes"] metrics = [] use_unsafe = [] # TODO: 3.0.0 remove @@ -32,6 +33,7 @@ enum_dispatch = "0.3.13" interval-heap = "0.0.5" log = "0.4.27" lz4_flex = { version = "0.11.5", optional = true, default-features = false } +flate2 = { version = "1.0.17", optional = true, default-features = false, features = ["zlib-rs"] } quick_cache = { version = "0.6.16", default-features = false, features = [] } rustc-hash = "2.1.1" self_cell = "1.2.0" diff --git a/src/coding.rs b/src/coding.rs index 03a33bb6..e5cb6f6c 100644 --- a/src/coding.rs +++ b/src/coding.rs @@ -9,17 +9,20 @@ use std::io::{Read, Write}; pub enum EncodeError { /// I/O error Io(std::io::Error), + InvalidCompressionLevel((crate::CompressionType, i32)), } impl std::fmt::Display for EncodeError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "EncodeError({})", - match self { - Self::Io(e) => e.to_string(), + match self { + Self::Io(e) => write!(f, "EncodeError(Io: {})", e), + Self::InvalidCompressionLevel((compression_type, level)) => { + write!( + f, + "EncodeError(InvalidCompressionLevel: {compression_type:?} level {level})" + ) } - ) + } } } @@ -33,6 +36,7 @@ impl std::error::Error for EncodeError { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { match self { Self::Io(e) => Some(e), + Self::InvalidCompressionLevel(_) => None, } } } @@ -57,6 +61,8 @@ pub enum DecodeError { /// UTF-8 error Utf8(std::str::Utf8Error), + + InvalidCompressionLevel((crate::CompressionType, i32)), } impl std::fmt::Display for DecodeError { @@ -66,6 +72,8 @@ impl std::fmt::Display for DecodeError { "DecodeError({})", match self { Self::Io(e) => e.to_string(), + Self::InvalidCompressionLevel((compression_type, level)) => + format!("InvalidCompressionLevel: {compression_type:?} level {level}"), e => format!("{e:?}"), } ) diff --git a/src/compression.rs b/src/compression.rs index 0391229d..4e81e568 100644 --- a/src/compression.rs +++ b/src/compression.rs @@ -6,6 +6,9 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError}; use byteorder::{ReadBytesExt, WriteBytesExt}; use std::io::{Read, Write}; +#[cfg(feature = "zlib")] +const ZLIB_MAX_LEVEL: u8 = 9; + /// Compression algorithm to use #[derive(Copy, Clone, Debug, Eq, PartialEq)] #[allow(clippy::module_name_repetitions)] @@ -21,6 +24,10 @@ pub enum CompressionType { /// on speed over compression ratio. #[cfg(feature = "lz4")] Lz4, + + /// Zlib compression + #[cfg(feature = "zlib")] + Zlib(u8), } impl Encode for CompressionType { @@ -34,6 +41,18 @@ impl Encode for CompressionType { Self::Lz4 => { writer.write_u8(1)?; } + + #[cfg(feature = "zlib")] + Self::Zlib(level) => { + if *level > ZLIB_MAX_LEVEL { + return Err(EncodeError::InvalidCompressionLevel(( + Self::Zlib(*level), + *level as i32, + ))); + } + writer.write_u8(2)?; + writer.write_u8(*level)?; + } } Ok(()) @@ -50,6 +69,17 @@ impl Decode for CompressionType { #[cfg(feature = "lz4")] 1 => Ok(Self::Lz4), + #[cfg(feature = "zlib")] + 2 => { + let level = reader.read_u8()?; + if level > ZLIB_MAX_LEVEL { + return Err(DecodeError::InvalidCompressionLevel(( + Self::Zlib(level), + level as i32, + ))); + } + Ok(Self::Zlib(level)) + } tag => Err(DecodeError::InvalidTag(("CompressionType", tag))), } } @@ -65,6 +95,11 @@ impl std::fmt::Display for CompressionType { #[cfg(feature = "lz4")] Self::Lz4 => "lz4", + + #[cfg(feature = "zlib")] + Self::Zlib(level) => { + return write!(f, "zlib (level {})", level); + } } ) } @@ -92,4 +127,30 @@ mod tests { assert_eq!(1, serialized.len()); } } + + #[cfg(feature = "zlib")] + mod zlib { + use super::*; + use test_log::test; + + #[test] + fn compression_serialize_zlib() { + for level in 0..=ZLIB_MAX_LEVEL { + let serialized = CompressionType::Zlib(level).encode_into_vec(); + assert_eq!(2, serialized.len()); + } + } + + #[test] + fn compression_serialize_zlib_invalid_level() { + let err = CompressionType::Zlib(ZLIB_MAX_LEVEL + 1).encode_into_vec_err(); + assert!(matches!( + err, + Err(EncodeError::InvalidCompressionLevel(( + CompressionType::Zlib(10), + 10 + ))) + )); + } + } } diff --git a/src/segment/block/mod.rs b/src/segment/block/mod.rs index 10edc744..28a7b7ba 100644 --- a/src/segment/block/mod.rs +++ b/src/segment/block/mod.rs @@ -23,7 +23,12 @@ use crate::{ segment::BlockHandle, CompressionType, Slice, }; +use std::borrow::Cow; use std::fs::File; +use std::io::{Read, Write}; + +#[cfg(feature = "zlib")] +use flate2::{read::ZlibDecoder, write::ZlibEncoder, Compression as ZCompression}; /// A block on disk /// @@ -56,16 +61,24 @@ impl Block { previous_block_offset: BlockOffset(0), // <-- TODO: }; - let data = match compression { - CompressionType::None => data, + let data: Cow<[u8]> = match compression { + CompressionType::None => Cow::Borrowed(data), #[cfg(feature = "lz4")] - CompressionType::Lz4 => &lz4_flex::compress(data), + CompressionType::Lz4 => Cow::Owned((lz4_flex::compress(data))), + + #[cfg(feature = "zlib")] + CompressionType::Zlib(level) => { + let lvl = level as u32; + let mut e = ZlibEncoder::new(Vec::new(), ZCompression::new(lvl)); + e.write_all(data)?; + Cow::Owned(e.finish()?) + } }; header.data_length = data.len() as u32; header.encode_into(&mut writer)?; - writer.write_all(data)?; + writer.write_all(&data)?; log::trace!( "Writing block with size {}B (compressed: {}B) (excluding header of {}B)", @@ -99,6 +112,16 @@ impl Block { builder.freeze().into() } + + #[cfg(feature = "zlib")] + CompressionType::Zlib(_level) => { + let mut d = ZlibDecoder::new(&raw_data[..]); + let mut decompressed_data = + unsafe { Slice::builder_unzeroed(header.uncompressed_length as usize) }; + d.read_exact(&mut decompressed_data) + .map_err(|_| crate::Error::Decompress(compression))?; + decompressed_data.freeze().into() + } }; debug_assert_eq!(header.uncompressed_length, { @@ -131,6 +154,43 @@ impl Block { handle: BlockHandle, compression: CompressionType, ) -> crate::Result { + #[warn(unsafe_code)] + let mut builder = unsafe { Slice::builder_unzeroed(handle.size() as usize) }; + { + #[cfg(unix)] + { + use std::os::unix::fs::FileExt; + + let bytes_read = file.read_at(&mut builder, *handle.offset())?; + + assert_eq!( + bytes_read, + handle.size() as usize, + "not enough bytes read: file has length {}", + file.metadata()?.len(), + ); + } + + #[cfg(windows)] + { + use std::os::windows::fs::FileExt; + + let bytes_read = file.seek_read(&mut builder, *handle.offset())?; + + assert_eq!( + bytes_read, + handle.size() as usize, + "not enough bytes read: file has length {}", + file.metadata()?.len(), + ); + } + + #[cfg(not(any(unix, windows)))] + { + compile_error!("unsupported OS"); + unimplemented!(); + } + } let buf = crate::file::read_exact(file, *handle.offset(), handle.size() as usize)?; let header = Header::decode_from(&mut &buf[..])?; @@ -154,6 +214,18 @@ impl Block { builder.freeze().into() } + + #[cfg(feature = "zlib")] + CompressionType::Zlib(_level) => { + #[allow(clippy::indexing_slicing)] + let raw_data = &buf[Header::serialized_len()..]; + let mut d = ZlibDecoder::new(raw_data); + let mut decompressed_data = + unsafe { Slice::builder_unzeroed(header.uncompressed_length as usize) }; + d.read_exact(&mut decompressed_data) + .map_err(|_| crate::Error::Decompress(compression))?; + decompressed_data.freeze().into() + } }; #[allow(clippy::expect_used, clippy::cast_possible_truncation)]