Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions bindings/cpp/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,9 +589,7 @@ pub fn compacted_row_to_owned(
Datum::Blob(Cow::Owned(row.get_binary(i, dt.length())?.to_vec()))
}
fcore::metadata::DataType::Array(_) => Datum::Array(row.get_array(i)?),
fcore::metadata::DataType::Map(mt) => {
Datum::Map(row.get_map(i, mt.key_type(), mt.value_type())?)
}
fcore::metadata::DataType::Map(_) => Datum::Map(row.get_map(i)?),
other => return Err(anyhow!("Unsupported data type for column {i}: {other:?}")),
};

Expand Down
11 changes: 9 additions & 2 deletions crates/fluss/src/row/binary_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -832,8 +832,15 @@ impl InternalRow for FlussArray {
self.get_array(pos)
}

fn get_map(&self, pos: usize, key_type: &DataType, value_type: &DataType) -> Result<FlussMap> {
self.get_map(pos, key_type, value_type)
fn get_map(&self, pos: usize) -> Result<FlussMap> {
// FlussArray carries no schema; nested map reads must go through the
// inherent FlussArray::get_map(pos, key_type, value_type).
Err(IllegalArgument {
message: format!(
"InternalRow::get_map is not supported on FlussArray (pos {pos}); \
use FlussArray::get_map(pos, key_type, value_type) directly"
),
})
}
}

Expand Down
153 changes: 147 additions & 6 deletions crates/fluss/src/row/binary_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::error::Error::IllegalArgument;
use crate::error::Result;
use crate::metadata::DataType;
use crate::row::binary_array::{FlussArray, FlussArrayWriter};
use crate::row::datum::Datum;
use crate::row::datum::{Datum, read_datum_from_fluss_array};
use bytes::Bytes;
use serde::Serialize;
use std::fmt;
Expand All @@ -41,6 +41,8 @@ pub struct FlussMap {
data: Bytes,
key_array: FlussArray,
value_array: FlussArray,
key_type: DataType,
value_type: DataType,
}

impl fmt::Debug for FlussMap {
Expand Down Expand Up @@ -190,6 +192,8 @@ impl FlussMap {
data: Bytes::copy_from_slice(data),
key_array,
value_array,
key_type: key_type.clone(),
value_type: value_type.clone(),
})
}

Expand All @@ -204,13 +208,20 @@ impl FlussMap {
data,
key_array,
value_array,
key_type: key_type.clone(),
value_type: value_type.clone(),
})
}

