From bc89d21f197b52322810e078b9c6dce9d702d3d1 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Mon, 19 Jan 2026 20:42:20 +0000 Subject: [PATCH 01/18] UpsertWriterImpl # Conflicts: # crates/fluss/src/record/arrow.rs --- crates/fluss/Cargo.toml | 1 + .../src/client/table/log_fetch_buffer.rs | 6 +- crates/fluss/src/client/table/lookup.rs | 9 ++- crates/fluss/src/client/table/mod.rs | 1 + crates/fluss/src/client/table/writer.rs | 5 +- crates/fluss/src/client/write/batch.rs | 12 ++-- crates/fluss/src/client/write/mod.rs | 19 +++--- crates/fluss/src/metadata/table.rs | 30 ++++----- crates/fluss/src/record/arrow.rs | 62 +++++++++---------- .../src/record/kv/kv_record_read_context.rs | 17 +---- crates/fluss/src/row/encode/mod.rs | 15 +++-- crates/fluss/src/row/mod.rs | 2 +- 12 files changed, 82 insertions(+), 97 deletions(-) diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml index c3bdd447..9aeee72d 100644 --- a/crates/fluss/Cargo.toml +++ b/crates/fluss/Cargo.toml @@ -33,6 +33,7 @@ integration_tests = [] [dependencies] arrow = { workspace = true } arrow-schema = "57.0.0" +bitvec = "1" byteorder = "1.5" futures = "0.3" clap = { workspace = true } diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs b/crates/fluss/src/client/table/log_fetch_buffer.rs index ac44cc17..ca0a2532 100644 --- a/crates/fluss/src/client/table/log_fetch_buffer.rs +++ b/crates/fluss/src/client/table/log_fetch_buffer.rs @@ -651,14 +651,14 @@ mod tests { use crate::compression::{ ArrowCompressionInfo, ArrowCompressionType, DEFAULT_NON_ZSTD_COMPRESSION_LEVEL, }; - use crate::metadata::{DataField, DataTypes, TablePath}; + use crate::metadata::{DataField, DataTypes, RowType, TablePath}; use crate::record::{MemoryLogRecordsArrowBuilder, ReadContext, to_arrow_schema}; use crate::row::GenericRow; use std::sync::Arc; use std::time::Duration; fn test_read_context() -> ReadContext { - let row_type = DataTypes::row(vec![DataField::new( + let row_type = RowType::new(vec![DataField::new( "id".to_string(), DataTypes::int(), None, @@ -714,7 +714,7 @@ mod tests { #[test] fn default_completed_fetch_reads_records() -> Result<()> { - let row_type = DataTypes::row(vec![ + let row_type = RowType::new(vec![ DataField::new("id".to_string(), DataTypes::int(), None), DataField::new("name".to_string(), DataTypes::string(), None), ]); diff --git a/crates/fluss/src/client/table/lookup.rs b/crates/fluss/src/client/table/lookup.rs index 1d32ebd7..cd23503a 100644 --- a/crates/fluss/src/client/table/lookup.rs +++ b/crates/fluss/src/client/table/lookup.rs @@ -22,7 +22,7 @@ use crate::error::{Error, Result}; use crate::metadata::{RowType, TableBucket, TableInfo}; use crate::row::InternalRow; use crate::row::compacted::CompactedRow; -use crate::row::encode::KeyEncoder; +use crate::row::encode::{KeyEncoder, KeyEncoderFactory}; use crate::rpc::ApiError; use crate::rpc::message::LookupRequest; use std::sync::Arc; @@ -130,8 +130,11 @@ impl<'a> TableLookup<'a> { // Create key encoder for the primary key fields let pk_fields = self.table_info.get_physical_primary_keys().to_vec(); - let key_encoder = - ::of(self.table_info.row_type(), pk_fields, data_lake_format)?; + let key_encoder = KeyEncoderFactory::of( + self.table_info.row_type(), + pk_fields.as_slice(), + &data_lake_format, + )?; Ok(Lookuper { conn: self.conn, diff --git a/crates/fluss/src/client/table/mod.rs b/crates/fluss/src/client/table/mod.rs index 7356be23..17ee71b8 100644 --- a/crates/fluss/src/client/table/mod.rs +++ b/crates/fluss/src/client/table/mod.rs @@ -29,6 +29,7 @@ mod lookup; mod log_fetch_buffer; mod remote_log; mod scanner; +mod upsert; mod writer; pub use append::{AppendWriter, TableAppend}; diff --git a/crates/fluss/src/client/table/writer.rs b/crates/fluss/src/client/table/writer.rs index 8a83b5e3..251dcbec 100644 --- a/crates/fluss/src/client/table/writer.rs +++ b/crates/fluss/src/client/table/writer.rs @@ -21,6 +21,7 @@ use std::sync::Arc; use crate::error::Result; use crate::metadata::{TableInfo, TablePath}; +use crate::row::compacted::CompactedRow; #[allow(dead_code)] pub trait TableWriter { @@ -34,8 +35,8 @@ pub trait AppendWriter: TableWriter { #[allow(dead_code)] pub trait UpsertWriter: TableWriter { - async fn upsert(&self, row: GenericRow) -> Result<()>; - async fn delete(&self, row: GenericRow) -> Result<()>; + async fn upsert(&mut self, row: CompactedRow) -> Result<()>; + async fn delete(&mut self, row: CompactedRow) -> Result<()>; } #[allow(dead_code)] diff --git a/crates/fluss/src/client/write/batch.rs b/crates/fluss/src/client/write/batch.rs index 01597538..29f17f8d 100644 --- a/crates/fluss/src/client/write/batch.rs +++ b/crates/fluss/src/client/write/batch.rs @@ -15,14 +15,14 @@ // specific language governing permissions and limitations // under the License. -use crate::BucketId; use crate::client::broadcast::{BatchWriteResult, BroadcastOnce}; use crate::client::{Record, ResultHandle, WriteRecord}; use crate::compression::ArrowCompressionInfo; use crate::error::{Error, Result}; -use crate::metadata::{DataType, KvFormat, TablePath}; -use crate::record::MemoryLogRecordsArrowBuilder; +use crate::metadata::{KvFormat, RowType, TablePath}; use crate::record::kv::KvRecordBatchBuilder; +use crate::record::MemoryLogRecordsArrowBuilder; +use crate::BucketId; use bytes::Bytes; use std::cmp::max; use std::sync::atomic::{AtomicBool, AtomicI32, Ordering}; @@ -192,7 +192,7 @@ impl ArrowLogWriteBatch { table_path: TablePath, schema_id: i32, arrow_compression_info: ArrowCompressionInfo, - row_type: &DataType, + row_type: &RowType, bucket_id: BucketId, create_ms: i64, to_append_record_batch: bool, @@ -284,7 +284,7 @@ impl KvWriteBatch { } }; - let key = kv_write_record.key; + let key = kv_write_record.key.as_ref(); if self.schema_id != write_record.schema_id { return Err(Error::UnexpectedError { @@ -296,7 +296,7 @@ impl KvWriteBatch { }); }; - if self.target_columns.as_deref() != kv_write_record.target_columns { + if self.target_columns.as_deref() != kv_write_record.target_columns.as_deref() { return Err(Error::UnexpectedError { message: format!( "target columns {:?} of the write record to append are not the same as the current target columns {:?} in the batch.", diff --git a/crates/fluss/src/client/write/mod.rs b/crates/fluss/src/client/write/mod.rs index 248218e0..0920b2e0 100644 --- a/crates/fluss/src/client/write/mod.rs +++ b/crates/fluss/src/client/write/mod.rs @@ -24,6 +24,7 @@ use crate::metadata::TablePath; use crate::row::{CompactedRow, GenericRow}; pub use accumulator::*; use arrow::array::RecordBatch; +use bytes::Bytes; use std::sync::Arc; pub(crate) mod broadcast; @@ -40,7 +41,7 @@ pub use writer_client::WriterClient; pub struct WriteRecord<'a> { record: Record<'a>, table_path: Arc, - bucket_key: Option<&'a [u8]>, + bucket_key: Option, schema_id: i32, write_format: WriteFormat, } @@ -63,15 +64,15 @@ pub enum LogWriteRecord<'a> { pub struct KvWriteRecord<'a> { // only valid for primary key table - key: &'a [u8], - target_columns: Option<&'a [usize]>, + key: Bytes, + target_columns: Option>, compacted_row: Option>, } impl<'a> KvWriteRecord<'a> { fn new( - key: &'a [u8], - target_columns: Option<&'a [usize]>, + key: Bytes, + target_columns: Option>, compacted_row: Option>, ) -> Self { KvWriteRecord { @@ -110,15 +111,15 @@ impl<'a> WriteRecord<'a> { pub fn for_upsert( table_path: Arc, schema_id: i32, - bucket_key: &'a [u8], - key: &'a [u8], - target_columns: Option<&'a [usize]>, + bucket_key: Option, + key: Bytes, + target_columns: Option>, row: CompactedRow<'a>, ) -> Self { Self { record: Record::Kv(KvWriteRecord::new(key, target_columns, Some(row))), table_path, - bucket_key: Some(bucket_key), + bucket_key, schema_id, write_format: WriteFormat::CompactedKv, } diff --git a/crates/fluss/src/metadata/table.rs b/crates/fluss/src/metadata/table.rs index da85b0c2..64a99551 100644 --- a/crates/fluss/src/metadata/table.rs +++ b/crates/fluss/src/metadata/table.rs @@ -97,8 +97,7 @@ impl PrimaryKey { pub struct Schema { columns: Vec, primary_key: Option, - // must be Row data type kind - row_type: DataType, + row_type: RowType, } impl Schema { @@ -118,7 +117,7 @@ impl Schema { self.primary_key.as_ref() } - pub fn row_type(&self) -> &DataType { + pub fn row_type(&self) -> &RowType { &self.row_type } @@ -213,7 +212,7 @@ impl SchemaBuilder { Ok(Schema { columns, primary_key: self.primary_key.clone(), - row_type: DataType::Row(RowType::new(data_fields)), + row_type: RowType::new(data_fields), }) } @@ -500,7 +499,7 @@ impl TableDescriptor { bucket_keys.retain(|k| !partition_keys.contains(k)); if bucket_keys.is_empty() { - return Err(Error::InvalidTableError { + return Err(InvalidTableError { message: format!( "Primary Key constraint {:?} should not be same with partition fields {:?}.", schema.primary_key().unwrap().column_names(), @@ -580,7 +579,7 @@ pub enum LogFormat { } impl Display for LogFormat { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { LogFormat::ARROW => { write!(f, "ARROW")?; @@ -612,7 +611,7 @@ pub enum KvFormat { } impl Display for KvFormat { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { KvFormat::COMPACTED => write!(f, "COMPACTED")?, KvFormat::INDEXED => write!(f, "INDEXED")?, @@ -626,7 +625,7 @@ impl KvFormat { match s.to_uppercase().as_str() { "INDEXED" => Ok(KvFormat::INDEXED), "COMPACTED" => Ok(KvFormat::COMPACTED), - _ => Err(Error::InvalidTableError { + _ => Err(InvalidTableError { message: format!("Unknown kv format: {s}"), }), } @@ -692,7 +691,7 @@ pub struct TableInfo { pub table_id: i64, pub schema_id: i32, pub schema: Schema, - pub row_type: DataType, + pub row_type: RowType, pub primary_keys: Vec, pub physical_primary_keys: Vec, pub bucket_keys: Vec, @@ -708,10 +707,7 @@ pub struct TableInfo { impl TableInfo { pub fn row_type(&self) -> &RowType { - match &self.row_type { - DataType::Row(row_type) => row_type, - _ => panic!("should be a row type"), - } + &self.row_type } } @@ -847,7 +843,7 @@ impl TableInfo { &self.schema } - pub fn get_row_type(&self) -> &DataType { + pub fn get_row_type(&self) -> &RowType { &self.row_type } @@ -946,8 +942,8 @@ impl TableInfo { } } -impl fmt::Display for TableInfo { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { +impl Display for TableInfo { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { write!( f, "TableInfo{{ table_path={:?}, table_id={}, schema_id={}, schema={:?}, physical_primary_keys={:?}, bucket_keys={:?}, partition_keys={:?}, num_buckets={}, properties={:?}, custom_properties={:?}, comment={:?}, created_time={}, modified_time={} }}", @@ -998,7 +994,7 @@ impl TableBucket { } impl Display for TableBucket { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { if let Some(partition_id) = self.partition_id { write!( f, diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index 3c46f9b5..108f66c7 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -18,7 +18,7 @@ use crate::client::{LogWriteRecord, Record, WriteRecord}; use crate::compression::ArrowCompressionInfo; use crate::error::{Error, Result}; -use crate::metadata::DataType; +use crate::metadata::{DataType, RowType}; use crate::record::{ChangeType, ScanRecord}; use crate::row::{ColumnarRow, GenericRow}; use arrow::array::{ @@ -48,7 +48,9 @@ use std::{ sync::Arc, }; +use crate::error::Error::IllegalArgument; use arrow::ipc::writer::IpcWriteOptions; + /// const for record batch pub const BASE_OFFSET_LENGTH: usize = 8; pub const LENGTH_LENGTH: usize = 4; @@ -171,7 +173,7 @@ pub struct RowAppendRecordBatchBuilder { } impl RowAppendRecordBatchBuilder { - pub fn new(row_type: &DataType) -> Self { + pub fn new(row_type: &RowType) -> Self { let schema_ref = to_arrow_schema(row_type); let builders = Mutex::new( schema_ref @@ -251,7 +253,7 @@ impl ArrowRecordBatchInnerBuilder for RowAppendRecordBatchBuilder { impl MemoryLogRecordsArrowBuilder { pub fn new( schema_id: i32, - row_type: &DataType, + row_type: &RowType, to_append_record_batch: bool, arrow_compression_info: ArrowCompressionInfo, ) -> Self { @@ -329,7 +331,7 @@ impl MemoryLogRecordsArrowBuilder { // write arrow batch bytes let mut cursor = Cursor::new(&mut batch_bytes[..]); cursor.set_position(RECORD_BATCH_HEADER_SIZE as u64); - cursor.write_all(real_arrow_batch_bytes).unwrap(); + cursor.write_all(real_arrow_batch_bytes)?; let calcute_crc_bytes = &cursor.get_ref()[SCHEMA_ID_OFFSET..]; // then update crc @@ -562,16 +564,17 @@ impl LogRecordBatch { return Ok(RecordBatch::new_empty(read_context.target_schema.clone())); } - let data = self.data.get(RECORDS_OFFSET..).ok_or_else(|| { - crate::error::Error::UnexpectedError { + let data = self + .data + .get(RECORDS_OFFSET..) + .ok_or_else(|| Error::UnexpectedError { message: format!( "Corrupt log record batch: data length {} is less than RECORDS_OFFSET {}", self.data.len(), RECORDS_OFFSET ), source: None, - } - })?; + })?; read_context.record_batch(data) } } @@ -639,27 +642,20 @@ fn parse_ipc_message( Ok((batch_metadata, body_buffer, message.version())) } -pub fn to_arrow_schema(fluss_schema: &DataType) -> SchemaRef { - match &fluss_schema { - DataType::Row(row_type) => { - let fields: Vec = row_type - .fields() - .iter() - .map(|f| { - Field::new( - f.name(), - to_arrow_type(f.data_type()), - f.data_type().is_nullable(), - ) - }) - .collect(); +pub fn to_arrow_schema(fluss_schema: &RowType) -> SchemaRef { + let fields: Vec = fluss_schema + .fields() + .iter() + .map(|f| { + Field::new( + f.name(), + to_arrow_type(f.data_type()), + f.data_type().is_nullable(), + ) + }) + .collect(); - SchemaRef::new(arrow_schema::Schema::new(fields)) - } - _ => { - panic!("must be row data type.") - } - } + SchemaRef::new(arrow_schema::Schema::new(fields)) } pub fn to_arrow_type(fluss_type: &DataType) -> ArrowDataType { @@ -813,7 +809,7 @@ impl ReadContext { let mut reordering_indexes = Vec::with_capacity(projected_fields.len()); for &original_idx in &projected_fields { let pos = sorted_fields.binary_search(&original_idx).map_err(|_| { - Error::IllegalArgument { + IllegalArgument { message: format!( "Projection index {original_idx} is invalid for the current schema." ), @@ -857,7 +853,7 @@ impl ReadContext { let field_count = schema.fields().len(); for &index in projected_fields { if index >= field_count { - return Err(Error::IllegalArgument { + return Err(IllegalArgument { message: format!( "Projection index {index} is out of bounds for schema with {field_count} fields." ), @@ -869,7 +865,7 @@ impl ReadContext { pub fn project_schema(schema: SchemaRef, projected_fields: &[usize]) -> Result { Ok(SchemaRef::new(schema.project(projected_fields).map_err( - |e| Error::IllegalArgument { + |e| IllegalArgument { message: format!("Invalid projection: {e}"), }, )?)) @@ -1217,14 +1213,14 @@ mod tests { #[test] fn projection_rejects_out_of_bounds_index() { - let row_type = DataTypes::row(vec![ + let row_type = RowType::new(vec![ DataField::new("id".to_string(), DataTypes::int(), None), DataField::new("name".to_string(), DataTypes::string(), None), ]); let schema = to_arrow_schema(&row_type); let result = ReadContext::with_projection_pushdown(schema, vec![0, 2], false); - assert!(matches!(result, Err(Error::IllegalArgument { .. }))); + assert!(matches!(result, Err(IllegalArgument { .. }))); } #[test] diff --git a/crates/fluss/src/record/kv/kv_record_read_context.rs b/crates/fluss/src/record/kv/kv_record_read_context.rs index fe6c6f05..92363216 100644 --- a/crates/fluss/src/record/kv/kv_record_read_context.rs +++ b/crates/fluss/src/record/kv/kv_record_read_context.rs @@ -18,7 +18,7 @@ //! Default implementation of ReadContext with decoder caching. use super::ReadContext; -use crate::error::{Error, Result}; +use crate::error::Result; use crate::metadata::{KvFormat, Schema}; use crate::row::{RowDecoder, RowDecoderFactory}; use std::collections::HashMap; @@ -85,20 +85,7 @@ impl ReadContext for KvRecordReadContext { // Build decoder outside the lock to avoid blocking other threads let schema = self.schema_getter.get_schema(schema_id)?; - let row_type = match schema.row_type() { - crate::metadata::DataType::Row(row_type) => row_type.clone(), - other => { - return Err(Error::IoUnexpectedError { - message: format!( - "Schema {schema_id} has invalid row type: expected Row, got {other:?}" - ), - source: std::io::Error::new( - std::io::ErrorKind::InvalidData, - "Invalid row type", - ), - }); - } - }; + let row_type = schema.row_type().clone(); // Create decoder outside lock let decoder = RowDecoderFactory::create(self.kv_format.clone(), row_type)?; diff --git a/crates/fluss/src/row/encode/mod.rs b/crates/fluss/src/row/encode/mod.rs index c294ecf1..6470da5e 100644 --- a/crates/fluss/src/row/encode/mod.rs +++ b/crates/fluss/src/row/encode/mod.rs @@ -31,8 +31,9 @@ pub trait KeyEncoder { fn encode_key(&mut self, row: &dyn InternalRow) -> Result; } -#[allow(dead_code)] -impl dyn KeyEncoder { +pub struct KeyEncoderFactory; + +impl KeyEncoderFactory { /// Create a key encoder to encode the key bytes of the input row. /// # Arguments /// * `row_type` - the row type of the input row @@ -43,23 +44,21 @@ impl dyn KeyEncoder { /// key encoder pub fn of( row_type: &RowType, - key_fields: Vec, - data_lake_format: Option, + key_fields: &[String], + data_lake_format: &Option, ) -> Result> { match data_lake_format { Some(DataLakeFormat::Paimon) => { unimplemented!("KeyEncoder for Paimon format is currently unimplemented") } Some(DataLakeFormat::Lance) => Ok(Box::new(CompactedKeyEncoder::create_key_encoder( - row_type, - key_fields.as_slice(), + row_type, key_fields, )?)), Some(DataLakeFormat::Iceberg) => { unimplemented!("KeyEncoder for Iceberg format is currently unimplemented") } None => Ok(Box::new(CompactedKeyEncoder::create_key_encoder( - row_type, - key_fields.as_slice(), + row_type, key_fields, )?)), } } diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs index d2f640e4..c9db7e96 100644 --- a/crates/fluss/src/row/mod.rs +++ b/crates/fluss/src/row/mod.rs @@ -23,7 +23,7 @@ mod decimal; pub mod binary; pub mod compacted; pub mod encode; -mod field_getter; +pub mod field_getter; mod row_decoder; pub use column::*; From 8d61145847ed3c36849f4129c3061006f3f9d347 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Sun, 18 Jan 2026 17:13:15 +0000 Subject: [PATCH 02/18] UpsertWriterImpl --- crates/fluss/src/client/table/upsert.rs | 333 ++++++++++++++++++++++++ 1 file changed, 333 insertions(+) create mode 100644 crates/fluss/src/client/table/upsert.rs diff --git a/crates/fluss/src/client/table/upsert.rs b/crates/fluss/src/client/table/upsert.rs new file mode 100644 index 00000000..55ade158 --- /dev/null +++ b/crates/fluss/src/client/table/upsert.rs @@ -0,0 +1,333 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::client::table::writer::{TableWriter, UpsertWriter}; +use crate::client::{WriteFormat, WriteRecord, WriterClient}; +use crate::error::Error::IllegalArgument; +use crate::error::Result; +use crate::metadata::{KvFormat, RowType, TableInfo, TablePath}; +use crate::row::compacted::CompactedRow; +use crate::row::encode::{KeyEncoder, KeyEncoderFactory, RowEncoder, RowEncoderFactory}; +use crate::row::field_getter::FieldGetter; +use crate::row::{BinaryRow, InternalRow}; +use std::sync::Arc; + +use bitvec::prelude::bitvec; +#[allow(dead_code)] +pub struct TableUpsert { + table_path: TablePath, + table_info: TableInfo, + writer_client: Arc, + target_columns: Option>, +} + +#[allow(dead_code)] +impl TableUpsert { + pub fn new( + table_path: TablePath, + table_info: TableInfo, + writer_client: Arc, + ) -> Self { + Self { + table_path, + table_info, + writer_client, + target_columns: None, + } + } + + pub fn partial_update(&self, target_columns: Option>) -> Result { + if let Some(columns) = &target_columns { + let num_columns = self.table_info.row_type().fields().len(); + + if let Some(&invalid_column) = columns.iter().find(|&&col| col >= num_columns) { + return Err(IllegalArgument { + message: format!( + "Invalid target column index: {invalid_column} for table {}. The table only has {num_columns} columns.", + self.table_path + ), + }); + } + } + + Ok(Self { + table_path: self.table_path.clone(), + table_info: self.table_info.clone(), + writer_client: self.writer_client.clone(), + target_columns: target_columns.clone(), + }) + } + + pub fn partial_update_with_column_names(&self, target_column_names: &[&str]) -> Result { + let row_type = self.table_info.row_type(); + let col_indices: Vec<(&str, Option)> = target_column_names + .iter() + .map(|col_name| (*col_name, row_type.get_field_index(col_name))) + .collect(); + + if let Some((missing_name, _)) = col_indices.iter().find(|(_, ix)| ix.is_none()) { + return Err(IllegalArgument { + message: format!( + "Cannot find target column `{}` for table {}.", + missing_name, self.table_path + ), + }); + } + + let valid_col_indices: Vec = col_indices + .into_iter() + .map(|(_, index)| index.unwrap()) + .collect(); + + self.partial_update(Some(valid_col_indices)) + } +} + +#[allow(dead_code)] +struct UpsertWriterImpl +where + RE: RowEncoder, +{ + table_path: Arc, + writer_client: Arc, + // partition_field_getter: Option, + // TODO thread safety + primary_key_encoder: Box, + target_columns: Option>, + bucket_key_encoder: Option>, + kv_format: KvFormat, + write_format: WriteFormat, + row_encoder: RE, + field_getters: Box<[FieldGetter]>, + table_info: TableInfo, +} + +#[allow(dead_code)] +struct UpsertWriterFactory; + +#[allow(dead_code)] +impl UpsertWriterFactory { + pub fn create( + table_path: &Arc, + table_info: &TableInfo, + partial_update_columns: Option<&Arc<[usize]>>, + writer_client: &Arc, + ) -> Result { + let datalake_format = &table_info.table_config.get_datalake_format()?; + let row_type = table_info.row_type(); + let physical_pks = table_info.get_physical_primary_keys(); + + let primary_key_encoder = KeyEncoderFactory::of(row_type, physical_pks, datalake_format)?; + let bucket_key_encoder = if !table_info.is_default_bucket_key() { + Some(KeyEncoderFactory::of( + row_type, + table_info.get_bucket_keys(), + datalake_format, + )?) + } else { + None + }; + + let kv_format = table_info.get_table_config().get_kv_format()?; + let write_format = WriteFormat::from_kv_format(&kv_format)?; + + let field_getters = FieldGetter::create_field_getters(row_type); + + Ok(UpsertWriterImpl { + table_path: Arc::clone(table_path), + writer_client: Arc::clone(writer_client), + primary_key_encoder, + target_columns: partial_update_columns.map(Arc::clone), + bucket_key_encoder, + kv_format: kv_format.clone(), + write_format, + row_encoder: RowEncoderFactory::create(kv_format, row_type)?, + field_getters, + table_info: table_info.clone(), + }) + } + + #[allow(dead_code)] + fn sanity_check( + row_type: RowType, + primary_keys: Vec, + auto_increment_col_names: Vec, + target_columns: Option>, + ) -> Result<()> { + if target_columns.is_none() && auto_increment_col_names.is_empty() { + return Err(IllegalArgument { + message: format!( + "This table has auto increment column {}. Explicitly specifying values for an auto increment column is not allowed. Please Specify non-auto-increment columns as target columns using partialUpdate first.", + auto_increment_col_names.join(", ") + ), + }); + } + + let field_count = row_type.fields().len(); + let target_columns = target_columns.unwrap(); + + let mut target_column_set = bitvec![0; field_count]; + + for &target_index in &target_columns { + target_column_set.insert(target_index, true); + } + + let mut pk_column_set = bitvec![0; field_count]; + + // check the target columns contains the primary key + for primary_key in &primary_keys { + let pk_index = row_type.get_field_index(primary_key.as_str()); + match pk_index { + Some(pk_index) => { + pk_column_set.insert(pk_index, true); + } + None => { + return Err(IllegalArgument { + message: format!( + "The target write columns {} must contain the primary key columns {}", + row_type + .project(target_columns.as_slice())? + .get_field_names() + .join(", "), + primary_keys.join(", ") + ), + }); + } + } + } + + let mut auto_increment_column_set = bitvec![0; field_count]; + // explicitly specifying values for an auto increment column is not allowed + for auto_increment_col_name in &auto_increment_col_names { + let auto_increment_field_index = + row_type.get_field_index(auto_increment_col_name.as_str()); + + if let Some(index) = auto_increment_field_index { + if target_column_set[index] { + return Err(IllegalArgument { + message: format!( + "Explicitly specifying values for the auto increment column {} is not allowed.", + auto_increment_col_name + ), + }); + } + + auto_increment_column_set.insert(index, true); + } + } + + // check the columns not in targetColumns should be nullable + for i in 0..field_count { + // column not in primary key and not in auto increment column + if !pk_column_set[i] && !auto_increment_column_set[i] { + // the column should be nullable + if !row_type.fields().get(i).unwrap().data_type.is_nullable() { + return Err(IllegalArgument { + message: format!( + "Partial Update requres all columns except primary key to be nullable, but column {} is NOT NULL.", + row_type.fields().get(i).unwrap().name() + ), + }); + } + } + } + + Ok(()) + } +} + +impl UpsertWriterImpl {} + +impl TableWriter for UpsertWriterImpl { + /// Flush data written that have not yet been sent to the server, forcing the client to send the + /// requests to server and blocks on the completion of the requests associated with these + /// records. A request is considered completed when it is successfully acknowledged according to + /// the {@link ConfigOptions#CLIENT_WRITER_ACKS} configuration you have specified or else it + /// results in an error. + async fn flush(&self) -> Result<()> { + self.writer_client.flush().await + } +} + +impl UpsertWriter for UpsertWriterImpl { + /// Inserts row into Fluss table if they do not already exist, or updates them if they do exist. + /// + /// # Arguments + /// * row - the row to upsert. + /// + /// # Returns + /// Ok() when completed normally + async fn upsert(&mut self, row: CompactedRow<'_>) -> Result<()> { + self.check_field_count(&row)?; + + let key = self.primary_key_encoder.encode_key(&row)?; + let bucket_key = match &mut self.bucket_key_encoder { + Some(bucket_key_encoder) => Some(bucket_key_encoder.encode_key(&row)?), + None => None, + }; + + let write_record = WriteRecord::for_upsert( + Arc::clone(&self.table_path), + self.table_info.schema_id, + bucket_key, + key, + self.target_columns.clone(), + row, + ); + + self.writer_client.send(&write_record).await?; + + Ok(()) + } + + /// Delete certain row by the input row in Fluss table, the input row must contain the primary + /// key. + /// + /// # Arguments + /// * row - the row to delete. + /// + /// # Returns + /// Ok() when completed normally + async fn delete(&mut self, row: CompactedRow<'_>) -> Result<()> { + self.check_field_count(&row)?; + + let _key = self.primary_key_encoder.encode_key(&row)?; + let _bucket_key = match &mut self.bucket_key_encoder { + Some(bucket_key_encoder) => Some(bucket_key_encoder.encode_key(&row)?), + None => None, + }; + + todo!() + } +} + +impl UpsertWriterImpl { + #[allow(dead_code)] + fn check_field_count(&self, row: &R) -> Result<()> { + let expected = self.table_info.get_row_type().fields().len(); + if row.get_field_count() != expected { + return Err(IllegalArgument { + message: format!( + "The field count of the row does not match the table schema. Expected: {}, Actual: {}", + expected, + row.get_field_count() + ), + }); + } + Ok(()) + } +} From 9810f2d4b88651dfb7cb34f8dcdd44c06c8cc005 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Sun, 18 Jan 2026 17:48:32 +0000 Subject: [PATCH 03/18] Kv Delete implementation --- crates/fluss/src/client/table/upsert.rs | 70 +++++++++++++++---------- crates/fluss/src/client/write/batch.rs | 4 +- crates/fluss/src/client/write/mod.rs | 4 +- 3 files changed, 45 insertions(+), 33 deletions(-) diff --git a/crates/fluss/src/client/table/upsert.rs b/crates/fluss/src/client/table/upsert.rs index 55ade158..c8d9804c 100644 --- a/crates/fluss/src/client/table/upsert.rs +++ b/crates/fluss/src/client/table/upsert.rs @@ -27,6 +27,8 @@ use crate::row::{BinaryRow, InternalRow}; use std::sync::Arc; use bitvec::prelude::bitvec; +use bytes::Bytes; + #[allow(dead_code)] pub struct TableUpsert { table_path: TablePath, @@ -250,7 +252,31 @@ impl UpsertWriterFactory { } } -impl UpsertWriterImpl {} +#[allow(dead_code)] +impl UpsertWriterImpl { + fn check_field_count(&self, row: &R) -> Result<()> { + let expected = self.table_info.get_row_type().fields().len(); + if row.get_field_count() != expected { + return Err(IllegalArgument { + message: format!( + "The field count of the row does not match the table schema. Expected: {}, Actual: {}", + expected, + row.get_field_count() + ), + }); + } + Ok(()) + } + + fn get_keys(&mut self, row: &dyn InternalRow) -> Result<(Bytes, Option)> { + let key = self.primary_key_encoder.encode_key(row)?; + let bucket_key = match &mut self.bucket_key_encoder { + Some(bucket_key_encoder) => Some(bucket_key_encoder.encode_key(row)?), + None => None, + }; + Ok((key, bucket_key)) + } +} impl TableWriter for UpsertWriterImpl { /// Flush data written that have not yet been sent to the server, forcing the client to send the @@ -274,11 +300,7 @@ impl UpsertWriter for UpsertWriterImpl { async fn upsert(&mut self, row: CompactedRow<'_>) -> Result<()> { self.check_field_count(&row)?; - let key = self.primary_key_encoder.encode_key(&row)?; - let bucket_key = match &mut self.bucket_key_encoder { - Some(bucket_key_encoder) => Some(bucket_key_encoder.encode_key(&row)?), - None => None, - }; + let (key, bucket_key) = self.get_keys(&row)?; let write_record = WriteRecord::for_upsert( Arc::clone(&self.table_path), @@ -286,7 +308,7 @@ impl UpsertWriter for UpsertWriterImpl { bucket_key, key, self.target_columns.clone(), - row, + Some(row), ); self.writer_client.send(&write_record).await?; @@ -305,29 +327,19 @@ impl UpsertWriter for UpsertWriterImpl { async fn delete(&mut self, row: CompactedRow<'_>) -> Result<()> { self.check_field_count(&row)?; - let _key = self.primary_key_encoder.encode_key(&row)?; - let _bucket_key = match &mut self.bucket_key_encoder { - Some(bucket_key_encoder) => Some(bucket_key_encoder.encode_key(&row)?), - None => None, - }; + let (key, bucket_key) = self.get_keys(&row)?; - todo!() - } -} + let write_record = WriteRecord::for_upsert( + Arc::clone(&self.table_path), + self.table_info.schema_id, + bucket_key, + key, + self.target_columns.clone(), + None, + ); + + self.writer_client.send(&write_record).await?; -impl UpsertWriterImpl { - #[allow(dead_code)] - fn check_field_count(&self, row: &R) -> Result<()> { - let expected = self.table_info.get_row_type().fields().len(); - if row.get_field_count() != expected { - return Err(IllegalArgument { - message: format!( - "The field count of the row does not match the table schema. Expected: {}, Actual: {}", - expected, - row.get_field_count() - ), - }); - } Ok(()) } -} +} \ No newline at end of file diff --git a/crates/fluss/src/client/write/batch.rs b/crates/fluss/src/client/write/batch.rs index 29f17f8d..d8d18310 100644 --- a/crates/fluss/src/client/write/batch.rs +++ b/crates/fluss/src/client/write/batch.rs @@ -15,14 +15,14 @@ // specific language governing permissions and limitations // under the License. +use crate::BucketId; use crate::client::broadcast::{BatchWriteResult, BroadcastOnce}; use crate::client::{Record, ResultHandle, WriteRecord}; use crate::compression::ArrowCompressionInfo; use crate::error::{Error, Result}; use crate::metadata::{KvFormat, RowType, TablePath}; -use crate::record::kv::KvRecordBatchBuilder; use crate::record::MemoryLogRecordsArrowBuilder; -use crate::BucketId; +use crate::record::kv::KvRecordBatchBuilder; use bytes::Bytes; use std::cmp::max; use std::sync::atomic::{AtomicBool, AtomicI32, Ordering}; diff --git a/crates/fluss/src/client/write/mod.rs b/crates/fluss/src/client/write/mod.rs index 0920b2e0..c380c18a 100644 --- a/crates/fluss/src/client/write/mod.rs +++ b/crates/fluss/src/client/write/mod.rs @@ -114,10 +114,10 @@ impl<'a> WriteRecord<'a> { bucket_key: Option, key: Bytes, target_columns: Option>, - row: CompactedRow<'a>, + row: Option>, ) -> Self { Self { - record: Record::Kv(KvWriteRecord::new(key, target_columns, Some(row))), + record: Record::Kv(KvWriteRecord::new(key, target_columns, row)), table_path, bucket_key, schema_id, From 99996229fafa83a48aa24de244cf8ad64078c189 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Sun, 18 Jan 2026 18:19:59 +0000 Subject: [PATCH 04/18] Wire up sanity check --- crates/fluss/src/client/table/upsert.rs | 35 ++++++++++++++++--------- crates/fluss/src/metadata/table.rs | 24 ++++++++++++++++- 2 files changed, 45 insertions(+), 14 deletions(-) diff --git a/crates/fluss/src/client/table/upsert.rs b/crates/fluss/src/client/table/upsert.rs index c8d9804c..7ebacf7b 100644 --- a/crates/fluss/src/client/table/upsert.rs +++ b/crates/fluss/src/client/table/upsert.rs @@ -129,16 +129,25 @@ impl UpsertWriterFactory { partial_update_columns: Option<&Arc<[usize]>>, writer_client: &Arc, ) -> Result { - let datalake_format = &table_info.table_config.get_datalake_format()?; + let data_lake_format = &table_info.table_config.get_datalake_format()?; let row_type = table_info.row_type(); let physical_pks = table_info.get_physical_primary_keys(); - let primary_key_encoder = KeyEncoderFactory::of(row_type, physical_pks, datalake_format)?; + let names = table_info.get_schema().auto_increment_col_names(); + + Self::sanity_check( + row_type, + &table_info.primary_keys, + names, + partial_update_columns, + )?; + + let primary_key_encoder = KeyEncoderFactory::of(row_type, physical_pks, data_lake_format)?; let bucket_key_encoder = if !table_info.is_default_bucket_key() { Some(KeyEncoderFactory::of( row_type, table_info.get_bucket_keys(), - datalake_format, + data_lake_format, )?) } else { None @@ -165,10 +174,10 @@ impl UpsertWriterFactory { #[allow(dead_code)] fn sanity_check( - row_type: RowType, - primary_keys: Vec, - auto_increment_col_names: Vec, - target_columns: Option>, + row_type: &RowType, + primary_keys: &Vec, + auto_increment_col_names: &Vec, + target_columns: Option<&Arc<[usize]>>, ) -> Result<()> { if target_columns.is_none() && auto_increment_col_names.is_empty() { return Err(IllegalArgument { @@ -184,14 +193,14 @@ impl UpsertWriterFactory { let mut target_column_set = bitvec![0; field_count]; - for &target_index in &target_columns { + for &target_index in target_columns.as_ref() { target_column_set.insert(target_index, true); } let mut pk_column_set = bitvec![0; field_count]; // check the target columns contains the primary key - for primary_key in &primary_keys { + for primary_key in primary_keys { let pk_index = row_type.get_field_index(primary_key.as_str()); match pk_index { Some(pk_index) => { @@ -202,7 +211,7 @@ impl UpsertWriterFactory { message: format!( "The target write columns {} must contain the primary key columns {}", row_type - .project(target_columns.as_slice())? + .project(target_columns)? .get_field_names() .join(", "), primary_keys.join(", ") @@ -214,7 +223,7 @@ impl UpsertWriterFactory { let mut auto_increment_column_set = bitvec![0; field_count]; // explicitly specifying values for an auto increment column is not allowed - for auto_increment_col_name in &auto_increment_col_names { + for auto_increment_col_name in auto_increment_col_names { let auto_increment_field_index = row_type.get_field_index(auto_increment_col_name.as_str()); @@ -240,7 +249,7 @@ impl UpsertWriterFactory { if !row_type.fields().get(i).unwrap().data_type.is_nullable() { return Err(IllegalArgument { message: format!( - "Partial Update requres all columns except primary key to be nullable, but column {} is NOT NULL.", + "Partial Update requires all columns except primary key to be nullable, but column {} is NOT NULL.", row_type.fields().get(i).unwrap().name() ), }); @@ -342,4 +351,4 @@ impl UpsertWriter for UpsertWriterImpl { Ok(()) } -} \ No newline at end of file +} diff --git a/crates/fluss/src/metadata/table.rs b/crates/fluss/src/metadata/table.rs index 64a99551..0076c483 100644 --- a/crates/fluss/src/metadata/table.rs +++ b/crates/fluss/src/metadata/table.rs @@ -16,7 +16,7 @@ // under the License. use crate::compression::ArrowCompressionInfo; -use crate::error::Error::InvalidTableError; +use crate::error::Error::{IllegalArgument, InvalidTableError}; use crate::error::{Error, Result}; use crate::metadata::DataLakeFormat; use crate::metadata::datatype::{DataField, DataType, RowType}; @@ -98,6 +98,7 @@ pub struct Schema { columns: Vec, primary_key: Option, row_type: RowType, + auto_increment_col_names: Vec, } impl Schema { @@ -143,12 +144,17 @@ impl Schema { pub fn column_names(&self) -> Vec<&str> { self.columns.iter().map(|c| c.name.as_str()).collect() } + + pub fn auto_increment_col_names(&self) -> &Vec { + &self.auto_increment_col_names + } } #[derive(Debug, Default)] pub struct SchemaBuilder { columns: Vec, primary_key: Option, + auto_increment_col_names: Vec, } impl SchemaBuilder { @@ -197,6 +203,21 @@ impl SchemaBuilder { self } + /// Declares a column to be auto-incremented. With an auto-increment column in the table, + /// whenever a new row is inserted into the table, the new row will be assigned with the next + /// available value from the auto-increment sequence. A table can have at most one auto + /// increment column. + pub fn enable_auto_increment(mut self, column_name: &str) -> Result { + if !self.auto_increment_col_names.is_empty() { + return Err(IllegalArgument { + message: "Multiple auto increment columns are not supported yet.".to_string(), + }); + } + + self.auto_increment_col_names.push(column_name.to_string()); + Ok(self) + } + pub fn build(&mut self) -> Result { let columns = Self::normalize_columns(&mut self.columns, self.primary_key.as_ref())?; @@ -213,6 +234,7 @@ impl SchemaBuilder { columns, primary_key: self.primary_key.clone(), row_type: RowType::new(data_fields), + auto_increment_col_names: self.auto_increment_col_names.clone(), }) } From e10c7ceaeb233da767e01899d754335dd3f2c4a0 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Sun, 18 Jan 2026 18:51:47 +0000 Subject: [PATCH 05/18] Wire up table upsert --- crates/fluss/src/client/table/mod.rs | 16 ++++++ .../src/client/table/partition_getter.rs | 54 +++++++++++++++++++ crates/fluss/src/client/table/upsert.rs | 4 +- 3 files changed, 72 insertions(+), 2 deletions(-) create mode 100644 crates/fluss/src/client/table/partition_getter.rs diff --git a/crates/fluss/src/client/table/mod.rs b/crates/fluss/src/client/table/mod.rs index 17ee71b8..aa919af0 100644 --- a/crates/fluss/src/client/table/mod.rs +++ b/crates/fluss/src/client/table/mod.rs @@ -27,11 +27,13 @@ mod append; mod lookup; mod log_fetch_buffer; +mod partition_getter; mod remote_log; mod scanner; mod upsert; mod writer; +use crate::client::table::upsert::TableUpsert; pub use append::{AppendWriter, TableAppend}; pub use lookup::{LookupResult, Lookuper, TableLookup}; pub use scanner::{LogScanner, RecordBatchLogScanner, TableScan}; @@ -120,6 +122,20 @@ impl<'a> FlussTable<'a> { self.metadata.clone(), )) } + + pub fn new_upsert(&self) -> Result { + if !self.has_primary_key { + return Err(Error::UnsupportedOperation { + message: "Upsert is only supported for primary key tables".to_string(), + }); + } + + Ok(TableUpsert::new( + self.table_path.clone(), + self.table_info.clone(), + self.conn.get_or_create_writer_client()?, + )) + } } impl<'a> Drop for FlussTable<'a> { diff --git a/crates/fluss/src/client/table/partition_getter.rs b/crates/fluss/src/client/table/partition_getter.rs new file mode 100644 index 00000000..d9ed6928 --- /dev/null +++ b/crates/fluss/src/client/table/partition_getter.rs @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::Error::IllegalArgument; +use crate::error::Result; +use crate::metadata::{DataType, RowType}; +use crate::row::field_getter::FieldGetter; + +#[allow(dead_code)] +pub struct PartitionGetter<'a> { + partitions: Vec<(&'a String, &'a DataType, FieldGetter)>, +} + +#[allow(dead_code)] +impl<'a> PartitionGetter<'a> { + pub fn new(row_type: &'a RowType, partition_keys: &'a Vec) -> Result { + let mut partitions = Vec::with_capacity(partition_keys.len()); + + for partition_key in partition_keys { + if let Some(partition_col_index) = row_type.get_field_index(partition_key.as_str()) { + let data_type = &row_type + .fields() + .get(partition_col_index) + .unwrap() + .data_type; + let field_getter = FieldGetter::create(data_type, partition_col_index); + + partitions.push((partition_key, data_type, field_getter)); + } else { + return Err(IllegalArgument { + message: format!( + "The partition column {partition_key} is not in the row {row_type}." + ), + }); + }; + } + + Ok(Self { partitions }) + } +} diff --git a/crates/fluss/src/client/table/upsert.rs b/crates/fluss/src/client/table/upsert.rs index 7ebacf7b..33639d8b 100644 --- a/crates/fluss/src/client/table/upsert.rs +++ b/crates/fluss/src/client/table/upsert.rs @@ -106,8 +106,8 @@ where { table_path: Arc, writer_client: Arc, - // partition_field_getter: Option, - // TODO thread safety + // TODO: Partitioning + // partition_field_getter: Option>, primary_key_encoder: Box, target_columns: Option>, bucket_key_encoder: Option>, From 3e07be5615c061991bfbfa47c4ed04b78bf34ba2 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Sun, 18 Jan 2026 21:55:19 +0000 Subject: [PATCH 06/18] Wire up table upsert --- crates/fluss/src/client/table/upsert.rs | 76 +++++++++++++++---------- crates/fluss/src/client/table/writer.rs | 4 +- crates/fluss/src/client/write/batch.rs | 2 +- crates/fluss/src/client/write/mod.rs | 6 +- crates/fluss/src/row/encode/mod.rs | 4 +- 5 files changed, 55 insertions(+), 37 deletions(-) diff --git a/crates/fluss/src/client/table/upsert.rs b/crates/fluss/src/client/table/upsert.rs index 33639d8b..8c7005f2 100644 --- a/crates/fluss/src/client/table/upsert.rs +++ b/crates/fluss/src/client/table/upsert.rs @@ -34,7 +34,7 @@ pub struct TableUpsert { table_path: TablePath, table_info: TableInfo, writer_client: Arc, - target_columns: Option>, + target_columns: Option>>, } #[allow(dead_code)] @@ -70,7 +70,7 @@ impl TableUpsert { table_path: self.table_path.clone(), table_info: self.table_info.clone(), writer_client: self.writer_client.clone(), - target_columns: target_columns.clone(), + target_columns: target_columns.map(Arc::new), }) } @@ -97,6 +97,15 @@ impl TableUpsert { self.partial_update(Some(valid_col_indices)) } + + pub fn create_writer(&self) -> Result { + UpsertWriterFactory::create( + Arc::new(self.table_path.clone()), + Arc::new(self.table_info.clone()), + self.target_columns.clone(), + Arc::clone(&self.writer_client), + ) + } } #[allow(dead_code)] @@ -109,13 +118,13 @@ where // TODO: Partitioning // partition_field_getter: Option>, primary_key_encoder: Box, - target_columns: Option>, + target_columns: Option>>, bucket_key_encoder: Option>, kv_format: KvFormat, write_format: WriteFormat, row_encoder: RE, field_getters: Box<[FieldGetter]>, - table_info: TableInfo, + table_info: Arc, } #[allow(dead_code)] @@ -124,10 +133,10 @@ struct UpsertWriterFactory; #[allow(dead_code)] impl UpsertWriterFactory { pub fn create( - table_path: &Arc, - table_info: &TableInfo, - partial_update_columns: Option<&Arc<[usize]>>, - writer_client: &Arc, + table_path: Arc, + table_info: Arc, + partial_update_columns: Option>>, + writer_client: Arc, ) -> Result { let data_lake_format = &table_info.table_config.get_datalake_format()?; let row_type = table_info.row_type(); @@ -139,7 +148,7 @@ impl UpsertWriterFactory { row_type, &table_info.primary_keys, names, - partial_update_columns, + &partial_update_columns, )?; let primary_key_encoder = KeyEncoderFactory::of(row_type, physical_pks, data_lake_format)?; @@ -159,14 +168,14 @@ impl UpsertWriterFactory { let field_getters = FieldGetter::create_field_getters(row_type); Ok(UpsertWriterImpl { - table_path: Arc::clone(table_path), - writer_client: Arc::clone(writer_client), + table_path, + writer_client, primary_key_encoder, - target_columns: partial_update_columns.map(Arc::clone), + target_columns: partial_update_columns, bucket_key_encoder, kv_format: kv_format.clone(), write_format, - row_encoder: RowEncoderFactory::create(kv_format, row_type)?, + row_encoder: RowEncoderFactory::create(kv_format, row_type.clone())?, field_getters, table_info: table_info.clone(), }) @@ -177,23 +186,27 @@ impl UpsertWriterFactory { row_type: &RowType, primary_keys: &Vec, auto_increment_col_names: &Vec, - target_columns: Option<&Arc<[usize]>>, + target_columns: &Option>>, ) -> Result<()> { - if target_columns.is_none() && auto_increment_col_names.is_empty() { - return Err(IllegalArgument { - message: format!( - "This table has auto increment column {}. Explicitly specifying values for an auto increment column is not allowed. Please Specify non-auto-increment columns as target columns using partialUpdate first.", - auto_increment_col_names.join(", ") - ), - }); + if target_columns.is_none() { + if auto_increment_col_names.is_empty() { + return Err(IllegalArgument { + message: format!( + "This table has auto increment column {}. Explicitly specifying values for an auto increment column is not allowed. Please Specify non-auto-increment columns as target columns using partialUpdate first.", + auto_increment_col_names.join(", ") + ), + }); + } + return Ok(()); } let field_count = row_type.fields().len(); - let target_columns = target_columns.unwrap(); let mut target_column_set = bitvec![0; field_count]; - for &target_index in target_columns.as_ref() { + let columns = target_columns.as_ref().unwrap().as_ref(); + + for &target_index in columns { target_column_set.insert(target_index, true); } @@ -204,17 +217,22 @@ impl UpsertWriterFactory { let pk_index = row_type.get_field_index(primary_key.as_str()); match pk_index { Some(pk_index) => { + if !target_column_set[pk_index] { + return Err(IllegalArgument { + message: format!( + "The target write columns {} must contain the primary key columns {}", + row_type.project(columns)?.get_field_names().join(", "), + primary_keys.join(", ") + ), + }); + } pk_column_set.insert(pk_index, true); } None => { return Err(IllegalArgument { message: format!( - "The target write columns {} must contain the primary key columns {}", - row_type - .project(target_columns)? - .get_field_names() - .join(", "), - primary_keys.join(", ") + "The specified primary key {} is not in row type {}", + primary_key, row_type ), }); } diff --git a/crates/fluss/src/client/table/writer.rs b/crates/fluss/src/client/table/writer.rs index 251dcbec..43018fe7 100644 --- a/crates/fluss/src/client/table/writer.rs +++ b/crates/fluss/src/client/table/writer.rs @@ -23,7 +23,7 @@ use crate::error::Result; use crate::metadata::{TableInfo, TablePath}; use crate::row::compacted::CompactedRow; -#[allow(dead_code)] +#[allow(dead_code, async_fn_in_trait)] pub trait TableWriter { async fn flush(&self) -> Result<()>; } @@ -33,7 +33,7 @@ pub trait AppendWriter: TableWriter { async fn append(&self, row: GenericRow) -> Result<()>; } -#[allow(dead_code)] +#[allow(dead_code, async_fn_in_trait)] pub trait UpsertWriter: TableWriter { async fn upsert(&mut self, row: CompactedRow) -> Result<()>; async fn delete(&mut self, row: CompactedRow) -> Result<()>; diff --git a/crates/fluss/src/client/write/batch.rs b/crates/fluss/src/client/write/batch.rs index d8d18310..d2f54e9a 100644 --- a/crates/fluss/src/client/write/batch.rs +++ b/crates/fluss/src/client/write/batch.rs @@ -296,7 +296,7 @@ impl KvWriteBatch { }); }; - if self.target_columns.as_deref() != kv_write_record.target_columns.as_deref() { + if self.target_columns.as_ref() != kv_write_record.target_columns.as_deref() { return Err(Error::UnexpectedError { message: format!( "target columns {:?} of the write record to append are not the same as the current target columns {:?} in the batch.", diff --git a/crates/fluss/src/client/write/mod.rs b/crates/fluss/src/client/write/mod.rs index c380c18a..21de4c6d 100644 --- a/crates/fluss/src/client/write/mod.rs +++ b/crates/fluss/src/client/write/mod.rs @@ -65,14 +65,14 @@ pub enum LogWriteRecord<'a> { pub struct KvWriteRecord<'a> { // only valid for primary key table key: Bytes, - target_columns: Option>, + target_columns: Option>>, compacted_row: Option>, } impl<'a> KvWriteRecord<'a> { fn new( key: Bytes, - target_columns: Option>, + target_columns: Option>>, compacted_row: Option>, ) -> Self { KvWriteRecord { @@ -113,7 +113,7 @@ impl<'a> WriteRecord<'a> { schema_id: i32, bucket_key: Option, key: Bytes, - target_columns: Option>, + target_columns: Option>>, row: Option>, ) -> Self { Self { diff --git a/crates/fluss/src/row/encode/mod.rs b/crates/fluss/src/row/encode/mod.rs index 6470da5e..9e65b644 100644 --- a/crates/fluss/src/row/encode/mod.rs +++ b/crates/fluss/src/row/encode/mod.rs @@ -109,8 +109,8 @@ pub struct RowEncoderFactory {} #[allow(dead_code)] impl RowEncoderFactory { - pub fn create(kv_format: KvFormat, row_type: &RowType) -> Result { - Self::create_for_field_types(kv_format, row_type.clone()) + pub fn create(kv_format: KvFormat, row_type: RowType) -> Result { + Self::create_for_field_types(kv_format, row_type) } pub fn create_for_field_types( From e866991ba3eb2bf31b0184b17cc7c2fbced646b2 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Mon, 19 Jan 2026 12:11:32 +0000 Subject: [PATCH 07/18] Address comments --- .../src/client/table/partition_getter.rs | 2 ++ crates/fluss/src/client/table/upsert.rs | 24 +++++++++++-------- crates/fluss/src/client/table/writer.rs | 14 +++++++++-- 3 files changed, 28 insertions(+), 12 deletions(-) diff --git a/crates/fluss/src/client/table/partition_getter.rs b/crates/fluss/src/client/table/partition_getter.rs index d9ed6928..4529d868 100644 --- a/crates/fluss/src/client/table/partition_getter.rs +++ b/crates/fluss/src/client/table/partition_getter.rs @@ -51,4 +51,6 @@ impl<'a> PartitionGetter<'a> { Ok(Self { partitions }) } + + // TODO Implement get partition } diff --git a/crates/fluss/src/client/table/upsert.rs b/crates/fluss/src/client/table/upsert.rs index 8c7005f2..81c4ff71 100644 --- a/crates/fluss/src/client/table/upsert.rs +++ b/crates/fluss/src/client/table/upsert.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::client::table::writer::{TableWriter, UpsertWriter}; +use crate::client::table::writer::{DeleteResult, TableWriter, UpsertResult, UpsertWriter}; use crate::client::{WriteFormat, WriteRecord, WriterClient}; use crate::error::Error::IllegalArgument; use crate::error::Result; @@ -189,7 +189,7 @@ impl UpsertWriterFactory { target_columns: &Option>>, ) -> Result<()> { if target_columns.is_none() { - if auto_increment_col_names.is_empty() { + if !auto_increment_col_names.is_empty() { return Err(IllegalArgument { message: format!( "This table has auto increment column {}. Explicitly specifying values for an auto increment column is not allowed. Please Specify non-auto-increment columns as target columns using partialUpdate first.", @@ -207,7 +207,7 @@ impl UpsertWriterFactory { let columns = target_columns.as_ref().unwrap().as_ref(); for &target_index in columns { - target_column_set.insert(target_index, true); + target_column_set.set(target_index, true); } let mut pk_column_set = bitvec![0; field_count]; @@ -226,7 +226,7 @@ impl UpsertWriterFactory { ), }); } - pk_column_set.insert(pk_index, true); + pk_column_set.set(pk_index, true); } None => { return Err(IllegalArgument { @@ -255,7 +255,7 @@ impl UpsertWriterFactory { }); } - auto_increment_column_set.insert(index, true); + auto_increment_column_set.set(index, true); } } @@ -324,7 +324,7 @@ impl UpsertWriter for UpsertWriterImpl { /// /// # Returns /// Ok() when completed normally - async fn upsert(&mut self, row: CompactedRow<'_>) -> Result<()> { + async fn upsert(&mut self, row: CompactedRow<'_>) -> Result { self.check_field_count(&row)?; let (key, bucket_key) = self.get_keys(&row)?; @@ -338,9 +338,10 @@ impl UpsertWriter for UpsertWriterImpl { Some(row), ); - self.writer_client.send(&write_record).await?; + let result_handle = self.writer_client.send(&write_record).await?; + let result = result_handle.wait().await?; - Ok(()) + result_handle.result(result).map(|_| UpsertResult) } /// Delete certain row by the input row in Fluss table, the input row must contain the primary @@ -351,7 +352,7 @@ impl UpsertWriter for UpsertWriterImpl { /// /// # Returns /// Ok() when completed normally - async fn delete(&mut self, row: CompactedRow<'_>) -> Result<()> { + async fn delete(&mut self, row: CompactedRow<'_>) -> Result { self.check_field_count(&row)?; let (key, bucket_key) = self.get_keys(&row)?; @@ -367,6 +368,9 @@ impl UpsertWriter for UpsertWriterImpl { self.writer_client.send(&write_record).await?; - Ok(()) + let result_handle = self.writer_client.send(&write_record).await?; + let result = result_handle.wait().await?; + + result_handle.result(result).map(|_| DeleteResult) } } diff --git a/crates/fluss/src/client/table/writer.rs b/crates/fluss/src/client/table/writer.rs index 43018fe7..16edc79e 100644 --- a/crates/fluss/src/client/table/writer.rs +++ b/crates/fluss/src/client/table/writer.rs @@ -35,10 +35,20 @@ pub trait AppendWriter: TableWriter { #[allow(dead_code, async_fn_in_trait)] pub trait UpsertWriter: TableWriter { - async fn upsert(&mut self, row: CompactedRow) -> Result<()>; - async fn delete(&mut self, row: CompactedRow) -> Result<()>; + async fn upsert(&mut self, row: CompactedRow) -> Result; + async fn delete(&mut self, row: CompactedRow) -> Result; } +/// The result of upserting a record +/// Currently this is an empty struct to allow for compatible evolution in the future +#[derive(Default)] +pub struct UpsertResult; + +/// The result of deleting a record +/// Currently this is an empty struct to allow for compatible evolution in the future +#[derive(Default)] +pub struct DeleteResult; + #[allow(dead_code)] pub struct AbstractTableWriter { table_path: Arc, From e8984a5117bbeccd6073f74d454bb6014cd39d0b Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Mon, 19 Jan 2026 12:14:13 +0000 Subject: [PATCH 08/18] Address comments --- crates/fluss/src/client/table/writer.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/fluss/src/client/table/writer.rs b/crates/fluss/src/client/table/writer.rs index 16edc79e..e6df105f 100644 --- a/crates/fluss/src/client/table/writer.rs +++ b/crates/fluss/src/client/table/writer.rs @@ -35,8 +35,8 @@ pub trait AppendWriter: TableWriter { #[allow(dead_code, async_fn_in_trait)] pub trait UpsertWriter: TableWriter { - async fn upsert(&mut self, row: CompactedRow) -> Result; - async fn delete(&mut self, row: CompactedRow) -> Result; + async fn upsert(&mut self, row: CompactedRow<'_>) -> Result; + async fn delete(&mut self, row: CompactedRow<'_>) -> Result; } /// The result of upserting a record From 5e9e7b0b9fed236407b999bc35b428022c5543c0 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Mon, 19 Jan 2026 19:41:51 +0000 Subject: [PATCH 09/18] UpsertWriter accepts InternalRow instead of CompactedRow --- crates/fluss/src/client/table/upsert.rs | 29 ++++--- crates/fluss/src/client/table/writer.rs | 7 +- crates/fluss/src/client/write/batch.rs | 6 +- crates/fluss/src/client/write/mod.rs | 27 +++---- crates/fluss/src/record/kv/kv_record_batch.rs | 11 ++- .../src/record/kv/kv_record_batch_builder.rs | 80 ++++++++++--------- .../fluss/src/row/compacted/compacted_row.rs | 6 +- .../src/row/compacted/compacted_row_writer.rs | 12 +++ .../src/row/encode/compacted_row_encoder.rs | 13 ++- crates/fluss/src/row/encode/mod.rs | 4 +- crates/fluss/src/row/mod.rs | 19 ++++- 11 files changed, 122 insertions(+), 92 deletions(-) diff --git a/crates/fluss/src/client/table/upsert.rs b/crates/fluss/src/client/table/upsert.rs index 81c4ff71..5a598bb4 100644 --- a/crates/fluss/src/client/table/upsert.rs +++ b/crates/fluss/src/client/table/upsert.rs @@ -20,10 +20,9 @@ use crate::client::{WriteFormat, WriteRecord, WriterClient}; use crate::error::Error::IllegalArgument; use crate::error::Result; use crate::metadata::{KvFormat, RowType, TableInfo, TablePath}; -use crate::row::compacted::CompactedRow; +use crate::row::InternalRow; use crate::row::encode::{KeyEncoder, KeyEncoderFactory, RowEncoder, RowEncoderFactory}; use crate::row::field_getter::FieldGetter; -use crate::row::{BinaryRow, InternalRow}; use std::sync::Arc; use bitvec::prelude::bitvec; @@ -281,7 +280,7 @@ impl UpsertWriterFactory { #[allow(dead_code)] impl UpsertWriterImpl { - fn check_field_count(&self, row: &R) -> Result<()> { + fn check_field_count(&self, row: &R) -> Result<()> { let expected = self.table_info.get_row_type().fields().len(); if row.get_field_count() != expected { return Err(IllegalArgument { @@ -303,6 +302,15 @@ impl UpsertWriterImpl { }; Ok((key, bucket_key)) } + + fn encode_row(&mut self, row: &R) -> Result { + self.row_encoder.start_new_row()?; + for (pos, field_getter) in self.field_getters.iter().enumerate() { + let datum = field_getter.get_field(row); + self.row_encoder.encode_field(pos, datum)?; + } + self.row_encoder.finish_row() + } } impl TableWriter for UpsertWriterImpl { @@ -324,10 +332,11 @@ impl UpsertWriter for UpsertWriterImpl { /// /// # Returns /// Ok() when completed normally - async fn upsert(&mut self, row: CompactedRow<'_>) -> Result { - self.check_field_count(&row)?; + async fn upsert(&mut self, row: &R) -> Result { + self.check_field_count(row)?; - let (key, bucket_key) = self.get_keys(&row)?; + let (key, bucket_key) = self.get_keys(row)?; + let row_bytes = self.encode_row(row)?; let write_record = WriteRecord::for_upsert( Arc::clone(&self.table_path), @@ -335,7 +344,7 @@ impl UpsertWriter for UpsertWriterImpl { bucket_key, key, self.target_columns.clone(), - Some(row), + Some(row_bytes), ); let result_handle = self.writer_client.send(&write_record).await?; @@ -352,10 +361,10 @@ impl UpsertWriter for UpsertWriterImpl { /// /// # Returns /// Ok() when completed normally - async fn delete(&mut self, row: CompactedRow<'_>) -> Result { - self.check_field_count(&row)?; + async fn delete(&mut self, row: &R) -> Result { + self.check_field_count(row)?; - let (key, bucket_key) = self.get_keys(&row)?; + let (key, bucket_key) = self.get_keys(row)?; let write_record = WriteRecord::for_upsert( Arc::clone(&self.table_path), diff --git a/crates/fluss/src/client/table/writer.rs b/crates/fluss/src/client/table/writer.rs index e6df105f..82765457 100644 --- a/crates/fluss/src/client/table/writer.rs +++ b/crates/fluss/src/client/table/writer.rs @@ -16,12 +16,11 @@ // under the License. use crate::client::{WriteRecord, WriterClient}; -use crate::row::GenericRow; +use crate::row::{GenericRow, InternalRow}; use std::sync::Arc; use crate::error::Result; use crate::metadata::{TableInfo, TablePath}; -use crate::row::compacted::CompactedRow; #[allow(dead_code, async_fn_in_trait)] pub trait TableWriter { @@ -35,8 +34,8 @@ pub trait AppendWriter: TableWriter { #[allow(dead_code, async_fn_in_trait)] pub trait UpsertWriter: TableWriter { - async fn upsert(&mut self, row: CompactedRow<'_>) -> Result; - async fn delete(&mut self, row: CompactedRow<'_>) -> Result; + async fn upsert(&mut self, row: &R) -> Result; + async fn delete(&mut self, row: &R) -> Result; } /// The result of upserting a record diff --git a/crates/fluss/src/client/write/batch.rs b/crates/fluss/src/client/write/batch.rs index d2f54e9a..15358f33 100644 --- a/crates/fluss/src/client/write/batch.rs +++ b/crates/fluss/src/client/write/batch.rs @@ -307,14 +307,14 @@ impl KvWriteBatch { }); } - let row = kv_write_record.compacted_row.as_ref(); + let row_bytes = kv_write_record.row_bytes(); - if self.is_closed() || !self.kv_batch_builder.has_room_for_row(key, row) { + if self.is_closed() || !self.kv_batch_builder.has_room_for_row(key, row_bytes) { Ok(None) } else { // append successfully self.kv_batch_builder - .append_row(key, row) + .append_row(key, row_bytes) .map_err(|e| Error::UnexpectedError { message: "Failed to append row to KvWriteBatch".to_string(), source: Some(Box::new(e)), diff --git a/crates/fluss/src/client/write/mod.rs b/crates/fluss/src/client/write/mod.rs index 21de4c6d..8407039b 100644 --- a/crates/fluss/src/client/write/mod.rs +++ b/crates/fluss/src/client/write/mod.rs @@ -21,7 +21,7 @@ mod batch; use crate::client::broadcast::{self as client_broadcast, BatchWriteResult, BroadcastOnceReceiver}; use crate::error::Error; use crate::metadata::TablePath; -use crate::row::{CompactedRow, GenericRow}; +use crate::row::GenericRow; pub use accumulator::*; use arrow::array::RecordBatch; use bytes::Bytes; @@ -54,7 +54,7 @@ impl<'a> WriteRecord<'a> { pub enum Record<'a> { Log(LogWriteRecord<'a>), - Kv(KvWriteRecord<'a>), + Kv(KvWriteRecord), } pub enum LogWriteRecord<'a> { @@ -62,25 +62,24 @@ pub enum LogWriteRecord<'a> { RecordBatch(Arc), } -pub struct KvWriteRecord<'a> { - // only valid for primary key table +pub struct KvWriteRecord { key: Bytes, target_columns: Option>>, - compacted_row: Option>, + row_bytes: Option, } -impl<'a> KvWriteRecord<'a> { - fn new( - key: Bytes, - target_columns: Option>>, - compacted_row: Option>, - ) -> Self { +impl KvWriteRecord { + fn new(key: Bytes, target_columns: Option>>, row_bytes: Option) -> Self { KvWriteRecord { key, target_columns, - compacted_row, + row_bytes, } } + + pub fn row_bytes(&self) -> Option<&[u8]> { + self.row_bytes.as_deref() + } } impl<'a> WriteRecord<'a> { @@ -114,10 +113,10 @@ impl<'a> WriteRecord<'a> { bucket_key: Option, key: Bytes, target_columns: Option>>, - row: Option>, + row_bytes: Option, ) -> Self { Self { - record: Record::Kv(KvWriteRecord::new(key, target_columns, row)), + record: Record::Kv(KvWriteRecord::new(key, target_columns, row_bytes)), table_path, bucket_key, schema_id, diff --git a/crates/fluss/src/record/kv/kv_record_batch.rs b/crates/fluss/src/record/kv/kv_record_batch.rs index 32f712f8..eb89d69c 100644 --- a/crates/fluss/src/record/kv/kv_record_batch.rs +++ b/crates/fluss/src/record/kv/kv_record_batch.rs @@ -370,12 +370,12 @@ impl Iterator for KvRecordIterator { #[cfg(test)] mod tests { use super::*; - use crate::metadata::{DataTypes, KvFormat, RowType}; + use crate::metadata::{DataTypes, KvFormat}; use crate::record::kv::test_util::TestReadContext; use crate::record::kv::{CURRENT_KV_MAGIC_VALUE, KvRecordBatchBuilder}; use crate::row::InternalRow; use crate::row::binary::BinaryWriter; - use crate::row::compacted::CompactedRow; + use bytes::{BufMut, BytesMut}; #[test] @@ -417,12 +417,11 @@ mod tests { let mut value1_writer = CompactedRowWriter::new(1); value1_writer.write_bytes(&[1, 2, 3, 4, 5]); - let row_type = RowType::with_data_types([DataTypes::bytes()].to_vec()); - let row = &CompactedRow::from_bytes(&row_type, value1_writer.buffer()); - builder.append_row(key1, Some(row)).unwrap(); + let row_bytes = value1_writer.buffer(); + builder.append_row(key1, Some(row_bytes)).unwrap(); let key2 = b"key2"; - builder.append_row::(key2, None).unwrap(); + builder.append_row(key2, None).unwrap(); let bytes = builder.build().unwrap(); diff --git a/crates/fluss/src/record/kv/kv_record_batch_builder.rs b/crates/fluss/src/record/kv/kv_record_batch_builder.rs index e3da8640..0b65500f 100644 --- a/crates/fluss/src/record/kv/kv_record_batch_builder.rs +++ b/crates/fluss/src/record/kv/kv_record_batch_builder.rs @@ -28,7 +28,6 @@ use crate::record::kv::kv_record_batch::{ WRITE_CLIENT_ID_OFFSET, }; use crate::record::kv::{CURRENT_KV_MAGIC_VALUE, NO_BATCH_SEQUENCE, NO_WRITER_ID}; -use crate::row::BinaryRow; use bytes::{Bytes, BytesMut}; use std::io; @@ -88,14 +87,13 @@ impl KvRecordBatchBuilder { } } - /// Check if there is room for a new record containing the given key and row. + /// Check if there is room for a new record containing the given key and row bytes. /// If no records have been appended, this always returns true. - pub fn has_room_for_row(&self, key: &[u8], row: Option<&R>) -> bool { - let value = row.map(|r| r.as_bytes()); - self.size_in_bytes + KvRecord::size_of(key, value) <= self.write_limit + pub fn has_room_for_row(&self, key: &[u8], row_bytes: Option<&[u8]>) -> bool { + self.size_in_bytes + KvRecord::size_of(key, row_bytes) <= self.write_limit } - /// Append a KV record with a row value to the batch. + /// Append a KV record with row bytes to the batch. /// /// Returns an error if: /// - The builder has been aborted @@ -103,7 +101,7 @@ impl KvRecordBatchBuilder { /// - Adding this record would exceed the write limit /// - The maximum number of records is exceeded /// - The KV format is not COMPACTED - pub fn append_row(&mut self, key: &[u8], row: Option<&R>) -> io::Result<()> { + pub fn append_row(&mut self, key: &[u8], row_bytes: Option<&[u8]>) -> io::Result<()> { if self.kv_format != KvFormat::COMPACTED { return Err(io::Error::new( io::ErrorKind::InvalidInput, @@ -134,8 +132,7 @@ impl KvRecordBatchBuilder { )); } - let value = row.map(|r| r.as_bytes()); - let record_size = KvRecord::size_of(key, value); + let record_size = KvRecord::size_of(key, row_bytes); if self.size_in_bytes + record_size > self.write_limit { return Err(io::Error::new( io::ErrorKind::WriteZero, @@ -146,7 +143,7 @@ impl KvRecordBatchBuilder { )); } - let record_byte_size = KvRecord::write_to_buf(&mut self.buffer, key, value)?; + let record_byte_size = KvRecord::write_to_buf(&mut self.buffer, key, row_bytes)?; debug_assert_eq!(record_byte_size, record_size, "Record size mismatch"); self.current_record_number += 1; @@ -349,12 +346,12 @@ mod tests { let key1 = b"key1"; let value1 = create_test_row(b"value1"); - assert!(builder.has_room_for_row(key1, Some(&value1))); - builder.append_row(key1, Some(&value1)).unwrap(); + assert!(builder.has_room_for_row(key1, Some(value1.as_bytes()))); + builder.append_row(key1, Some(value1.as_bytes())).unwrap(); let key2 = b"key2"; - assert!(builder.has_room_for_row::(key2, None)); - builder.append_row::(key2, None).unwrap(); + assert!(builder.has_room_for_row(key2, None)); + builder.append_row(key2, None).unwrap(); builder.close().unwrap(); assert!(builder.is_closed()); @@ -369,35 +366,34 @@ mod tests { // Test lifecycle: abort behavior let mut builder = KvRecordBatchBuilder::new(1, 4096, KvFormat::COMPACTED); let value = create_test_row(b"value"); - builder.append_row(b"key", Some(&value)).unwrap(); + builder.append_row(b"key", Some(value.as_bytes())).unwrap(); builder.abort(); - assert!(builder.append_row::(b"key2", None).is_err()); + assert!(builder.append_row(b"key2", None).is_err()); assert!(builder.build().is_err()); assert!(builder.close().is_err()); // Test lifecycle: close behavior let mut builder = KvRecordBatchBuilder::new(1, 4096, KvFormat::COMPACTED); let value = create_test_row(b"value"); - builder.append_row(b"key", Some(&value)).unwrap(); + builder.append_row(b"key", Some(value.as_bytes())).unwrap(); builder.close().unwrap(); - assert!(builder.append_row::(b"key2", None).is_err()); + assert!(builder.append_row(b"key2", None).is_err()); assert!(builder.build().is_ok()); // Test KvFormat validation let mut row_writer = CompactedRowWriter::new(1); row_writer.write_int(42); - let row_type = RowType::with_data_types(vec![DataTypes::int()]); - let row = &CompactedRow::from_bytes(&row_type, row_writer.buffer()); + let row_bytes = row_writer.buffer(); // INDEXED format should reject append_row let mut indexed_builder = KvRecordBatchBuilder::new(1, 4096, KvFormat::INDEXED); - let result = indexed_builder.append_row(b"key", Some(row)); + let result = indexed_builder.append_row(b"key", Some(row_bytes)); assert!(result.is_err()); assert_eq!(result.unwrap_err().kind(), io::ErrorKind::InvalidInput); // COMPACTED format should accept append_row let mut compacted_builder = KvRecordBatchBuilder::new(1, 4096, KvFormat::COMPACTED); - let result = compacted_builder.append_row(b"key", Some(row)); + let result = compacted_builder.append_row(b"key", Some(row_bytes)); assert!(result.is_ok()); } @@ -410,15 +406,17 @@ mod tests { let large_key = vec![0u8; 1000]; let large_value = vec![1u8; 1000]; let large_row = create_test_row(&large_value); - assert!(!builder.has_room_for_row(&large_key, Some(&large_row))); + assert!(!builder.has_room_for_row(&large_key, Some(large_row.as_bytes()))); let small_value = create_test_row(b"value"); - assert!(builder.has_room_for_row(b"key", Some(&small_value))); + assert!(builder.has_room_for_row(b"key", Some(small_value.as_bytes()))); // Test append enforcement - add small record first - builder.append_row(b"key", Some(&small_value)).unwrap(); + builder + .append_row(b"key", Some(small_value.as_bytes())) + .unwrap(); // Try to add large record that exceeds limit (reuse large_row from above) - let result = builder.append_row(b"key2", Some(&large_row)); + let result = builder.append_row(b"key2", Some(large_row.as_bytes())); assert!(result.is_err()); assert_eq!(result.unwrap_err().kind(), io::ErrorKind::WriteZero); } @@ -429,10 +427,12 @@ mod tests { builder.current_record_number = i32::MAX - 1; let value1 = create_test_row(b"value1"); - builder.append_row(b"key1", Some(&value1)).unwrap(); + builder + .append_row(b"key1", Some(value1.as_bytes())) + .unwrap(); let value2 = create_test_row(b"value2"); - let result = builder.append_row(b"key2", Some(&value2)); + let result = builder.append_row(b"key2", Some(value2.as_bytes())); assert!(result.is_err()); assert_eq!(result.unwrap_err().kind(), io::ErrorKind::InvalidInput); } @@ -452,13 +452,17 @@ mod tests { builder.set_writer_state(100, 5); let value1 = create_test_row(b"value1"); - builder.append_row(b"key1", Some(&value1)).unwrap(); + builder + .append_row(b"key1", Some(value1.as_bytes())) + .unwrap(); let bytes1 = builder.build().unwrap(); let len1 = bytes1.len(); // Append another record - this should invalidate the cache let value2 = create_test_row(b"value2"); - builder.append_row(b"key2", Some(&value2)).unwrap(); + builder + .append_row(b"key2", Some(value2.as_bytes())) + .unwrap(); let bytes2 = builder.build().unwrap(); let len2 = bytes2.len(); @@ -472,7 +476,7 @@ mod tests { let mut builder = KvRecordBatchBuilder::new(1, 4096, KvFormat::COMPACTED); builder.set_writer_state(100, 5); let value = create_test_row(b"value"); - builder.append_row(b"key", Some(&value)).unwrap(); + builder.append_row(b"key", Some(value.as_bytes())).unwrap(); let bytes1 = builder.build().unwrap(); // Change writer state - this should invalidate the cache @@ -494,7 +498,6 @@ mod tests { fn test_builder_with_compacted_row_writer() -> crate::error::Result<()> { use crate::record::kv::KvRecordBatch; use crate::row::InternalRow; - use crate::row::compacted::CompactedRow; let mut builder = KvRecordBatchBuilder::new(1, 100000, KvFormat::COMPACTED); builder.set_writer_state(100, 5); @@ -504,26 +507,25 @@ mod tests { row_writer1.write_int(42); row_writer1.write_string("hello"); - let row_type = RowType::with_data_types([DataTypes::int(), DataTypes::string()].to_vec()); - let row1 = &CompactedRow::from_bytes(&row_type, row_writer1.buffer()); + let row_bytes1 = row_writer1.buffer(); let key1 = b"key1"; - assert!(builder.has_room_for_row(key1, Some(row1))); - builder.append_row(key1, Some(row1))?; + assert!(builder.has_room_for_row(key1, Some(row_bytes1))); + builder.append_row(key1, Some(row_bytes1))?; // Create and append second record let mut row_writer2 = CompactedRowWriter::new(2); row_writer2.write_int(100); row_writer2.write_string("world"); - let row2 = &CompactedRow::from_bytes(&row_type, row_writer2.buffer()); + let row_bytes2 = row_writer2.buffer(); let key2 = b"key2"; - builder.append_row(key2, Some(row2))?; + builder.append_row(key2, Some(row_bytes2))?; // Append a deletion record let key3 = b"key3"; - builder.append_row::(key3, None)?; + builder.append_row(key3, None)?; // Build and verify builder.close()?; diff --git a/crates/fluss/src/row/compacted/compacted_row.rs b/crates/fluss/src/row/compacted/compacted_row.rs index bc68ea10..6b18f913 100644 --- a/crates/fluss/src/row/compacted/compacted_row.rs +++ b/crates/fluss/src/row/compacted/compacted_row.rs @@ -17,7 +17,7 @@ use crate::metadata::RowType; use crate::row::compacted::compacted_row_reader::{CompactedRowDeserializer, CompactedRowReader}; -use crate::row::{BinaryRow, GenericRow, InternalRow}; +use crate::row::{GenericRow, InternalRow}; use std::sync::{Arc, OnceLock}; // Reference implementation: @@ -69,10 +69,8 @@ impl<'a> CompactedRow<'a> { self.decoded_row .get_or_init(|| self.deserializer.deserialize(&self.reader)) } -} -impl BinaryRow for CompactedRow<'_> { - fn as_bytes(&self) -> &[u8] { + pub fn as_bytes(&self) -> &[u8] { self.data } } diff --git a/crates/fluss/src/row/compacted/compacted_row_writer.rs b/crates/fluss/src/row/compacted/compacted_row_writer.rs index d1ad047a..ac0100ee 100644 --- a/crates/fluss/src/row/compacted/compacted_row_writer.rs +++ b/crates/fluss/src/row/compacted/compacted_row_writer.rs @@ -63,6 +63,18 @@ impl CompactedRowWriter { Bytes::copy_from_slice(&self.buffer[..self.position]) } + /// Flushes writer's ByteMut, resetting writer's inner state and returns Byte of flushed state + pub fn flush_bytes(&mut self) -> Bytes { + let used = self.buffer.split_to(self.position); + self.position = self.header_size_in_bytes; + if self.buffer.len() < self.header_size_in_bytes { + self.buffer.resize(self.header_size_in_bytes.max(64), 0); + } else { + self.buffer[..self.header_size_in_bytes].fill(0); + } + used.freeze() + } + fn ensure_capacity(&mut self, need_len: usize) { if (self.buffer.len() - self.position) < need_len { let new_len = cmp::max(self.buffer.len() * 2, self.buffer.len() + need_len); diff --git a/crates/fluss/src/row/encode/compacted_row_encoder.rs b/crates/fluss/src/row/encode/compacted_row_encoder.rs index 48b9f3ff..20f28820 100644 --- a/crates/fluss/src/row/encode/compacted_row_encoder.rs +++ b/crates/fluss/src/row/encode/compacted_row_encoder.rs @@ -20,8 +20,9 @@ use crate::error::Result; use crate::metadata::RowType; use crate::row::Datum; use crate::row::binary::{BinaryRowFormat, BinaryWriter, ValueWriter}; -use crate::row::compacted::{CompactedRow, CompactedRowDeserializer, CompactedRowWriter}; -use crate::row::encode::{BinaryRow, RowEncoder}; +use crate::row::compacted::{CompactedRowDeserializer, CompactedRowWriter}; +use crate::row::encode::RowEncoder; +use bytes::Bytes; use std::sync::Arc; #[allow(dead_code)] @@ -65,12 +66,8 @@ impl RowEncoder for CompactedRowEncoder<'_> { .write_value(&mut self.writer, pos, &value) } - fn finish_row(&mut self) -> Result { - Ok(CompactedRow::deserialize( - Arc::clone(&self.compacted_row_deserializer), - self.arity, - self.writer.buffer(), - )) + fn finish_row(&mut self) -> Result { + Ok(self.writer.flush_bytes()) } fn close(&mut self) -> Result<()> { diff --git a/crates/fluss/src/row/encode/mod.rs b/crates/fluss/src/row/encode/mod.rs index 9e65b644..468d4d18 100644 --- a/crates/fluss/src/row/encode/mod.rs +++ b/crates/fluss/src/row/encode/mod.rs @@ -22,7 +22,7 @@ use crate::error::Result; use crate::metadata::{DataLakeFormat, KvFormat, RowType}; use crate::row::encode::compacted_key_encoder::CompactedKeyEncoder; use crate::row::encode::compacted_row_encoder::CompactedRowEncoder; -use crate::row::{BinaryRow, Datum, InternalRow}; +use crate::row::{Datum, InternalRow}; use bytes::Bytes; /// An interface for encoding key of row into bytes. @@ -95,7 +95,7 @@ pub trait RowEncoder { /// /// # Returns /// * the written row - fn finish_row(&mut self) -> Result; + fn finish_row(&mut self) -> Result; /// Closes the row encoder /// diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs index c9db7e96..977b22f3 100644 --- a/crates/fluss/src/row/mod.rs +++ b/crates/fluss/src/row/mod.rs @@ -26,6 +26,7 @@ pub mod encode; pub mod field_getter; mod row_decoder; +use bytes::Bytes; pub use column::*; pub use compacted::CompactedRow; pub use datum::*; @@ -33,9 +34,23 @@ pub use decimal::{Decimal, MAX_COMPACT_PRECISION}; pub use encode::KeyEncoder; pub use row_decoder::{CompactedRowDecoder, RowDecoder, RowDecoderFactory}; -pub trait BinaryRow: InternalRow { +pub struct BinaryRow<'a> { + data: BinaryDataWrapper<'a>, +} + +pub enum BinaryDataWrapper<'a> { + Bytes(Bytes), + Ref(&'a [u8]), +} + +impl<'a> BinaryRow<'a> { /// Returns the binary representation of this row as a byte slice. - fn as_bytes(&self) -> &[u8]; + pub fn as_bytes(&'a self) -> &'a [u8] { + match &self.data { + BinaryDataWrapper::Bytes(bytes) => bytes.as_ref(), + BinaryDataWrapper::Ref(r) => r, + } + } } // TODO make functions return Result for better error handling From dbce85fb80980a10a53455769af1b3e9b8b0a7b7 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Mon, 19 Jan 2026 20:44:02 +0000 Subject: [PATCH 10/18] Only encode when not already encoded # Conflicts: # crates/fluss/src/row/compacted/compacted_row.rs --- crates/fluss/src/client/table/upsert.rs | 5 ++++- crates/fluss/src/row/compacted/compacted_row.rs | 4 ++++ crates/fluss/src/row/mod.rs | 5 +++++ 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/crates/fluss/src/client/table/upsert.rs b/crates/fluss/src/client/table/upsert.rs index 5a598bb4..8a58b208 100644 --- a/crates/fluss/src/client/table/upsert.rs +++ b/crates/fluss/src/client/table/upsert.rs @@ -336,7 +336,10 @@ impl UpsertWriter for UpsertWriterImpl { self.check_field_count(row)?; let (key, bucket_key) = self.get_keys(row)?; - let row_bytes = self.encode_row(row)?; + let row_bytes = match row.as_encoded_bytes() { + Some(bytes) => Bytes::copy_from_slice(bytes), + None => self.encode_row(row)?, + }; let write_record = WriteRecord::for_upsert( Arc::clone(&self.table_path), diff --git a/crates/fluss/src/row/compacted/compacted_row.rs b/crates/fluss/src/row/compacted/compacted_row.rs index 6b18f913..f8236c7a 100644 --- a/crates/fluss/src/row/compacted/compacted_row.rs +++ b/crates/fluss/src/row/compacted/compacted_row.rs @@ -151,6 +151,10 @@ impl<'a> InternalRow for CompactedRow<'a> { fn get_timestamp_ltz(&self, pos: usize, precision: u32) -> crate::row::datum::TimestampLtz { self.decoded_row().get_timestamp_ltz(pos, precision) } + + fn as_encoded_bytes(&self) -> Option<&[u8]> { + Some(self.as_bytes()) + } } #[cfg(test)] diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs index 977b22f3..d63264f3 100644 --- a/crates/fluss/src/row/mod.rs +++ b/crates/fluss/src/row/mod.rs @@ -114,6 +114,11 @@ pub trait InternalRow { /// Returns the binary value at the given position fn get_bytes(&self, pos: usize) -> &[u8]; + + /// Returns encoded bytes if already encoded + fn as_encoded_bytes(&self) -> Option<&[u8]> { + None + } } pub struct GenericRow<'a> { From 8e09eb908c87d31d175cb5368f4c9cf6bad626c2 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Mon, 19 Jan 2026 20:24:49 +0000 Subject: [PATCH 11/18] Remove binary array copy when encoding is skipped while upserting --- crates/fluss/src/client/table/upsert.rs | 9 +++---- crates/fluss/src/client/write/mod.rs | 32 +++++++++++++++++++------ 2 files changed, 30 insertions(+), 11 deletions(-) diff --git a/crates/fluss/src/client/table/upsert.rs b/crates/fluss/src/client/table/upsert.rs index 8a58b208..bc9fc30b 100644 --- a/crates/fluss/src/client/table/upsert.rs +++ b/crates/fluss/src/client/table/upsert.rs @@ -16,7 +16,7 @@ // under the License. use crate::client::table::writer::{DeleteResult, TableWriter, UpsertResult, UpsertWriter}; -use crate::client::{WriteFormat, WriteRecord, WriterClient}; +use crate::client::{RowBytes, WriteFormat, WriteRecord, WriterClient}; use crate::error::Error::IllegalArgument; use crate::error::Result; use crate::metadata::{KvFormat, RowType, TableInfo, TablePath}; @@ -336,9 +336,10 @@ impl UpsertWriter for UpsertWriterImpl { self.check_field_count(row)?; let (key, bucket_key) = self.get_keys(row)?; - let row_bytes = match row.as_encoded_bytes() { - Some(bytes) => Bytes::copy_from_slice(bytes), - None => self.encode_row(row)?, + + let row_bytes: RowBytes<'_> = match row.as_encoded_bytes() { + Some(bytes) => RowBytes::Borrowed(bytes), + None => RowBytes::Owned(self.encode_row(row)?), }; let write_record = WriteRecord::for_upsert( diff --git a/crates/fluss/src/client/write/mod.rs b/crates/fluss/src/client/write/mod.rs index 8407039b..6f81bdec 100644 --- a/crates/fluss/src/client/write/mod.rs +++ b/crates/fluss/src/client/write/mod.rs @@ -54,7 +54,7 @@ impl<'a> WriteRecord<'a> { pub enum Record<'a> { Log(LogWriteRecord<'a>), - Kv(KvWriteRecord), + Kv(KvWriteRecord<'a>), } pub enum LogWriteRecord<'a> { @@ -62,14 +62,32 @@ pub enum LogWriteRecord<'a> { RecordBatch(Arc), } -pub struct KvWriteRecord { +pub enum RowBytes<'a> { + Borrowed(&'a [u8]), + Owned(Bytes), +} + +impl<'a> RowBytes<'a> { + pub fn as_slice(&self) -> &[u8] { + match self { + RowBytes::Borrowed(slice) => slice, + RowBytes::Owned(bytes) => bytes.as_ref(), + } + } +} + +pub struct KvWriteRecord<'a> { key: Bytes, target_columns: Option>>, - row_bytes: Option, + row_bytes: Option>, } -impl KvWriteRecord { - fn new(key: Bytes, target_columns: Option>>, row_bytes: Option) -> Self { +impl<'a> KvWriteRecord<'a> { + fn new( + key: Bytes, + target_columns: Option>>, + row_bytes: Option>, + ) -> Self { KvWriteRecord { key, target_columns, @@ -78,7 +96,7 @@ impl KvWriteRecord { } pub fn row_bytes(&self) -> Option<&[u8]> { - self.row_bytes.as_deref() + self.row_bytes.as_ref().map(|rb| rb.as_slice()) } } @@ -113,7 +131,7 @@ impl<'a> WriteRecord<'a> { bucket_key: Option, key: Bytes, target_columns: Option>>, - row_bytes: Option, + row_bytes: Option>, ) -> Self { Self { record: Record::Kv(KvWriteRecord::new(key, target_columns, row_bytes)), From d6c672d5fecf021559a4d72d7d55527330ebbdab Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Mon, 19 Jan 2026 20:45:13 +0000 Subject: [PATCH 12/18] Remove unused import --- crates/fluss/src/record/arrow.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index 108f66c7..a8e2332b 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -1056,7 +1056,6 @@ pub struct MyVec(pub StreamReader); #[cfg(test)] mod tests { use super::*; - use crate::error::Error; use crate::metadata::{DataField, DataTypes}; #[test] From 629ec3bd617f74933fed1bb9a16eee3e5fc144fa Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Mon, 19 Jan 2026 22:29:00 +0000 Subject: [PATCH 13/18] Add test cases --- crates/fluss/src/client/table/upsert.rs | 135 +++++++++++++++++++++++- 1 file changed, 133 insertions(+), 2 deletions(-) diff --git a/crates/fluss/src/client/table/upsert.rs b/crates/fluss/src/client/table/upsert.rs index bc9fc30b..c5ac3783 100644 --- a/crates/fluss/src/client/table/upsert.rs +++ b/crates/fluss/src/client/table/upsert.rs @@ -331,7 +331,7 @@ impl UpsertWriter for UpsertWriterImpl { /// * row - the row to upsert. /// /// # Returns - /// Ok() when completed normally + /// Ok(UpsertResult) when completed normally async fn upsert(&mut self, row: &R) -> Result { self.check_field_count(row)?; @@ -364,7 +364,7 @@ impl UpsertWriter for UpsertWriterImpl { /// * row - the row to delete. /// /// # Returns - /// Ok() when completed normally + /// Ok(DeleteResult) when completed normally async fn delete(&mut self, row: &R) -> Result { self.check_field_count(row)?; @@ -387,3 +387,134 @@ impl UpsertWriter for UpsertWriterImpl { result_handle.result(result).map(|_| DeleteResult) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::metadata::{DataField, DataTypes}; + + #[test] + fn sanity_check() { + // No target columns specified but table has auto-increment column + let fields = vec![ + DataField::new("id".to_string(), DataTypes::int().as_non_nullable(), None), + DataField::new("name".to_string(), DataTypes::string(), None), + ]; + let row_type = RowType::new(fields); + let primary_keys = vec!["id".to_string()]; + let auto_increment_col_names = vec!["id".to_string()]; + let target_columns = None; + + let result = UpsertWriterFactory::sanity_check( + &row_type, + &primary_keys, + &auto_increment_col_names, + &target_columns, + ); + + assert!(result.unwrap_err().to_string().contains( + "This table has auto increment column id. Explicitly specifying values for an auto increment column is not allowed. Please Specify non-auto-increment columns as target columns using partialUpdate first." + )); + + // Target columns do not contain primary key + let fields = vec![ + DataField::new("id".to_string(), DataTypes::int().as_non_nullable(), None), + DataField::new("name".to_string(), DataTypes::string(), None), + DataField::new("value".to_string(), DataTypes::int(), None), + ]; + let row_type = RowType::new(fields); + let primary_keys = vec!["id".to_string()]; + let auto_increment_col_names = vec![]; + let target_columns = Some(Arc::new(vec![1usize])); + + let result = UpsertWriterFactory::sanity_check( + &row_type, + &primary_keys, + &auto_increment_col_names, + &target_columns, + ); + + assert!( + result + .unwrap_err() + .to_string() + .contains("The target write columns name must contain the primary key columns id") + ); + + // Primary key column not found in row type + let fields = vec![ + DataField::new("id".to_string(), DataTypes::int().as_non_nullable(), None), + DataField::new("name".to_string(), DataTypes::string(), None), + ]; + let row_type = RowType::new(fields); + let primary_keys = vec!["nonexistent_pk".to_string()]; + let auto_increment_col_names = vec![]; + let target_columns = Some(Arc::new(vec![0usize, 1])); + + let result = UpsertWriterFactory::sanity_check( + &row_type, + &primary_keys, + &auto_increment_col_names, + &target_columns, + ); + + assert!( + result + .unwrap_err() + .to_string() + .contains("The specified primary key nonexistent_pk is not in row type") + ); + + // Target columns include auto-increment column + let fields = vec![ + DataField::new("id".to_string(), DataTypes::int().as_non_nullable(), None), + DataField::new( + "seq".to_string(), + DataTypes::bigint().as_non_nullable(), + None, + ), + DataField::new("name".to_string(), DataTypes::string(), None), + ]; + let row_type = RowType::new(fields); + let primary_keys = vec!["id".to_string()]; + let auto_increment_col_names = vec!["seq".to_string()]; + let target_columns = Some(Arc::new(vec![0usize, 1, 2])); + + let result = UpsertWriterFactory::sanity_check( + &row_type, + &primary_keys, + &auto_increment_col_names, + &target_columns, + ); + + assert!(result.unwrap_err().to_string().contains( + "Explicitly specifying values for the auto increment column seq is not allowed." + )); + + // Non-nullable column not in target columns (partial update requires nullable) + let fields = vec![ + DataField::new("id".to_string(), DataTypes::int().as_non_nullable(), None), + DataField::new( + "required_field".to_string(), + DataTypes::string().as_non_nullable(), + None, + ), + DataField::new("optional_field".to_string(), DataTypes::int(), None), + ]; + let row_type = RowType::new(fields); + let primary_keys = vec!["id".to_string()]; + let auto_increment_col_names = vec![]; + let target_columns = Some(Arc::new(vec![0usize])); + + let result = UpsertWriterFactory::sanity_check( + &row_type, + &primary_keys, + &auto_increment_col_names, + &target_columns, + ); + + assert!(result.unwrap_err().to_string().contains( + "Partial Update requires all columns except primary key to be nullable, but column required_field is NOT NULL." + )); + } +} From 7bb436ca57dab697427942dc78c60a188c2bed4d Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Tue, 20 Jan 2026 08:30:17 +0000 Subject: [PATCH 14/18] Address comments --- crates/fluss/src/client/table/upsert.rs | 18 ++++++++++-------- crates/fluss/src/client/write/mod.rs | 5 +++-- crates/fluss/src/client/write/write_format.rs | 1 + crates/fluss/src/metadata/table.rs | 12 ++++++++++++ crates/fluss/src/record/arrow.rs | 1 - .../fluss/src/row/compacted/compacted_row.rs | 9 +++++++-- crates/fluss/src/row/mod.rs | 3 ++- 7 files changed, 35 insertions(+), 14 deletions(-) diff --git a/crates/fluss/src/client/table/upsert.rs b/crates/fluss/src/client/table/upsert.rs index c5ac3783..a3909e72 100644 --- a/crates/fluss/src/client/table/upsert.rs +++ b/crates/fluss/src/client/table/upsert.rs @@ -118,6 +118,7 @@ where // partition_field_getter: Option>, primary_key_encoder: Box, target_columns: Option>>, + // Use primary key encoder as bucket key encoder when None bucket_key_encoder: Option>, kv_format: KvFormat, write_format: WriteFormat, @@ -158,6 +159,7 @@ impl UpsertWriterFactory { data_lake_format, )?) } else { + // Defaults to using primary key encoder when None for bucket key None }; @@ -298,7 +300,7 @@ impl UpsertWriterImpl { let key = self.primary_key_encoder.encode_key(row)?; let bucket_key = match &mut self.bucket_key_encoder { Some(bucket_key_encoder) => Some(bucket_key_encoder.encode_key(row)?), - None => None, + None => Some(key.clone()), }; Ok((key, bucket_key)) } @@ -317,7 +319,7 @@ impl TableWriter for UpsertWriterImpl { /// Flush data written that have not yet been sent to the server, forcing the client to send the /// requests to server and blocks on the completion of the requests associated with these /// records. A request is considered completed when it is successfully acknowledged according to - /// the {@link ConfigOptions#CLIENT_WRITER_ACKS} configuration you have specified or else it + /// the CLIENT_WRITER_ACKS configuration option you have specified or else it /// results in an error. async fn flush(&self) -> Result<()> { self.writer_client.flush().await @@ -325,7 +327,7 @@ impl TableWriter for UpsertWriterImpl { } impl UpsertWriter for UpsertWriterImpl { - /// Inserts row into Fluss table if they do not already exist, or updates them if they do exist. + /// Inserts row into Fluss table if they do not already exist, or updates them if they do exist. /// /// # Arguments /// * row - the row to upsert. @@ -337,7 +339,7 @@ impl UpsertWriter for UpsertWriterImpl { let (key, bucket_key) = self.get_keys(row)?; - let row_bytes: RowBytes<'_> = match row.as_encoded_bytes() { + let row_bytes: RowBytes<'_> = match row.as_encoded_bytes(self.write_format) { Some(bytes) => RowBytes::Borrowed(bytes), None => RowBytes::Owned(self.encode_row(row)?), }; @@ -345,8 +347,9 @@ impl UpsertWriter for UpsertWriterImpl { let write_record = WriteRecord::for_upsert( Arc::clone(&self.table_path), self.table_info.schema_id, - bucket_key, key, + bucket_key, + self.write_format, self.target_columns.clone(), Some(row_bytes), ); @@ -373,14 +376,13 @@ impl UpsertWriter for UpsertWriterImpl { let write_record = WriteRecord::for_upsert( Arc::clone(&self.table_path), self.table_info.schema_id, - bucket_key, key, + bucket_key, + self.write_format, self.target_columns.clone(), None, ); - self.writer_client.send(&write_record).await?; - let result_handle = self.writer_client.send(&write_record).await?; let result = result_handle.wait().await?; diff --git a/crates/fluss/src/client/write/mod.rs b/crates/fluss/src/client/write/mod.rs index 6f81bdec..a4151cfd 100644 --- a/crates/fluss/src/client/write/mod.rs +++ b/crates/fluss/src/client/write/mod.rs @@ -128,8 +128,9 @@ impl<'a> WriteRecord<'a> { pub fn for_upsert( table_path: Arc, schema_id: i32, - bucket_key: Option, key: Bytes, + bucket_key: Option, + write_format: WriteFormat, target_columns: Option>>, row_bytes: Option>, ) -> Self { @@ -138,7 +139,7 @@ impl<'a> WriteRecord<'a> { table_path, bucket_key, schema_id, - write_format: WriteFormat::CompactedKv, + write_format, } } } diff --git a/crates/fluss/src/client/write/write_format.rs b/crates/fluss/src/client/write/write_format.rs index 4a0c0d8a..147152ca 100644 --- a/crates/fluss/src/client/write/write_format.rs +++ b/crates/fluss/src/client/write/write_format.rs @@ -20,6 +20,7 @@ use crate::error::Result; use crate::metadata::KvFormat; use std::fmt::Display; +#[derive(Copy, Clone)] pub enum WriteFormat { ArrowLog, CompactedLog, diff --git a/crates/fluss/src/metadata/table.rs b/crates/fluss/src/metadata/table.rs index 0076c483..8204e7c4 100644 --- a/crates/fluss/src/metadata/table.rs +++ b/crates/fluss/src/metadata/table.rs @@ -221,6 +221,18 @@ impl SchemaBuilder { pub fn build(&mut self) -> Result { let columns = Self::normalize_columns(&mut self.columns, self.primary_key.as_ref())?; + let column_names: HashSet<_> = columns.iter().map(|c| &c.name).collect(); + for auto_inc_col in &self.auto_increment_col_names { + if !column_names.contains(auto_inc_col) { + return Err(IllegalArgument { + message: format!( + "Auto increment column '{}' is not found in the schema columns.", + auto_inc_col + ), + }); + } + } + let data_fields = columns .iter() .map(|c| DataField { diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index a8e2332b..3c94b720 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -50,7 +50,6 @@ use std::{ use crate::error::Error::IllegalArgument; use arrow::ipc::writer::IpcWriteOptions; - /// const for record batch pub const BASE_OFFSET_LENGTH: usize = 8; pub const LENGTH_LENGTH: usize = 4; diff --git a/crates/fluss/src/row/compacted/compacted_row.rs b/crates/fluss/src/row/compacted/compacted_row.rs index f8236c7a..2be0fb58 100644 --- a/crates/fluss/src/row/compacted/compacted_row.rs +++ b/crates/fluss/src/row/compacted/compacted_row.rs @@ -19,6 +19,7 @@ use crate::metadata::RowType; use crate::row::compacted::compacted_row_reader::{CompactedRowDeserializer, CompactedRowReader}; use crate::row::{GenericRow, InternalRow}; use std::sync::{Arc, OnceLock}; +use crate::client::WriteFormat; // Reference implementation: // https://github.com/apache/fluss/blob/main/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRow.java @@ -152,8 +153,12 @@ impl<'a> InternalRow for CompactedRow<'a> { self.decoded_row().get_timestamp_ltz(pos, precision) } - fn as_encoded_bytes(&self) -> Option<&[u8]> { - Some(self.as_bytes()) + fn as_encoded_bytes(&self, write_format: WriteFormat) -> Option<&[u8]> { + match write_format { + WriteFormat::CompactedKv => Some(self.as_bytes()), + WriteFormat::ArrowLog => None, + WriteFormat::CompactedLog => None, + } } } diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs index d63264f3..e81b280e 100644 --- a/crates/fluss/src/row/mod.rs +++ b/crates/fluss/src/row/mod.rs @@ -33,6 +33,7 @@ pub use datum::*; pub use decimal::{Decimal, MAX_COMPACT_PRECISION}; pub use encode::KeyEncoder; pub use row_decoder::{CompactedRowDecoder, RowDecoder, RowDecoderFactory}; +use crate::client::WriteFormat; pub struct BinaryRow<'a> { data: BinaryDataWrapper<'a>, @@ -116,7 +117,7 @@ pub trait InternalRow { fn get_bytes(&self, pos: usize) -> &[u8]; /// Returns encoded bytes if already encoded - fn as_encoded_bytes(&self) -> Option<&[u8]> { + fn as_encoded_bytes(&self, _write_format: WriteFormat) -> Option<&[u8]> { None } } From 8b38a849c0293d3028a183b31cd9788fe555030c Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Tue, 20 Jan 2026 08:38:22 +0000 Subject: [PATCH 15/18] Formatting --- crates/fluss/src/row/compacted/compacted_row.rs | 2 +- crates/fluss/src/row/mod.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/fluss/src/row/compacted/compacted_row.rs b/crates/fluss/src/row/compacted/compacted_row.rs index 2be0fb58..35d684db 100644 --- a/crates/fluss/src/row/compacted/compacted_row.rs +++ b/crates/fluss/src/row/compacted/compacted_row.rs @@ -15,11 +15,11 @@ // specific language governing permissions and limitations // under the License. +use crate::client::WriteFormat; use crate::metadata::RowType; use crate::row::compacted::compacted_row_reader::{CompactedRowDeserializer, CompactedRowReader}; use crate::row::{GenericRow, InternalRow}; use std::sync::{Arc, OnceLock}; -use crate::client::WriteFormat; // Reference implementation: // https://github.com/apache/fluss/blob/main/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRow.java diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs index e81b280e..bc8134dd 100644 --- a/crates/fluss/src/row/mod.rs +++ b/crates/fluss/src/row/mod.rs @@ -26,6 +26,7 @@ pub mod encode; pub mod field_getter; mod row_decoder; +use crate::client::WriteFormat; use bytes::Bytes; pub use column::*; pub use compacted::CompactedRow; @@ -33,7 +34,6 @@ pub use datum::*; pub use decimal::{Decimal, MAX_COMPACT_PRECISION}; pub use encode::KeyEncoder; pub use row_decoder::{CompactedRowDecoder, RowDecoder, RowDecoderFactory}; -use crate::client::WriteFormat; pub struct BinaryRow<'a> { data: BinaryDataWrapper<'a>, From 21161797d1e8fbb39d566cccf60bf64358bb4b06 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Tue, 20 Jan 2026 09:17:29 +0000 Subject: [PATCH 16/18] Wire up bucketing in WriterClient --- crates/fluss/src/client/table/mod.rs | 1 + crates/fluss/src/client/write/bucket_assigner.rs | 9 +++++---- crates/fluss/src/client/write/writer_client.rs | 7 +++++-- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/crates/fluss/src/client/table/mod.rs b/crates/fluss/src/client/table/mod.rs index aa919af0..2bfa0541 100644 --- a/crates/fluss/src/client/table/mod.rs +++ b/crates/fluss/src/client/table/mod.rs @@ -37,6 +37,7 @@ use crate::client::table::upsert::TableUpsert; pub use append::{AppendWriter, TableAppend}; pub use lookup::{LookupResult, Lookuper, TableLookup}; pub use scanner::{LogScanner, RecordBatchLogScanner, TableScan}; +pub use writer::{TableWriter, UpsertWriter}; #[allow(dead_code)] pub struct FlussTable<'a> { diff --git a/crates/fluss/src/client/write/bucket_assigner.rs b/crates/fluss/src/client/write/bucket_assigner.rs index 2370719e..817101a4 100644 --- a/crates/fluss/src/client/write/bucket_assigner.rs +++ b/crates/fluss/src/client/write/bucket_assigner.rs @@ -20,6 +20,7 @@ use crate::cluster::Cluster; use crate::error::Error::IllegalArgument; use crate::error::Result; use crate::metadata::TablePath; +use bytes::Bytes; use rand::Rng; use std::sync::atomic::{AtomicI32, Ordering}; @@ -28,7 +29,7 @@ pub trait BucketAssigner: Sync + Send { fn on_new_batch(&self, cluster: &Cluster, prev_bucket_id: i32); - fn assign_bucket(&self, bucket_key: Option<&[u8]>, cluster: &Cluster) -> Result; + fn assign_bucket(&self, bucket_key: Option<&Bytes>, cluster: &Cluster) -> Result; } #[derive(Debug)] @@ -94,7 +95,7 @@ impl BucketAssigner for StickyBucketAssigner { self.next_bucket(cluster, prev_bucket_id); } - fn assign_bucket(&self, _bucket_key: Option<&[u8]>, cluster: &Cluster) -> Result { + fn assign_bucket(&self, _bucket_key: Option<&Bytes>, cluster: &Cluster) -> Result { let bucket_id = self.current_bucket_id.load(Ordering::Relaxed); if bucket_id < 0 { Ok(self.next_bucket(cluster, bucket_id)) @@ -139,7 +140,7 @@ impl BucketAssigner for HashBucketAssigner { // do nothing } - fn assign_bucket(&self, bucket_key: Option<&[u8]>, _: &Cluster) -> Result { + fn assign_bucket(&self, bucket_key: Option<&Bytes>, _: &Cluster) -> Result { let key = bucket_key.ok_or_else(|| IllegalArgument { message: "no bucket key provided".to_string(), })?; @@ -181,7 +182,7 @@ mod tests { let assigner = HashBucketAssigner::new(4, ::of(None)); let cluster = Cluster::default(); let bucket = assigner - .assign_bucket(Some(b"key"), &cluster) + .assign_bucket(Some(&Bytes::from_static(b"key")), &cluster) .expect("bucket"); assert!((0..4).contains(&bucket)); } diff --git a/crates/fluss/src/client/write/writer_client.rs b/crates/fluss/src/client/write/writer_client.rs index 22e03972..363abc8d 100644 --- a/crates/fluss/src/client/write/writer_client.rs +++ b/crates/fluss/src/client/write/writer_client.rs @@ -21,6 +21,7 @@ use crate::client::write::sender::Sender; use crate::client::{RecordAccumulator, ResultHandle, WriteRecord}; use crate::config::Config; use crate::metadata::TablePath; +use bytes::Bytes; use dashmap::DashMap; use std::sync::Arc; use tokio::sync::mpsc; @@ -90,8 +91,9 @@ impl WriterClient { pub async fn send(&self, record: &WriteRecord<'_>) -> Result { let table_path = &record.table_path; let cluster = self.metadata.get_cluster(); + let bucket_key = record.bucket_key.as_ref(); - let (bucket_assigner, bucket_id) = self.assign_bucket(table_path)?; + let (bucket_assigner, bucket_id) = self.assign_bucket(bucket_key, table_path)?; let mut result = self .accumulate @@ -116,6 +118,7 @@ impl WriterClient { } fn assign_bucket( &self, + bucket_key: Option<&Bytes>, table_path: &Arc, ) -> Result<(Arc>, i32)> { let cluster = self.metadata.get_cluster(); @@ -129,7 +132,7 @@ impl WriterClient { assigner } }; - let bucket_id = bucket_assigner.assign_bucket(None, &cluster)?; + let bucket_id = bucket_assigner.assign_bucket(bucket_key, &cluster)?; Ok((bucket_assigner, bucket_id)) } From 7d829d9b4aa23a140e79e11e27cfd03650136e50 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Tue, 20 Jan 2026 09:21:57 +0000 Subject: [PATCH 17/18] Wire up bucketing in WriterClient --- crates/fluss/src/client/write/writer_client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/fluss/src/client/write/writer_client.rs b/crates/fluss/src/client/write/writer_client.rs index 363abc8d..65b04f56 100644 --- a/crates/fluss/src/client/write/writer_client.rs +++ b/crates/fluss/src/client/write/writer_client.rs @@ -103,7 +103,7 @@ impl WriterClient { if result.abort_record_for_new_batch { let prev_bucket_id = bucket_id; bucket_assigner.on_new_batch(&cluster, prev_bucket_id); - let bucket_id = bucket_assigner.assign_bucket(None, &cluster)?; + let bucket_id = bucket_assigner.assign_bucket(bucket_key, &cluster)?; result = self .accumulate .append(record, bucket_id, &cluster, false) From 41e03bd7345056416897ef0f0f6c632271da2738 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Tue, 20 Jan 2026 10:15:19 +0000 Subject: [PATCH 18/18] Fix write batch instantiation and appending --- crates/fluss/src/client/write/accumulator.rs | 39 ++++++++++++++------ crates/fluss/src/client/write/batch.rs | 8 ++-- crates/fluss/src/client/write/mod.rs | 1 + 3 files changed, 33 insertions(+), 15 deletions(-) diff --git a/crates/fluss/src/client/write/accumulator.rs b/crates/fluss/src/client/write/accumulator.rs index 0afc9d4b..fb7b5447 100644 --- a/crates/fluss/src/client/write/accumulator.rs +++ b/crates/fluss/src/client/write/accumulator.rs @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. -use crate::client::write::batch::WriteBatch::ArrowLog; -use crate::client::write::batch::{ArrowLogWriteBatch, WriteBatch}; +use crate::client::write::batch::WriteBatch::{ArrowLog, Kv}; +use crate::client::write::batch::{ArrowLogWriteBatch, KvWriteBatch, WriteBatch}; use crate::client::{LogWriteRecord, Record, ResultHandle, WriteRecord}; use crate::cluster::{BucketLocation, Cluster, ServerNode}; use crate::config::Config; @@ -102,16 +102,29 @@ impl RecordAccumulator { let schema_id = table_info.schema_id; - let mut batch = ArrowLog(ArrowLogWriteBatch::new( - self.batch_id.fetch_add(1, Ordering::Relaxed), - table_path.as_ref().clone(), - schema_id, - arrow_compression_info, - row_type, - bucket_id, - current_time_ms(), - matches!(&record.record, Record::Log(LogWriteRecord::RecordBatch(_))), - )); + let mut batch: WriteBatch = match record.record() { + Record::Log(_) => ArrowLog(ArrowLogWriteBatch::new( + self.batch_id.fetch_add(1, Ordering::Relaxed), + table_path.as_ref().clone(), + schema_id, + arrow_compression_info, + row_type, + bucket_id, + current_time_ms(), + matches!(&record.record, Record::Log(LogWriteRecord::RecordBatch(_))), + )), + Record::Kv(kv_record) => Kv(KvWriteBatch::new( + self.batch_id.fetch_add(1, Ordering::Relaxed), + table_path.as_ref().clone(), + schema_id, + // TODO: Decide how to derive write limit in the absence of java's equivalent of PreAllocatedPagedOutputView + KvWriteBatch::DEFAULT_WRITE_LIMIT, + record.write_format.to_kv_format()?, + bucket_id, + kv_record.target_columns.clone(), + current_time_ms(), + )), + }; let batch_id = batch.batch_id(); @@ -142,6 +155,8 @@ impl RecordAccumulator { ) -> Result { let table_path = &record.table_path; + // TODO: Implement partitioning + let dq = { let mut binding = self .write_batches diff --git a/crates/fluss/src/client/write/batch.rs b/crates/fluss/src/client/write/batch.rs index 15358f33..2ddf5192 100644 --- a/crates/fluss/src/client/write/batch.rs +++ b/crates/fluss/src/client/write/batch.rs @@ -25,6 +25,7 @@ use crate::record::MemoryLogRecordsArrowBuilder; use crate::record::kv::KvRecordBatchBuilder; use bytes::Bytes; use std::cmp::max; +use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicI32, Ordering}; #[allow(dead_code)] @@ -249,11 +250,12 @@ impl ArrowLogWriteBatch { pub struct KvWriteBatch { write_batch: InnerWriteBatch, kv_batch_builder: KvRecordBatchBuilder, - target_columns: Option>, + target_columns: Option>>, schema_id: i32, } impl KvWriteBatch { + pub const DEFAULT_WRITE_LIMIT: usize = 256; #[allow(clippy::too_many_arguments)] pub fn new( batch_id: i64, @@ -262,7 +264,7 @@ impl KvWriteBatch { write_limit: usize, kv_format: KvFormat, bucket_id: BucketId, - target_columns: Option>, + target_columns: Option>>, create_ms: i64, ) -> Self { let base = InnerWriteBatch::new(batch_id, table_path, create_ms, bucket_id); @@ -296,7 +298,7 @@ impl KvWriteBatch { }); }; - if self.target_columns.as_ref() != kv_write_record.target_columns.as_deref() { + if self.target_columns != kv_write_record.target_columns { return Err(Error::UnexpectedError { message: format!( "target columns {:?} of the write record to append are not the same as the current target columns {:?} in the batch.", diff --git a/crates/fluss/src/client/write/mod.rs b/crates/fluss/src/client/write/mod.rs index a4151cfd..dcc67959 100644 --- a/crates/fluss/src/client/write/mod.rs +++ b/crates/fluss/src/client/write/mod.rs @@ -62,6 +62,7 @@ pub enum LogWriteRecord<'a> { RecordBatch(Arc), } +#[derive(Clone)] pub enum RowBytes<'a> { Borrowed(&'a [u8]), Owned(Bytes),