Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions bindings/cpp/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ pub fn resolve_row_types(
Datum::TimestampNtz(ts) => Datum::TimestampNtz(*ts),
Datum::TimestampLtz(ts) => Datum::TimestampLtz(*ts),
Datum::Array(a) => Datum::Array(a.clone()),
Datum::Map(m) => Datum::Map(m.clone()),
Datum::Row(_) => return Err(anyhow!("Row datum is not yet supported in C++ bindings")),
};
out.set_field(idx, resolved);
Expand Down Expand Up @@ -588,6 +589,9 @@ pub fn compacted_row_to_owned(
Datum::Blob(Cow::Owned(row.get_binary(i, dt.length())?.to_vec()))
}
fcore::metadata::DataType::Array(_) => Datum::Array(row.get_array(i)?),
fcore::metadata::DataType::Map(mt) => {
Datum::Map(row.get_map(i, mt.key_type(), mt.value_type())?)
}
other => return Err(anyhow!("Unsupported data type for column {i}: {other:?}")),
};

Expand Down
1 change: 1 addition & 0 deletions bindings/python/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1369,6 +1369,7 @@ fn python_value_to_datum(
}
}
Datum::Array(v) => writer.write_array(i, &v),
Datum::Map(v) => writer.write_map(i, &v),
Datum::Row(_) => {
return Err(FlussError::new_err(
"Row datum is not supported as an array element",
Expand Down
7 changes: 6 additions & 1 deletion crates/fluss/src/client/table/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,12 @@ impl AppendWriter {
/// or dropped for fire-and-forget behavior (use `flush()` to ensure delivery).
pub fn append_arrow_batch(&self, batch: RecordBatch) -> Result<WriteResultFuture> {
let physical_table_path = if self.partition_getter.is_some() && batch.num_rows() > 0 {
let first_row = ColumnarRow::new(Arc::new(batch.clone()), 0, None);
let first_row = ColumnarRow::new(
Arc::new(batch.clone()),
Arc::new(self.table_info.row_type.clone()),
0,
None,
);
Arc::new(get_physical_path(
&self.table_path,
self.partition_getter.as_ref(),
Expand Down
8 changes: 6 additions & 2 deletions crates/fluss/src/client/table/log_fetch_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,11 @@ mod tests {

fn test_read_context() -> Result<ReadContext> {
let row_type = RowType::new(vec![DataField::new("id", DataTypes::int(), None)]);
Ok(ReadContext::new(to_arrow_schema(&row_type)?, false))
Ok(ReadContext::new(
to_arrow_schema(&row_type)?,
Arc::new(row_type),
false,
))
}

struct ErrorPendingFetch {
Expand Down Expand Up @@ -921,7 +925,7 @@ mod tests {

let data = builder.build()?;
let log_records = LogRecordsBatches::new(data.clone());
let read_context = ReadContext::new(to_arrow_schema(&row_type)?, false);
let read_context = ReadContext::new(to_arrow_schema(&row_type)?, Arc::new(row_type), false);
let mut fetch = DefaultCompletedFetch::new(
TableBucket::new(1, 0),
log_records,
Expand Down
42 changes: 30 additions & 12 deletions crates/fluss/src/client/table/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -801,12 +801,20 @@ impl LogFetcher {
.collect(),
)),
};
let read_context =
Self::create_read_context(full_arrow_schema.clone(), projected_fields.clone(), false)?
.with_fluss_row_type(projected_row_type.clone());
let remote_read_context =
Self::create_read_context(full_arrow_schema, projected_fields.clone(), true)?
.with_fluss_row_type(projected_row_type);
let read_context = Self::create_read_context(
full_arrow_schema.clone(),
projected_row_type.clone(),
projected_fields.clone(),
false,
)?
.with_fluss_row_type(projected_row_type.clone());
let remote_read_context = Self::create_read_context(
full_arrow_schema,
projected_row_type.clone(),
projected_fields.clone(),
true,
)?
.with_fluss_row_type(projected_row_type);

let tmp_dir = TempDir::with_prefix("fluss-remote-logs")?;
let log_fetch_buffer = Arc::new(LogFetchBuffer::new(read_context.clone()));
Expand Down Expand Up @@ -851,14 +859,22 @@ impl LogFetcher {

fn create_read_context(
full_arrow_schema: SchemaRef,
row_type: Arc<RowType>,
projected_fields: Option<Vec<usize>>,
is_from_remote: bool,
) -> Result<ReadContext> {
match projected_fields {
None => Ok(ReadContext::new(full_arrow_schema, is_from_remote)),
Some(fields) => {
ReadContext::with_projection_pushdown(full_arrow_schema, fields, is_from_remote)
}
None => Ok(ReadContext::new(
full_arrow_schema,
row_type,
is_from_remote,
)),
Some(fields) => ReadContext::with_projection_pushdown(
full_arrow_schema,
row_type,
fields,
is_from_remote,
),
}
}

Expand Down Expand Up @@ -1901,7 +1917,8 @@ mod tests {

let data = build_records(&table_info, Arc::new(table_path))?;
let log_records = LogRecordsBatches::new(data.clone());
let read_context = ReadContext::new(to_arrow_schema(table_info.get_row_type())?, false);
let row_type = Arc::new(table_info.get_row_type().clone());
let read_context = ReadContext::new(to_arrow_schema(&row_type)?, row_type, false);
let completed =
DefaultCompletedFetch::new(bucket.clone(), log_records, data.len(), read_context, 0, 0);
fetcher.log_fetch_buffer.add(Box::new(completed));
Expand Down Expand Up @@ -1931,7 +1948,8 @@ mod tests {
let bucket = TableBucket::new(1, 0);
let data = build_records(&table_info, Arc::new(table_path))?;
let log_records = LogRecordsBatches::new(data.clone());
let read_context = ReadContext::new(to_arrow_schema(table_info.get_row_type())?, false);
let row_type = Arc::new(table_info.get_row_type().clone());
let read_context = ReadContext::new(to_arrow_schema(&row_type)?, row_type, false);
let mut completed: Box<dyn CompletedFetch> = Box::new(DefaultCompletedFetch::new(
bucket,
log_records,
Expand Down
86 changes: 78 additions & 8 deletions crates/fluss/src/metadata/datatype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -920,13 +920,36 @@ impl Display for ArrayType {
}
}

#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, Hash)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Hash)]
pub struct MapType {
nullable: bool,
key_type: Box<DataType>,
value_type: Box<DataType>,
}

// Route Deserialize through `with_nullable` so a Serde-built MapType
// collapses to the same canonical form as the constructor (otherwise
// equivalent maps disagree under `PartialEq`).
impl<'de> Deserialize<'de> for MapType {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
#[derive(Deserialize)]
struct Raw {
nullable: bool,
key_type: Box<DataType>,
value_type: Box<DataType>,
}
let raw = Raw::deserialize(deserializer)?;
Ok(MapType::with_nullable(
raw.nullable,
*raw.key_type,
*raw.value_type,
))
}
}

impl MapType {
pub fn new(key_type: DataType, value_type: DataType) -> Self {
Self::with_nullable(true, key_type, value_type)
Expand All @@ -935,7 +958,7 @@ impl MapType {
pub fn with_nullable(nullable: bool, key_type: DataType, value_type: DataType) -> Self {
Self {
nullable,
key_type: Box::new(key_type),
key_type: Box::new(key_type.as_non_nullable()),
value_type: Box::new(value_type),
}
}
Expand Down Expand Up @@ -1452,16 +1475,60 @@ fn test_array_display() {
#[test]
fn test_map_display() {
let map_type = MapType::new(DataTypes::string(), DataTypes::int());
assert_eq!(map_type.to_string(), "MAP<STRING, INT>");
assert_eq!(map_type.to_string(), "MAP<STRING NOT NULL, INT>");

let map_type_non_null = MapType::with_nullable(false, DataTypes::int(), DataTypes::string());
assert_eq!(map_type_non_null.to_string(), "MAP<INT, STRING> NOT NULL");
assert_eq!(
map_type_non_null.to_string(),
"MAP<INT NOT NULL, STRING> NOT NULL"
);

let nested_map = MapType::new(
DataTypes::string(),
DataTypes::map(DataTypes::int(), DataTypes::boolean()),
);
assert_eq!(nested_map.to_string(), "MAP<STRING, MAP<INT, BOOLEAN>>");
assert_eq!(
nested_map.to_string(),
"MAP<STRING NOT NULL, MAP<INT NOT NULL, BOOLEAN>>"
);
}

#[test]
fn test_map_deserialize_normalises_key_nullability() {
let json = r#"{
"nullable": true,
"key_type": {"Int": {"nullable": true}},
"value_type": {"String": {"nullable": true}}
}"#;
let from_json: MapType = serde_json::from_str(json).expect("deserialize");
let from_ctor = MapType::new(DataTypes::int(), DataTypes::string());
assert_eq!(from_json, from_ctor);
assert!(!from_json.key_type().is_nullable());
}

#[test]
fn test_map_deserialize_normalises_nested_map_keys() {
let json = r#"{
"nullable": true,
"key_type": {"String": {"nullable": true}},
"value_type": {"Map": {
"nullable": true,
"key_type": {"Int": {"nullable": true}},
"value_type": {"Boolean": {"nullable": true}}
}}
}"#;
let from_json: MapType = serde_json::from_str(json).expect("deserialize");
let from_ctor = MapType::new(
DataTypes::string(),
DataTypes::map(DataTypes::int(), DataTypes::boolean()),
);
assert_eq!(from_json, from_ctor);
assert!(!from_json.key_type().is_nullable());
let inner = match from_json.value_type() {
DataType::Map(m) => m,
other => panic!("expected nested Map, got {other:?}"),
};
assert!(!inner.key_type().is_nullable());
}

#[test]
Expand Down Expand Up @@ -1497,7 +1564,7 @@ fn test_datatype_display() {
assert_eq!(DataTypes::array(DataTypes::int()).to_string(), "ARRAY<INT>");
assert_eq!(
DataTypes::map(DataTypes::string(), DataTypes::int()).to_string(),
"MAP<STRING, INT>"
"MAP<STRING NOT NULL, INT>"
);
}

Expand Down Expand Up @@ -1525,7 +1592,7 @@ fn test_complex_nested_display() {
]);
assert_eq!(
row_type.to_string(),
"ROW<id INT, tags ARRAY<STRING>, metadata MAP<STRING, STRING>>"
"ROW<id INT, tags ARRAY<STRING>, metadata MAP<STRING NOT NULL, STRING>>"
);
}

Expand All @@ -1547,7 +1614,10 @@ fn test_deeply_nested_types() {
DataTypes::field("y", DataTypes::int()),
]),
));
assert_eq!(nested.to_string(), "ARRAY<MAP<STRING, ROW<x INT, y INT>>>");
assert_eq!(
nested.to_string(),
"ARRAY<MAP<STRING NOT NULL, ROW<x INT, y INT>>>"
);
}

// ============================================================================
Expand Down
Loading
Loading