/// Creates a FlussMap by combining a key array and a value array.
///
/// Copies both arrays into a new contiguous buffer.
pub fn from_arrays(key_array: &FlussArray, value_array: &FlussArray) -> Result<Self> {
pub fn from_arrays(
key_array: &FlussArray,
value_array: &FlussArray,
key_type: &DataType,
value_type: &DataType,
) -> Result<Self> {
if key_array.size() != value_array.size() {
return Err(IllegalArgument {
message: format!(
Expand Down Expand Up @@ -239,6 +250,8 @@ impl FlussMap {
data,
key_array: key_array.clone(),
value_array: value_array.clone(),
key_type: key_type.clone(),
value_type: value_type.clone(),
})
}

Expand All @@ -261,8 +274,61 @@ impl FlussMap {
pub fn value_array(&self) -> &FlussArray {
&self.value_array
}

pub fn key_type(&self) -> &DataType {
&self.key_type
}

pub fn value_type(&self) -> &DataType {
&self.value_type
}

pub fn entries(&self) -> Entries<'_> {
Entries {
map: self,
index: 0,
}
}

/// O(n) linear scan; the binary format carries no key index.
pub fn get<'a>(&'a self, key: &Datum<'_>) -> Result<Option<Datum<'a>>> {
for entry in self.entries() {
let (k, v) = entry?;
if &k == key {
return Ok(Some(v));
}
}
Ok(None)
}
}

pub struct Entries<'a> {
map: &'a FlussMap,
index: usize,
}

impl<'a> Iterator for Entries<'a> {
type Item = Result<(Datum<'a>, Datum<'a>)>;

fn next(&mut self) -> Option<Self::Item> {
if self.index >= self.map.size() {
return None;
}
let i = self.index;
self.index += 1;
let key = read_datum_from_fluss_array(&self.map.key_array, i, &self.map.key_type);
let value = read_datum_from_fluss_array(&self.map.value_array, i, &self.map.value_type);
Some(key.and_then(|k| value.map(|v| (k, v))))
}

fn size_hint(&self) -> (usize, Option<usize>) {
let remaining = self.map.size() - self.index;
(remaining, Some(remaining))
}
}

impl ExactSizeIterator for Entries<'_> {}

/// Writer for building a `FlussMap` entry by entry.
pub struct FlussMapWriter {
key_writer: FlussArrayWriter,
Expand All @@ -284,6 +350,18 @@ impl FlussMapWriter {
}
}

pub fn extend<'a, I, K, V>(&mut self, entries: I) -> Result<()>
where
I: IntoIterator<Item = (K, V)>,
K: Into<Datum<'a>>,
V: Into<Datum<'a>>,
{
for (k, v) in entries {
self.write_entry(k.into(), v.into())?;
}
Ok(())
}

/// Writes a key-value entry into the map.
///
/// # Errors
Expand Down Expand Up @@ -315,7 +393,7 @@ impl FlussMapWriter {
pub fn complete(self) -> Result<FlussMap> {
let key_array = self.key_writer.complete()?;
let value_array = self.value_writer.complete()?;
FlussMap::from_arrays(&key_array, &value_array)
FlussMap::from_arrays(&key_array, &value_array, &self.key_type, &self.value_type)
}

fn write_datum(
Expand Down Expand Up @@ -480,7 +558,13 @@ mod tests {
let value_writer = FlussArrayWriter::new(2, &DataTypes::string());
let value_array = value_writer.complete().unwrap();

let err = FlussMap::from_arrays(&key_array, &value_array).unwrap_err();
let err = FlussMap::from_arrays(
&key_array,
&value_array,
&DataTypes::int(),
&DataTypes::string(),
)
.unwrap_err();
assert!(err.to_string().contains("does not match value array size"));
}

Expand Down Expand Up @@ -520,7 +604,13 @@ mod tests {
value_writer.write_int(0, 100);
let value_array = value_writer.complete().unwrap();

let map = FlussMap::from_arrays(&key_array, &value_array).unwrap();
let map = FlussMap::from_arrays(
&key_array,
&value_array,
&DataTypes::int(),
&DataTypes::int(),
)
.unwrap();
let bytes = map.as_bytes();

// Valid bytes should pass
Expand All @@ -545,7 +635,13 @@ mod tests {
value_writer.write_int(0, 100);
let value_array = value_writer.complete().unwrap();

let err = FlussMap::from_arrays(&key_array, &value_array).unwrap_err();
let err = FlussMap::from_arrays(
&key_array,
&value_array,
&DataTypes::int(),
&DataTypes::int(),
)
.unwrap_err();
assert!(err.to_string().contains("keys cannot be null"));

let key_bytes = key_array.as_bytes();
Expand All @@ -558,4 +654,49 @@ mod tests {
let err = FlussMap::from_bytes(&data, &DataTypes::int(), &DataTypes::int()).unwrap_err();
assert!(err.to_string().contains("keys cannot be null"));
}

#[test]
fn entries_yields_typed_pairs_including_nulls() {
let mut writer = FlussMapWriter::new(3, &DataTypes::string(), &DataTypes::int());
writer.write_entry("a".into(), 1.into()).unwrap();
writer.write_entry("b".into(), Datum::Null).unwrap();
writer.write_entry("c".into(), 3.into()).unwrap();
let map = writer.complete().unwrap();

let collected: Vec<(Datum, Datum)> = map
.entries()
.collect::<Result<Vec<_>>>()
.expect("entries should decode cleanly");

assert_eq!(collected.len(), 3);
assert_eq!(collected[0], (Datum::from("a"), Datum::from(1i32)));
assert_eq!(collected[1].0, Datum::from("b"));
assert_eq!(collected[1].1, Datum::Null);
assert_eq!(collected[2], (Datum::from("c"), Datum::from(3i32)));
}

#[test]
fn get_finds_present_key_and_returns_none_for_absent() {
let mut writer = FlussMapWriter::new(2, &DataTypes::string(), &DataTypes::int());
writer.write_entry("a".into(), 10.into()).unwrap();
writer.write_entry("b".into(), 20.into()).unwrap();
let map = writer.complete().unwrap();

let v = map.get(&Datum::from("b")).unwrap();
assert_eq!(v, Some(Datum::from(20i32)));

let missing = map.get(&Datum::from("z")).unwrap();
assert!(missing.is_none());
}

#[test]
fn writer_extend_from_iterator_round_trips() {
let src: Vec<(&str, i32)> = vec![("a", 1), ("b", 2), ("c", 3)];
let mut writer = FlussMapWriter::new(src.len(), &DataTypes::string(), &DataTypes::int());
writer.extend(src).unwrap();
let map = writer.complete().unwrap();

assert_eq!(map.size(), 3);
assert_eq!(map.get(&Datum::from("b")).unwrap(), Some(Datum::from(2i32)));
}
}
42 changes: 23 additions & 19 deletions crates/fluss/src/row/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,17 @@ impl InternalRow for ColumnarRow {
writer.complete()
}

fn get_map(&self, pos: usize, key_type: &DataType, value_type: &DataType) -> Result<FlussMap> {
fn get_map(&self, pos: usize) -> Result<FlussMap> {
let expected_type = self.row_type.fields()[pos].data_type();
let map_type = match expected_type {
DataType::Map(m) => m,
_ => {
return Err(IllegalArgument {
message: format!("expected Map type at position {pos}, got {expected_type:?}"),
});
}
};

let column = self.column(pos)?;
let map_arr =
column
Expand All @@ -703,7 +713,11 @@ impl InternalRow for ColumnarRow {
),
})?;

arrow_map_entry_to_fluss_map(&map_arr.value(self.row_id), key_type, value_type)
arrow_map_entry_to_fluss_map(
&map_arr.value(self.row_id),
map_type.key_type(),
map_type.value_type(),
)
}

fn get_row(&self, pos: usize) -> Result<&GenericRow<'_>> {
Expand Down Expand Up @@ -799,7 +813,7 @@ fn arrow_map_entry_to_fluss_map(
write_arrow_values_to_fluss_array(&**values_arrow, value_type, &mut value_writer)?;
let value_array = value_writer.complete()?;

FlussMap::from_arrays(&key_array, &value_array)
FlussMap::from_arrays(&key_array, &value_array, key_type, value_type)
}

/// Downcast to a primitive Arrow array type, then loop with null checks calling a writer method.
Expand Down Expand Up @@ -1560,16 +1574,12 @@ mod tests {
Arc::new(RecordBatch::try_new(schema, vec![Arc::new(map_arr)]).expect("record batch"));

let map_type = DataTypes::map(DataTypes::int(), DataTypes::string());
let row_type = Arc::new(RowType::with_data_types(vec![map_type.clone()]));
let row_type = Arc::new(RowType::with_data_types(vec![map_type]));
let row = ColumnarRow::new(batch, row_type, 0, None);

let (k, v) = match &map_type {
crate::metadata::DataType::Map(m) => (m.key_type(), m.value_type()),
_ => unreachable!(),
};
let fluss_map = row
.get_map(0, k, v)
.expect("get_map should accept non-nullable key from MapType");
.get_map(0)
.expect("get_map should succeed on ColumnarRow");
assert_eq!(fluss_map.size(), 1);
assert_eq!(fluss_map.key_array().get_int(0).unwrap(), 1);
assert_eq!(fluss_map.value_array().get_string(0).unwrap(), "a");
Expand Down Expand Up @@ -1628,9 +1638,7 @@ mod tests {
.get_row(0)
.expect("reading row with Map field must succeed");
assert_eq!(nested.get_int(0).unwrap(), 10);
let inner_map = nested
.get_map(1, &DataTypes::string(), &DataTypes::int())
.expect("nested map should be accessible");
let inner_map = nested.get_map(1).expect("nested map should be accessible");
assert_eq!(inner_map.size(), 1);
assert_eq!(inner_map.key_array().get_string(0).unwrap(), "k1");
assert_eq!(inner_map.value_array().get_int(0).unwrap(), 42);
Expand All @@ -1639,9 +1647,7 @@ mod tests {
row.set_row_id(1);
let nested = row.get_row(0).expect("row 1 must read");
assert_eq!(nested.get_int(0).unwrap(), 20);
let inner_map = nested
.get_map(1, &DataTypes::string(), &DataTypes::int())
.unwrap();
let inner_map = nested.get_map(1).unwrap();
assert_eq!(inner_map.key_array().get_string(0).unwrap(), "k2");
assert_eq!(inner_map.value_array().get_int(0).unwrap(), 7);
}
Expand Down Expand Up @@ -1726,9 +1732,7 @@ mod tests {
)]));
let row = ColumnarRow::new(batch, row_type, 0, None);

let err = row
.get_map(0, &DataTypes::string(), &DataTypes::string())
.expect_err("type mismatch must error");
let err = row.get_map(0).expect_err("type mismatch must error");
let msg = err.to_string();
assert!(
msg.contains("does not match expected Fluss type"),
Expand Down
Loading
Loading