Skip to content

Commit bcdee8a

Browse files
committed
[rust][row] typed column-vector dispatch with lazy nested ARRAY/MAP/ROW views
1 parent 7d71d6b commit bcdee8a

27 files changed

Lines changed: 2322 additions & 1372 deletions

bindings/cpp/src/lib.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2400,7 +2400,10 @@ mod row_reader {
24002400
validate(row, columns, field, "get_array", |dt| {
24012401
matches!(dt, fcore::metadata::DataType::Array(_))
24022402
})?;
2403-
row.get_array(field).map_err(|e| e.to_string())
2403+
row.get_array(field)
2404+
.map_err(|e| e.to_string())?
2405+
.try_into_binary()
2406+
.map_err(|e| e.to_string())
24042407
}
24052408

24062409
pub fn get_array_element_type(

bindings/cpp/src/types.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -588,8 +588,10 @@ pub fn compacted_row_to_owned(
588588
fcore::metadata::DataType::Binary(dt) => {
589589
Datum::Blob(Cow::Owned(row.get_binary(i, dt.length())?.to_vec()))
590590
}
591-
fcore::metadata::DataType::Array(_) => Datum::Array(row.get_array(i)?),
592-
fcore::metadata::DataType::Map(_) => Datum::Map(row.get_map(i)?),
591+
fcore::metadata::DataType::Array(_) => {
592+
Datum::Array(row.get_array(i)?.try_into_binary()?)
593+
}
594+
fcore::metadata::DataType::Map(_) => Datum::Map(row.get_map(i)?.try_into_binary()?),
593595
other => return Err(anyhow!("Unsupported data type for column {i}: {other:?}")),
594596
};
595597

bindings/python/src/table.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1519,6 +1519,8 @@ pub fn datum_to_python_value(
15191519
DataType::Array(array_type) => {
15201520
let array_data = row
15211521
.get_array(pos)
1522+
.map_err(|e| FlussError::from_core_error(&e))?
1523+
.try_into_binary()
15221524
.map_err(|e| FlussError::from_core_error(&e))?;
15231525

15241526
let element_type = array_type.get_element_type();

crates/fluss/src/client/table/append.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ impl AppendWriter {
131131
Arc::new(self.table_info.row_type.clone()),
132132
0,
133133
None,
134-
);
134+
)?;
135135
Arc::new(get_physical_path(
136136
&self.table_path,
137137
self.partition_getter.as_ref(),

crates/fluss/src/client/table/lookup.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -626,6 +626,7 @@ impl PrefixKeyLookuper {
626626
mod tests {
627627
use super::*;
628628
use crate::metadata::{Column, DataTypes, Schema};
629+
use crate::row::DataGetters;
629630
use crate::row::binary::BinaryWriter;
630631
use crate::row::compacted::CompactedRowWriter;
631632
use arrow::array::Int32Array;

crates/fluss/src/record/arrow.rs

Lines changed: 10 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@ use crate::compression::{
2222
use crate::error::{Error, Result};
2323
use crate::metadata::{DataField, DataType, RowType};
2424
use crate::record::{ChangeType, ScanRecord};
25+
use crate::row::column_vector::TypedBatch;
2526
use crate::row::column_writer::{ColumnWriter, round_up_to_8};
26-
use crate::row::{ColumnarRow, InternalRow, arrow_row_column_indices, fluss_row_column_indices};
27+
use crate::row::{ColumnarRow, InternalRow};
2728
use arrow::array::{ArrayBuilder, ArrayRef};
2829
use arrow::{
2930
array::RecordBatch,
@@ -1667,52 +1668,33 @@ impl Iterator for ArrowLogRecordIterator {
16671668
}
16681669

16691670
pub struct ArrowReader {
1670-
record_batch: Arc<RecordBatch>,
1671-
row_type: Arc<RowType>,
1672-
fluss_row_type: Option<Arc<RowType>>,
1673-
row_column_indices: Arc<[usize]>,
1671+
batch: Arc<TypedBatch>,
16741672
}
16751673

16761674
impl ArrowReader {
16771675
pub fn new(record_batch: Arc<RecordBatch>, row_type: Arc<RowType>) -> Self {
1678-
let row_column_indices = arrow_row_column_indices(&record_batch);
1679-
ArrowReader {
1680-
record_batch,
1681-
row_type,
1682-
fluss_row_type: None,
1683-
row_column_indices,
1684-
}
1676+
Self::new_with_fluss_row_type(record_batch, row_type, None)
16851677
}
16861678

16871679
pub fn new_with_fluss_row_type(
16881680
record_batch: Arc<RecordBatch>,
16891681
row_type: Arc<RowType>,
16901682
fluss_row_type: Option<Arc<RowType>>,
16911683
) -> Self {
1692-
let row_column_indices = match &fluss_row_type {
1693-
Some(rt) => fluss_row_column_indices(rt),
1694-
None => arrow_row_column_indices(&record_batch),
1695-
};
1684+
let schema = fluss_row_type.as_deref().unwrap_or(&row_type);
1685+
let typed = TypedBatch::build(&record_batch, schema)
1686+
.expect("ArrowReader: TypedBatch::build failed — schema mismatch in scan setup");
16961687
ArrowReader {
1697-
record_batch,
1698-
row_type,
1699-
fluss_row_type,
1700-
row_column_indices,
1688+
batch: Arc::new(typed),
17011689
}
17021690
}
17031691

17041692
pub fn row_count(&self) -> usize {
1705-
self.record_batch.num_rows()
1693+
self.batch.num_rows
17061694
}
17071695

17081696
pub fn read(&self, row_id: usize) -> ColumnarRow {
1709-
ColumnarRow::with_indices(
1710-
self.record_batch.clone(),
1711-
self.row_type.clone(),
1712-
row_id,
1713-
self.fluss_row_type.clone(),
1714-
self.row_column_indices.clone(),
1715-
)
1697+
ColumnarRow::from_typed_batch(Arc::clone(&self.batch), row_id)
17161698
}
17171699
}
17181700
pub struct MyVec<T>(pub StreamReader<T>);

crates/fluss/src/record/kv/kv_record_batch.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,7 @@ mod tests {
373373
use crate::metadata::{DataTypes, KvFormat};
374374
use crate::record::kv::test_util::TestReadContext;
375375
use crate::record::kv::{CURRENT_KV_MAGIC_VALUE, KvRecordBatchBuilder};
376-
use crate::row::InternalRow;
376+
use crate::row::DataGetters;
377377
use crate::row::binary::BinaryWriter;
378378

379379
use bytes::{BufMut, BytesMut};

crates/fluss/src/record/kv/kv_record_batch_builder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -498,7 +498,7 @@ mod tests {
498498
#[test]
499499
fn test_builder_with_compacted_row_writer() -> crate::error::Result<()> {
500500
use crate::record::kv::KvRecordBatch;
501-
use crate::row::InternalRow;
501+
use crate::row::DataGetters;
502502

503503
let mut builder = KvRecordBatchBuilder::new(1, 100000, KvFormat::COMPACTED);
504504
builder.set_writer_state(100, 5);

crates/fluss/src/record/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ mod tests {
253253
let row_type = Arc::new(RowType::with_data_types(vec![
254254
crate::metadata::DataType::Int(crate::metadata::IntType::new()),
255255
]));
256-
ColumnarRow::new(Arc::new(batch), row_type, row_id, None)
256+
ColumnarRow::new(Arc::new(batch), row_type, row_id, None).expect("ColumnarRow")
257257
}
258258

259259
#[test]

crates/fluss/src/row/binary_array.rs

Lines changed: 52 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,13 @@ use crate::error::Error::IllegalArgument;
2828
use crate::error::Result;
2929
use crate::metadata::{DataType, RowType};
3030
use crate::row::Decimal;
31-
use crate::row::InternalRow;
3231
use crate::row::binary::{BinaryRowFormat, ValueWriter};
3332
use crate::row::binary_map::FlussMap;
3433
use crate::row::compacted::{CompactedRow, CompactedRowWriter, calculate_bit_set_width_in_bytes};
3534
use crate::row::datum::{Date, Time, TimestampLtz, TimestampNtz};
3635
use crate::row::field_getter::FieldGetter;
36+
use crate::row::view::ArrayView;
37+
use crate::row::{DataGetters, InternalArray, InternalRow};
3738
use bytes::Bytes;
3839
use serde::Serialize;
3940
use std::fmt;
@@ -768,80 +769,79 @@ impl InternalRow for FlussArray {
768769
fn get_field_count(&self) -> usize {
769770
self.size()
770771
}
772+
}
771773

774+
impl InternalArray for FlussArray {
775+
fn size(&self) -> usize {
776+
FlussArray::size(self)
777+
}
778+
}
779+
780+
impl DataGetters for FlussArray {
772781
fn is_null_at(&self, pos: usize) -> Result<bool> {
773-
Ok(self.is_null_at(pos))
782+
Ok(FlussArray::is_null_at(self, pos))
774783
}
775784

776785
fn get_boolean(&self, pos: usize) -> Result<bool> {
777-
self.get_boolean(pos)
786+
FlussArray::get_boolean(self, pos)
778787
}
779788
fn get_byte(&self, pos: usize) -> Result<i8> {
780-
self.get_byte(pos)
789+
FlussArray::get_byte(self, pos)
781790
}
782791
fn get_short(&self, pos: usize) -> Result<i16> {
783-
self.get_short(pos)
792+
FlussArray::get_short(self, pos)
784793
}
785794
fn get_int(&self, pos: usize) -> Result<i32> {
786-
self.get_int(pos)
795+
FlussArray::get_int(self, pos)
787796
}
788797
fn get_long(&self, pos: usize) -> Result<i64> {
789-
self.get_long(pos)
798+
FlussArray::get_long(self, pos)
790799
}
791800
fn get_float(&self, pos: usize) -> Result<f32> {
792-
self.get_float(pos)
801+
FlussArray::get_float(self, pos)
793802
}
794803
fn get_double(&self, pos: usize) -> Result<f64> {
795-
self.get_double(pos)
804+
FlussArray::get_double(self, pos)
796805
}
797806

798807
fn get_char(&self, pos: usize, _length: usize) -> Result<&str> {
799-
self.get_string(pos)
808+
FlussArray::get_string(self, pos)
800809
}
801-
802810
fn get_string(&self, pos: usize) -> Result<&str> {
803-
self.get_string(pos)
811+
FlussArray::get_string(self, pos)
804812
}
805813

806814
fn get_decimal(&self, pos: usize, precision: usize, scale: usize) -> Result<Decimal> {
807-
self.get_decimal(pos, precision as u32, scale as u32)
815+
FlussArray::get_decimal(self, pos, precision as u32, scale as u32)
808816
}
809817

810818
fn get_date(&self, pos: usize) -> Result<Date> {
811-
self.get_date(pos)
819+
FlussArray::get_date(self, pos)
812820
}
813821
fn get_time(&self, pos: usize) -> Result<Time> {
814-
self.get_time(pos)
822+
FlussArray::get_time(self, pos)
815823
}
816824
fn get_timestamp_ntz(&self, pos: usize, precision: u32) -> Result<TimestampNtz> {
817-
self.get_timestamp_ntz(pos, precision)
825+
FlussArray::get_timestamp_ntz(self, pos, precision)
818826
}
819827
fn get_timestamp_ltz(&self, pos: usize, precision: u32) -> Result<TimestampLtz> {
820-
self.get_timestamp_ltz(pos, precision)
828+
FlussArray::get_timestamp_ltz(self, pos, precision)
821829
}
822830

823831
fn get_binary(&self, pos: usize, _length: usize) -> Result<&[u8]> {
824-
self.get_binary(pos)
832+
FlussArray::get_binary(self, pos)
825833
}
826-
827834
fn get_bytes(&self, pos: usize) -> Result<&[u8]> {
828-
self.get_binary(pos)
835+
FlussArray::get_binary(self, pos)
829836
}
830837

831-
fn get_array(&self, pos: usize) -> Result<FlussArray> {
832-
self.get_array(pos)
833-
}
834-
835-
fn get_map(&self, pos: usize) -> Result<FlussMap> {
836-
// FlussArray carries no schema; nested map reads must go through the
837-
// inherent FlussArray::get_map(pos, key_type, value_type).
838-
Err(IllegalArgument {
839-
message: format!(
840-
"InternalRow::get_map is not supported on FlussArray (pos {pos}); \
841-
use FlussArray::get_map(pos, key_type, value_type) directly"
842-
),
843-
})
838+
fn get_array(&self, pos: usize) -> Result<ArrayView<'_>> {
839+
Ok(ArrayView::Binary(FlussArray::get_array(self, pos)?))
844840
}
841+
// get_map and get_row keep the trait default (error) — `FlussArray`
842+
// carries no schema, so trait-style schema-free reads can't construct
843+
// a binary `FlussMap`/`CompactedRow`. Use the inherent
844+
// `FlussArray::get_map` / `FlussArray::get_row` with explicit types.
845845
}
846846

847847
#[cfg(test)]
@@ -852,6 +852,23 @@ mod tests {
852852
use crate::row::compacted::CompactedRowWriter;
853853
use crate::row::{Datum, GenericRow};
854854

855+
#[test]
856+
fn fluss_array_dispatches_through_internal_array_trait() {
857+
let mut writer = FlussArrayWriter::new(3, &DataTypes::int());
858+
writer.write_int(0, 10);
859+
writer.set_null_at(1);
860+
writer.write_int(2, 30);
861+
let arr = writer.complete().unwrap();
862+
863+
let view: &dyn InternalArray = &arr;
864+
assert_eq!(view.size(), 3);
865+
assert!(!view.is_null_at(0).unwrap());
866+
assert!(view.is_null_at(1).unwrap());
867+
assert!(!view.is_null_at(2).unwrap());
868+
assert_eq!(view.get_int(0).unwrap(), 10);
869+
assert_eq!(view.get_int(2).unwrap(), 30);
870+
}
871+
855872
#[test]
856873
fn test_header_calculation() {
857874
assert_eq!(calculate_header_in_bytes(0), 4);
@@ -1169,7 +1186,7 @@ mod tests {
11691186
let r1_tags = r1.get_array(0).unwrap();
11701187
assert_eq!(r1_tags.size(), 3);
11711188
assert_eq!(r1_tags.get_string(0).unwrap(), "x");
1172-
assert!(r1_tags.is_null_at(1));
1189+
assert!(r1_tags.is_null_at(1).unwrap());
11731190
assert_eq!(r1_tags.get_string(2).unwrap(), "z");
11741191
}
11751192

@@ -1207,7 +1224,7 @@ mod tests {
12071224
let bytes = writer.to_bytes();
12081225

12091226
let outer_compacted = CompactedRow::from_bytes(outer_row_type, &bytes);
1210-
let recovered_arr = outer_compacted.get_array(0).unwrap();
1227+
let recovered_arr = outer_compacted.get_array(0).unwrap().expect_binary();
12111228
assert_eq!(recovered_arr.size(), 2);
12121229

12131230
let recovered_r0 = recovered_arr.get_row(0, inner_row_type).unwrap();

0 commit comments

Comments
 (0)