From 9cbdb7eeec7bb28866605c8b82f415c4f0e1831b Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Fri, 30 Jan 2026 23:39:33 +0800 Subject: [PATCH 1/2] support produce partitioned table --- bindings/cpp/src/lib.rs | 7 +- bindings/python/src/table.rs | 6 +- crates/examples/src/example_kv_table.rs | 2 +- .../src/example_partitioned_kv_table.rs | 2 +- crates/examples/src/example_table.rs | 8 +- crates/fluss/src/client/table/append.rs | 55 ++++++-- .../src/client/table/log_fetch_buffer.rs | 2 +- crates/fluss/src/client/table/mod.rs | 4 +- .../src/client/table/partition_getter.rs | 19 ++- crates/fluss/src/client/table/scanner.rs | 13 +- crates/fluss/src/client/table/upsert.rs | 67 +++++---- crates/fluss/src/client/table/writer.rs | 46 ------- crates/fluss/src/client/write/accumulator.rs | 12 +- crates/fluss/src/client/write/batch.rs | 2 +- crates/fluss/src/client/write/mod.rs | 8 +- crates/fluss/src/client/write/sender.rs | 15 +- crates/fluss/src/io/storage.rs | 2 + crates/fluss/src/record/arrow.rs | 46 ++++--- crates/fluss/src/row/column.rs | 36 ++--- .../fluss/src/row/compacted/compacted_row.rs | 17 ++- crates/fluss/src/row/mod.rs | 2 +- crates/fluss/tests/integration/kv_table.rs | 17 +-- .../integration/{table.rs => log_table.rs} | 130 ++++++++++++++++-- .../tests/integration/table_remote_scan.rs | 8 +- crates/fluss/tests/integration/utils.rs | 26 +++- crates/fluss/tests/test_fluss.rs | 2 +- 26 files changed, 344 insertions(+), 210 deletions(-) delete mode 100644 crates/fluss/src/client/table/writer.rs rename crates/fluss/tests/integration/{table.rs => log_table.rs} (89%) diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index 2d377636..bd38a039 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -508,7 +508,10 @@ impl Table { Err(e) => return Err(format!("Failed to create append: {e}")), }; - let writer = table_append.create_writer(); + let writer = match table_append.create_writer() { + Ok(w) => w, + Err(e) => return Err(format!("Failed to create writer: {e}")), + }; let writer = Box::into_raw(Box::new(AppendWriter { inner: writer })); Ok(writer) } @@ -580,7 +583,7 @@ impl AppendWriter { fn append(&mut self, row: &ffi::FfiGenericRow) -> ffi::FfiResult { let generic_row = types::ffi_row_to_core(row); - let result = RUNTIME.block_on(async { self.inner.append(generic_row).await }); + let result = RUNTIME.block_on(async { self.inner.append(&generic_row).await }); match result { Ok(_) => ok_result(), diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs index 40433505..48f09e7a 100644 --- a/bindings/python/src/table.rs +++ b/bindings/python/src/table.rs @@ -68,7 +68,9 @@ impl FlussTable { .new_append() .map_err(|e| FlussError::new_err(e.to_string()))?; - let rust_writer = table_append.create_writer(); + let rust_writer = table_append + .create_writer() + .map_err(|e| FlussError::new_err(e.to_string()))?; let py_writer = AppendWriter::from_core(rust_writer, table_info); @@ -251,7 +253,7 @@ impl AppendWriter { future_into_py(py, async move { inner - .append(generic_row) + .append(&generic_row) .await .map_err(|e| FlussError::new_err(e.to_string())) }) diff --git a/crates/examples/src/example_kv_table.rs b/crates/examples/src/example_kv_table.rs index 032691e0..437da06b 100644 --- a/crates/examples/src/example_kv_table.rs +++ b/crates/examples/src/example_kv_table.rs @@ -16,7 +16,7 @@ // under the License. use clap::Parser; -use fluss::client::{FlussConnection, UpsertWriter}; +use fluss::client::FlussConnection; use fluss::config::Config; use fluss::error::Result; use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; diff --git a/crates/examples/src/example_partitioned_kv_table.rs b/crates/examples/src/example_partitioned_kv_table.rs index a5e76fa3..1b0c3035 100644 --- a/crates/examples/src/example_partitioned_kv_table.rs +++ b/crates/examples/src/example_partitioned_kv_table.rs @@ -16,7 +16,7 @@ // under the License. use clap::Parser; -use fluss::client::{FlussAdmin, FlussConnection, UpsertWriter}; +use fluss::client::{FlussAdmin, FlussConnection}; use fluss::config::Config; use fluss::error::Result; use fluss::metadata::{DataTypes, PartitionSpec, Schema, TableDescriptor, TablePath}; diff --git a/crates/examples/src/example_table.rs b/crates/examples/src/example_table.rs index 92055a70..6c74e635 100644 --- a/crates/examples/src/example_table.rs +++ b/crates/examples/src/example_table.rs @@ -63,13 +63,13 @@ pub async fn main() -> Result<()> { row.set_field(2, 123_456_789_123i64); let table = conn.get_table(&table_path).await?; - let append_writer = table.new_append()?.create_writer(); - let f1 = append_writer.append(row); - row = GenericRow::new(3); + let append_writer = table.new_append()?.create_writer()?; + let f1 = append_writer.append(&row); + let mut row = GenericRow::new(3); row.set_field(0, 233333); row.set_field(1, "tt44"); row.set_field(2, 987_654_321_987i64); - let f2 = append_writer.append(row); + let f2 = append_writer.append(&row); try_join!(f1, f2, append_writer.flush())?; // scan rows diff --git a/crates/fluss/src/client/table/append.rs b/crates/fluss/src/client/table/append.rs index 7fe2023b..ace91a60 100644 --- a/crates/fluss/src/client/table/append.rs +++ b/crates/fluss/src/client/table/append.rs @@ -15,16 +15,16 @@ // specific language governing permissions and limitations // under the License. +use crate::client::table::partition_getter::{PartitionGetter, get_physical_path}; use crate::client::{WriteRecord, WriterClient}; use crate::error::Result; use crate::metadata::{PhysicalTablePath, TableInfo, TablePath}; -use crate::row::GenericRow; +use crate::row::{ColumnarRow, InternalRow}; use arrow::array::RecordBatch; use std::sync::Arc; -#[allow(dead_code)] pub struct TableAppend { - table_path: TablePath, + table_path: Arc, table_info: Arc, writer_client: Arc, } @@ -36,32 +36,48 @@ impl TableAppend { writer_client: Arc, ) -> Self { Self { - table_path, + table_path: Arc::new(table_path), table_info, writer_client, } } - pub fn create_writer(&self) -> AppendWriter { - AppendWriter { - physical_table_path: Arc::new(PhysicalTablePath::of(Arc::new(self.table_path.clone()))), + pub fn create_writer(&self) -> Result { + let partition_getter = if self.table_info.is_partitioned() { + Some(PartitionGetter::new( + self.table_info.row_type(), + Arc::clone(self.table_info.get_partition_keys()), + )?) + } else { + None + }; + + Ok(AppendWriter { + table_path: Arc::clone(&self.table_path), + partition_getter, writer_client: self.writer_client.clone(), table_info: Arc::clone(&self.table_info), - } + }) } } pub struct AppendWriter { - physical_table_path: Arc, + table_path: Arc, + partition_getter: Option, writer_client: Arc, table_info: Arc, } impl AppendWriter { - pub async fn append(&self, row: GenericRow<'_>) -> Result<()> { + pub async fn append(&self, row: &R) -> Result<()> { + let physical_table_path = Arc::new(get_physical_path( + &self.table_path, + self.partition_getter.as_ref(), + row, + )?); let record = WriteRecord::for_append( Arc::clone(&self.table_info), - Arc::clone(&self.physical_table_path), + physical_table_path, self.table_info.schema_id, row, ); @@ -70,10 +86,25 @@ impl AppendWriter { result_handle.result(result) } + /// Appends an Arrow RecordBatch to the table. + /// + /// For partitioned tables, the partition is derived from the **first row** of the batch. + /// Callers must ensure all rows in the batch belong to the same partition. pub async fn append_arrow_batch(&self, batch: RecordBatch) -> Result<()> { + let physical_table_path = if self.partition_getter.is_some() && batch.num_rows() > 0 { + let first_row = ColumnarRow::new(Arc::new(batch.clone())); + Arc::new(get_physical_path( + &self.table_path, + self.partition_getter.as_ref(), + &first_row, + )?) + } else { + Arc::new(PhysicalTablePath::of(Arc::clone(&self.table_path))) + }; + let record = WriteRecord::for_append_record_batch( Arc::clone(&self.table_info), - Arc::clone(&self.physical_table_path), + physical_table_path, self.table_info.schema_id, batch, ); diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs b/crates/fluss/src/client/table/log_fetch_buffer.rs index 78ee065a..b622f19a 100644 --- a/crates/fluss/src/client/table/log_fetch_buffer.rs +++ b/crates/fluss/src/client/table/log_fetch_buffer.rs @@ -917,7 +917,7 @@ mod tests { let mut row = GenericRow::new(2); row.set_field(0, 1_i32); row.set_field(1, "alice"); - let record = WriteRecord::for_append(table_info, physical_table_path, 1, row); + let record = WriteRecord::for_append(table_info, physical_table_path, 1, &row); builder.append(&record)?; let data = builder.build()?; diff --git a/crates/fluss/src/client/table/mod.rs b/crates/fluss/src/client/table/mod.rs index 2fbbbc91..6d54933a 100644 --- a/crates/fluss/src/client/table/mod.rs +++ b/crates/fluss/src/client/table/mod.rs @@ -31,16 +31,14 @@ mod partition_getter; mod remote_log; mod scanner; mod upsert; -mod writer; -use crate::client::table::upsert::TableUpsert; pub use append::{AppendWriter, TableAppend}; pub use lookup::{LookupResult, Lookuper, TableLookup}; pub use remote_log::{ DEFAULT_SCANNER_REMOTE_LOG_DOWNLOAD_THREADS, DEFAULT_SCANNER_REMOTE_LOG_PREFETCH_NUM, }; pub use scanner::{LogScanner, RecordBatchLogScanner, TableScan}; -pub use writer::{TableWriter, UpsertWriter}; +pub use upsert::{TableUpsert, UpsertWriter}; #[allow(dead_code)] pub struct FlussTable<'a> { diff --git a/crates/fluss/src/client/table/partition_getter.rs b/crates/fluss/src/client/table/partition_getter.rs index 1a761068..91368019 100644 --- a/crates/fluss/src/client/table/partition_getter.rs +++ b/crates/fluss/src/client/table/partition_getter.rs @@ -17,12 +17,29 @@ use crate::error::Error::IllegalArgument; use crate::error::Result; -use crate::metadata::{DataType, ResolvedPartitionSpec, RowType}; +use crate::metadata::{DataType, PhysicalTablePath, ResolvedPartitionSpec, RowType, TablePath}; use crate::row::InternalRow; use crate::row::field_getter::FieldGetter; use crate::util::partition; use std::sync::Arc; +/// Get the physical table path for a row, handling partitioned vs non-partitioned tables. +pub fn get_physical_path( + table_path: &Arc, + partition_getter: Option<&PartitionGetter>, + row: &R, +) -> Result { + if let Some(getter) = partition_getter { + let partition = getter.get_partition(row)?; + Ok(PhysicalTablePath::of_partitioned( + Arc::clone(table_path), + Some(partition), + )) + } else { + Ok(PhysicalTablePath::of(Arc::clone(table_path))) + } +} + /// A getter to get partition name from a row. #[allow(dead_code)] pub struct PartitionGetter { diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 14d28411..10e7fff5 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -1516,14 +1516,11 @@ mod tests { }, )?; let physical_table_path = Arc::new(PhysicalTablePath::of(table_path)); - let record = WriteRecord::for_append( - Arc::new(table_info.clone()), - physical_table_path, - 1, - GenericRow { - values: vec![Datum::Int32(1)], - }, - ); + let row = GenericRow { + values: vec![Datum::Int32(1)], + }; + let record = + WriteRecord::for_append(Arc::new(table_info.clone()), physical_table_path, 1, &row); builder.append(&record)?; builder.build() } diff --git a/crates/fluss/src/client/table/upsert.rs b/crates/fluss/src/client/table/upsert.rs index 269d5258..05953971 100644 --- a/crates/fluss/src/client/table/upsert.rs +++ b/crates/fluss/src/client/table/upsert.rs @@ -15,17 +15,16 @@ // specific language governing permissions and limitations // under the License. -use crate::client::table::writer::{DeleteResult, TableWriter, UpsertResult, UpsertWriter}; use crate::client::{RowBytes, WriteFormat, WriteRecord, WriterClient}; use crate::error::Error::IllegalArgument; use crate::error::Result; -use crate::metadata::{PhysicalTablePath, RowType, TableInfo, TablePath}; +use crate::metadata::{RowType, TableInfo, TablePath}; use crate::row::InternalRow; use crate::row::encode::{KeyEncoder, KeyEncoderFactory, RowEncoder, RowEncoderFactory}; use crate::row::field_getter::FieldGetter; use std::sync::Arc; -use crate::client::table::partition_getter::PartitionGetter; +use crate::client::table::partition_getter::{PartitionGetter, get_physical_path}; use bitvec::prelude::bitvec; use bytes::Bytes; @@ -98,7 +97,7 @@ impl TableUpsert { self.partial_update(Some(valid_col_indices)) } - pub fn create_writer(&self) -> Result { + pub fn create_writer(&self) -> Result { UpsertWriterFactory::create( Arc::new(self.table_path.clone()), Arc::new(self.table_info.clone()), @@ -108,10 +107,7 @@ impl TableUpsert { } } -struct UpsertWriterImpl -where - RE: RowEncoder, -{ +pub struct UpsertWriter { table_path: Arc, writer_client: Arc, partition_field_getter: Option, @@ -120,7 +116,7 @@ where // Use primary key encoder as bucket key encoder when None bucket_key_encoder: Option>, write_format: WriteFormat, - row_encoder: RE, + row_encoder: Box, field_getters: Box<[FieldGetter]>, table_info: Arc, } @@ -133,7 +129,7 @@ impl UpsertWriterFactory { table_info: Arc, partial_update_columns: Option>>, writer_client: Arc, - ) -> Result { + ) -> Result { let data_lake_format = &table_info.table_config.get_datalake_format()?; let row_type = table_info.row_type(); let physical_pks = table_info.get_physical_primary_keys(); @@ -173,7 +169,7 @@ impl UpsertWriterFactory { None }; - Ok(UpsertWriterImpl { + Ok(UpsertWriter { table_path, partition_field_getter, writer_client, @@ -181,7 +177,7 @@ impl UpsertWriterFactory { target_columns: partial_update_columns, bucket_key_encoder, write_format, - row_encoder: RowEncoderFactory::create(kv_format, row_type.clone())?, + row_encoder: Box::new(RowEncoderFactory::create(kv_format, row_type.clone())?), field_getters, table_info: table_info.clone(), }) @@ -283,8 +279,7 @@ impl UpsertWriterFactory { } } -#[allow(dead_code)] -impl UpsertWriterImpl { +impl UpsertWriter { fn check_field_count(&self, row: &R) -> Result<()> { let expected = self.table_info.get_row_type().fields().len(); if row.get_field_count() != expected { @@ -317,31 +312,15 @@ impl UpsertWriterImpl { self.row_encoder.finish_row() } - fn get_physical_path(&self, row: &R) -> Result { - if let Some(partition_getter) = &self.partition_field_getter { - let partition = partition_getter.get_partition(row); - Ok(PhysicalTablePath::of_partitioned( - Arc::clone(&self.table_path), - Some(partition?), - )) - } else { - Ok(PhysicalTablePath::of(Arc::clone(&self.table_path))) - } - } -} - -impl TableWriter for UpsertWriterImpl { /// Flush data written that have not yet been sent to the server, forcing the client to send the /// requests to server and blocks on the completion of the requests associated with these /// records. A request is considered completed when it is successfully acknowledged according to /// the CLIENT_WRITER_ACKS configuration option you have specified or else it /// results in an error. - async fn flush(&self) -> Result<()> { + pub async fn flush(&self) -> Result<()> { self.writer_client.flush().await } -} -impl UpsertWriter for UpsertWriterImpl { /// Inserts row into Fluss table if they do not already exist, or updates them if they do exist. /// /// # Arguments @@ -349,7 +328,7 @@ impl UpsertWriter for UpsertWriterImpl { /// /// # Returns /// Ok(UpsertResult) when completed normally - async fn upsert(&mut self, row: &R) -> Result { + pub async fn upsert(&mut self, row: &R) -> Result { self.check_field_count(row)?; let (key, bucket_key) = self.get_keys(row)?; @@ -361,7 +340,11 @@ impl UpsertWriter for UpsertWriterImpl { let write_record = WriteRecord::for_upsert( Arc::clone(&self.table_info), - Arc::new(self.get_physical_path(row)?), + Arc::new(get_physical_path( + &self.table_path, + self.partition_field_getter.as_ref(), + row, + )?), self.table_info.schema_id, key, bucket_key, @@ -384,14 +367,18 @@ impl UpsertWriter for UpsertWriterImpl { /// /// # Returns /// Ok(DeleteResult) when completed normally - async fn delete(&mut self, row: &R) -> Result { + pub async fn delete(&mut self, row: &R) -> Result { self.check_field_count(row)?; let (key, bucket_key) = self.get_keys(row)?; let write_record = WriteRecord::for_upsert( Arc::clone(&self.table_info), - Arc::new(self.get_physical_path(row)?), + Arc::new(get_physical_path( + &self.table_path, + self.partition_field_getter.as_ref(), + row, + )?), self.table_info.schema_id, key, bucket_key, @@ -537,3 +524,13 @@ mod tests { )); } } + +/// The result of upserting a record +/// Currently this is an empty struct to allow for compatible evolution in the future +#[derive(Default)] +pub struct UpsertResult; + +/// The result of deleting a record +/// Currently this is an empty struct to allow for compatible evolution in the future +#[derive(Default)] +pub struct DeleteResult; diff --git a/crates/fluss/src/client/table/writer.rs b/crates/fluss/src/client/table/writer.rs deleted file mode 100644 index ec26ec63..00000000 --- a/crates/fluss/src/client/table/writer.rs +++ /dev/null @@ -1,46 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::row::{GenericRow, InternalRow}; - -use crate::error::Result; - -#[allow(dead_code, async_fn_in_trait)] -pub trait TableWriter { - async fn flush(&self) -> Result<()>; -} - -#[allow(dead_code)] -pub trait AppendWriter: TableWriter { - async fn append(&self, row: GenericRow) -> Result<()>; -} - -#[allow(dead_code, async_fn_in_trait)] -pub trait UpsertWriter: TableWriter { - async fn upsert(&mut self, row: &R) -> Result; - async fn delete(&mut self, row: &R) -> Result; -} - -/// The result of upserting a record -/// Currently this is an empty struct to allow for compatible evolution in the future -#[derive(Default)] -pub struct UpsertResult; - -/// The result of deleting a record -/// Currently this is an empty struct to allow for compatible evolution in the future -#[derive(Default)] -pub struct DeleteResult; diff --git a/crates/fluss/src/client/write/accumulator.rs b/crates/fluss/src/client/write/accumulator.rs index 2a455173..5eae868d 100644 --- a/crates/fluss/src/client/write/accumulator.rs +++ b/crates/fluss/src/client/write/accumulator.rs @@ -599,14 +599,10 @@ mod tests { let physical_table_path = Arc::new(PhysicalTablePath::of(Arc::new(table_path.clone()))); let table_info = Arc::new(build_table_info(table_path.clone(), 1, 1)); let cluster = Arc::new(build_cluster(&table_path, 1, 1)); - let record = WriteRecord::for_append( - table_info, - physical_table_path, - 1, - GenericRow { - values: vec![Datum::Int32(1)], - }, - ); + let row = GenericRow { + values: vec![Datum::Int32(1)], + }; + let record = WriteRecord::for_append(table_info, physical_table_path, 1, &row); accumulator.append(&record, 0, &cluster, false).await?; diff --git a/crates/fluss/src/client/write/batch.rs b/crates/fluss/src/client/write/batch.rs index da30c8ac..c7654730 100644 --- a/crates/fluss/src/client/write/batch.rs +++ b/crates/fluss/src/client/write/batch.rs @@ -426,7 +426,7 @@ mod tests { Arc::clone(&table_info), Arc::clone(&physical_table_path), 1, - row, + &row, ); batch.try_append(&record).unwrap(); } diff --git a/crates/fluss/src/client/write/mod.rs b/crates/fluss/src/client/write/mod.rs index 868b582c..25a0db61 100644 --- a/crates/fluss/src/client/write/mod.rs +++ b/crates/fluss/src/client/write/mod.rs @@ -22,7 +22,7 @@ use crate::client::broadcast::{self as client_broadcast, BatchWriteResult, Broad use crate::error::Error; use crate::metadata::{PhysicalTablePath, TableInfo}; -use crate::row::GenericRow; +use crate::row::InternalRow; pub use accumulator::*; use arrow::array::RecordBatch; use bytes::Bytes; @@ -64,7 +64,7 @@ pub enum Record<'a> { } pub enum LogWriteRecord<'a> { - Generic(GenericRow<'a>), + InternalRow(&'a dyn InternalRow), RecordBatch(Arc), } @@ -112,11 +112,11 @@ impl<'a> WriteRecord<'a> { table_info: Arc, physical_table_path: Arc, schema_id: i32, - row: GenericRow<'a>, + row: &'a dyn InternalRow, ) -> Self { Self { table_info, - record: Record::Log(LogWriteRecord::Generic(row)), + record: Record::Log(LogWriteRecord::InternalRow(row)), physical_table_path, bucket_key: None, schema_id, diff --git a/crates/fluss/src/client/write/sender.rs b/crates/fluss/src/client/write/sender.rs index 6a7dad00..f336d0cd 100644 --- a/crates/fluss/src/client/write/sender.rs +++ b/crates/fluss/src/client/write/sender.rs @@ -103,8 +103,7 @@ impl Sender { if api_error.code == FlussError::PartitionNotExists.code() => { warn!( - "Partition does not exist during metadata update, continuing: {}", - api_error + "Partition does not exist during metadata update, continuing: {api_error}" ); } _ => return Err(e), @@ -664,14 +663,10 @@ mod tests { ) -> Result<(ReadyWriteBatch, crate::client::ResultHandle)> { let table_info = Arc::new(build_table_info(table_path.as_ref().clone(), 1, 1)); let physical_table_path = Arc::new(PhysicalTablePath::of(table_path)); - let record = WriteRecord::for_append( - table_info, - physical_table_path, - 1, - GenericRow { - values: vec![Datum::Int32(1)], - }, - ); + let row = GenericRow { + values: vec![Datum::Int32(1)], + }; + let record = WriteRecord::for_append(table_info, physical_table_path, 1, &row); let result = accumulator.append(&record, 0, &cluster, false).await?; let result_handle = result.result_handle.expect("result handle"); let server = cluster.get_tablet_server(1).expect("server"); diff --git a/crates/fluss/src/io/storage.rs b/crates/fluss/src/io/storage.rs index a3708610..a5735178 100644 --- a/crates/fluss/src/io/storage.rs +++ b/crates/fluss/src/io/storage.rs @@ -19,6 +19,8 @@ use crate::error; use crate::error::Result; use crate::io::FileIOBuilder; use opendal::{Operator, Scheme}; +#[cfg(any(feature = "storage-s3", feature = "storage-oss"))] +use std::collections::HashMap; /// The storage carries all supported storage services in fluss #[derive(Debug)] diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index 726106b7..f6e05957 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -20,7 +20,8 @@ use crate::compression::ArrowCompressionInfo; use crate::error::{Error, Result}; use crate::metadata::{DataType, RowType}; use crate::record::{ChangeType, ScanRecord}; -use crate::row::{ColumnarRow, GenericRow}; +use crate::row::field_getter::FieldGetter; +use crate::row::{ColumnarRow, InternalRow}; use arrow::array::{ ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder, Float32Builder, Float64Builder, Int8Builder, Int16Builder, Int32Builder, Int64Builder, @@ -166,7 +167,7 @@ pub struct MemoryLogRecordsArrowBuilder { pub trait ArrowRecordBatchInnerBuilder: Send + Sync { fn build_arrow_record_batch(&mut self) -> Result>; - fn append(&mut self, row: &GenericRow) -> Result; + fn append(&mut self, row: &dyn InternalRow) -> Result; fn append_batch(&mut self, record_batch: Arc) -> Result; @@ -191,7 +192,7 @@ impl ArrowRecordBatchInnerBuilder for PrebuiltRecordBatchBuilder { Ok(self.arrow_record_batch.as_ref().unwrap().clone()) } - fn append(&mut self, _row: &GenericRow) -> Result { + fn append(&mut self, _row: &dyn InternalRow) -> Result { // append one single row is not supported, return false directly Ok(false) } @@ -229,6 +230,7 @@ impl ArrowRecordBatchInnerBuilder for PrebuiltRecordBatchBuilder { pub struct RowAppendRecordBatchBuilder { table_schema: SchemaRef, arrow_column_builders: Vec>, + field_getters: Box<[FieldGetter]>, records_count: i32, } @@ -240,9 +242,11 @@ impl RowAppendRecordBatchBuilder { .iter() .map(|field| Self::create_builder(field.data_type())) .collect(); + let field_getters = FieldGetter::create_field_getters(row_type); Ok(Self { table_schema: schema_ref.clone(), arrow_column_builders: builders?, + field_getters, records_count: 0, }) } @@ -346,11 +350,12 @@ impl ArrowRecordBatchInnerBuilder for RowAppendRecordBatchBuilder { )?)) } - fn append(&mut self, row: &GenericRow) -> Result { - for (idx, value) in row.values.iter().enumerate() { + fn append(&mut self, row: &dyn InternalRow) -> Result { + for (idx, getter) in self.field_getters.iter().enumerate() { + let datum = getter.get_field(row); let field_type = self.table_schema.field(idx).data_type(); let builder = self.arrow_column_builders.get_mut(idx).unwrap(); - value.append_to(builder.as_mut(), field_type)?; + datum.append_to(builder.as_mut(), field_type)?; } self.records_count += 1; Ok(true) @@ -412,7 +417,9 @@ impl MemoryLogRecordsArrowBuilder { pub fn append(&mut self, record: &WriteRecord) -> Result { match &record.record() { Record::Log(log_write_record) => match log_write_record { - LogWriteRecord::Generic(row) => Ok(self.arrow_record_batch_builder.append(row)?), + LogWriteRecord::InternalRow(row) => { + Ok(self.arrow_record_batch_builder.append(*row)?) + } LogWriteRecord::RecordBatch(record_batch) => Ok(self .arrow_record_batch_builder .append_batch(record_batch.clone())?), @@ -1715,9 +1722,10 @@ mod tests { )]); let mut builder = RowAppendRecordBatchBuilder::new(&row_type)?; let decimal = Decimal::from_big_decimal(BigDecimal::from_str("123.456").unwrap(), 10, 3)?; - builder.append(&GenericRow { + let row = GenericRow { values: vec![Datum::Decimal(decimal)], - })?; + }; + builder.append(&row)?; let batch = builder.build_arrow_record_batch()?; let array = batch .column(0) @@ -1735,9 +1743,10 @@ mod tests { )]); let mut builder = RowAppendRecordBatchBuilder::new(&row_type)?; let decimal = Decimal::from_big_decimal(BigDecimal::from_str("123456.78").unwrap(), 10, 2)?; - let result = builder.append(&GenericRow { + let row = GenericRow { values: vec![Datum::Decimal(decimal)], - }); + }; + let result = builder.append(&row); assert!(result.is_err()); assert!( result @@ -1832,7 +1841,7 @@ mod tests { let mut builder = RowAppendRecordBatchBuilder::new(&row_type)?; // Append rows with various data types - builder.append(&GenericRow { + let row = GenericRow { values: vec![ Datum::Int32(1), Datum::Decimal(Decimal::from_big_decimal( @@ -1851,7 +1860,8 @@ mod tests { // 1609459200000 ms = 2021-01-01 00:00:00 UTC, with 987654 additional nanoseconds Datum::TimestampLtz(TimestampLtz::from_millis_nanos(1609459200000, 987654)?), ], - })?; + }; + builder.append(&row)?; let batch = builder.build_arrow_record_batch()?; @@ -1959,15 +1969,19 @@ mod tests { let mut row = GenericRow::new(2); row.set_field(0, 1_i32); row.set_field(1, "alice"); - let record = - WriteRecord::for_append(Arc::clone(&table_info), physical_table_path.clone(), 1, row); + let record = WriteRecord::for_append( + Arc::clone(&table_info), + physical_table_path.clone(), + 1, + &row, + ); builder.append(&record)?; let mut row2 = GenericRow::new(2); row2.set_field(0, 2_i32); row2.set_field(1, "bob"); let record2 = - WriteRecord::for_append(Arc::clone(&table_info), physical_table_path, 2, row2); + WriteRecord::for_append(Arc::clone(&table_info), physical_table_path, 2, &row2); builder.append(&record2)?; let data = builder.build()?; diff --git a/crates/fluss/src/row/column.rs b/crates/fluss/src/row/column.rs index f48075b3..50db32b1 100644 --- a/crates/fluss/src/row/column.rs +++ b/crates/fluss/src/row/column.rs @@ -248,6 +248,24 @@ impl InternalRow for ColumnarRow { .value(self.row_id) } + fn get_char(&self, pos: usize, _length: usize) -> &str { + self.record_batch + .column(pos) + .as_any() + .downcast_ref::() + .expect("Expected String array for char type") + .value(self.row_id) + } + + fn get_string(&self, pos: usize) -> &str { + self.record_batch + .column(pos) + .as_any() + .downcast_ref::() + .expect("Expected String array.") + .value(self.row_id) + } + fn get_decimal(&self, pos: usize, precision: usize, scale: usize) -> crate::row::Decimal { use arrow::datatypes::DataType; @@ -327,24 +345,6 @@ impl InternalRow for ColumnarRow { ) } - fn get_char(&self, pos: usize, _length: usize) -> &str { - self.record_batch - .column(pos) - .as_any() - .downcast_ref::() - .expect("Expected String array for char type") - .value(self.row_id) - } - - fn get_string(&self, pos: usize) -> &str { - self.record_batch - .column(pos) - .as_any() - .downcast_ref::() - .expect("Expected String array.") - .value(self.row_id) - } - fn get_binary(&self, pos: usize, _length: usize) -> &[u8] { self.record_batch .column(pos) diff --git a/crates/fluss/src/row/compacted/compacted_row.rs b/crates/fluss/src/row/compacted/compacted_row.rs index 35d684db..23222072 100644 --- a/crates/fluss/src/row/compacted/compacted_row.rs +++ b/crates/fluss/src/row/compacted/compacted_row.rs @@ -76,7 +76,6 @@ impl<'a> CompactedRow<'a> { } } -#[allow(dead_code)] impl<'a> InternalRow for CompactedRow<'a> { fn get_field_count(&self) -> usize { self.arity @@ -125,14 +124,6 @@ impl<'a> InternalRow for CompactedRow<'a> { self.decoded_row().get_string(pos) } - fn get_binary(&self, pos: usize, length: usize) -> &[u8] { - self.decoded_row().get_binary(pos, length) - } - - fn get_bytes(&self, pos: usize) -> &[u8] { - self.decoded_row().get_bytes(pos) - } - fn get_decimal(&self, pos: usize, precision: usize, scale: usize) -> crate::row::Decimal { self.decoded_row().get_decimal(pos, precision, scale) } @@ -153,6 +144,14 @@ impl<'a> InternalRow for CompactedRow<'a> { self.decoded_row().get_timestamp_ltz(pos, precision) } + fn get_binary(&self, pos: usize, length: usize) -> &[u8] { + self.decoded_row().get_binary(pos, length) + } + + fn get_bytes(&self, pos: usize) -> &[u8] { + self.decoded_row().get_bytes(pos) + } + fn as_encoded_bytes(&self, write_format: WriteFormat) -> Option<&[u8]> { match write_format { WriteFormat::CompactedKv => Some(self.as_bytes()), diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs index f7c8bec5..276dcca3 100644 --- a/crates/fluss/src/row/mod.rs +++ b/crates/fluss/src/row/mod.rs @@ -55,7 +55,7 @@ impl<'a> BinaryRow<'a> { } // TODO make functions return Result for better error handling -pub trait InternalRow { +pub trait InternalRow: Send + Sync { /// Returns the number of fields in this row fn get_field_count(&self) -> usize; diff --git a/crates/fluss/tests/integration/kv_table.rs b/crates/fluss/tests/integration/kv_table.rs index b2263c29..3691d655 100644 --- a/crates/fluss/tests/integration/kv_table.rs +++ b/crates/fluss/tests/integration/kv_table.rs @@ -34,9 +34,10 @@ static SHARED_FLUSS_CLUSTER: LazyLock>>> mod kv_table_test { use super::SHARED_FLUSS_CLUSTER; use crate::integration::fluss_cluster::FlussTestingCluster; - use crate::integration::utils::{create_table, get_cluster, start_cluster, stop_cluster}; - use fluss::client::UpsertWriter; - use fluss::metadata::{DataTypes, PartitionSpec, Schema, TableDescriptor, TablePath}; + use crate::integration::utils::{ + create_partitions, create_table, get_cluster, start_cluster, stop_cluster, + }; + use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; use fluss::row::{GenericRow, InternalRow}; use std::sync::Arc; @@ -467,15 +468,7 @@ mod kv_table_test { create_table(&admin, &table_path, &table_descriptor).await; // Create partitions for each region before inserting data - for region in &["US", "EU", "APAC"] { - let mut partition_map = std::collections::HashMap::new(); - partition_map.insert("region".to_string(), region.to_string()); - let partition_spec = PartitionSpec::new(partition_map); - admin - .create_partition(&table_path, &partition_spec, false) - .await - .expect("Failed to create partition"); - } + create_partitions(&admin, &table_path, "region", &["US", "EU", "APAC"]).await; let connection = cluster.get_fluss_connection().await; diff --git a/crates/fluss/tests/integration/table.rs b/crates/fluss/tests/integration/log_table.rs similarity index 89% rename from crates/fluss/tests/integration/table.rs rename to crates/fluss/tests/integration/log_table.rs index 6a156740..cb4b30c1 100644 --- a/crates/fluss/tests/integration/table.rs +++ b/crates/fluss/tests/integration/log_table.rs @@ -34,16 +34,20 @@ static SHARED_FLUSS_CLUSTER: LazyLock>>> mod table_test { use super::SHARED_FLUSS_CLUSTER; use crate::integration::fluss_cluster::FlussTestingCluster; - use crate::integration::utils::{create_table, get_cluster, start_cluster, stop_cluster}; + use crate::integration::utils::{ + create_partitions, create_table, get_cluster, start_cluster, stop_cluster, + }; use arrow::array::record_batch; use fluss::client::{FlussTable, TableScan}; use fluss::metadata::{DataTypes, Schema, TableBucket, TableDescriptor, TablePath}; use fluss::record::ScanRecord; - use fluss::row::InternalRow; + use fluss::row::{InternalRow}; use fluss::rpc::message::OffsetSpec; use jiff::Timestamp; use std::collections::HashMap; use std::sync::Arc; + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering; use std::time::Duration; fn before_all() { @@ -91,7 +95,8 @@ mod table_test { let append_writer = table .new_append() .expect("Failed to create append") - .create_writer(); + .create_writer() + .expect("Failed to create writer"); let batch1 = record_batch!(("c1", Int32, [1, 2, 3]), ("c2", Utf8, ["a1", "a2", "a3"])).unwrap(); @@ -217,7 +222,8 @@ mod table_test { .expect("Failed to get table") .new_append() .expect("Failed to create append") - .create_writer(); + .create_writer() + .expect("Failed to create writer"); let batch = record_batch!( ("id", Int32, [1, 2, 3]), @@ -314,7 +320,8 @@ mod table_test { let append_writer = table .new_append() .expect("Failed to create append") - .create_writer(); + .create_writer() + .expect("Failed to create writer"); let batch = record_batch!( ("col_a", Int32, [1, 2, 3]), @@ -472,7 +479,7 @@ mod table_test { .is_empty() ); - let writer = table.new_append().unwrap().create_writer(); + let writer = table.new_append().unwrap().create_writer().unwrap(); writer .append_arrow_batch( record_batch!(("id", Int32, [1, 2]), ("name", Utf8, ["a", "b"])).unwrap(), @@ -676,7 +683,8 @@ mod table_test { let append_writer = table .new_append() .expect("Failed to create append") - .create_writer(); + .create_writer() + .expect("Failed to create writer"); // Test data for all datatypes let col_tinyint = 127i8; @@ -749,7 +757,7 @@ mod table_test { row.set_field(27, col_timestamp_ltz_ns_neg.clone()); append_writer - .append(row) + .append(&row) .await .expect("Failed to append row with all datatypes"); @@ -760,7 +768,7 @@ mod table_test { } append_writer - .append(row_with_nulls) + .append(&row_with_nulls) .await .expect("Failed to append row with nulls"); @@ -967,4 +975,108 @@ mod table_test { .await .expect("Failed to drop table"); } + + #[tokio::test] + async fn partitioned_table_append() { + let cluster = get_fluss_cluster(); + let connection = cluster.get_fluss_connection().await; + + let admin = connection.get_admin().await.expect("Failed to get admin"); + + let table_path = TablePath::new( + "fluss".to_string(), + "test_partitioned_log_append".to_string(), + ); + + // Create a partitioned log table + let table_descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("id", DataTypes::int()) + .column("region", DataTypes::string()) + .column("value", DataTypes::bigint()) + .build() + .expect("Failed to build schema"), + ) + .partitioned_by(vec!["region".to_string()]) + .build() + .expect("Failed to build table"); + + create_table(&admin, &table_path, &table_descriptor).await; + + // Create partitions + create_partitions(&admin, &table_path, "region", &["US", "EU"]).await; + + // Wait for partitions to be available + tokio::time::sleep(Duration::from_secs(2)).await; + + let table = connection + .get_table(&table_path) + .await + .expect("Failed to get table"); + + // Create append writer - this should now work for partitioned tables + let append_writer = table + .new_append() + .expect("Failed to create append") + .create_writer() + .expect("Failed to create writer"); + + // Append records with different partitions + let test_data = [ + (1, "US", 100i64), + (2, "US", 200i64), + (3, "EU", 300i64), + (4, "EU", 400i64), + ]; + + for (id, region, value) in &test_data { + let mut row = fluss::row::GenericRow::new(3); + row.set_field(0, *id); + row.set_field(1, *region); + row.set_field(2, *value); + append_writer + .append(&row) + .await + .expect("Failed to append row"); + } + + append_writer.flush().await.expect("Failed to flush"); + + // Test append_arrow_batch for partitioned tables + // Each batch must contain rows from the same partition + let us_batch = record_batch!( + ("id", Int32, [5, 6]), + ("region", Utf8, ["US", "US"]), + ("value", Int64, [500, 600]) + ) + .unwrap(); + append_writer + .append_arrow_batch(us_batch) + .await + .expect("Failed to append US batch"); + + let eu_batch = record_batch!( + ("id", Int32, [7, 8]), + ("region", Utf8, ["EU", "EU"]), + ("value", Int64, [700, 800]) + ) + .unwrap(); + append_writer + .append_arrow_batch(eu_batch) + .await + .expect("Failed to append EU batch"); + + append_writer + .flush() + .await + .expect("Failed to flush batches"); + + admin + .drop_table(&table_path, false) + .await + .expect("Failed to drop table"); + + // todo: add scan test in 203 + } } diff --git a/crates/fluss/tests/integration/table_remote_scan.rs b/crates/fluss/tests/integration/table_remote_scan.rs index c83da0f2..9fa81d5a 100644 --- a/crates/fluss/tests/integration/table_remote_scan.rs +++ b/crates/fluss/tests/integration/table_remote_scan.rs @@ -31,7 +31,7 @@ static SHARED_FLUSS_CLUSTER: LazyLock>>> #[before_all] #[after_all] mod table_remote_scan_test { - use super::SHARED_FLUSS_CLUSTER; + use std :: sync :: atomic :: AtomicUsize ;use super::SHARED_FLUSS_CLUSTER; use crate::integration::fluss_cluster::{FlussTestingCluster, FlussTestingClusterBuilder}; use crate::integration::utils::{ create_table, get_cluster, stop_cluster, wait_for_cluster_ready, @@ -43,7 +43,6 @@ mod table_remote_scan_test { use std::thread; use std::time::Duration; use uuid::Uuid; - fn before_all() { // Create a new tokio runtime in a separate thread let cluster_lock = SHARED_FLUSS_CLUSTER.clone(); @@ -141,7 +140,8 @@ mod table_remote_scan_test { let append_writer = table .new_append() .expect("Failed to create append") - .create_writer(); + .create_writer() + .expect("Failed to create writer"); // append 20 rows, there must be some tiered to remote let record_count = 20; @@ -151,7 +151,7 @@ mod table_remote_scan_test { let v = format!("v{}", i); row.set_field(1, v.as_str()); append_writer - .append(row) + .append(&row) .await .expect("Failed to append row"); } diff --git a/crates/fluss/tests/integration/utils.rs b/crates/fluss/tests/integration/utils.rs index 4d0c349f..fd5145a7 100644 --- a/crates/fluss/tests/integration/utils.rs +++ b/crates/fluss/tests/integration/utils.rs @@ -17,8 +17,9 @@ */ use crate::integration::fluss_cluster::{FlussTestingCluster, FlussTestingClusterBuilder}; use fluss::client::FlussAdmin; -use fluss::metadata::{TableDescriptor, TablePath}; +use fluss::metadata::{PartitionSpec, TableDescriptor, TablePath}; use parking_lot::RwLock; +use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; @@ -102,3 +103,26 @@ pub fn get_cluster(cluster_lock: &RwLock>) -> Arc Date: Sat, 31 Jan 2026 07:27:08 +0800 Subject: [PATCH 2/2] address comments --- crates/fluss/src/record/arrow.rs | 10 ++++++++-- crates/fluss/tests/integration/log_table.rs | 2 +- crates/fluss/tests/integration/table_remote_scan.rs | 3 ++- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index f6e05957..b798896a 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -354,8 +354,14 @@ impl ArrowRecordBatchInnerBuilder for RowAppendRecordBatchBuilder { for (idx, getter) in self.field_getters.iter().enumerate() { let datum = getter.get_field(row); let field_type = self.table_schema.field(idx).data_type(); - let builder = self.arrow_column_builders.get_mut(idx).unwrap(); - datum.append_to(builder.as_mut(), field_type)?; + let builder = + self.arrow_column_builders + .get_mut(idx) + .ok_or_else(|| Error::UnexpectedError { + message: format!("Column builder at index {idx} not found."), + source: None, + })?; + datum.append_to(builder, field_type)?; } self.records_count += 1; Ok(true) diff --git a/crates/fluss/tests/integration/log_table.rs b/crates/fluss/tests/integration/log_table.rs index cb4b30c1..64e6289c 100644 --- a/crates/fluss/tests/integration/log_table.rs +++ b/crates/fluss/tests/integration/log_table.rs @@ -41,7 +41,7 @@ mod table_test { use fluss::client::{FlussTable, TableScan}; use fluss::metadata::{DataTypes, Schema, TableBucket, TableDescriptor, TablePath}; use fluss::record::ScanRecord; - use fluss::row::{InternalRow}; + use fluss::row::InternalRow; use fluss::rpc::message::OffsetSpec; use jiff::Timestamp; use std::collections::HashMap; diff --git a/crates/fluss/tests/integration/table_remote_scan.rs b/crates/fluss/tests/integration/table_remote_scan.rs index 9fa81d5a..ce0c1371 100644 --- a/crates/fluss/tests/integration/table_remote_scan.rs +++ b/crates/fluss/tests/integration/table_remote_scan.rs @@ -31,7 +31,7 @@ static SHARED_FLUSS_CLUSTER: LazyLock>>> #[before_all] #[after_all] mod table_remote_scan_test { - use std :: sync :: atomic :: AtomicUsize ;use super::SHARED_FLUSS_CLUSTER; + use super::SHARED_FLUSS_CLUSTER; use crate::integration::fluss_cluster::{FlussTestingCluster, FlussTestingClusterBuilder}; use crate::integration::utils::{ create_table, get_cluster, stop_cluster, wait_for_cluster_ready, @@ -40,6 +40,7 @@ mod table_remote_scan_test { use fluss::row::{GenericRow, InternalRow}; use std::collections::HashMap; use std::sync::Arc; + use std::sync::atomic::AtomicUsize; use std::thread; use std::time::Duration; use uuid::Uuid;