diff --git a/bindings/cpp/src/types.rs b/bindings/cpp/src/types.rs index 84aec25b..9ef8fe72 100644 --- a/bindings/cpp/src/types.rs +++ b/bindings/cpp/src/types.rs @@ -529,6 +529,7 @@ pub fn resolve_row_types( Datum::TimestampNtz(ts) => Datum::TimestampNtz(*ts), Datum::TimestampLtz(ts) => Datum::TimestampLtz(*ts), Datum::Array(a) => Datum::Array(a.clone()), + Datum::Map(m) => Datum::Map(m.clone()), Datum::Row(_) => return Err(anyhow!("Row datum is not yet supported in C++ bindings")), }; out.set_field(idx, resolved); @@ -588,6 +589,9 @@ pub fn compacted_row_to_owned( Datum::Blob(Cow::Owned(row.get_binary(i, dt.length())?.to_vec())) } fcore::metadata::DataType::Array(_) => Datum::Array(row.get_array(i)?), + fcore::metadata::DataType::Map(mt) => { + Datum::Map(row.get_map(i, mt.key_type(), mt.value_type())?) + } other => return Err(anyhow!("Unsupported data type for column {i}: {other:?}")), }; diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs index 4133bed4..b30baeb5 100644 --- a/bindings/python/src/table.rs +++ b/bindings/python/src/table.rs @@ -1369,6 +1369,7 @@ fn python_value_to_datum( } } Datum::Array(v) => writer.write_array(i, &v), + Datum::Map(v) => writer.write_map(i, &v), Datum::Row(_) => { return Err(FlussError::new_err( "Row datum is not supported as an array element", diff --git a/crates/fluss/src/client/table/append.rs b/crates/fluss/src/client/table/append.rs index 53552008..562e8ea7 100644 --- a/crates/fluss/src/client/table/append.rs +++ b/crates/fluss/src/client/table/append.rs @@ -126,7 +126,12 @@ impl AppendWriter { /// or dropped for fire-and-forget behavior (use `flush()` to ensure delivery). pub fn append_arrow_batch(&self, batch: RecordBatch) -> Result { let physical_table_path = if self.partition_getter.is_some() && batch.num_rows() > 0 { - let first_row = ColumnarRow::new(Arc::new(batch.clone()), 0, None); + let first_row = ColumnarRow::new( + Arc::new(batch.clone()), + Arc::new(self.table_info.row_type.clone()), + 0, + None, + ); Arc::new(get_physical_path( &self.table_path, self.partition_getter.as_ref(), diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs b/crates/fluss/src/client/table/log_fetch_buffer.rs index 0dff4e87..9d45abad 100644 --- a/crates/fluss/src/client/table/log_fetch_buffer.rs +++ b/crates/fluss/src/client/table/log_fetch_buffer.rs @@ -842,7 +842,11 @@ mod tests { fn test_read_context() -> Result { let row_type = RowType::new(vec![DataField::new("id", DataTypes::int(), None)]); - Ok(ReadContext::new(to_arrow_schema(&row_type)?, false)) + Ok(ReadContext::new( + to_arrow_schema(&row_type)?, + Arc::new(row_type), + false, + )) } struct ErrorPendingFetch { @@ -921,7 +925,7 @@ mod tests { let data = builder.build()?; let log_records = LogRecordsBatches::new(data.clone()); - let read_context = ReadContext::new(to_arrow_schema(&row_type)?, false); + let read_context = ReadContext::new(to_arrow_schema(&row_type)?, Arc::new(row_type), false); let mut fetch = DefaultCompletedFetch::new( TableBucket::new(1, 0), log_records, diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 86870991..a4164b99 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -801,12 +801,20 @@ impl LogFetcher { .collect(), )), }; - let read_context = - Self::create_read_context(full_arrow_schema.clone(), projected_fields.clone(), false)? - .with_fluss_row_type(projected_row_type.clone()); - let remote_read_context = - Self::create_read_context(full_arrow_schema, projected_fields.clone(), true)? - .with_fluss_row_type(projected_row_type); + let read_context = Self::create_read_context( + full_arrow_schema.clone(), + projected_row_type.clone(), + projected_fields.clone(), + false, + )? + .with_fluss_row_type(projected_row_type.clone()); + let remote_read_context = Self::create_read_context( + full_arrow_schema, + projected_row_type.clone(), + projected_fields.clone(), + true, + )? + .with_fluss_row_type(projected_row_type); let tmp_dir = TempDir::with_prefix("fluss-remote-logs")?; let log_fetch_buffer = Arc::new(LogFetchBuffer::new(read_context.clone())); @@ -851,14 +859,22 @@ impl LogFetcher { fn create_read_context( full_arrow_schema: SchemaRef, + row_type: Arc, projected_fields: Option>, is_from_remote: bool, ) -> Result { match projected_fields { - None => Ok(ReadContext::new(full_arrow_schema, is_from_remote)), - Some(fields) => { - ReadContext::with_projection_pushdown(full_arrow_schema, fields, is_from_remote) - } + None => Ok(ReadContext::new( + full_arrow_schema, + row_type, + is_from_remote, + )), + Some(fields) => ReadContext::with_projection_pushdown( + full_arrow_schema, + row_type, + fields, + is_from_remote, + ), } } @@ -1901,7 +1917,8 @@ mod tests { let data = build_records(&table_info, Arc::new(table_path))?; let log_records = LogRecordsBatches::new(data.clone()); - let read_context = ReadContext::new(to_arrow_schema(table_info.get_row_type())?, false); + let row_type = Arc::new(table_info.get_row_type().clone()); + let read_context = ReadContext::new(to_arrow_schema(&row_type)?, row_type, false); let completed = DefaultCompletedFetch::new(bucket.clone(), log_records, data.len(), read_context, 0, 0); fetcher.log_fetch_buffer.add(Box::new(completed)); @@ -1931,7 +1948,8 @@ mod tests { let bucket = TableBucket::new(1, 0); let data = build_records(&table_info, Arc::new(table_path))?; let log_records = LogRecordsBatches::new(data.clone()); - let read_context = ReadContext::new(to_arrow_schema(table_info.get_row_type())?, false); + let row_type = Arc::new(table_info.get_row_type().clone()); + let read_context = ReadContext::new(to_arrow_schema(&row_type)?, row_type, false); let mut completed: Box = Box::new(DefaultCompletedFetch::new( bucket, log_records, diff --git a/crates/fluss/src/metadata/datatype.rs b/crates/fluss/src/metadata/datatype.rs index ffc48a8b..60a44ba7 100644 --- a/crates/fluss/src/metadata/datatype.rs +++ b/crates/fluss/src/metadata/datatype.rs @@ -920,13 +920,36 @@ impl Display for ArrayType { } } -#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Hash)] pub struct MapType { nullable: bool, key_type: Box, value_type: Box, } +// Route Deserialize through `with_nullable` so a Serde-built MapType +// collapses to the same canonical form as the constructor (otherwise +// equivalent maps disagree under `PartialEq`). +impl<'de> Deserialize<'de> for MapType { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + #[derive(Deserialize)] + struct Raw { + nullable: bool, + key_type: Box, + value_type: Box, + } + let raw = Raw::deserialize(deserializer)?; + Ok(MapType::with_nullable( + raw.nullable, + *raw.key_type, + *raw.value_type, + )) + } +} + impl MapType { pub fn new(key_type: DataType, value_type: DataType) -> Self { Self::with_nullable(true, key_type, value_type) @@ -935,7 +958,7 @@ impl MapType { pub fn with_nullable(nullable: bool, key_type: DataType, value_type: DataType) -> Self { Self { nullable, - key_type: Box::new(key_type), + key_type: Box::new(key_type.as_non_nullable()), value_type: Box::new(value_type), } } @@ -1452,16 +1475,60 @@ fn test_array_display() { #[test] fn test_map_display() { let map_type = MapType::new(DataTypes::string(), DataTypes::int()); - assert_eq!(map_type.to_string(), "MAP"); + assert_eq!(map_type.to_string(), "MAP"); let map_type_non_null = MapType::with_nullable(false, DataTypes::int(), DataTypes::string()); - assert_eq!(map_type_non_null.to_string(), "MAP NOT NULL"); + assert_eq!( + map_type_non_null.to_string(), + "MAP NOT NULL" + ); let nested_map = MapType::new( DataTypes::string(), DataTypes::map(DataTypes::int(), DataTypes::boolean()), ); - assert_eq!(nested_map.to_string(), "MAP>"); + assert_eq!( + nested_map.to_string(), + "MAP>" + ); +} + +#[test] +fn test_map_deserialize_normalises_key_nullability() { + let json = r#"{ + "nullable": true, + "key_type": {"Int": {"nullable": true}}, + "value_type": {"String": {"nullable": true}} + }"#; + let from_json: MapType = serde_json::from_str(json).expect("deserialize"); + let from_ctor = MapType::new(DataTypes::int(), DataTypes::string()); + assert_eq!(from_json, from_ctor); + assert!(!from_json.key_type().is_nullable()); +} + +#[test] +fn test_map_deserialize_normalises_nested_map_keys() { + let json = r#"{ + "nullable": true, + "key_type": {"String": {"nullable": true}}, + "value_type": {"Map": { + "nullable": true, + "key_type": {"Int": {"nullable": true}}, + "value_type": {"Boolean": {"nullable": true}} + }} + }"#; + let from_json: MapType = serde_json::from_str(json).expect("deserialize"); + let from_ctor = MapType::new( + DataTypes::string(), + DataTypes::map(DataTypes::int(), DataTypes::boolean()), + ); + assert_eq!(from_json, from_ctor); + assert!(!from_json.key_type().is_nullable()); + let inner = match from_json.value_type() { + DataType::Map(m) => m, + other => panic!("expected nested Map, got {other:?}"), + }; + assert!(!inner.key_type().is_nullable()); } #[test] @@ -1497,7 +1564,7 @@ fn test_datatype_display() { assert_eq!(DataTypes::array(DataTypes::int()).to_string(), "ARRAY"); assert_eq!( DataTypes::map(DataTypes::string(), DataTypes::int()).to_string(), - "MAP" + "MAP" ); } @@ -1525,7 +1592,7 @@ fn test_complex_nested_display() { ]); assert_eq!( row_type.to_string(), - "ROW, metadata MAP>" + "ROW, metadata MAP>" ); } @@ -1547,7 +1614,10 @@ fn test_deeply_nested_types() { DataTypes::field("y", DataTypes::int()), ]), )); - assert_eq!(nested.to_string(), "ARRAY>>"); + assert_eq!( + nested.to_string(), + "ARRAY>>" + ); } // ============================================================================ diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index 9f1a9784..b97fc120 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -994,6 +994,7 @@ impl LogRecordBatch { let record_batch = read_context.record_batch(data)?; let arrow_reader = ArrowReader::new_with_fluss_row_type( Arc::new(record_batch), + read_context.row_type.clone(), read_context.fluss_row_type().cloned(), ); let log_record_iterator = LogRecordIterator::Arrow(ArrowLogRecordIterator { @@ -1020,6 +1021,7 @@ impl LogRecordBatch { Some(record_batch) => { let arrow_reader = ArrowReader::new_with_fluss_row_type( Arc::new(record_batch), + read_context.row_type.clone(), read_context.fluss_row_type().cloned(), ); LogRecordIterator::Arrow(ArrowLogRecordIterator { @@ -1237,7 +1239,7 @@ pub fn to_arrow_type(fluss_type: &DataType) -> Result { Arc::new(Field::new( "entries", ArrowDataType::Struct(arrow_schema::Fields::from(entry_fields)), - fluss_type.is_nullable(), + false, )), false, ) @@ -1259,6 +1261,16 @@ pub fn to_arrow_type(fluss_type: &DataType) -> Result { }) } +/// Like `from_arrow_type`, but also reads the Field's nullability — +/// Arrow stores it on the Field wrapper, not the leaf data type. +pub(crate) fn from_arrow_field(field: &arrow_schema::Field) -> Result { + let mut dt = from_arrow_type(field.data_type())?; + if !field.is_nullable() { + dt = dt.as_non_nullable(); + } + Ok(dt) +} + /// Converts an Arrow data type back to a Fluss `DataType`. /// Used for reading array elements from Arrow ListArray back into Fluss types. pub(crate) fn from_arrow_type(arrow_type: &ArrowDataType) -> Result { @@ -1315,17 +1327,30 @@ pub(crate) fn from_arrow_type(arrow_type: &ArrowDataType) -> Result { DataTypes::timestamp_with_precision(precision) } } - ArrowDataType::List(field) => DataTypes::array(from_arrow_type(field.data_type())?), + ArrowDataType::List(field) => DataTypes::array(from_arrow_field(field)?), + ArrowDataType::Map(entries_field, _sorted) => { + let fields = match entries_field.data_type() { + ArrowDataType::Struct(f) => f, + other => { + return Err(Error::IllegalArgument { + message: format!("Map entries must be Struct, got {other:?}"), + }); + } + }; + if fields.len() != 2 { + return Err(Error::IllegalArgument { + message: format!( + "Map entries Struct must have 2 fields (key, value), got {}", + fields.len() + ), + }); + } + DataTypes::map(from_arrow_field(&fields[0])?, from_arrow_field(&fields[1])?) + } ArrowDataType::Struct(fields) => { let row_fields: Result> = fields .iter() - .map(|f| { - let mut dt = from_arrow_type(f.data_type())?; - if !f.is_nullable() { - dt = dt.as_non_nullable(); - } - Ok(DataField::new(f.name(), dt, None)) - }) + .map(|f| Ok(DataField::new(f.name(), from_arrow_field(f)?, None))) .collect(); DataTypes::row(row_fields?) } @@ -1341,6 +1366,7 @@ pub(crate) fn from_arrow_type(arrow_type: &ArrowDataType) -> Result { pub struct ReadContext { target_schema: SchemaRef, full_schema: SchemaRef, + row_type: Arc, projection: Option, is_from_remote: bool, fluss_row_type: Option>, @@ -1357,10 +1383,15 @@ struct Projection { } impl ReadContext { - pub fn new(arrow_schema: SchemaRef, is_from_remote: bool) -> ReadContext { + pub fn new( + arrow_schema: SchemaRef, + row_type: Arc, + is_from_remote: bool, + ) -> ReadContext { ReadContext { target_schema: arrow_schema.clone(), full_schema: arrow_schema, + row_type, projection: None, is_from_remote, fluss_row_type: None, @@ -1378,6 +1409,7 @@ impl ReadContext { pub fn with_projection_pushdown( arrow_schema: SchemaRef, + row_type: Arc, projected_fields: Vec, is_from_remote: bool, ) -> Result { @@ -1442,6 +1474,7 @@ impl ReadContext { Ok(ReadContext { target_schema, full_schema: arrow_schema, + row_type, projection: Some(project), is_from_remote, fluss_row_type: None, @@ -1635,15 +1668,17 @@ impl Iterator for ArrowLogRecordIterator { pub struct ArrowReader { record_batch: Arc, + row_type: Arc, fluss_row_type: Option>, row_column_indices: Arc<[usize]>, } impl ArrowReader { - pub fn new(record_batch: Arc) -> Self { + pub fn new(record_batch: Arc, row_type: Arc) -> Self { let row_column_indices = arrow_row_column_indices(&record_batch); ArrowReader { record_batch, + row_type, fluss_row_type: None, row_column_indices, } @@ -1651,6 +1686,7 @@ impl ArrowReader { pub fn new_with_fluss_row_type( record_batch: Arc, + row_type: Arc, fluss_row_type: Option>, ) -> Self { let row_column_indices = match &fluss_row_type { @@ -1659,6 +1695,7 @@ impl ArrowReader { }; ArrowReader { record_batch, + row_type, fluss_row_type, row_column_indices, } @@ -1671,6 +1708,7 @@ impl ArrowReader { pub fn read(&self, row_id: usize) -> ColumnarRow { ColumnarRow::with_indices( self.record_batch.clone(), + self.row_type.clone(), row_id, self.fluss_row_type.clone(), self.row_column_indices.clone(), @@ -1799,10 +1837,10 @@ mod tests { Arc::new(Field::new( "entries", ArrowDataType::Struct(arrow_schema::Fields::from(vec![ - Field::new("key", ArrowDataType::Utf8, true), + Field::new("key", ArrowDataType::Utf8, false), Field::new("value", ArrowDataType::Int32, true), ])), - true, + false, )), false, ) @@ -1821,6 +1859,48 @@ mod tests { ); } + #[test] + fn test_arrow_map_schema_strictness() { + let map_type = DataTypes::map(DataTypes::string(), DataTypes::int()); + let arrow_type = to_arrow_type(&map_type).unwrap(); + + if let ArrowDataType::Map(entries_field, _) = arrow_type { + assert!( + !entries_field.is_nullable(), + "Arrow Map 'entries' field must be strictly non-nullable" + ); + } else { + panic!("Expected ArrowDataType::Map, got {:?}", arrow_type); + } + } + + #[test] + fn test_from_arrow_type_preserves_container_field_nullability() { + let arrow_list = ArrowDataType::List(Arc::new(arrow_schema::Field::new( + "item", + ArrowDataType::Int32, + false, + ))); + match from_arrow_type(&arrow_list).unwrap() { + DataType::Array(at) => assert!(!at.get_element_type().is_nullable()), + other => panic!("expected Array, got {other:?}"), + } + + let entries_struct = ArrowDataType::Struct(arrow_schema::Fields::from(vec![ + arrow_schema::Field::new("key", ArrowDataType::Utf8, false), + arrow_schema::Field::new("value", ArrowDataType::Int32, false), + ])); + let entries_field = arrow_schema::Field::new("entries", entries_struct, false); + let arrow_map = ArrowDataType::Map(Arc::new(entries_field), false); + match from_arrow_type(&arrow_map).unwrap() { + DataType::Map(m) => { + assert!(!m.key_type().is_nullable()); + assert!(!m.value_type().is_nullable()); + } + other => panic!("expected Map, got {other:?}"), + } + } + #[test] fn test_parse_ipc_message() { let empty_body: &[u8] = &le_bytes(&[0xFFFFFFFF, 0x00000000]); @@ -1878,7 +1958,8 @@ mod tests { DataField::new("name", DataTypes::string(), None), ]); let schema = to_arrow_schema(&row_type).unwrap(); - let result = ReadContext::with_projection_pushdown(schema, vec![0, 2], false); + let result = + ReadContext::with_projection_pushdown(schema, Arc::new(row_type), vec![0, 2], false); assert!(matches!(result, Err(IllegalArgument { .. }))); } diff --git a/crates/fluss/src/record/mod.rs b/crates/fluss/src/record/mod.rs index 522fb03b..462bdebb 100644 --- a/crates/fluss/src/record/mod.rs +++ b/crates/fluss/src/record/mod.rs @@ -246,10 +246,14 @@ mod tests { use std::sync::Arc; fn make_row(values: Vec, row_id: usize) -> ColumnarRow { + use crate::metadata::RowType; let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int32, false)])); let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(values))]) .expect("record batch"); - ColumnarRow::new(Arc::new(batch), row_id, None) + let row_type = Arc::new(RowType::with_data_types(vec![ + crate::metadata::DataType::Int(crate::metadata::IntType::new()), + ])); + ColumnarRow::new(Arc::new(batch), row_type, row_id, None) } #[test] diff --git a/crates/fluss/src/row/binary/binary_writer.rs b/crates/fluss/src/row/binary/binary_writer.rs index 7192ee57..33806295 100644 --- a/crates/fluss/src/row/binary/binary_writer.rs +++ b/crates/fluss/src/row/binary/binary_writer.rs @@ -18,10 +18,10 @@ use crate::error::Error::IllegalArgument; use crate::error::Result; use crate::metadata::{DataType, RowType}; -use crate::row::Datum; use crate::row::Decimal; use crate::row::binary::BinaryRowFormat; use crate::row::datum::{TimestampLtz, TimestampNtz}; +use crate::row::{Datum, FlussArray, FlussMap}; /// Writer to write a composite data format, like row, array, #[allow(dead_code)] @@ -69,7 +69,9 @@ pub trait BinaryWriter { fn write_timestamp_ltz(&mut self, value: &TimestampLtz, precision: u32); - fn write_array(&mut self, value: &[u8]); + fn write_array(&mut self, value: &FlussArray); + + fn write_map(&mut self, value: &FlussMap); // TODO Row serializer // fn write_row(&mut self, pos: i32, value: &InternalRow); @@ -138,6 +140,7 @@ pub enum InnerValueWriter { TimestampNtz(u32), // precision TimestampLtz(u32), // precision Array, + Map, Row(NestedRowWriter), } @@ -207,13 +210,10 @@ impl InnerValueWriter { Ok(InnerValueWriter::TimestampLtz(t.precision())) } DataType::Array(_) => Ok(InnerValueWriter::Array), + DataType::Map(_) => Ok(InnerValueWriter::Map), DataType::Row(row_type) => Ok(InnerValueWriter::Row(NestedRowWriter::from_row_type( row_type, )?)), - _ => unimplemented!( - "ValueWriter for DataType {:?} is currently not implemented", - data_type - ), } } pub fn write_value( @@ -273,7 +273,10 @@ impl InnerValueWriter { writer.write_timestamp_ltz(ts, *p); } (InnerValueWriter::Array, Datum::Array(arr)) => { - writer.write_array(arr.as_bytes()); + writer.write_array(arr); + } + (InnerValueWriter::Map, Datum::Map(map)) => { + writer.write_map(map); } (InnerValueWriter::Row(nested_writer), Datum::Row(inner_row)) => { use crate::row::compacted::CompactedRowWriter; diff --git a/crates/fluss/src/row/binary/iceberg_binary_row_writer.rs b/crates/fluss/src/row/binary/iceberg_binary_row_writer.rs index b0e8434d..4320a622 100644 --- a/crates/fluss/src/row/binary/iceberg_binary_row_writer.rs +++ b/crates/fluss/src/row/binary/iceberg_binary_row_writer.rs @@ -21,6 +21,8 @@ use crate::error::{Error, Result}; use crate::metadata::DataType; use crate::row::Decimal; use crate::row::binary::{BinaryWriter, ValueWriter}; +use crate::row::binary_array::FlussArray; +use crate::row::binary_map::FlussMap; const MICROS_PER_MILLI: i64 = 1_000; @@ -225,8 +227,12 @@ impl BinaryWriter for IcebergBinaryRowWriter { self.write_raw(µs.to_le_bytes()); } - fn write_array(&mut self, _value: &[u8]) { - panic!("Iceberg key columns do not support array values"); + fn write_array(&mut self, _value: &FlussArray) { + unreachable!("Array/Map types are rejected during value writer creation"); + } + + fn write_map(&mut self, _value: &FlussMap) { + unreachable!("Array/Map types are rejected during value writer creation"); } fn complete(&mut self) { diff --git a/crates/fluss/src/row/binary_array.rs b/crates/fluss/src/row/binary_array.rs index d0e8c9a5..d4fab762 100644 --- a/crates/fluss/src/row/binary_array.rs +++ b/crates/fluss/src/row/binary_array.rs @@ -30,6 +30,7 @@ use crate::metadata::{DataType, RowType}; use crate::row::Decimal; use crate::row::InternalRow; use crate::row::binary::{BinaryRowFormat, ValueWriter}; +use crate::row::binary_map::FlussMap; use crate::row::compacted::{CompactedRow, CompactedRowWriter, calculate_bit_set_width_in_bytes}; use crate::row::datum::{Date, Time, TimestampLtz, TimestampNtz}; use crate::row::field_getter::FieldGetter; @@ -76,6 +77,22 @@ fn round_to_nearest_word(num_bytes: usize) -> usize { (num_bytes + 7) & !7 } +fn is_variable_length_type(dt: &DataType) -> bool { + match dt { + DataType::Char(_) + | DataType::String(_) + | DataType::Binary(_) + | DataType::Bytes(_) + | DataType::Array(_) + | DataType::Map(_) + | DataType::Row(_) => true, + DataType::Decimal(d) => !Decimal::is_compact_precision(d.precision()), + DataType::Timestamp(t) => !TimestampNtz::is_compact(t.precision()), + DataType::TimestampLTz(t) => !TimestampLtz::is_compact(t.precision()), + _ => false, + } +} + /// A Fluss binary array, wire-compatible with Java's `BinaryArray`. /// /// Stores elements in a flat byte buffer with a header (element count + null bitmap) @@ -219,6 +236,33 @@ impl FlussArray { (self.data[4 + byte_index] & (1u8 << bit)) != 0 } + /// Returns the logically occupied bytes of this array, including the variable-length part. + /// This is used to detect trailing garbage in binary containers. + pub fn extent(&self, element_type: &DataType) -> Result { + let header_size = calculate_header_in_bytes(self.size); + let element_size = calculate_fix_length_part_size(element_type); + let fixed_part_size = round_to_nearest_word(header_size + self.size * element_size); + + if !is_variable_length_type(element_type) { + return Ok(fixed_part_size); + } + + let mut max_extent = fixed_part_size; + for i in 0..self.size { + if !self.is_null_at(i) { + let packed = self.read_i64(i, "extent calculation")? as u64; + let mark = packed & HIGHEST_FIRST_BIT; + if mark == 0 { + let offset = (packed >> 32) as usize; + let len = (packed & 0xFFFF_FFFF) as usize; + max_extent = max_extent.max(offset + len); + } + } + } + + Ok(round_to_nearest_word(max_extent)) + } + fn checked_slice(&self, start: usize, len: usize, context: &str) -> Result<&[u8]> { let end = start.checked_add(len).ok_or_else(|| IllegalArgument { message: format!("Overflow while reading {context}: start={start}, len={len}"), @@ -423,6 +467,16 @@ impl FlussArray { FlussArray::from_owned_bytes(self.data.slice(start..start + len)) } + pub fn get_map( + &self, + pos: usize, + key_type: &DataType, + value_type: &DataType, + ) -> Result { + let (start, len) = self.read_var_len_span(pos)?; + FlussMap::from_owned_bytes(self.data.slice(start..start + len), key_type, value_type) + } + pub fn get_row<'a>(&'a self, pos: usize, row_type: &'a RowType) -> Result> { let bytes = self.read_var_len_bytes(pos)?; let header_size = calculate_bit_set_width_in_bytes(row_type.fields().len()); @@ -672,6 +726,11 @@ impl FlussArrayWriter { self.write_bytes_to_var_len_part(pos, value.as_bytes()); } + /// Writes a nested FlussMap into this array at position `pos`. + pub fn write_map(&mut self, pos: usize, value: &FlussMap) { + self.write_bytes_to_var_len_part(pos, value.as_bytes()); + } + /// Writes a nested row at `pos`. Requires the writer to have been /// constructed via [`new`](Self::new) with a `DataType::Row(_)` element type. pub fn write_row(&mut self, pos: usize, row: &dyn InternalRow) -> Result<()> { @@ -772,6 +831,10 @@ impl InternalRow for FlussArray { fn get_array(&self, pos: usize) -> Result { self.get_array(pos) } + + fn get_map(&self, pos: usize, key_type: &DataType, value_type: &DataType) -> Result { + self.get_map(pos, key_type, value_type) + } } #[cfg(test)] @@ -1133,7 +1196,7 @@ mod tests { outer.set_field(0, inner_arr.clone()); let mut writer = CompactedRowWriter::new(1); - writer.write_array(inner_arr.as_bytes()); + writer.write_array(&inner_arr); let bytes = writer.to_bytes(); let outer_compacted = CompactedRow::from_bytes(outer_row_type, &bytes); diff --git a/crates/fluss/src/row/binary_map.rs b/crates/fluss/src/row/binary_map.rs new file mode 100644 index 00000000..46e82899 --- /dev/null +++ b/crates/fluss/src/row/binary_map.rs @@ -0,0 +1,561 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Binary map format matching Java's `BinaryMap.java` layout. +//! +//! Binary layout: +//! ```text +//! [4 bytes: keyArraySizeInBytes] + [Key BinaryArray bytes] + [Value BinaryArray bytes] +//! ``` + +use crate::error::Error::IllegalArgument; +use crate::error::Result; +use crate::metadata::DataType; +use crate::row::binary_array::{FlussArray, FlussArrayWriter}; +use crate::row::datum::Datum; +use bytes::Bytes; +use serde::Serialize; +use std::fmt; +use std::hash::{Hash, Hasher}; + +/// A Fluss binary map, wire-compatible with Java's `BinaryMap`. +/// +/// Stores entries as two parallel binary arrays (keys and values) within a single +/// byte buffer. +#[derive(Clone)] +pub struct FlussMap { + data: Bytes, + key_array: FlussArray, + value_array: FlussArray, +} + +impl fmt::Debug for FlussMap { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("FlussMap") + .field("size", &self.size()) + .field("data_len", &self.data.len()) + .finish() + } +} + +impl fmt::Display for FlussMap { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "FlussMap[size={}]", self.size()) + } +} + +impl PartialEq for FlussMap { + fn eq(&self, other: &Self) -> bool { + self.data == other.data + } +} + +impl Eq for FlussMap {} + +impl PartialOrd for FlussMap { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for FlussMap { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.data.cmp(&other.data) + } +} + +impl Hash for FlussMap { + fn hash(&self, state: &mut H) { + self.data.hash(state); + } +} + +impl Serialize for FlussMap { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + serializer.serialize_bytes(&self.data) + } +} + +fn check_no_null_keys(key_array: &FlussArray) -> Result<()> { + for i in 0..key_array.size() { + if key_array.is_null_at(i) { + return Err(IllegalArgument { + message: "FlussMap keys cannot be null".to_string(), + }); + } + } + Ok(()) +} + +impl FlussMap { + /// Validates the raw bytes and extracts the sub-arrays. + fn validate( + data: &[u8], + key_type: &DataType, + value_type: &DataType, + ) -> Result<(FlussArray, FlussArray)> { + if data.len() < 4 { + return Err(IllegalArgument { + message: format!( + "FlussMap data too short: need at least 4 bytes, got {}", + data.len() + ), + }); + } + let raw_key_size = i32::from_le_bytes(data[0..4].try_into().unwrap()); + if raw_key_size < 0 { + return Err(IllegalArgument { + message: format!( + "FlussMap key array size must be non-negative, got {}", + raw_key_size + ), + }); + } + let key_size = raw_key_size as usize; + if 4 + key_size > data.len() { + return Err(IllegalArgument { + message: format!( + "FlussMap key array size {} exceeds remaining payload {}", + key_size, + data.len() - 4 + ), + }); + } + + let key_bytes = &data[4..4 + key_size]; + let value_bytes = &data[4 + key_size..]; + + let key_array = FlussArray::from_bytes(key_bytes).map_err(|e| IllegalArgument { + message: format!("Invalid key array in FlussMap: {}", e), + })?; + + let value_array = FlussArray::from_bytes(value_bytes).map_err(|e| IllegalArgument { + message: format!("Invalid value array in FlussMap: {}", e), + })?; + + if key_array.size() != value_array.size() { + return Err(IllegalArgument { + message: format!( + "FlussMap key array size ({}) does not match value array size ({})", + key_array.size(), + value_array.size() + ), + }); + } + + // Strict trailing byte check: ensure the total reach of key and value arrays + // plus the 4-byte header matches the provided data length exactly. + let key_extent = key_array.extent(key_type)?; + let value_extent = value_array.extent(value_type)?; + let expected_len = 4 + key_extent + value_extent; + if expected_len != data.len() { + return Err(IllegalArgument { + message: format!( + "FlussMap binary validation failed: expected {expected_len} bytes, got {}", + data.len() + ), + }); + } + + check_no_null_keys(&key_array)?; + + Ok((key_array, value_array)) + } + + /// Creates a FlussMap from a byte slice (copies data). + pub(crate) fn from_bytes( + data: &[u8], + key_type: &DataType, + value_type: &DataType, + ) -> Result { + let (key_array, value_array) = Self::validate(data, key_type, value_type)?; + Ok(FlussMap { + data: Bytes::copy_from_slice(data), + key_array, + value_array, + }) + } + + /// Creates a FlussMap from owned bytes without copying. + pub(crate) fn from_owned_bytes( + data: Bytes, + key_type: &DataType, + value_type: &DataType, + ) -> Result { + let (key_array, value_array) = Self::validate(&data, key_type, value_type)?; + Ok(FlussMap { + data, + key_array, + value_array, + }) + } + + /// Creates a FlussMap by combining a key array and a value array. + /// + /// Copies both arrays into a new contiguous buffer. + pub fn from_arrays(key_array: &FlussArray, value_array: &FlussArray) -> Result { + if key_array.size() != value_array.size() { + return Err(IllegalArgument { + message: format!( + "FlussMap key array size ({}) does not match value array size ({})", + key_array.size(), + value_array.size() + ), + }); + } + check_no_null_keys(key_array)?; + + let key_bytes = key_array.as_bytes(); + let value_bytes = value_array.as_bytes(); + + let mut data = Vec::with_capacity(4 + key_bytes.len() + value_bytes.len()); + // Write the key array size (4 bytes) + // Java's BinaryMap uses memory segment methods which write in LE + data.extend_from_slice(&(key_bytes.len() as i32).to_le_bytes()); + // Write key array bytes + data.extend_from_slice(key_bytes); + // Write value array bytes + data.extend_from_slice(value_bytes); + + let data = Bytes::from(data); + Ok(FlussMap { + data, + key_array: key_array.clone(), + value_array: value_array.clone(), + }) + } + + /// Returns the number of entries in the map. + pub fn size(&self) -> usize { + self.key_array.size() + } + + /// Returns the raw bytes of this map (the complete binary representation). + pub fn as_bytes(&self) -> &[u8] { + &self.data + } + + /// Returns the key array. + pub fn key_array(&self) -> &FlussArray { + &self.key_array + } + + /// Returns the value array. + pub fn value_array(&self) -> &FlussArray { + &self.value_array + } +} + +/// Writer for building a `FlussMap` entry by entry. +pub struct FlussMapWriter { + key_writer: FlussArrayWriter, + value_writer: FlussArrayWriter, + key_type: DataType, + value_type: DataType, + current_index: usize, +} + +impl FlussMapWriter { + /// Creates a new writer for a map with the given capacity and key/value types. + pub fn new(capacity: usize, key_type: &DataType, value_type: &DataType) -> Self { + Self { + key_writer: FlussArrayWriter::new(capacity, key_type), + value_writer: FlussArrayWriter::new(capacity, value_type), + key_type: key_type.clone(), + value_type: value_type.clone(), + current_index: 0, + } + } + + /// Writes a key-value entry into the map. + /// + /// # Errors + /// Returns an error if the key is null or if there's a type mismatch. + pub fn write_entry(&mut self, key: Datum, value: Datum) -> Result<()> { + if key.is_null() { + return Err(IllegalArgument { + message: "FlussMap keys cannot be null".to_string(), + }); + } + + Self::write_datum( + &mut self.key_writer, + self.current_index, + key, + &self.key_type, + )?; + Self::write_datum( + &mut self.value_writer, + self.current_index, + value, + &self.value_type, + )?; + self.current_index += 1; + Ok(()) + } + + /// Finalizes the writer and returns the completed `FlussMap`. + pub fn complete(self) -> Result { + let key_array = self.key_writer.complete()?; + let value_array = self.value_writer.complete()?; + FlussMap::from_arrays(&key_array, &value_array) + } + + fn write_datum( + writer: &mut FlussArrayWriter, + pos: usize, + datum: Datum, + dt: &DataType, + ) -> Result<()> { + if datum.is_null() { + writer.set_null_at(pos); + return Ok(()); + } + + match (dt, &datum) { + (DataType::Boolean(_), Datum::Bool(v)) => writer.write_boolean(pos, *v), + (DataType::TinyInt(_), Datum::Int8(v)) => writer.write_byte(pos, *v), + (DataType::SmallInt(_), Datum::Int16(v)) => writer.write_short(pos, *v), + (DataType::Int(_), Datum::Int32(v)) => writer.write_int(pos, *v), + (DataType::BigInt(_), Datum::Int64(v)) => writer.write_long(pos, *v), + (DataType::Float(_), Datum::Float32(v)) => writer.write_float(pos, v.into_inner()), + (DataType::Double(_), Datum::Float64(v)) => writer.write_double(pos, v.into_inner()), + (DataType::Char(_), Datum::String(v)) => writer.write_string(pos, v), + (DataType::String(_), Datum::String(v)) => writer.write_string(pos, v), + (DataType::Binary(_), Datum::Blob(v)) => writer.write_binary_bytes(pos, v), + (DataType::Bytes(_), Datum::Blob(v)) => writer.write_binary_bytes(pos, v), + (DataType::Decimal(d), Datum::Decimal(v)) => { + writer.write_decimal(pos, v, d.precision()) + } + (DataType::Date(_), Datum::Date(v)) => writer.write_date(pos, *v), + (DataType::Time(_), Datum::Time(v)) => writer.write_time(pos, *v), + (DataType::Timestamp(t), Datum::TimestampNtz(v)) => { + writer.write_timestamp_ntz(pos, v, t.precision()) + } + (DataType::TimestampLTz(t), Datum::TimestampLtz(v)) => { + writer.write_timestamp_ltz(pos, v, t.precision()) + } + (DataType::Array(_), Datum::Array(v)) => writer.write_array(pos, v), + (DataType::Map(_), Datum::Map(v)) => writer.write_map(pos, v), + _ => { + return Err(IllegalArgument { + message: format!("Type mismatch: expected {:?}, got {:?}", dt, datum), + }); + } + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::metadata::DataTypes; + use crate::row::binary_array::FlussArrayWriter; + + #[test] + fn test_round_trip_int_to_string_map() { + let mut writer = FlussMapWriter::new(2, &DataTypes::int(), &DataTypes::string()); + writer.write_entry(1.into(), "a".into()).unwrap(); + writer.write_entry(2.into(), "b".into()).unwrap(); + let map = writer.complete().unwrap(); + assert_eq!(map.size(), 2); + + assert_eq!( + map.as_bytes(), + &[ + 16, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, + 0, 97, 0, 0, 0, 0, 0, 0, 129, 98, 0, 0, 0, 0, 0, 0, 129 + ] + ); + + let bytes = map.as_bytes(); + let decoded = FlussMap::from_bytes(bytes, &DataTypes::int(), &DataTypes::string()).unwrap(); + + assert_eq!(decoded.size(), 2); + let decoded_keys = decoded.key_array(); + let decoded_values = decoded.value_array(); + + assert_eq!(decoded_keys.get_int(0).unwrap(), 1); + assert_eq!(decoded_keys.get_int(1).unwrap(), 2); + assert_eq!(decoded_values.get_string(0).unwrap(), "a"); + assert_eq!(decoded_values.get_string(1).unwrap(), "b"); + } + + #[test] + fn test_empty_map() { + let writer = FlussMapWriter::new(0, &DataTypes::int(), &DataTypes::string()); + let map = writer.complete().unwrap(); + assert_eq!(map.size(), 0); + + let decoded = + FlussMap::from_bytes(map.as_bytes(), &DataTypes::int(), &DataTypes::string()).unwrap(); + assert_eq!(decoded.size(), 0); + } + + #[test] + fn test_map_with_null_values() { + let key_type = DataTypes::string(); + let value_type = DataTypes::int(); + let mut writer = FlussMapWriter::new(3, &key_type, &value_type); + writer.write_entry("k1".into(), 10.into()).unwrap(); + writer.write_entry("k2".into(), Datum::Null).unwrap(); + writer.write_entry("k3".into(), 30.into()).unwrap(); + let map = writer.complete().unwrap(); + + assert_eq!( + map.as_bytes(), + &[ + 32, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 107, 49, 0, 0, 0, 0, 0, 130, 107, 50, 0, 0, 0, + 0, 0, 130, 107, 51, 0, 0, 0, 0, 0, 130, 3, 0, 0, 0, 2, 0, 0, 0, 10, 0, 0, 0, 0, 0, + 0, 0, 30, 0, 0, 0, 0, 0, 0, 0 + ] + ); + + let decoded = FlussMap::from_bytes(map.as_bytes(), &key_type, &value_type).unwrap(); + + let values = decoded.value_array(); + assert_eq!(values.size(), 3); + assert!(!values.is_null_at(0)); + assert!(values.is_null_at(1)); + assert!(!values.is_null_at(2)); + assert_eq!(values.get_int(0).unwrap(), 10); + assert_eq!(values.get_int(2).unwrap(), 30); + } + + #[test] + fn test_invalid_data() { + // Too short + let err = + FlussMap::from_bytes(&[1, 2, 3], &DataTypes::int(), &DataTypes::int()).unwrap_err(); + assert!(err.to_string().contains("FlussMap data too short")); + + // Negative size + let neg_size = (-1i32).to_le_bytes(); + let mut bad_data = vec![]; + bad_data.extend_from_slice(&neg_size); + bad_data.extend_from_slice(&[0, 0, 0, 0]); + let err2 = + FlussMap::from_bytes(&bad_data, &DataTypes::int(), &DataTypes::int()).unwrap_err(); + assert!( + err2.to_string() + .contains("FlussMap key array size must be non-negative") + ); + + // Key array length exceeds payload + let large_size = 100i32.to_le_bytes(); + let mut bad_data2 = vec![]; + bad_data2.extend_from_slice(&large_size); + bad_data2.extend_from_slice(&[0, 0, 0, 0]); + let err3 = + FlussMap::from_bytes(&bad_data2, &DataTypes::int(), &DataTypes::int()).unwrap_err(); + assert!( + err3.to_string() + .contains("FlussMap key array size 100 exceeds remaining payload 4") + ); + } + + #[test] + fn test_mismatched_array_sizes() { + let key_writer = FlussArrayWriter::new(1, &DataTypes::int()); + let key_array = key_writer.complete().unwrap(); + + let value_writer = FlussArrayWriter::new(2, &DataTypes::string()); + let value_array = value_writer.complete().unwrap(); + + let err = FlussMap::from_arrays(&key_array, &value_array).unwrap_err(); + assert!(err.to_string().contains("does not match value array size")); + } + + #[test] + fn test_nested_map() { + let map_type = DataTypes::map(DataTypes::int(), DataTypes::string()); + let mut inner_writer = FlussMapWriter::new(1, &DataTypes::int(), &DataTypes::string()); + inner_writer.write_entry(1.into(), "b".into()).unwrap(); + let inner_map = inner_writer.complete().unwrap(); + + let mut writer = FlussMapWriter::new(1, &DataTypes::string(), &map_type); + writer + .write_entry("a".into(), Datum::Map(inner_map)) + .unwrap(); + let map = writer.complete().unwrap(); + + let decoded = + FlussMap::from_bytes(map.as_bytes(), &DataTypes::string(), &map_type).unwrap(); + let decoded_keys = decoded.key_array(); + let decoded_values = decoded.value_array(); + + assert_eq!(decoded_keys.get_string(0).unwrap(), "a"); + let decoded_inner_map = decoded_values + .get_map(0, &DataTypes::int(), &DataTypes::string()) + .unwrap(); + assert_eq!(decoded_inner_map.key_array().get_int(0).unwrap(), 1); + assert_eq!(decoded_inner_map.value_array().get_string(0).unwrap(), "b"); + } + + #[test] + fn test_trailing_garbage() { + let mut key_writer = FlussArrayWriter::new(1, &DataTypes::int()); + key_writer.write_int(0, 1); + let key_array = key_writer.complete().unwrap(); + + let mut value_writer = FlussArrayWriter::new(1, &DataTypes::int()); + value_writer.write_int(0, 100); + let value_array = value_writer.complete().unwrap(); + + let map = FlussMap::from_arrays(&key_array, &value_array).unwrap(); + let bytes = map.as_bytes(); + + // Valid bytes should pass + assert!(FlussMap::from_bytes(bytes, &DataTypes::int(), &DataTypes::int()).is_ok()); + + // Append trailing garbage + let mut bad_bytes = bytes.to_vec(); + bad_bytes.push(0); + let err = + FlussMap::from_bytes(&bad_bytes, &DataTypes::int(), &DataTypes::int()).unwrap_err(); + assert!(err.to_string().contains("binary validation failed")); + assert!(err.to_string().contains("expected")); + } + + #[test] + fn test_null_keys_fail_validation() { + let mut key_writer = FlussArrayWriter::new(1, &DataTypes::int()); + key_writer.set_null_at(0); + let key_array = key_writer.complete().unwrap(); + + let mut value_writer = FlussArrayWriter::new(1, &DataTypes::int()); + value_writer.write_int(0, 100); + let value_array = value_writer.complete().unwrap(); + + let err = FlussMap::from_arrays(&key_array, &value_array).unwrap_err(); + assert!(err.to_string().contains("keys cannot be null")); + + let key_bytes = key_array.as_bytes(); + let value_bytes = value_array.as_bytes(); + let mut data = vec![]; + data.extend_from_slice(&(key_bytes.len() as i32).to_le_bytes()); + data.extend_from_slice(key_bytes); + data.extend_from_slice(value_bytes); + + let err = FlussMap::from_bytes(&data, &DataTypes::int(), &DataTypes::int()).unwrap_err(); + assert!(err.to_string().contains("keys cannot be null")); + } +} diff --git a/crates/fluss/src/row/column.rs b/crates/fluss/src/row/column.rs index 5db1cd50..8b23423b 100644 --- a/crates/fluss/src/row/column.rs +++ b/crates/fluss/src/row/column.rs @@ -18,13 +18,14 @@ use crate::error::Error::IllegalArgument; use crate::error::Result; use crate::metadata::{DataType, RowType}; -use crate::record::from_arrow_type; +use crate::record::from_arrow_field; use crate::row::binary_array::FlussArrayWriter; +use crate::row::binary_map::FlussMap; use crate::row::datum::{Date, Datum, Time, TimestampLtz, TimestampNtz}; use crate::row::{Decimal, FlussArray, GenericRow, InternalRow}; use arrow::array::{ Array, AsArray, BinaryArray, BooleanArray, Date32Array, Decimal128Array, FixedSizeBinaryArray, - Float32Array, Float64Array, Int8Array, Int16Array, Int32Array, Int64Array, ListArray, + Float32Array, Float64Array, Int8Array, Int16Array, Int32Array, Int64Array, ListArray, MapArray, RecordBatch, StringArray, StructArray, Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, @@ -40,6 +41,7 @@ use std::sync::Arc; #[derive(Clone)] pub struct ColumnarRow { record_batch: Arc, + row_type: Arc, row_id: usize, fluss_row_type: Option>, row_column_indices: Arc<[usize]>, @@ -71,6 +73,7 @@ fn make_row_caches(indices: &[usize]) -> Box<[std::sync::OnceLock, + row_type: Arc, row_id: usize, fluss_row_type: Option>, ) -> Self { @@ -78,11 +81,12 @@ impl ColumnarRow { Some(rt) => fluss_row_column_indices(rt), None => arrow_row_column_indices(&batch), }; - Self::with_indices(batch, row_id, fluss_row_type, row_column_indices) + Self::with_indices(batch, row_type, row_id, fluss_row_type, row_column_indices) } pub(crate) fn with_indices( batch: Arc, + row_type: Arc, row_id: usize, fluss_row_type: Option>, row_column_indices: Arc<[usize]>, @@ -90,6 +94,7 @@ impl ColumnarRow { let row_caches = make_row_caches(&row_column_indices); ColumnarRow { record_batch: batch, + row_type, row_id, fluss_row_type, row_column_indices, @@ -408,14 +413,48 @@ fn arrow_value_to_datum( ArrowDataType::List(field) => { let list_arr = downcast!(ListArray); let values = list_arr.value(row_id); + // Infer via from_arrow_field so the inferred element type + // matches what `arrow_map_entry_to_fluss_map` / strict `==` + // expect when there's no upstream Fluss schema. let element_fluss_type = match fluss_type { Some(DataType::Array(at)) => at.get_element_type().clone(), - _ => from_arrow_type(field.data_type())?, + _ => from_arrow_field(field)?, }; let mut writer = FlussArrayWriter::new(values.len(), &element_fluss_type); write_arrow_values_to_fluss_array(&*values, &element_fluss_type, &mut writer)?; Ok(Datum::Array(writer.complete()?)) } + ArrowDataType::Map(entries_field, _) => { + let map_arr = downcast!(MapArray); + let entries = map_arr.value(row_id); + let (key_type, value_type) = match fluss_type { + Some(DataType::Map(m)) => (m.key_type().clone(), m.value_type().clone()), + _ => { + let fields = match entries_field.data_type() { + ArrowDataType::Struct(f) => f, + other => { + return Err(IllegalArgument { + message: format!("expected Struct for Map entries, got {other:?}"), + }); + } + }; + if fields.len() != 2 { + return Err(IllegalArgument { + message: format!( + "Map entries Struct must have 2 fields, got {}", + fields.len() + ), + }); + } + (from_arrow_field(&fields[0])?, from_arrow_field(&fields[1])?) + } + }; + Ok(Datum::Map(arrow_map_entry_to_fluss_map( + &entries, + &key_type, + &value_type, + )?)) + } other => Err(IllegalArgument { message: format!("unsupported Arrow data type for nested row extraction: {other:?}"), }), @@ -609,25 +648,64 @@ impl InternalRow for ColumnarRow { } fn get_array(&self, pos: usize) -> Result { + let expected_type = self.row_type.fields()[pos].data_type(); + let element_fluss_type = match expected_type { + DataType::Array(a) => a.get_element_type(), + _ => { + return Err(IllegalArgument { + message: format!( + "expected Array type at position {pos}, got {expected_type:?}" + ), + }); + } + }; + let column = self.column(pos)?; - let values = if let Some(list_arr) = column.as_any().downcast_ref::() { - list_arr.value(self.row_id) - } else { + let element_field = match column.data_type() { + ArrowDataType::List(field) => field, + other => { + return Err(IllegalArgument { + message: format!("expected List array at position {pos}, got {other:?}"), + }); + } + }; + + let actual_element_type = from_arrow_field(element_field)?; + if actual_element_type != *element_fluss_type { return Err(IllegalArgument { message: format!( - "expected List array at position {pos}, got {:?}", - column.data_type() + "Arrow list element type {:?} does not match expected Fluss type {:?}", + actual_element_type, element_fluss_type ), }); - }; - - let element_fluss_type = from_arrow_type(values.data_type())?; - let mut writer = FlussArrayWriter::new(values.len(), &element_fluss_type); + } - write_arrow_values_to_fluss_array(&*values, &element_fluss_type, &mut writer)?; + let list_arr = column + .as_any() + .downcast_ref::() + .expect("data_type matched List but downcast failed; arrow-rs invariant violated"); + let values = list_arr.value(self.row_id); + let mut writer = FlussArrayWriter::new(values.len(), element_fluss_type); + write_arrow_values_to_fluss_array(&*values, element_fluss_type, &mut writer)?; writer.complete() } + fn get_map(&self, pos: usize, key_type: &DataType, value_type: &DataType) -> Result { + let column = self.column(pos)?; + let map_arr = + column + .as_any() + .downcast_ref::() + .ok_or_else(|| IllegalArgument { + message: format!( + "expected Map array at position {pos}, got {:?}", + column.data_type() + ), + })?; + + arrow_map_entry_to_fluss_map(&map_arr.value(self.row_id), key_type, value_type) + } + fn get_row(&self, pos: usize) -> Result<&GenericRow<'_>> { let cache_idx = self .row_column_indices @@ -663,6 +741,67 @@ impl InternalRow for ColumnarRow { } } +#[inline] +fn arrow_map_entry_to_fluss_map( + struct_arr: &arrow::array::StructArray, + key_type: &DataType, + value_type: &DataType, +) -> Result { + let fields = match struct_arr.data_type() { + ArrowDataType::Struct(f) => f, + other => { + return Err(IllegalArgument { + message: format!("expected Struct for Map entries, got {other:?}"), + }); + } + }; + if fields.len() != 2 { + return Err(IllegalArgument { + message: format!( + "Expected 2 columns in Map entries struct, got {}", + fields.len() + ), + }); + } + + let actual_key_type = from_arrow_field(&fields[0])?; + if actual_key_type != *key_type { + return Err(IllegalArgument { + message: format!( + "Arrow map key type {:?} does not match expected Fluss type {:?}", + actual_key_type, key_type + ), + }); + } + + let actual_value_type = from_arrow_field(&fields[1])?; + if actual_value_type != *value_type { + return Err(IllegalArgument { + message: format!( + "Arrow map value type {:?} does not match expected Fluss type {:?}", + actual_value_type, value_type + ), + }); + } + + let keys_arrow = struct_arr.column(0); + let values_arrow = struct_arr.column(1); + + let len = keys_arrow.len(); + + // Convert Arrow keys → FlussArray + let mut key_writer = FlussArrayWriter::new(len, key_type); + write_arrow_values_to_fluss_array(&**keys_arrow, key_type, &mut key_writer)?; + let key_array = key_writer.complete()?; + + // Convert Arrow values → FlussArray + let mut value_writer = FlussArrayWriter::new(len, value_type); + write_arrow_values_to_fluss_array(&**values_arrow, value_type, &mut value_writer)?; + let value_array = value_writer.complete()?; + + FlussMap::from_arrays(&key_array, &value_array) +} + /// Downcast to a primitive Arrow array type, then loop with null checks calling a writer method. macro_rules! write_primitive_elements { ($values:expr, $arrow_type:ty, $element_type:expr, $writer:expr, $write_method:ident) => {{ @@ -721,7 +860,10 @@ macro_rules! write_list_elements { $element_type ), })?; - let nested_element_type = from_arrow_type(&arr.value_type())?; + let nested_element_type = match $element_type { + DataType::Array(a) => a.get_element_type(), + _ => unreachable!("Expected Array type for write_list_elements"), + }; for i in 0..$len { if arr.is_null(i) { $writer.set_null_at(i); @@ -877,6 +1019,34 @@ fn write_arrow_values_to_fluss_array( }); } } + DataType::Map(_) => { + let map_arr = + values + .as_any() + .downcast_ref::() + .ok_or_else(|| IllegalArgument { + message: format!( + "Expected MapArray for {element_type:?} element, got {:?}", + values.data_type() + ), + })?; + for i in 0..len { + if map_arr.is_null(i) { + writer.set_null_at(i); + } else { + let expected_map_type = match element_type { + DataType::Map(m) => m, + _ => unreachable!("Expected Map type for Map variant"), + }; + let fluss_map = arrow_map_entry_to_fluss_map( + &map_arr.value(i), + expected_map_type.key_type(), + expected_map_type.value_type(), + )?; + writer.write_map(i, &fluss_map); + } + } + } DataType::Row(row_type) => { let struct_arr = values .as_any() @@ -896,13 +1066,6 @@ fn write_arrow_values_to_fluss_array( } } } - _ => { - return Err(IllegalArgument { - message: format!( - "unsupported element type for Arrow → FlussArray conversion: {element_type:?}" - ), - }); - } } Ok(()) } @@ -1034,6 +1197,7 @@ fn write_timestamp_elements( #[cfg(test)] mod tests { use super::*; + use crate::metadata::{DataField, RowType}; use arrow::array::{ ArrayRef, BinaryArray, BooleanArray, Decimal128Array, Float32Array, Float64Array, Int8Array, Int16Array, Int32Array, Int32Builder, Int64Array, ListBuilder, StringArray, @@ -1041,29 +1205,43 @@ mod tests { }; use arrow::datatypes::{DataType, Field, Fields, Schema}; + fn infer_fluss_type(arrow_dt: &arrow_schema::DataType) -> crate::metadata::DataType { + match arrow_dt { + arrow_schema::DataType::Int32 => { + crate::metadata::DataType::Int(crate::metadata::IntType::new()) + } + arrow_schema::DataType::List(f) => crate::metadata::DataType::Array( + crate::metadata::ArrayType::new(infer_fluss_type(f.data_type())), + ), + _ => crate::metadata::DataType::Int(crate::metadata::IntType::new()), + } + } + fn single_column_row(array: ArrayRef) -> ColumnarRow { + let dt = infer_fluss_type(array.data_type()); let batch = RecordBatch::try_from_iter(vec![("arr", array)]).expect("record batch with one column"); - ColumnarRow::new(Arc::new(batch), 0, None) + let row_type = Arc::new(RowType::with_data_types(vec![dt])); + ColumnarRow::new(Arc::new(batch), row_type, 0, None) } #[test] fn columnar_row_reads_values() { let schema = Arc::new(Schema::new(vec![ - Field::new("b", DataType::Boolean, false), - Field::new("i8", DataType::Int8, false), - Field::new("i16", DataType::Int16, false), - Field::new("i32", DataType::Int32, false), - Field::new("i64", DataType::Int64, false), - Field::new("f32", DataType::Float32, false), - Field::new("f64", DataType::Float64, false), - Field::new("s", DataType::Utf8, false), - Field::new("bin", DataType::Binary, false), - Field::new("char", DataType::Utf8, false), + Field::new("b", ArrowDataType::Boolean, false), + Field::new("i8", ArrowDataType::Int8, false), + Field::new("i16", ArrowDataType::Int16, false), + Field::new("i32", ArrowDataType::Int32, false), + Field::new("i64", ArrowDataType::Int64, false), + Field::new("f32", ArrowDataType::Float32, false), + Field::new("f64", ArrowDataType::Float64, false), + Field::new("s", ArrowDataType::Utf8, false), + Field::new("bin", ArrowDataType::Binary, false), + Field::new("char", ArrowDataType::Utf8, false), ])); let batch = RecordBatch::try_new( - schema, + schema.clone(), vec![ Arc::new(BooleanArray::from(vec![true])), Arc::new(Int8Array::from(vec![1])), @@ -1079,7 +1257,7 @@ mod tests { ) .expect("record batch"); - let mut row = ColumnarRow::new(Arc::new(batch), 0, None); + let mut row = ColumnarRow::new(Arc::new(batch), Arc::new(RowType::new(vec![])), 0, None); assert_eq!(row.get_field_count(), 10); assert!(row.get_boolean(0).unwrap()); assert_eq!(row.get_byte(1).unwrap(), 1); @@ -1097,14 +1275,13 @@ mod tests { #[test] fn columnar_row_reads_decimal() { - use arrow::datatypes::DataType; use bigdecimal::{BigDecimal, num_bigint::BigInt}; // Test with Decimal128 let schema = Arc::new(Schema::new(vec![ - Field::new("dec1", DataType::Decimal128(10, 2), false), - Field::new("dec2", DataType::Decimal128(20, 5), false), - Field::new("dec3", DataType::Decimal128(38, 10), false), + Field::new("dec1", ArrowDataType::Decimal128(10, 2), false), + Field::new("dec2", ArrowDataType::Decimal128(20, 5), false), + Field::new("dec3", ArrowDataType::Decimal128(38, 10), false), ])); // Create decimal values: 123.45, 12345.67890, large decimal @@ -1134,7 +1311,7 @@ mod tests { ) .expect("record batch"); - let row = ColumnarRow::new(Arc::new(batch), 0, None); + let row = ColumnarRow::new(Arc::new(batch), Arc::new(RowType::new(vec![])), 0, None); assert_eq!(row.get_field_count(), 3); // Verify decimal values @@ -1228,7 +1405,7 @@ mod tests { let row = single_column_row(array); let err = row.get_array(0).unwrap_err(); assert!( - err.to_string().contains("expected List array"), + err.to_string().contains("expected Array type"), "unexpected error: {err}" ); } @@ -1240,7 +1417,16 @@ mod tests { builder.append(true); let array = Arc::new(builder.finish()) as ArrayRef; - let row = single_column_row(array); + let batch = RecordBatch::try_from_iter(vec![("arr", array)]).expect("record batch"); + // We manually create a row type that claims to be Array(Int) even though it's List(UInt32) + // to test the validation in get_array. + let row_type = Arc::new(RowType::new(vec![DataField::new( + "arr", + crate::metadata::DataTypes::array(crate::metadata::DataTypes::int()), + None, + )])); + let row = ColumnarRow::new(Arc::new(batch), row_type, 0, None); + let err = row.get_array(0).unwrap_err(); assert!( err.to_string() @@ -1276,7 +1462,7 @@ mod tests { ]; let batch = make_struct_batch("nested", child_fields, child_arrays, 2); - let mut row = ColumnarRow::new(batch, 0, None); + let mut row = ColumnarRow::new(batch, Arc::new(RowType::new(vec![])), 0, None); // row_id = 0 let nested = row.get_row(0).unwrap(); @@ -1322,7 +1508,7 @@ mod tests { let batch = Arc::new(RecordBatch::try_new(schema, vec![outer_array]).expect("record batch")); - let mut row = ColumnarRow::new(batch, 0, None); + let mut row = ColumnarRow::new(batch, Arc::new(RowType::new(vec![])), 0, None); // row_id = 0 let outer = row.get_row(0).unwrap(); @@ -1344,7 +1530,7 @@ mod tests { let child_arrays: Vec> = vec![Arc::new(Int32Array::from(vec![10, 20]))]; let batch = make_struct_batch("s", child_fields, child_arrays, 2); - let mut row = ColumnarRow::new(batch, 0, None); + let mut row = ColumnarRow::new(batch, Arc::new(RowType::new(vec![])), 0, None); // row_id = 0: nested x = 10 let nested_0 = row.get_row(0).unwrap(); @@ -1355,4 +1541,198 @@ mod tests { let nested_1 = row.get_row(0).unwrap(); assert_eq!(nested_1.get_int(0).unwrap(), 20); } + + #[test] + fn columnar_row_get_map_accepts_non_nullable_key_from_map_type() { + use crate::metadata::DataTypes; + use arrow::array::{MapBuilder, StringBuilder}; + + // Arrow map column with INT keys, STRING values. + let mut builder = MapBuilder::new(None, Int32Builder::new(), StringBuilder::new()); + builder.keys().append_value(1); + builder.values().append_value("a"); + builder.append(true).unwrap(); + let map_arr = builder.finish(); + + let map_arrow_type = map_arr.data_type().clone(); + let schema = Arc::new(Schema::new(vec![Field::new("m", map_arrow_type, true)])); + let batch = + Arc::new(RecordBatch::try_new(schema, vec![Arc::new(map_arr)]).expect("record batch")); + + let map_type = DataTypes::map(DataTypes::int(), DataTypes::string()); + let row_type = Arc::new(RowType::with_data_types(vec![map_type.clone()])); + let row = ColumnarRow::new(batch, row_type, 0, None); + + let (k, v) = match &map_type { + crate::metadata::DataType::Map(m) => (m.key_type(), m.value_type()), + _ => unreachable!(), + }; + let fluss_map = row + .get_map(0, k, v) + .expect("get_map should accept non-nullable key from MapType"); + assert_eq!(fluss_map.size(), 1); + assert_eq!(fluss_map.key_array().get_int(0).unwrap(), 1); + assert_eq!(fluss_map.value_array().get_string(0).unwrap(), "a"); + } + + #[test] + fn columnar_row_reads_row_containing_map() { + use crate::metadata::DataTypes; + use arrow::array::{MapBuilder, StringBuilder}; + + // Inner Map Arrow column with one entry per row, 2 rows. + let mut mb = MapBuilder::new(None, StringBuilder::new(), Int32Builder::new()); + mb.keys().append_value("k1"); + mb.values().append_value(42); + mb.append(true).unwrap(); + mb.keys().append_value("k2"); + mb.values().append_value(7); + mb.append(true).unwrap(); + let map_arr = mb.finish(); + + // Struct { id: Int32, m: Map } + let id_arr = Int32Array::from(vec![10, 20]); + let struct_fields = Fields::from(vec![ + Field::new("id", DataType::Int32, false), + Field::new("m", map_arr.data_type().clone(), false), + ]); + let struct_arr = Arc::new(StructArray::new( + struct_fields.clone(), + vec![Arc::new(id_arr), Arc::new(map_arr)], + None, + )); + let schema = Arc::new(Schema::new(vec![Field::new( + "outer", + DataType::Struct(struct_fields), + false, + )])); + let batch = Arc::new(RecordBatch::try_new(schema, vec![struct_arr]).expect("record batch")); + + // Fluss outer ROW> + let inner_row_type = RowType::with_data_types(vec![ + DataTypes::int(), + DataTypes::map(DataTypes::string(), DataTypes::int()), + ]); + let outer_row_type = Arc::new(RowType::with_data_types(vec![ + crate::metadata::DataType::Row(inner_row_type), + ])); + + let mut row = ColumnarRow::new( + batch, + outer_row_type.clone(), + 0, + Some(outer_row_type.clone()), + ); + + let nested = row + .get_row(0) + .expect("reading row with Map field must succeed"); + assert_eq!(nested.get_int(0).unwrap(), 10); + let inner_map = nested + .get_map(1, &DataTypes::string(), &DataTypes::int()) + .expect("nested map should be accessible"); + assert_eq!(inner_map.size(), 1); + assert_eq!(inner_map.key_array().get_string(0).unwrap(), "k1"); + assert_eq!(inner_map.value_array().get_int(0).unwrap(), 42); + + // Verify cache invalidation across rows works for Row-with-Map too. + row.set_row_id(1); + let nested = row.get_row(0).expect("row 1 must read"); + assert_eq!(nested.get_int(0).unwrap(), 20); + let inner_map = nested + .get_map(1, &DataTypes::string(), &DataTypes::int()) + .unwrap(); + assert_eq!(inner_map.key_array().get_string(0).unwrap(), "k2"); + assert_eq!(inner_map.value_array().get_int(0).unwrap(), 7); + } + + #[test] + fn columnar_row_reads_array_of_maps() { + use crate::metadata::DataTypes; + use arrow::array::{ListBuilder, MapBuilder, StringBuilder}; + + // One row whose ARRAY> contains two maps: + // [{"k1" -> 1}, {"k2" -> 2, "k3" -> 3}]. + let mut outer = ListBuilder::new(MapBuilder::new( + None, + StringBuilder::new(), + Int32Builder::new(), + )); + { + let mb = outer.values(); + // Map 0: {"k1" -> 1} + mb.keys().append_value("k1"); + mb.values().append_value(1); + mb.append(true).unwrap(); + // Map 1: {"k2" -> 2, "k3" -> 3} + mb.keys().append_value("k2"); + mb.values().append_value(2); + mb.keys().append_value("k3"); + mb.values().append_value(3); + mb.append(true).unwrap(); + } + outer.append(true); + let list_arr = outer.finish(); + let arrow_dt = list_arr.data_type().clone(); + + let schema = Arc::new(Schema::new(vec![Field::new("a", arrow_dt, false)])); + let batch = + Arc::new(RecordBatch::try_new(schema, vec![Arc::new(list_arr)]).expect("record batch")); + + let array_type = DataTypes::array(DataTypes::map(DataTypes::string(), DataTypes::int())); + let row_type = Arc::new(RowType::with_data_types(vec![array_type])); + let row = ColumnarRow::new(batch, row_type, 0, None); + + let arr = row.get_array(0).expect("get_array on ARRAY must work"); + assert_eq!(arr.size(), 2); + + let m0 = arr + .get_map(0, &DataTypes::string(), &DataTypes::int()) + .unwrap(); + assert_eq!(m0.size(), 1); + assert_eq!(m0.key_array().get_string(0).unwrap(), "k1"); + assert_eq!(m0.value_array().get_int(0).unwrap(), 1); + + let m1 = arr + .get_map(1, &DataTypes::string(), &DataTypes::int()) + .unwrap(); + assert_eq!(m1.size(), 2); + assert_eq!(m1.key_array().get_string(0).unwrap(), "k2"); + assert_eq!(m1.value_array().get_int(0).unwrap(), 2); + assert_eq!(m1.key_array().get_string(1).unwrap(), "k3"); + assert_eq!(m1.value_array().get_int(1).unwrap(), 3); + } + + #[test] + fn columnar_row_get_map_rejects_real_type_mismatch() { + use crate::metadata::DataTypes; + use arrow::array::{MapBuilder, StringBuilder}; + + let mut mb = MapBuilder::new(None, StringBuilder::new(), Int32Builder::new()); + mb.keys().append_value("k"); + mb.values().append_value(1); + mb.append(true).unwrap(); + let map_arr = mb.finish(); + let map_arrow_type = map_arr.data_type().clone(); + + let schema = Arc::new(Schema::new(vec![Field::new("m", map_arrow_type, true)])); + let batch = + Arc::new(RecordBatch::try_new(schema, vec![Arc::new(map_arr)]).expect("record batch")); + + // Caller mis-declares the value type as STRING. + let row_type = Arc::new(RowType::with_data_types(vec![DataTypes::map( + DataTypes::string(), + DataTypes::string(), + )])); + let row = ColumnarRow::new(batch, row_type, 0, None); + + let err = row + .get_map(0, &DataTypes::string(), &DataTypes::string()) + .expect_err("type mismatch must error"); + let msg = err.to_string(); + assert!( + msg.contains("does not match expected Fluss type"), + "unexpected error: {msg}" + ); + } } diff --git a/crates/fluss/src/row/column_writer.rs b/crates/fluss/src/row/column_writer.rs index d595df91..bbd28767 100644 --- a/crates/fluss/src/row/column_writer.rs +++ b/crates/fluss/src/row/column_writer.rs @@ -123,6 +123,14 @@ enum TypedWriter { offsets: Vec, validity: Vec, }, + Map { + key_writer: Box, + value_writer: Box, + key_type: DataType, + value_type: DataType, + offsets: Vec, + validity: Vec, + }, Struct { field_writers: Vec, validity: Vec, @@ -162,6 +170,7 @@ macro_rules! with_builder { TypedWriter::TimestampLtzMicrosecond { builder: $b, .. } => $body, TypedWriter::TimestampLtzNanosecond { builder: $b, .. } => $body, TypedWriter::List { .. } => panic!("List variant not supported in with_builder!"), + TypedWriter::Map { .. } => panic!("Map variant not supported in with_builder!"), TypedWriter::Struct { .. } => panic!("Struct variant not supported in with_builder!"), } }; @@ -361,6 +370,50 @@ impl ColumnWriter { validity: Vec::with_capacity(capacity), } } + DataType::Map(m) => { + let (key_arrow_type, value_arrow_type) = match arrow_type { + ArrowDataType::Map(field, _) => match field.data_type() { + ArrowDataType::Struct(fields) => { + if fields.len() != 2 { + return Err(Error::IllegalArgument { + message: format!( + "Expected Struct with 2 fields for Map, got {}", + fields.len() + ), + }); + } + (fields[0].data_type().clone(), fields[1].data_type().clone()) + } + struct_type => { + return Err(Error::IllegalArgument { + message: format!( + "Expected Struct within Map Arrow type, got {:?}", + struct_type + ), + }); + } + }, + _ => { + return Err(Error::IllegalArgument { + message: format!( + "Expected Map Arrow type for Map, got: {arrow_type:?}" + ), + }); + } + }; + + let key_writer = ColumnWriter::create(m.key_type(), &key_arrow_type, 0, capacity)?; + let value_writer = + ColumnWriter::create(m.value_type(), &value_arrow_type, 1, capacity)?; + TypedWriter::Map { + key_writer: Box::new(key_writer), + value_writer: Box::new(value_writer), + key_type: m.key_type().clone(), + value_type: m.value_type().clone(), + offsets: vec![0], + validity: Vec::with_capacity(capacity), + } + } DataType::Row(row_type) => { let arrow_fields = match arrow_type { ArrowDataType::Struct(fields) => fields.clone(), @@ -394,11 +447,6 @@ impl ColumnWriter { row_type: row_type.clone(), } } - _ => { - return Err(Error::IllegalArgument { - message: format!("Unsupported Fluss DataType: {fluss_type:?}"), - }); - } }; Ok(Self { @@ -440,6 +488,26 @@ impl ColumnWriter { let taken_validity = std::mem::take(validity); finish_list_array(values, item_nullable, &taken_offsets, &taken_validity) } + TypedWriter::Map { + key_writer, + value_writer, + offsets, + validity, + .. + } => { + let value_nullable = value_writer.nullable; + let keys = key_writer.finish(); + let values = value_writer.finish(); + let taken_offsets = std::mem::replace(offsets, vec![0]); + let taken_validity = std::mem::take(validity); + finish_map_array( + keys, + values, + value_nullable, + &taken_offsets, + &taken_validity, + ) + } TypedWriter::Struct { field_writers, validity, @@ -527,6 +595,20 @@ impl ColumnWriter { let offsets_bytes = round_up_to_8(offsets.len() * std::mem::size_of::()); validity_bytes + offsets_bytes + element_writer.buffer_size() } + TypedWriter::Map { + key_writer, + value_writer, + offsets, + validity, + .. + } => { + let validity_bytes = round_up_to_8(validity.len().div_ceil(8)); + let offsets_bytes = round_up_to_8(offsets.len() * std::mem::size_of::()); + validity_bytes + + offsets_bytes + + key_writer.buffer_size() + + value_writer.buffer_size() + } TypedWriter::Struct { field_writers, validity, @@ -543,6 +625,9 @@ impl ColumnWriter { match &mut self.inner { TypedWriter::List { offsets, validity, .. + } + | TypedWriter::Map { + offsets, validity, .. } => { let last = *offsets.last().unwrap_or(&0); offsets.push(last); @@ -783,6 +868,30 @@ impl ColumnWriter { validity.push(true); Ok(()) } + TypedWriter::Map { + key_writer, + value_writer, + key_type, + value_type, + offsets, + validity, + } => { + let map = row.get_map(pos, key_type, value_type)?; + let key_array = map.key_array(); + let value_array = map.value_array(); + for i in 0..map.size() { + key_writer.write_field_at(key_array, i)?; + value_writer.write_field_at(value_array, i)?; + } + let last = *offsets.last().unwrap(); + offsets.push( + last + i32::try_from(map.size()).map_err(|_| RowConvertError { + message: format!("Map size {} exceeds i32 range", map.size()), + })?, + ); + validity.push(true); + Ok(()) + } TypedWriter::Struct { field_writers, validity, @@ -844,12 +953,52 @@ fn finish_list_array( )) } +fn finish_map_array( + keys: ArrayRef, + values: ArrayRef, + value_nullable: bool, + offsets: &[i32], + validity: &[bool], +) -> ArrayRef { + use arrow::array::{Array, MapArray, StructArray}; + use arrow::buffer::{NullBuffer, OffsetBuffer, ScalarBuffer}; + use arrow::datatypes::Field; + use std::sync::Arc; + + let offsets_buffer = OffsetBuffer::new(ScalarBuffer::from(offsets.to_vec())); + let null_buffer = NullBuffer::from(validity.to_vec()); + + let key_field = Arc::new(Field::new("key", keys.data_type().clone(), false)); + let value_field = Arc::new(Field::new( + "value", + values.data_type().clone(), + value_nullable, + )); + + let struct_array = StructArray::from(vec![(key_field, keys), (value_field, values)]); + + let entries_field = Arc::new(Field::new( + "entries", + struct_array.data_type().clone(), + false, + )); + + Arc::new(MapArray::new( + entries_field, + offsets_buffer, + struct_array, + Some(null_buffer), + false, + )) +} + #[cfg(test)] mod tests { use super::*; use crate::metadata::DataTypes; use crate::record::to_arrow_type; use crate::row::binary_array::FlussArrayWriter; + use crate::row::binary_map::FlussMapWriter; use crate::row::{Date, Datum, Decimal, GenericRow, Time, TimestampLtz, TimestampNtz}; use arrow::array::*; use bigdecimal::BigDecimal; @@ -1116,4 +1265,56 @@ mod tests { "Arrow field inside the list should be non-nullable" ); } + + #[test] + fn test_write_map_type() { + use crate::metadata::DataTypes; + let key_type = DataTypes::int(); + let value_type = DataTypes::string(); + let fluss_type = DataTypes::map(key_type.clone(), value_type.clone()); + + let mut map_writer = FlussMapWriter::new(2, &key_type, &value_type); + map_writer.write_entry(1.into(), "a".into()).unwrap(); + map_writer.write_entry(2.into(), "b".into()).unwrap(); + let map = map_writer.complete().unwrap(); + + let arr = write_one(&fluss_type, Datum::Map(map)); + let map_arr = arr.as_any().downcast_ref::().unwrap(); + assert_eq!(map_arr.len(), 1); + + let entries = map_arr.value(0); + let struct_arr = entries.as_any().downcast_ref::().unwrap(); + assert_eq!(struct_arr.num_columns(), 2); + + let keys = struct_arr + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let values = struct_arr + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + + assert_eq!(keys.len(), 2); + assert_eq!(keys.value(0), 1); + assert_eq!(keys.value(1), 2); + + assert_eq!(values.len(), 2); + assert_eq!(values.value(0), "a"); + assert_eq!(values.value(1), "b"); + } + + #[test] + fn test_write_null_map_type() { + use crate::metadata::DataTypes; + + let fluss_type = DataTypes::map(DataTypes::int(), DataTypes::string()); + let arr = write_one(&fluss_type, Datum::Null); + let map_arr = arr.as_any().downcast_ref::().unwrap(); + + assert_eq!(map_arr.len(), 1); + assert!(map_arr.is_null(0)); + } } diff --git a/crates/fluss/src/row/compacted/compacted_key_writer.rs b/crates/fluss/src/row/compacted/compacted_key_writer.rs index 9e0ffa53..d5f7c2f1 100644 --- a/crates/fluss/src/row/compacted/compacted_key_writer.rs +++ b/crates/fluss/src/row/compacted/compacted_key_writer.rs @@ -23,6 +23,8 @@ use crate::error::Result; use crate::metadata::DataType; use crate::row::Decimal; use crate::row::binary::{BinaryRowFormat, BinaryWriter, ValueWriter}; +use crate::row::binary_array::FlussArray; +use crate::row::binary_map::FlussMap; use crate::row::datum::{TimestampLtz, TimestampNtz}; use delegate::delegate; @@ -109,7 +111,9 @@ impl BinaryWriter for CompactedKeyWriter { fn write_timestamp_ltz(&mut self, value: &TimestampLtz, precision: u32); - fn write_array(&mut self, value: &[u8]); + fn write_array(&mut self, value: &FlussArray); + + fn write_map(&mut self, value: &FlussMap); } } diff --git a/crates/fluss/src/row/compacted/compacted_row.rs b/crates/fluss/src/row/compacted/compacted_row.rs index fbf47c58..2463e479 100644 --- a/crates/fluss/src/row/compacted/compacted_row.rs +++ b/crates/fluss/src/row/compacted/compacted_row.rs @@ -18,10 +18,12 @@ use crate::client::WriteFormat; use crate::error::Error::IllegalArgument; use crate::error::Result; -use crate::metadata::RowType; +use crate::metadata::{DataType, RowType}; +use crate::row::binary_array::FlussArray; +use crate::row::binary_map::FlussMap; use crate::row::compacted::compacted_row_reader::{CompactedRowDeserializer, CompactedRowReader}; use crate::row::datum::{Date, Time, TimestampLtz, TimestampNtz}; -use crate::row::{Decimal, FlussArray, GenericRow, InternalRow}; +use crate::row::{Decimal, GenericRow, InternalRow}; use std::sync::{Arc, OnceLock}; pub fn calculate_bit_set_width_in_bytes(arity: usize) -> usize { @@ -172,6 +174,10 @@ impl<'a> InternalRow for CompactedRow<'a> { self.decoded_row()?.get_array(pos) } + fn get_map(&self, pos: usize, key_type: &DataType, value_type: &DataType) -> Result { + self.decoded_row()?.get_map(pos, key_type, value_type) + } + fn get_row(&self, pos: usize) -> Result<&GenericRow<'_>> { self.decoded_row()?.get_row(pos) } @@ -188,14 +194,17 @@ impl<'a> InternalRow for CompactedRow<'a> { #[cfg(test)] mod tests { use super::*; - use crate::metadata::{ - BigIntType, BooleanType, BytesType, DataType, DataTypes, DecimalType, DoubleType, - FloatType, IntType, SmallIntType, StringType, TimestampLTzType, TimestampType, TinyIntType, - }; + use crate::metadata::DataTypes; use crate::row::binary::BinaryWriter; use crate::row::binary_array::FlussArrayWriter; + use crate::row::binary_map::FlussMapWriter; + + use crate::metadata::{ + BigIntType, BooleanType, BytesType, DataType, DoubleType, FloatType, IntType, SmallIntType, + StringType, TinyIntType, + }; + use crate::row::Datum; use crate::row::compacted::compacted_row_writer::CompactedRowWriter; - use crate::row::datum::{TimestampLtz, TimestampNtz}; #[test] fn test_compacted_row() { @@ -265,6 +274,9 @@ mod tests { #[test] fn test_compacted_row_temporal_and_decimal_types() { // Comprehensive test covering DATE, TIME, TIMESTAMP (compact/non-compact), and DECIMAL (compact/non-compact) + use crate::metadata::{DecimalType, TimestampLTzType, TimestampType}; + use crate::row::Decimal; + use crate::row::datum::{TimestampLtz, TimestampNtz}; use bigdecimal::{BigDecimal, num_bigint::BigInt}; let row_type = RowType::with_data_types(vec![ @@ -356,7 +368,7 @@ mod tests { arr_writer.write_int(1, 2); arr_writer.write_int(2, 3); let arr = arr_writer.complete().unwrap(); - writer.write_array(arr.as_bytes()); + writer.write_array(&arr); let bytes = writer.to_bytes(); let row = CompactedRow::from_bytes(&row_type, bytes.as_ref()); @@ -381,7 +393,7 @@ mod tests { arr_writer.write_string(1, "fluss"); arr_writer.write_string(2, "rust"); let arr = arr_writer.complete().unwrap(); - writer.write_array(arr.as_bytes()); + writer.write_array(&arr); let bytes = writer.to_bytes(); let row = CompactedRow::from_bytes(&row_type, bytes.as_ref()); @@ -405,7 +417,7 @@ mod tests { arr_writer.set_null_at(1); arr_writer.write_int(2, 30); let arr = arr_writer.complete().unwrap(); - writer.write_array(arr.as_bytes()); + writer.write_array(&arr); let bytes = writer.to_bytes(); let row = CompactedRow::from_bytes(&row_type, bytes.as_ref()); @@ -428,7 +440,7 @@ mod tests { let elem_type = DataTypes::int(); let arr_writer = FlussArrayWriter::new(0, &elem_type); let arr = arr_writer.complete().unwrap(); - writer.write_array(arr.as_bytes()); + writer.write_array(&arr); let bytes = writer.to_bytes(); let row = CompactedRow::from_bytes(&row_type, bytes.as_ref()); @@ -462,7 +474,7 @@ mod tests { outer_writer.write_array(1, &inner2_arr); let outer_arr = outer_writer.complete().unwrap(); - writer.write_array(outer_arr.as_bytes()); + writer.write_array(&outer_arr); let bytes = writer.to_bytes(); let row = CompactedRow::from_bytes(&row_type, bytes.as_ref()); @@ -479,4 +491,140 @@ mod tests { assert_eq!(nested2.size(), 1); assert_eq!(nested2.get_int(0).unwrap(), 99); } + + #[test] + fn test_compacted_row_map() { + let row_type = + RowType::with_data_types(vec![DataTypes::map(DataTypes::int(), DataTypes::string())]); + + let mut writer = CompactedRowWriter::new(row_type.fields().len()); + + let mut map_writer = FlussMapWriter::new(2, &DataTypes::int(), &DataTypes::string()); + map_writer.write_entry(1.into(), "a".into()).unwrap(); + map_writer.write_entry(2.into(), "b".into()).unwrap(); + let map = map_writer.complete().unwrap(); + writer.write_map(&map); + + let bytes = writer.to_bytes(); + let row = CompactedRow::from_bytes(&row_type, bytes.as_ref()); + + let read_map = row + .get_map(0, &DataTypes::int(), &DataTypes::string()) + .unwrap(); + assert_eq!(read_map.size(), 2); + assert_eq!(read_map.key_array().get_int(0).unwrap(), 1); + assert_eq!(read_map.value_array().get_string(0).unwrap(), "a"); + } + + #[test] + fn test_compacted_row_map_with_nulls() { + // Row with two columns: an INT and a nullable MAP + let row_type = RowType::with_data_types(vec![ + DataTypes::int(), + DataTypes::map(DataTypes::int(), DataTypes::string()), + ]); + + // Write row with null map + let mut writer = CompactedRowWriter::new(row_type.fields().len()); + writer.write_int(42); + writer.set_null_at(1); + writer.complete(); + + let bytes = writer.to_bytes(); + let row = CompactedRow::from_bytes(&row_type, bytes.as_ref()); + + assert_eq!(row.get_int(0).unwrap(), 42); + assert!(row.is_null_at(1).unwrap()); + + // Write row with non-null map + writer.reset(); + writer.write_int(99); + let mut map_writer = FlussMapWriter::new(1, &DataTypes::int(), &DataTypes::string()); + map_writer.write_entry(7.into(), "hello".into()).unwrap(); + let map = map_writer.complete().unwrap(); + writer.write_map(&map); + writer.complete(); + + let bytes2 = writer.to_bytes(); + let row2 = CompactedRow::from_bytes(&row_type, bytes2.as_ref()); + assert_eq!(row2.get_int(0).unwrap(), 99); + assert!(!row2.is_null_at(1).unwrap()); + let read_map = row2 + .get_map(1, &DataTypes::int(), &DataTypes::string()) + .unwrap(); + assert_eq!(read_map.size(), 1); + assert_eq!(read_map.key_array().get_int(0).unwrap(), 7); + assert_eq!(read_map.value_array().get_string(0).unwrap(), "hello"); + } + + #[test] + fn test_compacted_row_nested_map() { + // Map> + let row_type = RowType::with_data_types(vec![DataTypes::map( + DataTypes::string(), + DataTypes::array(DataTypes::int()), + )]); + + let mut writer = CompactedRowWriter::new(row_type.fields().len()); + + // Values: [[1, 2], [3]] + let inner_type = DataTypes::int(); + let mut inner1 = FlussArrayWriter::new(2, &inner_type); + inner1.write_int(0, 1); + inner1.write_int(1, 2); + let inner1_arr = inner1.complete().unwrap(); + + let mut inner2 = FlussArrayWriter::new(1, &inner_type); + inner2.write_int(0, 3); + let inner2_arr = inner2.complete().unwrap(); + + let array_type = DataTypes::array(DataTypes::int()); + + let mut map_writer = FlussMapWriter::new(2, &DataTypes::string(), &array_type); + map_writer + .write_entry("a".into(), Datum::Array(inner1_arr)) + .unwrap(); + map_writer + .write_entry("b".into(), Datum::Array(inner2_arr)) + .unwrap(); + let map = map_writer.complete().unwrap(); + writer.write_map(&map); + + let bytes = writer.to_bytes(); + let row = CompactedRow::from_bytes(&row_type, bytes.as_ref()); + + let read_map = row.get_map(0, &DataTypes::string(), &array_type).unwrap(); + assert_eq!(read_map.size(), 2); + assert_eq!(read_map.key_array().get_string(0).unwrap(), "a"); + assert_eq!(read_map.key_array().get_string(1).unwrap(), "b"); + + let nested1 = read_map.value_array().get_array(0).unwrap(); + assert_eq!(nested1.size(), 2); + assert_eq!(nested1.get_int(0).unwrap(), 1); + assert_eq!(nested1.get_int(1).unwrap(), 2); + + let nested2 = read_map.value_array().get_array(1).unwrap(); + assert_eq!(nested2.size(), 1); + assert_eq!(nested2.get_int(0).unwrap(), 3); + } + + #[test] + fn test_compacted_row_empty_map() { + let row_type = + RowType::with_data_types(vec![DataTypes::map(DataTypes::int(), DataTypes::string())]); + + let mut writer = CompactedRowWriter::new(row_type.fields().len()); + + let map_writer = FlussMapWriter::new(0, &DataTypes::int(), &DataTypes::string()); + let map = map_writer.complete().unwrap(); + writer.write_map(&map); + + let bytes = writer.to_bytes(); + let row = CompactedRow::from_bytes(&row_type, bytes.as_ref()); + + let read_map = row + .get_map(0, &DataTypes::int(), &DataTypes::string()) + .unwrap(); + assert_eq!(read_map.size(), 0); + } } diff --git a/crates/fluss/src/row/compacted/compacted_row_reader.rs b/crates/fluss/src/row/compacted/compacted_row_reader.rs index efcb39f2..3f2eb653 100644 --- a/crates/fluss/src/row/compacted/compacted_row_reader.rs +++ b/crates/fluss/src/row/compacted/compacted_row_reader.rs @@ -213,12 +213,14 @@ impl<'a> CompactedRowDeserializer<'a> { let nested_row = nested_deser.deserialize(&nested_reader)?; (Datum::Row(Box::new(nested_row)), next) } - _ => { - return Err(IllegalArgument { - message: format!( - "Unsupported DataType in CompactedRowDeserializer: {dtype:?}" - ), - }); + DataType::Map(map_type) => { + let (bytes, next) = reader.read_bytes(cursor)?; + let map = crate::row::binary_map::FlussMap::from_bytes( + bytes, + map_type.key_type(), + map_type.value_type(), + )?; + (Datum::Map(map), next) } }; cursor = next_cursor; diff --git a/crates/fluss/src/row/compacted/compacted_row_writer.rs b/crates/fluss/src/row/compacted/compacted_row_writer.rs index 36271743..2af8767f 100644 --- a/crates/fluss/src/row/compacted/compacted_row_writer.rs +++ b/crates/fluss/src/row/compacted/compacted_row_writer.rs @@ -17,6 +17,8 @@ use crate::row::Decimal; use crate::row::binary::BinaryWriter; +use crate::row::binary_array::FlussArray; +use crate::row::binary_map::FlussMap; use crate::row::compacted::compacted_row::calculate_bit_set_width_in_bytes; use crate::util::varint::{write_unsigned_varint_to_slice, write_unsigned_varint_u64_to_slice}; use bytes::{Bytes, BytesMut}; @@ -165,8 +167,12 @@ impl BinaryWriter for CompactedRowWriter { self.write_bytes(&bytes[..length.min(bytes.len())]) } - fn write_array(&mut self, value: &[u8]) { - self.write_bytes(value) + fn write_array(&mut self, value: &FlussArray) { + self.write_bytes(value.as_bytes()) + } + + fn write_map(&mut self, value: &FlussMap) { + self.write_bytes(value.as_bytes()) } fn complete(&mut self) { diff --git a/crates/fluss/src/row/datum.rs b/crates/fluss/src/row/datum.rs index d3f19a1e..b1595b31 100644 --- a/crates/fluss/src/row/datum.rs +++ b/crates/fluss/src/row/datum.rs @@ -22,11 +22,12 @@ use crate::row::Decimal; use crate::row::GenericRow; use crate::row::InternalRow; use crate::row::binary_array::FlussArray; +use crate::row::binary_map::FlussMap; use crate::row::field_getter::FieldGetter; use arrow::array::{ ArrayBuilder, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder, FixedSizeBinaryBuilder, Float32Builder, Float64Builder, Int8Builder, Int16Builder, - Int32Builder, Int64Builder, ListBuilder, StringBuilder, StructBuilder, + Int32Builder, Int64Builder, ListBuilder, MapBuilder, StringBuilder, StructBuilder, Time32MillisecondBuilder, Time32SecondBuilder, Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampMicrosecondBuilder, TimestampMillisecondBuilder, TimestampNanosecondBuilder, TimestampSecondBuilder, @@ -76,6 +77,8 @@ pub enum Datum<'a> { TimestampLtz(TimestampLtz), #[display("{0}")] Array(FlussArray), + #[display("{0}")] + Map(FlussMap), #[display("{0:?}")] Row(Box>), } @@ -141,6 +144,17 @@ impl Datum<'_> { } } + pub fn is_map(&self) -> bool { + matches!(self, Datum::Map(_)) + } + + pub fn as_map(&self) -> &FlussMap { + match self { + Self::Map(m) => m, + _ => panic!("not a map: {self:?}"), + } + } + pub fn as_row(&self) -> &GenericRow<'_> { match self { Self::Row(r) => r.as_ref(), @@ -168,6 +182,7 @@ impl<'a> Datum<'a> { Datum::TimestampNtz(t) => Datum::TimestampNtz(t), Datum::TimestampLtz(t) => Datum::TimestampLtz(t), Datum::Array(a) => Datum::Array(a), + Datum::Map(m) => Datum::Map(m), Datum::Row(boxed) => Datum::Row(Box::new(boxed.into_owned())), } } @@ -443,11 +458,19 @@ impl<'a> From for Datum<'a> { } } +impl<'a> From for Datum<'a> { + #[inline] + fn from(map: FlussMap) -> Datum<'a> { + Datum::Map(map) + } +} + pub trait ToArrow { fn append_to( &self, builder: &mut dyn ArrayBuilder, - data_type: &arrow_schema::DataType, + fluss_type: &crate::metadata::DataType, + arrow_type: &arrow_schema::DataType, ) -> Result<()>; } @@ -552,10 +575,9 @@ impl AppendResult for std::result::Result<(), ArrowError> { fn append_fluss_array_to_list_builder( arr: &FlussArray, builder: &mut dyn ArrayBuilder, - data_type: &arrow_schema::DataType, + fluss_type: &crate::metadata::DataType, + arrow_type: &arrow_schema::DataType, ) -> Result<()> { - use crate::record::from_arrow_type; - let list_builder = builder .as_any_mut() .downcast_mut::>>() @@ -563,34 +585,106 @@ fn append_fluss_array_to_list_builder( message: "Builder type mismatch for Array: expected ListBuilder".to_string(), })?; - let element_arrow_type = match data_type { + let element_fluss_type = match fluss_type { + crate::metadata::DataType::Array(a) => a.get_element_type(), + _ => { + return Err(RowConvertError { + message: format!("Expected Array Fluss type for Array datum, got: {fluss_type:?}"), + }); + } + }; + + let element_arrow_type = match arrow_type { arrow_schema::DataType::List(field) => field.data_type().clone(), _ => { return Err(RowConvertError { - message: format!("Expected List Arrow type for Array datum, got: {data_type:?}"), + message: format!("Expected List Arrow type for Array datum, got: {arrow_type:?}"), }); } }; - let element_fluss_type = from_arrow_type(&element_arrow_type)?; let values_builder = list_builder.values(); for i in 0..arr.size() { if arr.is_null_at(i) { append_null_for_type(values_builder, &element_arrow_type)?; } else { - let datum = read_datum_from_fluss_array(arr, i, &element_fluss_type)?; - datum.append_to(values_builder, &element_arrow_type)?; + let datum = read_datum_from_fluss_array(arr, i, element_fluss_type)?; + datum.append_to(values_builder, element_fluss_type, &element_arrow_type)?; } } list_builder.append(true); Ok(()) } +fn append_fluss_map_to_map_builder( + map: &crate::row::FlussMap, + builder: &mut dyn ArrayBuilder, + fluss_type: &crate::metadata::DataType, + arrow_type: &arrow_schema::DataType, +) -> Result<()> { + let map_builder = builder + .as_any_mut() + .downcast_mut::, Box>>() + .ok_or_else(|| RowConvertError { + message: "Builder type mismatch for Map: expected MapBuilder".to_string(), + })?; + + let expected_map_type = match fluss_type { + crate::metadata::DataType::Map(m) => m, + _ => { + return Err(RowConvertError { + message: format!("Expected Map Fluss type for Map datum, got: {fluss_type:?}"), + }); + } + }; + + let (key_arrow_type, value_arrow_type) = match arrow_type { + arrow_schema::DataType::Map(entries_field, _) => match entries_field.data_type() { + arrow_schema::DataType::Struct(fields) if fields.len() == 2 => { + (fields[0].data_type().clone(), fields[1].data_type().clone()) + } + other => { + return Err(RowConvertError { + message: format!( + "Expected Struct with 2 fields for Map entries, got: {other:?}" + ), + }); + } + }, + _ => { + return Err(RowConvertError { + message: format!("Expected Map Arrow type for Map datum, got: {arrow_type:?}"), + }); + } + }; + + let key_fluss_type = expected_map_type.key_type(); + let value_fluss_type = expected_map_type.value_type(); + let key_array = map.key_array(); + let value_array = map.value_array(); + + for i in 0..map.size() { + let key_datum = read_datum_from_fluss_array(key_array, i, key_fluss_type)?; + key_datum.append_to(map_builder.keys(), key_fluss_type, &key_arrow_type)?; + + if value_array.is_null_at(i) { + append_null_for_type(map_builder.values(), &value_arrow_type)?; + } else { + let val_datum = read_datum_from_fluss_array(value_array, i, value_fluss_type)?; + val_datum.append_to(map_builder.values(), value_fluss_type, &value_arrow_type)?; + } + } + map_builder.append(true).map_err(|e| RowConvertError { + message: format!("Failed to append Map entries: {e}"), + })?; + Ok(()) +} + fn read_datum_from_fluss_array<'a>( arr: &FlussArray, pos: usize, - element_type: &DataType, + element_type: &crate::metadata::DataType, ) -> Result> { if let DataType::Row(row_type) = element_type { let compacted = arr.get_row(pos, row_type)?; @@ -675,6 +769,20 @@ fn append_null_for_type( arrow_schema::DataType::List(_) => { downcast_null!(ListBuilder>) } + arrow_schema::DataType::Map(_, _) => { + let b = builder + .as_any_mut() + .downcast_mut::, Box>>() + .ok_or_else(|| RowConvertError { + message: format!( + "Builder type mismatch: expected MapBuilder for {data_type:?}", + ), + })?; + b.append(false).map_err(|e| RowConvertError { + message: format!("Failed to append null Map entries: {e}"), + })?; + Ok(()) + } arrow_schema::DataType::Struct(fields) => { // StructBuilder::append_null only flips parent validity; children must each get a null too. let struct_builder = builder @@ -704,7 +812,8 @@ fn append_null_for_type( fn append_generic_row_to_struct_builder( row: &GenericRow<'_>, builder: &mut dyn ArrayBuilder, - data_type: &arrow_schema::DataType, + fluss_type: &crate::metadata::DataType, + arrow_type: &arrow_schema::DataType, ) -> Result<()> { let struct_builder = builder .as_any_mut() @@ -713,11 +822,20 @@ fn append_generic_row_to_struct_builder( message: "Builder type mismatch for Row: expected StructBuilder".to_string(), })?; - let fields = match data_type { + let row_type = match fluss_type { + crate::metadata::DataType::Row(rt) => rt, + _ => { + return Err(RowConvertError { + message: format!("Expected Row Fluss type for Row datum, got: {fluss_type:?}"), + }); + } + }; + + let fields = match arrow_type { arrow_schema::DataType::Struct(fields) => fields.clone(), _ => { return Err(RowConvertError { - message: format!("Expected Struct Arrow type for Row datum, got: {data_type:?}"), + message: format!("Expected Struct Arrow type for Row datum, got: {arrow_type:?}"), }); } }; @@ -736,7 +854,8 @@ fn append_generic_row_to_struct_builder( let field_builders = struct_builder.field_builders_mut(); for (i, datum) in row.values.iter().enumerate() { let child = field_builders[i].as_mut(); - datum.append_to(child, fields[i].data_type())?; + let child_fluss_type = row_type.fields()[i].data_type(); + datum.append_to(child, child_fluss_type, fields[i].data_type())?; } } struct_builder.append(true); @@ -747,7 +866,8 @@ impl Datum<'_> { pub fn append_to( &self, builder: &mut dyn ArrayBuilder, - data_type: &arrow_schema::DataType, + fluss_type: &crate::metadata::DataType, + arrow_type: &arrow_schema::DataType, ) -> Result<()> { macro_rules! append_value_to_arrow { ($builder_type:ty, $value:expr) => { @@ -759,7 +879,7 @@ impl Datum<'_> { } match self { - Datum::Null => return append_null_for_type(builder, data_type), + Datum::Null => return append_null_for_type(builder, arrow_type), Datum::Bool(v) => append_value_to_arrow!(BooleanBuilder, *v), Datum::Int8(v) => append_value_to_arrow!(Int8Builder, *v), Datum::Int16(v) => append_value_to_arrow!(Int16Builder, *v), @@ -768,7 +888,7 @@ impl Datum<'_> { Datum::Float32(v) => append_value_to_arrow!(Float32Builder, v.into_inner()), Datum::Float64(v) => append_value_to_arrow!(Float64Builder, v.into_inner()), Datum::String(v) => append_value_to_arrow!(StringBuilder, v.as_ref()), - Datum::Blob(v) => match data_type { + Datum::Blob(v) => match arrow_type { arrow_schema::DataType::Binary => { append_value_to_arrow!(BinaryBuilder, v.as_ref()); } @@ -778,18 +898,18 @@ impl Datum<'_> { _ => { return Err(RowConvertError { message: format!( - "Expected Binary or FixedSizeBinary Arrow type, got: {data_type:?}" + "Expected Binary or FixedSizeBinary Arrow type, got: {arrow_type:?}" ), }); } }, Datum::Decimal(decimal) => { // Extract target precision and scale from Arrow schema - let (p, s) = match data_type { + let (p, s) = match arrow_type { arrow_schema::DataType::Decimal128(p, s) => (*p, *s), _ => { return Err(RowConvertError { - message: format!("Expected Decimal128 Arrow type, got: {data_type:?}"), + message: format!("Expected Decimal128 Arrow type, got: {arrow_type:?}"), }); } }; @@ -817,7 +937,7 @@ impl Datum<'_> { // Convert to Arrow's time unit based on schema let millis = time.get_inner(); - match data_type { + match arrow_type { arrow_schema::DataType::Time32(arrow_schema::TimeUnit::Second) => { if let Some(b) = builder.as_any_mut().downcast_mut::() { @@ -877,7 +997,7 @@ impl Datum<'_> { _ => { return Err(RowConvertError { message: format!( - "Expected Time32/Time64 Arrow type, got: {data_type:?}" + "Expected Time32/Time64 Arrow type, got: {arrow_type:?}" ), }); } @@ -962,10 +1082,13 @@ impl Datum<'_> { }); } Datum::Array(arr) => { - return append_fluss_array_to_list_builder(arr, builder, data_type); + return append_fluss_array_to_list_builder(arr, builder, fluss_type, arrow_type); + } + Datum::Map(map) => { + return append_fluss_map_to_map_builder(map, builder, fluss_type, arrow_type); } Datum::Row(row) => { - return append_generic_row_to_struct_builder(row, builder, data_type); + return append_generic_row_to_struct_builder(row, builder, fluss_type, arrow_type); } } @@ -985,7 +1108,8 @@ macro_rules! impl_to_arrow { fn append_to( &self, builder: &mut dyn ArrayBuilder, - _data_type: &arrow_schema::DataType, + _fluss_type: &crate::metadata::DataType, + _arrow_type: &arrow_schema::DataType, ) -> Result<()> { if let Some(b) = builder.as_any_mut().downcast_mut::<$variant>() { b.append_value(*self); @@ -1208,20 +1332,23 @@ mod tests { #[test] fn datum_append_to_builder() { + use crate::metadata::DataTypes; let mut builder = Int32Builder::new(); + let int_type = DataTypes::int(); Datum::Null - .append_to(&mut builder, &arrow_schema::DataType::Int32) + .append_to(&mut builder, &int_type, &arrow_schema::DataType::Int32) .unwrap(); Datum::Int32(5) - .append_to(&mut builder, &arrow_schema::DataType::Int32) + .append_to(&mut builder, &int_type, &arrow_schema::DataType::Int32) .unwrap(); let array = builder.finish(); assert!(array.is_null(0)); assert_eq!(array.value(1), 5); let mut builder = StringBuilder::new(); + let string_type = DataTypes::string(); let err = Datum::Int32(1) - .append_to(&mut builder, &arrow_schema::DataType::Utf8) + .append_to(&mut builder, &string_type, &arrow_schema::DataType::Utf8) .unwrap_err(); assert!(matches!(err, RowConvertError { .. })); } @@ -1246,6 +1373,94 @@ mod tests { assert_eq!(date.month(), 1); assert_eq!(date.day(), 1); } + #[test] + fn test_datum_map_appends_to_arrow() { + use crate::metadata::DataTypes; + use crate::row::binary_map::FlussMapWriter; + use arrow::array::MapBuilder; + use std::sync::Arc; + + let mut writer = FlussMapWriter::new(1, &DataTypes::int(), &DataTypes::string()); + writer.write_entry(99.into(), "arrow_test".into()).unwrap(); + let map = writer.complete().unwrap(); + + let arrow_type = arrow_schema::DataType::Map( + Arc::new(arrow_schema::Field::new( + "entries", + arrow_schema::DataType::Struct(arrow_schema::Fields::from(vec![ + arrow_schema::Field::new("key", arrow_schema::DataType::Int32, false), + arrow_schema::Field::new("value", arrow_schema::DataType::Utf8, true), + ])), + false, + )), + false, + ); + + let mut map_builder: MapBuilder< + Box, + Box, + > = MapBuilder::new( + None, + Box::new(Int32Builder::new()), + Box::new(StringBuilder::new()), + ); + + let map_type = DataTypes::map(DataTypes::int(), DataTypes::string()); + Datum::Map(map) + .append_to(&mut map_builder, &map_type, &arrow_type) + .unwrap(); + + let array = map_builder.finish(); + assert_eq!(array.len(), 1); + assert!(!array.is_null(0)); + } + + #[test] + fn test_datum_map_append_type_mismatch() { + use crate::metadata::DataTypes; + use crate::row::binary_map::FlussMapWriter; + use arrow::array::{Float64Builder, MapBuilder, StringBuilder}; + use std::sync::Arc; + + // 1. Construct a Map with Keys: String, Values: Float64 + let mut writer = FlussMapWriter::new(1, &DataTypes::string(), &DataTypes::double()); + writer.write_entry("key1".into(), 1.23.into()).unwrap(); + let map = writer.complete().unwrap(); + + // 2. Define an Arrow Map builder for (String, Float64) using Boxed builders + let mut map_builder: MapBuilder< + Box, + Box, + > = MapBuilder::new( + None, + Box::new(StringBuilder::new()), + Box::new(Float64Builder::new()), + ); + + // 3. Define an INCOMPATIBLE expected Fluss type (Int32 instead of Map) + let mismatched_type = DataTypes::int(); + + // 4. Define the Arrow type (must match the builder structure) + let arrow_type = arrow_schema::DataType::Map( + Arc::new(arrow_schema::Field::new( + "entries", + arrow_schema::DataType::Struct(arrow_schema::Fields::from(vec![ + arrow_schema::Field::new("key", arrow_schema::DataType::Utf8, false), + arrow_schema::Field::new("value", arrow_schema::DataType::Float64, true), + ])), + false, + )), + false, + ); + + // 5. Assert that append_to returns an error + let result = Datum::Map(map).append_to(&mut map_builder, &mismatched_type, &arrow_type); + + assert!(result.is_err()); + let err = result.unwrap_err().to_string(); + assert!(err.contains("row convert error Expected Map Fluss type for Map datum")); + assert!(err.contains("Int(IntType { nullable: true })")); + } } #[cfg(test)] @@ -1305,10 +1520,11 @@ mod timestamp_tests { #[test] fn test_row_arrow_struct_round_trip() { - let row_type_owned = DataTypes::row(vec![ + let row_type = crate::metadata::RowType::new(vec![ DataField::new("x", DataTypes::int(), None), DataField::new("label", DataTypes::string(), None), ]); + let row_type_owned = DataType::Row(row_type.clone()); let arrow_struct_dt = to_arrow_type(&row_type_owned).unwrap(); let struct_fields: Fields = match &arrow_struct_dt { arrow_schema::DataType::Struct(f) => f.clone(), @@ -1321,18 +1537,18 @@ mod timestamp_tests { r0.set_field(0, 42_i32); r0.set_field(1, "hello"); Datum::Row(Box::new(r0)) - .append_to(&mut struct_builder, &arrow_struct_dt) + .append_to(&mut struct_builder, &row_type_owned, &arrow_struct_dt) .expect("append row 0"); Datum::Null - .append_to(&mut struct_builder, &arrow_struct_dt) + .append_to(&mut struct_builder, &row_type_owned, &arrow_struct_dt) .expect("append null row"); let mut r2 = GenericRow::new(2); r2.set_field(0, -7_i32); r2.set_field(1, Datum::Null); Datum::Row(Box::new(r2)) - .append_to(&mut struct_builder, &arrow_struct_dt) + .append_to(&mut struct_builder, &row_type_owned, &arrow_struct_dt) .expect("append row 2"); let struct_array: StructArray = struct_builder.finish(); @@ -1346,7 +1562,7 @@ mod timestamp_tests { RecordBatch::try_new(schema, vec![Arc::new(struct_array)]).expect("record batch"), ); - let mut columnar = ColumnarRow::new(batch, 0, None); + let mut columnar = ColumnarRow::new(batch, Arc::new(row_type), 0, None); let nested = columnar.get_row(0).expect("get_row 0"); assert_eq!(nested.get_int(0).unwrap(), 42); diff --git a/crates/fluss/src/row/encode/compacted_key_encoder.rs b/crates/fluss/src/row/encode/compacted_key_encoder.rs index c7f16d6c..81cd96fa 100644 --- a/crates/fluss/src/row/encode/compacted_key_encoder.rs +++ b/crates/fluss/src/row/encode/compacted_key_encoder.rs @@ -165,6 +165,22 @@ mod tests { .expect("CompactedKeyEncoder initialization failed") } + #[test] + fn test_encode_map_rejected() { + let row_type = + RowType::with_data_types(vec![DataTypes::map(DataTypes::string(), DataTypes::int())]); + + let res = CompactedKeyEncoder::new(&row_type, vec![0]); + assert!(res.is_err()); + if let Err(e) = res { + assert!( + e.to_string().contains("Cannot use Map"), + "Expected error to contain 'Cannot use Map', got '{}'", + e + ); + } + } + #[test] fn test_encode_key() { let row_type = RowType::with_data_types(vec![ @@ -364,7 +380,7 @@ mod tests { DataTypes::array(DataTypes::int()), // ARRAY DataTypes::array(DataTypes::float().as_non_nullable()), // ARRAY DataTypes::array(DataTypes::array(DataTypes::string())), // ARRAY> - // TODO: Add support for MAP type + // Note: MAP is rejected as a key type (see test_encode_map_rejected) // TODO: Add support for ROW type ]); diff --git a/crates/fluss/src/row/field_getter.rs b/crates/fluss/src/row/field_getter.rs index a1ea378f..41322f54 100644 --- a/crates/fluss/src/row/field_getter.rs +++ b/crates/fluss/src/row/field_getter.rs @@ -82,10 +82,13 @@ impl FieldGetter { pos, precision: t.precision(), }, - // TODO: add Map variant when get_map is available in InternalRow. DataType::Array(_) => InnerFieldGetter::Array { pos }, + DataType::Map(m) => InnerFieldGetter::Map { + pos, + key_type: m.key_type().clone(), + value_type: m.value_type().clone(), + }, DataType::Row(_) => InnerFieldGetter::Row { pos }, - _ => unimplemented!("DataType {:?} is currently unimplemented", data_type), }; if data_type.is_nullable() { @@ -155,6 +158,11 @@ pub enum InnerFieldGetter { Array { pos: usize, }, + Map { + pos: usize, + key_type: DataType, + value_type: DataType, + }, Row { pos: usize, }, @@ -187,8 +195,12 @@ impl InnerFieldGetter { InnerFieldGetter::TimestampLtz { pos, precision } => { Datum::TimestampLtz(row.get_timestamp_ltz(*pos, *precision)?) } - // TODO: add Map field getter support once its binary form is implemented. InnerFieldGetter::Array { pos } => Datum::Array(row.get_array(*pos)?), + InnerFieldGetter::Map { + pos, + key_type, + value_type, + } => Datum::Map(row.get_map(*pos, key_type, value_type)?), InnerFieldGetter::Row { pos } => Datum::Row(Box::new(row.get_row(*pos)?.clone())), }) } @@ -212,6 +224,7 @@ impl InnerFieldGetter { | Self::Timestamp { pos, .. } | Self::TimestampLtz { pos, .. } | Self::Array { pos } + | Self::Map { pos, .. } | Self::Row { pos } => *pos, } } @@ -223,6 +236,7 @@ mod tests { use crate::metadata::DataTypes; use crate::row::GenericRow; use crate::row::binary_array::FlussArrayWriter; + use crate::row::binary_map::FlussMapWriter; #[test] fn test_field_getter_array() { @@ -258,4 +272,38 @@ mod tests { let datum = getter.get_field(&row).unwrap(); assert!(datum.is_null()); } + + #[test] + fn test_field_getter_map() { + let mut map_writer = FlussMapWriter::new(1, &DataTypes::int(), &DataTypes::string()); + map_writer.write_entry(42.into(), "value".into()).unwrap(); + let map = map_writer.complete().unwrap(); + + let mut row = GenericRow::new(2); + row.set_field(0, Datum::Int32(1)); + row.set_field(1, Datum::Map(map)); + + let data_type = DataTypes::map(DataTypes::int(), DataTypes::string()); + let getter = FieldGetter::create(&data_type, 1); + let datum = getter.get_field(&row).unwrap(); + + match datum { + Datum::Map(m) => { + assert_eq!(m.size(), 1); + assert_eq!(m.key_array().get_int(0).unwrap(), 42); + assert_eq!(m.value_array().get_string(0).unwrap(), "value"); + } + _ => panic!("Expected Map datum"), + } + } + + #[test] + fn test_field_getter_nullable_map() { + let row = GenericRow::from_data(vec![Datum::Null]); + + let data_type = DataTypes::map(DataTypes::int(), DataTypes::string()); + let getter = FieldGetter::create(&data_type, 0); + let datum = getter.get_field(&row).unwrap(); + assert!(datum.is_null()); + } } diff --git a/crates/fluss/src/row/lookup_row.rs b/crates/fluss/src/row/lookup_row.rs index 76505abf..fd3db4fc 100644 --- a/crates/fluss/src/row/lookup_row.rs +++ b/crates/fluss/src/row/lookup_row.rs @@ -21,10 +21,11 @@ use crate::client::WriteFormat; use crate::error::Result; +use crate::metadata::DataType; use crate::row::compacted::CompactedRow; use crate::row::datum::{Date, Time, TimestampLtz, TimestampNtz}; use crate::row::projected_row::ProjectedRow; -use crate::row::{Decimal, FlussArray, GenericRow, InternalRow}; +use crate::row::{Decimal, FlussArray, FlussMap, GenericRow, InternalRow}; pub struct LookupRow<'a> { inner: Inner<'a>, @@ -116,6 +117,9 @@ impl<'a> InternalRow for LookupRow<'a> { fn get_array(&self, pos: usize) -> Result { delegate!(self, get_array, pos) } + fn get_map(&self, pos: usize, key_type: &DataType, value_type: &DataType) -> Result { + delegate!(self, get_map, pos, key_type, value_type) + } fn get_row(&self, pos: usize) -> Result<&GenericRow<'_>> { delegate!(self, get_row, pos) } diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs index 36f9a1c2..2456ee4d 100644 --- a/crates/fluss/src/row/mod.rs +++ b/crates/fluss/src/row/mod.rs @@ -16,6 +16,7 @@ // under the License. pub mod binary_array; +pub mod binary_map; mod column; pub(crate) mod datum; @@ -32,7 +33,9 @@ mod projected_row; mod row_decoder; use crate::client::WriteFormat; -pub use binary_array::FlussArray; +use crate::metadata::DataType; +pub use binary_array::{FlussArray, FlussArrayWriter}; +pub use binary_map::{FlussMap, FlussMapWriter}; use bytes::Bytes; pub use column::*; pub use compacted::CompactedRow; @@ -131,7 +134,10 @@ pub trait InternalRow: Send + Sync { /// Returns the array value at the given position fn get_array(&self, pos: usize) -> Result; - /// Returns the nested row value at the given position + /// Returns the map value at the given position + fn get_map(&self, pos: usize, key_type: &DataType, value_type: &DataType) -> Result; + + /// Returns the nested row value at the given position fn get_row(&self, pos: usize) -> Result<&GenericRow<'_>> { Err(IllegalArgument { message: format!("get_row not supported at position {pos}"), @@ -303,6 +309,20 @@ impl<'a> InternalRow for GenericRow<'a> { } } + fn get_map( + &self, + pos: usize, + _key_type: &DataType, + _value_type: &DataType, + ) -> Result { + match self.get_value(pos)? { + Datum::Map(m) => Ok(m.clone()), + other => Err(IllegalArgument { + message: format!("type mismatch at position {pos}: expected Map, got {other:?}"), + }), + } + } + fn get_row(&self, pos: usize) -> Result<&GenericRow<'_>> { match self.get_value(pos)? { Datum::Row(r) => Ok(r.as_ref()), diff --git a/crates/fluss/src/row/projected_row.rs b/crates/fluss/src/row/projected_row.rs index fc4a521e..0075f0b8 100644 --- a/crates/fluss/src/row/projected_row.rs +++ b/crates/fluss/src/row/projected_row.rs @@ -21,9 +21,10 @@ use crate::client::WriteFormat; use crate::error::Error::IllegalArgument; use crate::error::Result; +use crate::metadata::DataType; use crate::metadata::UNEXIST_MAPPING; use crate::row::datum::{Date, Time, TimestampLtz, TimestampNtz}; -use crate::row::{Decimal, FlussArray, GenericRow, InternalRow}; +use crate::row::{Decimal, FlussArray, FlussMap, GenericRow, InternalRow}; use std::sync::Arc; pub(crate) struct ProjectedRow { @@ -142,6 +143,10 @@ impl InternalRow for ProjectedRow { project!(self, get_array, pos) } + fn get_map(&self, pos: usize, key_type: &DataType, value_type: &DataType) -> Result { + project!(self, get_map, pos, key_type, value_type) + } + fn get_row(&self, pos: usize) -> Result<&GenericRow<'_>> { project!(self, get_row, pos) } diff --git a/crates/fluss/tests/integration/log_table.rs b/crates/fluss/tests/integration/log_table.rs index fadb4962..11c5b3e9 100644 --- a/crates/fluss/tests/integration/log_table.rs +++ b/crates/fluss/tests/integration/log_table.rs @@ -1041,6 +1041,98 @@ mod table_test { .expect("Failed to drop table"); } + #[tokio::test] + async fn test_map_datatype_roundtrip() { + use fluss::row::binary_map::FlussMapWriter; + use fluss::row::{Datum, GenericRow}; + + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("Failed to get admin"); + + let table_path = TablePath::new("fluss", "test_map_datatype_roundtrip"); + + let key_type = DataTypes::string(); + let value_type = DataTypes::int(); + let map_type = DataTypes::map(key_type.clone(), value_type.clone()); + + let table_descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("id", DataTypes::int()) + .column("map_col", map_type.clone()) + .build() + .expect("Failed to build schema"), + ) + .build() + .expect("Failed to build table"); + + create_table(&admin, &table_path, &table_descriptor).await; + + let table = connection + .get_table(&table_path) + .await + .expect("Failed to get table"); + + // 1. Construct FlussMap + let mut map_writer = FlussMapWriter::new(3, &key_type, &value_type); + map_writer.write_entry("k1".into(), 10.into()).unwrap(); + map_writer.write_entry("k2".into(), 20.into()).unwrap(); + map_writer.write_entry("k3".into(), 30.into()).unwrap(); + let fluss_map = map_writer.complete().unwrap(); + + // 2. Insert Row + let mut row = GenericRow::new(2); + row.set_field(0, 1i32); + row.set_field(1, Datum::Map(fluss_map)); + + let append_writer = table + .new_append() + .expect("Failed to create append") + .create_writer() + .expect("Failed to create writer"); + + append_writer.append(&row).expect("Failed to append row"); + append_writer.flush().await.expect("Failed to flush"); + + // 3. Fetch Row + let records = scan_table(&table, |scan| scan).await; + assert_eq!(records.len(), 1, "Expected 1 record"); + + let found_row = records[0].row(); + assert_eq!(found_row.get_int(0).unwrap(), 1); + + // 4. Assert Map. Look the types up from `map_type` rather than reusing + // the locally-stashed `key_type`/`value_type`: `MapType::with_nullable` + // forces the stored key non-nullable, which exercises the same + // (non-nullable schema) vs (Arrow-derived nullable) comparison realistic + // callers hit. + let (mt_key, mt_value) = match &map_type { + fluss::metadata::DataType::Map(m) => (m.key_type(), m.value_type()), + _ => unreachable!("map_type is a MAP"), + }; + let decoded_map = found_row + .get_map(1, mt_key, mt_value) + .expect("Failed to get map"); + assert_eq!(decoded_map.size(), 3); + + let decoded_keys = decoded_map.key_array(); + let decoded_values = decoded_map.value_array(); + + assert_eq!(decoded_keys.get_string(0).unwrap(), "k1"); + assert_eq!(decoded_keys.get_string(1).unwrap(), "k2"); + assert_eq!(decoded_keys.get_string(2).unwrap(), "k3"); + + assert_eq!(decoded_values.get_int(0).unwrap(), 10); + assert_eq!(decoded_values.get_int(1).unwrap(), 20); + assert_eq!(decoded_values.get_int(2).unwrap(), 30); + + admin + .drop_table(&table_path, false) + .await + .expect("Failed to drop table"); + } + #[tokio::test] async fn partitioned_table_append_scan() { let cluster = get_shared_cluster();