diff --git a/bindings/cpp/examples/example.cpp b/bindings/cpp/examples/example.cpp index 04f9ac64..6ff2b9b7 100644 --- a/bindings/cpp/examples/example.cpp +++ b/bindings/cpp/examples/example.cpp @@ -61,7 +61,6 @@ int main() { auto descriptor = fluss::TableDescriptor::NewBuilder() .SetSchema(schema) .SetBucketCount(3) - .SetProperty("table.log.arrow.compression.type", "NONE") .SetComment("cpp example table with 3 buckets") .Build(); diff --git a/crates/fluss/src/client/write/accumulator.rs b/crates/fluss/src/client/write/accumulator.rs index e4ca9578..215adbe6 100644 --- a/crates/fluss/src/client/write/accumulator.rs +++ b/crates/fluss/src/client/write/accumulator.rs @@ -94,6 +94,7 @@ impl RecordAccumulator { let table_path = &record.table_path; let table_info = cluster.get_table(table_path); + let arrow_compression_info = table_info.get_table_config().get_arrow_compression_info()?; let row_type = &cluster.get_table(table_path).row_type; let schema_id = table_info.schema_id; @@ -102,6 +103,7 @@ impl RecordAccumulator { self.batch_id.fetch_add(1, Ordering::Relaxed), table_path.as_ref().clone(), schema_id, + arrow_compression_info, row_type, bucket_id, current_time_ms(), diff --git a/crates/fluss/src/client/write/batch.rs b/crates/fluss/src/client/write/batch.rs index 13b3d364..ba04db4a 100644 --- a/crates/fluss/src/client/write/batch.rs +++ b/crates/fluss/src/client/write/batch.rs @@ -18,6 +18,7 @@ use crate::BucketId; use crate::client::broadcast::{BatchWriteResult, BroadcastOnce}; use crate::client::{ResultHandle, WriteRecord}; +use crate::compression::ArrowCompressionInfo; use crate::error::Result; use crate::metadata::{DataType, TablePath}; use crate::record::MemoryLogRecordsArrowBuilder; @@ -132,10 +133,12 @@ pub struct ArrowLogWriteBatch { } impl ArrowLogWriteBatch { + #[allow(clippy::too_many_arguments)] pub fn new( batch_id: i64, table_path: TablePath, schema_id: i32, + arrow_compression_info: ArrowCompressionInfo, row_type: &DataType, bucket_id: BucketId, create_ms: i64, @@ -148,6 +151,7 @@ impl ArrowLogWriteBatch { schema_id, row_type, to_append_record_batch, + arrow_compression_info, ), } } diff --git a/crates/fluss/src/compression/arrow_compression.rs b/crates/fluss/src/compression/arrow_compression.rs new file mode 100644 index 00000000..32dfadb4 --- /dev/null +++ b/crates/fluss/src/compression/arrow_compression.rs @@ -0,0 +1,245 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::{Error, Result}; +use arrow::ipc::CompressionType; +use std::collections::HashMap; + +pub const TABLE_LOG_ARROW_COMPRESSION_ZSTD_LEVEL: &str = "table.log.arrow.compression.zstd.level"; +pub const TABLE_LOG_ARROW_COMPRESSION_TYPE: &str = "table.log.arrow.compression.type"; +pub const DEFAULT_NON_ZSTD_COMPRESSION_LEVEL: i32 = -1; +pub const DEFAULT_ZSTD_COMPRESSION_LEVEL: i32 = 3; + +#[derive(Clone, Debug, PartialEq)] +pub enum ArrowCompressionType { + None, + Lz4Frame, + Zstd, +} + +impl ArrowCompressionType { + fn from_conf(properties: &HashMap) -> Result { + match properties + .get(TABLE_LOG_ARROW_COMPRESSION_TYPE) + .map(|s| s.as_str()) + { + Some("NONE") => Ok(Self::None), + Some("LZ4_FRAME") => Ok(Self::Lz4Frame), + Some("ZSTD") => Ok(Self::Zstd), + Some(other) => Err(Error::IllegalArgument { + message: format!("Unsupported compression type: {other}"), + }), + None => Ok(Self::Zstd), + } + } +} + +#[derive(Clone, Debug)] +pub struct ArrowCompressionInfo { + pub compression_type: ArrowCompressionType, + pub compression_level: i32, +} + +impl ArrowCompressionInfo { + pub fn from_conf(properties: &HashMap) -> Result { + let compression_type = ArrowCompressionType::from_conf(properties)?; + + if compression_type != ArrowCompressionType::Zstd { + return Ok(Self { + compression_type, + compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL, + }); + } + + match properties + .get(TABLE_LOG_ARROW_COMPRESSION_ZSTD_LEVEL) + .map(|s| s.as_str().parse::()) + { + Some(Ok(level)) if !(1..=22).contains(&level) => Err(Error::IllegalArgument { + message: format!( + "Invalid ZSTD compression level: {}. Expected a value between 1 and 22.", + level + ), + }), + Some(Err(e)) => Err(Error::IllegalArgument { + message: format!( + "Invalid ZSTD compression level. Expected a value between 1 and 22. {}", + e + ), + }), + + Some(Ok(level)) => Ok(Self { + compression_type, + compression_level: level, + }), + None => Ok(Self { + compression_type, + compression_level: DEFAULT_ZSTD_COMPRESSION_LEVEL, + }), + } + } + + #[cfg(test)] + fn new(compression_type: ArrowCompressionType, compression_level: i32) -> ArrowCompressionInfo { + Self { + compression_type, + compression_level, + } + } + + pub fn get_compression_type(&self) -> Option { + match self.compression_type { + ArrowCompressionType::Zstd => Some(CompressionType::ZSTD), + ArrowCompressionType::Lz4Frame => Some(CompressionType::LZ4_FRAME), + ArrowCompressionType::None => None, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::HashMap; + + #[test] + fn test_from_conf() { + assert_eq!( + ArrowCompressionType::from_conf(&HashMap::new()).unwrap(), + ArrowCompressionType::Zstd + ); + + assert_eq!( + ArrowCompressionType::from_conf(&mk_map(&[( + "table.log.arrow.compression.type", + "NONE" + )])) + .unwrap(), + ArrowCompressionType::None + ); + + assert_eq!( + ArrowCompressionType::from_conf(&mk_map(&[( + "table.log.arrow.compression.type", + "LZ4_FRAME" + )])) + .unwrap(), + ArrowCompressionType::Lz4Frame + ); + + assert_eq!( + ArrowCompressionType::from_conf(&mk_map(&[( + "table.log.arrow.compression.type", + "ZSTD" + )])) + .unwrap(), + ArrowCompressionType::Zstd + ); + } + + #[test] + fn test_from_conf_invalid_compression_type() { + let props = mk_map(&[("table.log.arrow.compression.type", "FOO")]); + + assert!( + ArrowCompressionInfo::from_conf(&props) + .unwrap_err() + .to_string() + .contains( + "Fluss hitting illegal argument error Unsupported compression type: FOO." + ) + ); + } + + #[test] + fn test_from_conf_zstd_compression_level() { + let compression_info = ArrowCompressionInfo::from_conf(&mk_map(&[( + "table.log.arrow.compression.type", + "ZSTD", + )])); + assert_eq!(compression_info.unwrap().compression_level, 3); + let compression_info = ArrowCompressionInfo::from_conf(&mk_map(&[ + ("table.log.arrow.compression.type", "ZSTD"), + ("table.log.arrow.compression.zstd.level", "1"), + ])); + assert_eq!(compression_info.unwrap().compression_level, 1); + } + + #[test] + fn test_from_conf_compression_level_out_of_range() { + let props = mk_map(&[ + ("table.log.arrow.compression.type", "ZSTD"), + ("table.log.arrow.compression.zstd.level", "0"), + ]); + + assert!( + ArrowCompressionInfo::from_conf(&props) + .unwrap_err() + .to_string() + .contains("Expected a value between 1 and 22.") + ); + + let props = mk_map(&[ + ("table.log.arrow.compression.type", "ZSTD"), + ("table.log.arrow.compression.zstd.level", "23"), + ]); + + assert!( + ArrowCompressionInfo::from_conf(&props) + .unwrap_err() + .to_string() + .contains("Expected a value between 1 and 22.") + ); + } + + #[test] + fn test_from_conf_compression_level_parse_error() { + let props = mk_map(&[ + ("table.log.arrow.compression.type", "ZSTD"), + ("table.log.arrow.compression.zstd.level", "not-a-number"), + ]); + + assert!( + ArrowCompressionInfo::from_conf(&props) + .unwrap_err() + .to_string() + .contains("Expected a value between 1 and 22.") + ); + } + + #[test] + fn get_compression_type_maps_correctly() { + assert_eq!( + ArrowCompressionInfo::new(ArrowCompressionType::None, -1).get_compression_type(), + None + ); + assert_eq!( + ArrowCompressionInfo::new(ArrowCompressionType::Lz4Frame, -1).get_compression_type(), + Some(CompressionType::LZ4_FRAME) + ); + assert_eq!( + ArrowCompressionInfo::new(ArrowCompressionType::Zstd, -1).get_compression_type(), + Some(CompressionType::ZSTD) + ); + } + + fn mk_map(pairs: &[(&str, &str)]) -> HashMap { + pairs + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect() + } +} diff --git a/crates/fluss/src/compression/mod.rs b/crates/fluss/src/compression/mod.rs new file mode 100644 index 00000000..2b86dba7 --- /dev/null +++ b/crates/fluss/src/compression/mod.rs @@ -0,0 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod arrow_compression; + +pub use arrow_compression::*; diff --git a/crates/fluss/src/lib.rs b/crates/fluss/src/lib.rs index 366edfc6..25978ce0 100644 --- a/crates/fluss/src/lib.rs +++ b/crates/fluss/src/lib.rs @@ -26,6 +26,7 @@ mod cluster; pub mod config; pub mod error; +mod compression; pub mod io; mod util; diff --git a/crates/fluss/src/metadata/table.rs b/crates/fluss/src/metadata/table.rs index 770c4f2c..4f6c04bc 100644 --- a/crates/fluss/src/metadata/table.rs +++ b/crates/fluss/src/metadata/table.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::compression::ArrowCompressionInfo; use crate::error::Error::InvalidTableError; use crate::error::{Error, Result}; use crate::metadata::datatype::{DataField, DataType, RowType}; @@ -721,6 +722,10 @@ impl TableConfig { pub fn from_properties(properties: HashMap) -> Self { TableConfig { properties } } + + pub fn get_arrow_compression_info(&self) -> Result { + ArrowCompressionInfo::from_conf(&self.properties) + } } impl TableInfo { diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index 0a803aec..5a5115ed 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -16,6 +16,7 @@ // under the License. use crate::client::{Record, WriteRecord}; +use crate::compression::ArrowCompressionInfo; use crate::error::Result; use crate::metadata::DataType; use crate::record::{ChangeType, ScanRecord}; @@ -47,6 +48,7 @@ use std::{ sync::Arc, }; +use arrow::ipc::writer::IpcWriteOptions; /// const for record batch pub const BASE_OFFSET_LENGTH: usize = 8; pub const LENGTH_LENGTH: usize = 4; @@ -104,6 +106,7 @@ pub struct MemoryLogRecordsArrowBuilder { batch_sequence: i32, arrow_record_batch_builder: Box, is_closed: bool, + arrow_compression_info: ArrowCompressionInfo, } pub trait ArrowRecordBatchInnerBuilder: Send + Sync { @@ -244,7 +247,12 @@ impl ArrowRecordBatchInnerBuilder for RowAppendRecordBatchBuilder { } impl MemoryLogRecordsArrowBuilder { - pub fn new(schema_id: i32, row_type: &DataType, to_append_record_batch: bool) -> Self { + pub fn new( + schema_id: i32, + row_type: &DataType, + to_append_record_batch: bool, + arrow_compression_info: ArrowCompressionInfo, + ) -> Self { let arrow_batch_builder: Box = { if to_append_record_batch { Box::new(PrebuiltRecordBatchBuilder::default()) @@ -260,6 +268,7 @@ impl MemoryLogRecordsArrowBuilder { batch_sequence: NO_BATCH_SEQUENCE, is_closed: false, arrow_record_batch_builder: arrow_batch_builder, + arrow_compression_info, } } @@ -289,7 +298,15 @@ impl MemoryLogRecordsArrowBuilder { // serialize arrow batch let mut arrow_batch_bytes = vec![]; let table_schema = self.arrow_record_batch_builder.schema(); - let mut writer = StreamWriter::try_new(&mut arrow_batch_bytes, &table_schema)?; + let compression_type = self.arrow_compression_info.get_compression_type(); + let write_option = + IpcWriteOptions::try_with_compression(IpcWriteOptions::default(), compression_type); + let mut writer = StreamWriter::try_new_with_options( + &mut arrow_batch_bytes, + &table_schema, + write_option?, + )?; + // get header len let header = writer.get_ref().len(); let record_batch = self.arrow_record_batch_builder.build_arrow_record_batch()?; diff --git a/crates/fluss/tests/integration/table.rs b/crates/fluss/tests/integration/table.rs index 9eec98ea..3f7946ee 100644 --- a/crates/fluss/tests/integration/table.rs +++ b/crates/fluss/tests/integration/table.rs @@ -103,7 +103,6 @@ mod table_test { .build() .expect("Failed to build schema"), ) - .property("table.log.arrow.compression.type", "NONE") .build() .expect("Failed to build table"); diff --git a/crates/fluss/tests/integration/table_remote_scan.rs b/crates/fluss/tests/integration/table_remote_scan.rs index bdbced95..43c89b54 100644 --- a/crates/fluss/tests/integration/table_remote_scan.rs +++ b/crates/fluss/tests/integration/table_remote_scan.rs @@ -142,7 +142,6 @@ mod table_remote_scan_test { .build() .expect("Failed to build schema"), ) - .property("table.log.arrow.compression.type", "NONE") .build() .expect("Failed to build table");