diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml index c3bdd447..9aeee72d 100644 --- a/crates/fluss/Cargo.toml +++ b/crates/fluss/Cargo.toml @@ -33,6 +33,7 @@ integration_tests = [] [dependencies] arrow = { workspace = true } arrow-schema = "57.0.0" +bitvec = "1" byteorder = "1.5" futures = "0.3" clap = { workspace = true } diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs b/crates/fluss/src/client/table/log_fetch_buffer.rs index ac44cc17..ca0a2532 100644 --- a/crates/fluss/src/client/table/log_fetch_buffer.rs +++ b/crates/fluss/src/client/table/log_fetch_buffer.rs @@ -651,14 +651,14 @@ mod tests { use crate::compression::{ ArrowCompressionInfo, ArrowCompressionType, DEFAULT_NON_ZSTD_COMPRESSION_LEVEL, }; - use crate::metadata::{DataField, DataTypes, TablePath}; + use crate::metadata::{DataField, DataTypes, RowType, TablePath}; use crate::record::{MemoryLogRecordsArrowBuilder, ReadContext, to_arrow_schema}; use crate::row::GenericRow; use std::sync::Arc; use std::time::Duration; fn test_read_context() -> ReadContext { - let row_type = DataTypes::row(vec![DataField::new( + let row_type = RowType::new(vec![DataField::new( "id".to_string(), DataTypes::int(), None, @@ -714,7 +714,7 @@ mod tests { #[test] fn default_completed_fetch_reads_records() -> Result<()> { - let row_type = DataTypes::row(vec![ + let row_type = RowType::new(vec![ DataField::new("id".to_string(), DataTypes::int(), None), DataField::new("name".to_string(), DataTypes::string(), None), ]); diff --git a/crates/fluss/src/client/table/lookup.rs b/crates/fluss/src/client/table/lookup.rs index 1d32ebd7..cd23503a 100644 --- a/crates/fluss/src/client/table/lookup.rs +++ b/crates/fluss/src/client/table/lookup.rs @@ -22,7 +22,7 @@ use crate::error::{Error, Result}; use crate::metadata::{RowType, TableBucket, TableInfo}; use crate::row::InternalRow; use crate::row::compacted::CompactedRow; -use crate::row::encode::KeyEncoder; +use crate::row::encode::{KeyEncoder, KeyEncoderFactory}; use crate::rpc::ApiError; use crate::rpc::message::LookupRequest; use std::sync::Arc; @@ -130,8 +130,11 @@ impl<'a> TableLookup<'a> { // Create key encoder for the primary key fields let pk_fields = self.table_info.get_physical_primary_keys().to_vec(); - let key_encoder = - ::of(self.table_info.row_type(), pk_fields, data_lake_format)?; + let key_encoder = KeyEncoderFactory::of( + self.table_info.row_type(), + pk_fields.as_slice(), + &data_lake_format, + )?; Ok(Lookuper { conn: self.conn, diff --git a/crates/fluss/src/client/table/mod.rs b/crates/fluss/src/client/table/mod.rs index 7356be23..2bfa0541 100644 --- a/crates/fluss/src/client/table/mod.rs +++ b/crates/fluss/src/client/table/mod.rs @@ -27,13 +27,17 @@ mod append; mod lookup; mod log_fetch_buffer; +mod partition_getter; mod remote_log; mod scanner; +mod upsert; mod writer; +use crate::client::table::upsert::TableUpsert; pub use append::{AppendWriter, TableAppend}; pub use lookup::{LookupResult, Lookuper, TableLookup}; pub use scanner::{LogScanner, RecordBatchLogScanner, TableScan}; +pub use writer::{TableWriter, UpsertWriter}; #[allow(dead_code)] pub struct FlussTable<'a> { @@ -119,6 +123,20 @@ impl<'a> FlussTable<'a> { self.metadata.clone(), )) } + + pub fn new_upsert(&self) -> Result { + if !self.has_primary_key { + return Err(Error::UnsupportedOperation { + message: "Upsert is only supported for primary key tables".to_string(), + }); + } + + Ok(TableUpsert::new( + self.table_path.clone(), + self.table_info.clone(), + self.conn.get_or_create_writer_client()?, + )) + } } impl<'a> Drop for FlussTable<'a> { diff --git a/crates/fluss/src/client/table/partition_getter.rs b/crates/fluss/src/client/table/partition_getter.rs new file mode 100644 index 00000000..4529d868 --- /dev/null +++ b/crates/fluss/src/client/table/partition_getter.rs @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::Error::IllegalArgument; +use crate::error::Result; +use crate::metadata::{DataType, RowType}; +use crate::row::field_getter::FieldGetter; + +#[allow(dead_code)] +pub struct PartitionGetter<'a> { + partitions: Vec<(&'a String, &'a DataType, FieldGetter)>, +} + +#[allow(dead_code)] +impl<'a> PartitionGetter<'a> { + pub fn new(row_type: &'a RowType, partition_keys: &'a Vec) -> Result { + let mut partitions = Vec::with_capacity(partition_keys.len()); + + for partition_key in partition_keys { + if let Some(partition_col_index) = row_type.get_field_index(partition_key.as_str()) { + let data_type = &row_type + .fields() + .get(partition_col_index) + .unwrap() + .data_type; + let field_getter = FieldGetter::create(data_type, partition_col_index); + + partitions.push((partition_key, data_type, field_getter)); + } else { + return Err(IllegalArgument { + message: format!( + "The partition column {partition_key} is not in the row {row_type}." + ), + }); + }; + } + + Ok(Self { partitions }) + } + + // TODO Implement get partition +} diff --git a/crates/fluss/src/client/table/upsert.rs b/crates/fluss/src/client/table/upsert.rs new file mode 100644 index 00000000..a3909e72 --- /dev/null +++ b/crates/fluss/src/client/table/upsert.rs @@ -0,0 +1,522 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::client::table::writer::{DeleteResult, TableWriter, UpsertResult, UpsertWriter}; +use crate::client::{RowBytes, WriteFormat, WriteRecord, WriterClient}; +use crate::error::Error::IllegalArgument; +use crate::error::Result; +use crate::metadata::{KvFormat, RowType, TableInfo, TablePath}; +use crate::row::InternalRow; +use crate::row::encode::{KeyEncoder, KeyEncoderFactory, RowEncoder, RowEncoderFactory}; +use crate::row::field_getter::FieldGetter; +use std::sync::Arc; + +use bitvec::prelude::bitvec; +use bytes::Bytes; + +#[allow(dead_code)] +pub struct TableUpsert { + table_path: TablePath, + table_info: TableInfo, + writer_client: Arc, + target_columns: Option>>, +} + +#[allow(dead_code)] +impl TableUpsert { + pub fn new( + table_path: TablePath, + table_info: TableInfo, + writer_client: Arc, + ) -> Self { + Self { + table_path, + table_info, + writer_client, + target_columns: None, + } + } + + pub fn partial_update(&self, target_columns: Option>) -> Result { + if let Some(columns) = &target_columns { + let num_columns = self.table_info.row_type().fields().len(); + + if let Some(&invalid_column) = columns.iter().find(|&&col| col >= num_columns) { + return Err(IllegalArgument { + message: format!( + "Invalid target column index: {invalid_column} for table {}. The table only has {num_columns} columns.", + self.table_path + ), + }); + } + } + + Ok(Self { + table_path: self.table_path.clone(), + table_info: self.table_info.clone(), + writer_client: self.writer_client.clone(), + target_columns: target_columns.map(Arc::new), + }) + } + + pub fn partial_update_with_column_names(&self, target_column_names: &[&str]) -> Result { + let row_type = self.table_info.row_type(); + let col_indices: Vec<(&str, Option)> = target_column_names + .iter() + .map(|col_name| (*col_name, row_type.get_field_index(col_name))) + .collect(); + + if let Some((missing_name, _)) = col_indices.iter().find(|(_, ix)| ix.is_none()) { + return Err(IllegalArgument { + message: format!( + "Cannot find target column `{}` for table {}.", + missing_name, self.table_path + ), + }); + } + + let valid_col_indices: Vec = col_indices + .into_iter() + .map(|(_, index)| index.unwrap()) + .collect(); + + self.partial_update(Some(valid_col_indices)) + } + + pub fn create_writer(&self) -> Result { + UpsertWriterFactory::create( + Arc::new(self.table_path.clone()), + Arc::new(self.table_info.clone()), + self.target_columns.clone(), + Arc::clone(&self.writer_client), + ) + } +} + +#[allow(dead_code)] +struct UpsertWriterImpl +where + RE: RowEncoder, +{ + table_path: Arc, + writer_client: Arc, + // TODO: Partitioning + // partition_field_getter: Option>, + primary_key_encoder: Box, + target_columns: Option>>, + // Use primary key encoder as bucket key encoder when None + bucket_key_encoder: Option>, + kv_format: KvFormat, + write_format: WriteFormat, + row_encoder: RE, + field_getters: Box<[FieldGetter]>, + table_info: Arc, +} + +#[allow(dead_code)] +struct UpsertWriterFactory; + +#[allow(dead_code)] +impl UpsertWriterFactory { + pub fn create( + table_path: Arc, + table_info: Arc, + partial_update_columns: Option>>, + writer_client: Arc, + ) -> Result { + let data_lake_format = &table_info.table_config.get_datalake_format()?; + let row_type = table_info.row_type(); + let physical_pks = table_info.get_physical_primary_keys(); + + let names = table_info.get_schema().auto_increment_col_names(); + + Self::sanity_check( + row_type, + &table_info.primary_keys, + names, + &partial_update_columns, + )?; + + let primary_key_encoder = KeyEncoderFactory::of(row_type, physical_pks, data_lake_format)?; + let bucket_key_encoder = if !table_info.is_default_bucket_key() { + Some(KeyEncoderFactory::of( + row_type, + table_info.get_bucket_keys(), + data_lake_format, + )?) + } else { + // Defaults to using primary key encoder when None for bucket key + None + }; + + let kv_format = table_info.get_table_config().get_kv_format()?; + let write_format = WriteFormat::from_kv_format(&kv_format)?; + + let field_getters = FieldGetter::create_field_getters(row_type); + + Ok(UpsertWriterImpl { + table_path, + writer_client, + primary_key_encoder, + target_columns: partial_update_columns, + bucket_key_encoder, + kv_format: kv_format.clone(), + write_format, + row_encoder: RowEncoderFactory::create(kv_format, row_type.clone())?, + field_getters, + table_info: table_info.clone(), + }) + } + + #[allow(dead_code)] + fn sanity_check( + row_type: &RowType, + primary_keys: &Vec, + auto_increment_col_names: &Vec, + target_columns: &Option>>, + ) -> Result<()> { + if target_columns.is_none() { + if !auto_increment_col_names.is_empty() { + return Err(IllegalArgument { + message: format!( + "This table has auto increment column {}. Explicitly specifying values for an auto increment column is not allowed. Please Specify non-auto-increment columns as target columns using partialUpdate first.", + auto_increment_col_names.join(", ") + ), + }); + } + return Ok(()); + } + + let field_count = row_type.fields().len(); + + let mut target_column_set = bitvec![0; field_count]; + + let columns = target_columns.as_ref().unwrap().as_ref(); + + for &target_index in columns { + target_column_set.set(target_index, true); + } + + let mut pk_column_set = bitvec![0; field_count]; + + // check the target columns contains the primary key + for primary_key in primary_keys { + let pk_index = row_type.get_field_index(primary_key.as_str()); + match pk_index { + Some(pk_index) => { + if !target_column_set[pk_index] { + return Err(IllegalArgument { + message: format!( + "The target write columns {} must contain the primary key columns {}", + row_type.project(columns)?.get_field_names().join(", "), + primary_keys.join(", ") + ), + }); + } + pk_column_set.set(pk_index, true); + } + None => { + return Err(IllegalArgument { + message: format!( + "The specified primary key {} is not in row type {}", + primary_key, row_type + ), + }); + } + } + } + + let mut auto_increment_column_set = bitvec![0; field_count]; + // explicitly specifying values for an auto increment column is not allowed + for auto_increment_col_name in auto_increment_col_names { + let auto_increment_field_index = + row_type.get_field_index(auto_increment_col_name.as_str()); + + if let Some(index) = auto_increment_field_index { + if target_column_set[index] { + return Err(IllegalArgument { + message: format!( + "Explicitly specifying values for the auto increment column {} is not allowed.", + auto_increment_col_name + ), + }); + } + + auto_increment_column_set.set(index, true); + } + } + + // check the columns not in targetColumns should be nullable + for i in 0..field_count { + // column not in primary key and not in auto increment column + if !pk_column_set[i] && !auto_increment_column_set[i] { + // the column should be nullable + if !row_type.fields().get(i).unwrap().data_type.is_nullable() { + return Err(IllegalArgument { + message: format!( + "Partial Update requires all columns except primary key to be nullable, but column {} is NOT NULL.", + row_type.fields().get(i).unwrap().name() + ), + }); + } + } + } + + Ok(()) + } +} + +#[allow(dead_code)] +impl UpsertWriterImpl { + fn check_field_count(&self, row: &R) -> Result<()> { + let expected = self.table_info.get_row_type().fields().len(); + if row.get_field_count() != expected { + return Err(IllegalArgument { + message: format!( + "The field count of the row does not match the table schema. Expected: {}, Actual: {}", + expected, + row.get_field_count() + ), + }); + } + Ok(()) + } + + fn get_keys(&mut self, row: &dyn InternalRow) -> Result<(Bytes, Option)> { + let key = self.primary_key_encoder.encode_key(row)?; + let bucket_key = match &mut self.bucket_key_encoder { + Some(bucket_key_encoder) => Some(bucket_key_encoder.encode_key(row)?), + None => Some(key.clone()), + }; + Ok((key, bucket_key)) + } + + fn encode_row(&mut self, row: &R) -> Result { + self.row_encoder.start_new_row()?; + for (pos, field_getter) in self.field_getters.iter().enumerate() { + let datum = field_getter.get_field(row); + self.row_encoder.encode_field(pos, datum)?; + } + self.row_encoder.finish_row() + } +} + +impl TableWriter for UpsertWriterImpl { + /// Flush data written that have not yet been sent to the server, forcing the client to send the + /// requests to server and blocks on the completion of the requests associated with these + /// records. A request is considered completed when it is successfully acknowledged according to + /// the CLIENT_WRITER_ACKS configuration option you have specified or else it + /// results in an error. + async fn flush(&self) -> Result<()> { + self.writer_client.flush().await + } +} + +impl UpsertWriter for UpsertWriterImpl { + /// Inserts row into Fluss table if they do not already exist, or updates them if they do exist. + /// + /// # Arguments + /// * row - the row to upsert. + /// + /// # Returns + /// Ok(UpsertResult) when completed normally + async fn upsert(&mut self, row: &R) -> Result { + self.check_field_count(row)?; + + let (key, bucket_key) = self.get_keys(row)?; + + let row_bytes: RowBytes<'_> = match row.as_encoded_bytes(self.write_format) { + Some(bytes) => RowBytes::Borrowed(bytes), + None => RowBytes::Owned(self.encode_row(row)?), + }; + + let write_record = WriteRecord::for_upsert( + Arc::clone(&self.table_path), + self.table_info.schema_id, + key, + bucket_key, + self.write_format, + self.target_columns.clone(), + Some(row_bytes), + ); + + let result_handle = self.writer_client.send(&write_record).await?; + let result = result_handle.wait().await?; + + result_handle.result(result).map(|_| UpsertResult) + } + + /// Delete certain row by the input row in Fluss table, the input row must contain the primary + /// key. + /// + /// # Arguments + /// * row - the row to delete. + /// + /// # Returns + /// Ok(DeleteResult) when completed normally + async fn delete(&mut self, row: &R) -> Result { + self.check_field_count(row)?; + + let (key, bucket_key) = self.get_keys(row)?; + + let write_record = WriteRecord::for_upsert( + Arc::clone(&self.table_path), + self.table_info.schema_id, + key, + bucket_key, + self.write_format, + self.target_columns.clone(), + None, + ); + + let result_handle = self.writer_client.send(&write_record).await?; + let result = result_handle.wait().await?; + + result_handle.result(result).map(|_| DeleteResult) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::metadata::{DataField, DataTypes}; + + #[test] + fn sanity_check() { + // No target columns specified but table has auto-increment column + let fields = vec![ + DataField::new("id".to_string(), DataTypes::int().as_non_nullable(), None), + DataField::new("name".to_string(), DataTypes::string(), None), + ]; + let row_type = RowType::new(fields); + let primary_keys = vec!["id".to_string()]; + let auto_increment_col_names = vec!["id".to_string()]; + let target_columns = None; + + let result = UpsertWriterFactory::sanity_check( + &row_type, + &primary_keys, + &auto_increment_col_names, + &target_columns, + ); + + assert!(result.unwrap_err().to_string().contains( + "This table has auto increment column id. Explicitly specifying values for an auto increment column is not allowed. Please Specify non-auto-increment columns as target columns using partialUpdate first." + )); + + // Target columns do not contain primary key + let fields = vec![ + DataField::new("id".to_string(), DataTypes::int().as_non_nullable(), None), + DataField::new("name".to_string(), DataTypes::string(), None), + DataField::new("value".to_string(), DataTypes::int(), None), + ]; + let row_type = RowType::new(fields); + let primary_keys = vec!["id".to_string()]; + let auto_increment_col_names = vec![]; + let target_columns = Some(Arc::new(vec![1usize])); + + let result = UpsertWriterFactory::sanity_check( + &row_type, + &primary_keys, + &auto_increment_col_names, + &target_columns, + ); + + assert!( + result + .unwrap_err() + .to_string() + .contains("The target write columns name must contain the primary key columns id") + ); + + // Primary key column not found in row type + let fields = vec![ + DataField::new("id".to_string(), DataTypes::int().as_non_nullable(), None), + DataField::new("name".to_string(), DataTypes::string(), None), + ]; + let row_type = RowType::new(fields); + let primary_keys = vec!["nonexistent_pk".to_string()]; + let auto_increment_col_names = vec![]; + let target_columns = Some(Arc::new(vec![0usize, 1])); + + let result = UpsertWriterFactory::sanity_check( + &row_type, + &primary_keys, + &auto_increment_col_names, + &target_columns, + ); + + assert!( + result + .unwrap_err() + .to_string() + .contains("The specified primary key nonexistent_pk is not in row type") + ); + + // Target columns include auto-increment column + let fields = vec![ + DataField::new("id".to_string(), DataTypes::int().as_non_nullable(), None), + DataField::new( + "seq".to_string(), + DataTypes::bigint().as_non_nullable(), + None, + ), + DataField::new("name".to_string(), DataTypes::string(), None), + ]; + let row_type = RowType::new(fields); + let primary_keys = vec!["id".to_string()]; + let auto_increment_col_names = vec!["seq".to_string()]; + let target_columns = Some(Arc::new(vec![0usize, 1, 2])); + + let result = UpsertWriterFactory::sanity_check( + &row_type, + &primary_keys, + &auto_increment_col_names, + &target_columns, + ); + + assert!(result.unwrap_err().to_string().contains( + "Explicitly specifying values for the auto increment column seq is not allowed." + )); + + // Non-nullable column not in target columns (partial update requires nullable) + let fields = vec![ + DataField::new("id".to_string(), DataTypes::int().as_non_nullable(), None), + DataField::new( + "required_field".to_string(), + DataTypes::string().as_non_nullable(), + None, + ), + DataField::new("optional_field".to_string(), DataTypes::int(), None), + ]; + let row_type = RowType::new(fields); + let primary_keys = vec!["id".to_string()]; + let auto_increment_col_names = vec![]; + let target_columns = Some(Arc::new(vec![0usize])); + + let result = UpsertWriterFactory::sanity_check( + &row_type, + &primary_keys, + &auto_increment_col_names, + &target_columns, + ); + + assert!(result.unwrap_err().to_string().contains( + "Partial Update requires all columns except primary key to be nullable, but column required_field is NOT NULL." + )); + } +} diff --git a/crates/fluss/src/client/table/writer.rs b/crates/fluss/src/client/table/writer.rs index 8a83b5e3..82765457 100644 --- a/crates/fluss/src/client/table/writer.rs +++ b/crates/fluss/src/client/table/writer.rs @@ -16,13 +16,13 @@ // under the License. use crate::client::{WriteRecord, WriterClient}; -use crate::row::GenericRow; +use crate::row::{GenericRow, InternalRow}; use std::sync::Arc; use crate::error::Result; use crate::metadata::{TableInfo, TablePath}; -#[allow(dead_code)] +#[allow(dead_code, async_fn_in_trait)] pub trait TableWriter { async fn flush(&self) -> Result<()>; } @@ -32,12 +32,22 @@ pub trait AppendWriter: TableWriter { async fn append(&self, row: GenericRow) -> Result<()>; } -#[allow(dead_code)] +#[allow(dead_code, async_fn_in_trait)] pub trait UpsertWriter: TableWriter { - async fn upsert(&self, row: GenericRow) -> Result<()>; - async fn delete(&self, row: GenericRow) -> Result<()>; + async fn upsert(&mut self, row: &R) -> Result; + async fn delete(&mut self, row: &R) -> Result; } +/// The result of upserting a record +/// Currently this is an empty struct to allow for compatible evolution in the future +#[derive(Default)] +pub struct UpsertResult; + +/// The result of deleting a record +/// Currently this is an empty struct to allow for compatible evolution in the future +#[derive(Default)] +pub struct DeleteResult; + #[allow(dead_code)] pub struct AbstractTableWriter { table_path: Arc, diff --git a/crates/fluss/src/client/write/accumulator.rs b/crates/fluss/src/client/write/accumulator.rs index 0afc9d4b..fb7b5447 100644 --- a/crates/fluss/src/client/write/accumulator.rs +++ b/crates/fluss/src/client/write/accumulator.rs @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. -use crate::client::write::batch::WriteBatch::ArrowLog; -use crate::client::write::batch::{ArrowLogWriteBatch, WriteBatch}; +use crate::client::write::batch::WriteBatch::{ArrowLog, Kv}; +use crate::client::write::batch::{ArrowLogWriteBatch, KvWriteBatch, WriteBatch}; use crate::client::{LogWriteRecord, Record, ResultHandle, WriteRecord}; use crate::cluster::{BucketLocation, Cluster, ServerNode}; use crate::config::Config; @@ -102,16 +102,29 @@ impl RecordAccumulator { let schema_id = table_info.schema_id; - let mut batch = ArrowLog(ArrowLogWriteBatch::new( - self.batch_id.fetch_add(1, Ordering::Relaxed), - table_path.as_ref().clone(), - schema_id, - arrow_compression_info, - row_type, - bucket_id, - current_time_ms(), - matches!(&record.record, Record::Log(LogWriteRecord::RecordBatch(_))), - )); + let mut batch: WriteBatch = match record.record() { + Record::Log(_) => ArrowLog(ArrowLogWriteBatch::new( + self.batch_id.fetch_add(1, Ordering::Relaxed), + table_path.as_ref().clone(), + schema_id, + arrow_compression_info, + row_type, + bucket_id, + current_time_ms(), + matches!(&record.record, Record::Log(LogWriteRecord::RecordBatch(_))), + )), + Record::Kv(kv_record) => Kv(KvWriteBatch::new( + self.batch_id.fetch_add(1, Ordering::Relaxed), + table_path.as_ref().clone(), + schema_id, + // TODO: Decide how to derive write limit in the absence of java's equivalent of PreAllocatedPagedOutputView + KvWriteBatch::DEFAULT_WRITE_LIMIT, + record.write_format.to_kv_format()?, + bucket_id, + kv_record.target_columns.clone(), + current_time_ms(), + )), + }; let batch_id = batch.batch_id(); @@ -142,6 +155,8 @@ impl RecordAccumulator { ) -> Result { let table_path = &record.table_path; + // TODO: Implement partitioning + let dq = { let mut binding = self .write_batches diff --git a/crates/fluss/src/client/write/batch.rs b/crates/fluss/src/client/write/batch.rs index 01597538..2ddf5192 100644 --- a/crates/fluss/src/client/write/batch.rs +++ b/crates/fluss/src/client/write/batch.rs @@ -20,11 +20,12 @@ use crate::client::broadcast::{BatchWriteResult, BroadcastOnce}; use crate::client::{Record, ResultHandle, WriteRecord}; use crate::compression::ArrowCompressionInfo; use crate::error::{Error, Result}; -use crate::metadata::{DataType, KvFormat, TablePath}; +use crate::metadata::{KvFormat, RowType, TablePath}; use crate::record::MemoryLogRecordsArrowBuilder; use crate::record::kv::KvRecordBatchBuilder; use bytes::Bytes; use std::cmp::max; +use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicI32, Ordering}; #[allow(dead_code)] @@ -192,7 +193,7 @@ impl ArrowLogWriteBatch { table_path: TablePath, schema_id: i32, arrow_compression_info: ArrowCompressionInfo, - row_type: &DataType, + row_type: &RowType, bucket_id: BucketId, create_ms: i64, to_append_record_batch: bool, @@ -249,11 +250,12 @@ impl ArrowLogWriteBatch { pub struct KvWriteBatch { write_batch: InnerWriteBatch, kv_batch_builder: KvRecordBatchBuilder, - target_columns: Option>, + target_columns: Option>>, schema_id: i32, } impl KvWriteBatch { + pub const DEFAULT_WRITE_LIMIT: usize = 256; #[allow(clippy::too_many_arguments)] pub fn new( batch_id: i64, @@ -262,7 +264,7 @@ impl KvWriteBatch { write_limit: usize, kv_format: KvFormat, bucket_id: BucketId, - target_columns: Option>, + target_columns: Option>>, create_ms: i64, ) -> Self { let base = InnerWriteBatch::new(batch_id, table_path, create_ms, bucket_id); @@ -284,7 +286,7 @@ impl KvWriteBatch { } }; - let key = kv_write_record.key; + let key = kv_write_record.key.as_ref(); if self.schema_id != write_record.schema_id { return Err(Error::UnexpectedError { @@ -296,7 +298,7 @@ impl KvWriteBatch { }); }; - if self.target_columns.as_deref() != kv_write_record.target_columns { + if self.target_columns != kv_write_record.target_columns { return Err(Error::UnexpectedError { message: format!( "target columns {:?} of the write record to append are not the same as the current target columns {:?} in the batch.", @@ -307,14 +309,14 @@ impl KvWriteBatch { }); } - let row = kv_write_record.compacted_row.as_ref(); + let row_bytes = kv_write_record.row_bytes(); - if self.is_closed() || !self.kv_batch_builder.has_room_for_row(key, row) { + if self.is_closed() || !self.kv_batch_builder.has_room_for_row(key, row_bytes) { Ok(None) } else { // append successfully self.kv_batch_builder - .append_row(key, row) + .append_row(key, row_bytes) .map_err(|e| Error::UnexpectedError { message: "Failed to append row to KvWriteBatch".to_string(), source: Some(Box::new(e)), diff --git a/crates/fluss/src/client/write/bucket_assigner.rs b/crates/fluss/src/client/write/bucket_assigner.rs index 2370719e..817101a4 100644 --- a/crates/fluss/src/client/write/bucket_assigner.rs +++ b/crates/fluss/src/client/write/bucket_assigner.rs @@ -20,6 +20,7 @@ use crate::cluster::Cluster; use crate::error::Error::IllegalArgument; use crate::error::Result; use crate::metadata::TablePath; +use bytes::Bytes; use rand::Rng; use std::sync::atomic::{AtomicI32, Ordering}; @@ -28,7 +29,7 @@ pub trait BucketAssigner: Sync + Send { fn on_new_batch(&self, cluster: &Cluster, prev_bucket_id: i32); - fn assign_bucket(&self, bucket_key: Option<&[u8]>, cluster: &Cluster) -> Result; + fn assign_bucket(&self, bucket_key: Option<&Bytes>, cluster: &Cluster) -> Result; } #[derive(Debug)] @@ -94,7 +95,7 @@ impl BucketAssigner for StickyBucketAssigner { self.next_bucket(cluster, prev_bucket_id); } - fn assign_bucket(&self, _bucket_key: Option<&[u8]>, cluster: &Cluster) -> Result { + fn assign_bucket(&self, _bucket_key: Option<&Bytes>, cluster: &Cluster) -> Result { let bucket_id = self.current_bucket_id.load(Ordering::Relaxed); if bucket_id < 0 { Ok(self.next_bucket(cluster, bucket_id)) @@ -139,7 +140,7 @@ impl BucketAssigner for HashBucketAssigner { // do nothing } - fn assign_bucket(&self, bucket_key: Option<&[u8]>, _: &Cluster) -> Result { + fn assign_bucket(&self, bucket_key: Option<&Bytes>, _: &Cluster) -> Result { let key = bucket_key.ok_or_else(|| IllegalArgument { message: "no bucket key provided".to_string(), })?; @@ -181,7 +182,7 @@ mod tests { let assigner = HashBucketAssigner::new(4, ::of(None)); let cluster = Cluster::default(); let bucket = assigner - .assign_bucket(Some(b"key"), &cluster) + .assign_bucket(Some(&Bytes::from_static(b"key")), &cluster) .expect("bucket"); assert!((0..4).contains(&bucket)); } diff --git a/crates/fluss/src/client/write/mod.rs b/crates/fluss/src/client/write/mod.rs index 248218e0..dcc67959 100644 --- a/crates/fluss/src/client/write/mod.rs +++ b/crates/fluss/src/client/write/mod.rs @@ -21,9 +21,10 @@ mod batch; use crate::client::broadcast::{self as client_broadcast, BatchWriteResult, BroadcastOnceReceiver}; use crate::error::Error; use crate::metadata::TablePath; -use crate::row::{CompactedRow, GenericRow}; +use crate::row::GenericRow; pub use accumulator::*; use arrow::array::RecordBatch; +use bytes::Bytes; use std::sync::Arc; pub(crate) mod broadcast; @@ -40,7 +41,7 @@ pub use writer_client::WriterClient; pub struct WriteRecord<'a> { record: Record<'a>, table_path: Arc, - bucket_key: Option<&'a [u8]>, + bucket_key: Option, schema_id: i32, write_format: WriteFormat, } @@ -61,25 +62,43 @@ pub enum LogWriteRecord<'a> { RecordBatch(Arc), } +#[derive(Clone)] +pub enum RowBytes<'a> { + Borrowed(&'a [u8]), + Owned(Bytes), +} + +impl<'a> RowBytes<'a> { + pub fn as_slice(&self) -> &[u8] { + match self { + RowBytes::Borrowed(slice) => slice, + RowBytes::Owned(bytes) => bytes.as_ref(), + } + } +} + pub struct KvWriteRecord<'a> { - // only valid for primary key table - key: &'a [u8], - target_columns: Option<&'a [usize]>, - compacted_row: Option>, + key: Bytes, + target_columns: Option>>, + row_bytes: Option>, } impl<'a> KvWriteRecord<'a> { fn new( - key: &'a [u8], - target_columns: Option<&'a [usize]>, - compacted_row: Option>, + key: Bytes, + target_columns: Option>>, + row_bytes: Option>, ) -> Self { KvWriteRecord { key, target_columns, - compacted_row, + row_bytes, } } + + pub fn row_bytes(&self) -> Option<&[u8]> { + self.row_bytes.as_ref().map(|rb| rb.as_slice()) + } } impl<'a> WriteRecord<'a> { @@ -110,17 +129,18 @@ impl<'a> WriteRecord<'a> { pub fn for_upsert( table_path: Arc, schema_id: i32, - bucket_key: &'a [u8], - key: &'a [u8], - target_columns: Option<&'a [usize]>, - row: CompactedRow<'a>, + key: Bytes, + bucket_key: Option, + write_format: WriteFormat, + target_columns: Option>>, + row_bytes: Option>, ) -> Self { Self { - record: Record::Kv(KvWriteRecord::new(key, target_columns, Some(row))), + record: Record::Kv(KvWriteRecord::new(key, target_columns, row_bytes)), table_path, - bucket_key: Some(bucket_key), + bucket_key, schema_id, - write_format: WriteFormat::CompactedKv, + write_format, } } } diff --git a/crates/fluss/src/client/write/write_format.rs b/crates/fluss/src/client/write/write_format.rs index 4a0c0d8a..147152ca 100644 --- a/crates/fluss/src/client/write/write_format.rs +++ b/crates/fluss/src/client/write/write_format.rs @@ -20,6 +20,7 @@ use crate::error::Result; use crate::metadata::KvFormat; use std::fmt::Display; +#[derive(Copy, Clone)] pub enum WriteFormat { ArrowLog, CompactedLog, diff --git a/crates/fluss/src/client/write/writer_client.rs b/crates/fluss/src/client/write/writer_client.rs index 22e03972..65b04f56 100644 --- a/crates/fluss/src/client/write/writer_client.rs +++ b/crates/fluss/src/client/write/writer_client.rs @@ -21,6 +21,7 @@ use crate::client::write::sender::Sender; use crate::client::{RecordAccumulator, ResultHandle, WriteRecord}; use crate::config::Config; use crate::metadata::TablePath; +use bytes::Bytes; use dashmap::DashMap; use std::sync::Arc; use tokio::sync::mpsc; @@ -90,8 +91,9 @@ impl WriterClient { pub async fn send(&self, record: &WriteRecord<'_>) -> Result { let table_path = &record.table_path; let cluster = self.metadata.get_cluster(); + let bucket_key = record.bucket_key.as_ref(); - let (bucket_assigner, bucket_id) = self.assign_bucket(table_path)?; + let (bucket_assigner, bucket_id) = self.assign_bucket(bucket_key, table_path)?; let mut result = self .accumulate @@ -101,7 +103,7 @@ impl WriterClient { if result.abort_record_for_new_batch { let prev_bucket_id = bucket_id; bucket_assigner.on_new_batch(&cluster, prev_bucket_id); - let bucket_id = bucket_assigner.assign_bucket(None, &cluster)?; + let bucket_id = bucket_assigner.assign_bucket(bucket_key, &cluster)?; result = self .accumulate .append(record, bucket_id, &cluster, false) @@ -116,6 +118,7 @@ impl WriterClient { } fn assign_bucket( &self, + bucket_key: Option<&Bytes>, table_path: &Arc, ) -> Result<(Arc>, i32)> { let cluster = self.metadata.get_cluster(); @@ -129,7 +132,7 @@ impl WriterClient { assigner } }; - let bucket_id = bucket_assigner.assign_bucket(None, &cluster)?; + let bucket_id = bucket_assigner.assign_bucket(bucket_key, &cluster)?; Ok((bucket_assigner, bucket_id)) } diff --git a/crates/fluss/src/metadata/table.rs b/crates/fluss/src/metadata/table.rs index da85b0c2..8204e7c4 100644 --- a/crates/fluss/src/metadata/table.rs +++ b/crates/fluss/src/metadata/table.rs @@ -16,7 +16,7 @@ // under the License. use crate::compression::ArrowCompressionInfo; -use crate::error::Error::InvalidTableError; +use crate::error::Error::{IllegalArgument, InvalidTableError}; use crate::error::{Error, Result}; use crate::metadata::DataLakeFormat; use crate::metadata::datatype::{DataField, DataType, RowType}; @@ -97,8 +97,8 @@ impl PrimaryKey { pub struct Schema { columns: Vec, primary_key: Option, - // must be Row data type kind - row_type: DataType, + row_type: RowType, + auto_increment_col_names: Vec, } impl Schema { @@ -118,7 +118,7 @@ impl Schema { self.primary_key.as_ref() } - pub fn row_type(&self) -> &DataType { + pub fn row_type(&self) -> &RowType { &self.row_type } @@ -144,12 +144,17 @@ impl Schema { pub fn column_names(&self) -> Vec<&str> { self.columns.iter().map(|c| c.name.as_str()).collect() } + + pub fn auto_increment_col_names(&self) -> &Vec { + &self.auto_increment_col_names + } } #[derive(Debug, Default)] pub struct SchemaBuilder { columns: Vec, primary_key: Option, + auto_increment_col_names: Vec, } impl SchemaBuilder { @@ -198,9 +203,36 @@ impl SchemaBuilder { self } + /// Declares a column to be auto-incremented. With an auto-increment column in the table, + /// whenever a new row is inserted into the table, the new row will be assigned with the next + /// available value from the auto-increment sequence. A table can have at most one auto + /// increment column. + pub fn enable_auto_increment(mut self, column_name: &str) -> Result { + if !self.auto_increment_col_names.is_empty() { + return Err(IllegalArgument { + message: "Multiple auto increment columns are not supported yet.".to_string(), + }); + } + + self.auto_increment_col_names.push(column_name.to_string()); + Ok(self) + } + pub fn build(&mut self) -> Result { let columns = Self::normalize_columns(&mut self.columns, self.primary_key.as_ref())?; + let column_names: HashSet<_> = columns.iter().map(|c| &c.name).collect(); + for auto_inc_col in &self.auto_increment_col_names { + if !column_names.contains(auto_inc_col) { + return Err(IllegalArgument { + message: format!( + "Auto increment column '{}' is not found in the schema columns.", + auto_inc_col + ), + }); + } + } + let data_fields = columns .iter() .map(|c| DataField { @@ -213,7 +245,8 @@ impl SchemaBuilder { Ok(Schema { columns, primary_key: self.primary_key.clone(), - row_type: DataType::Row(RowType::new(data_fields)), + row_type: RowType::new(data_fields), + auto_increment_col_names: self.auto_increment_col_names.clone(), }) } @@ -500,7 +533,7 @@ impl TableDescriptor { bucket_keys.retain(|k| !partition_keys.contains(k)); if bucket_keys.is_empty() { - return Err(Error::InvalidTableError { + return Err(InvalidTableError { message: format!( "Primary Key constraint {:?} should not be same with partition fields {:?}.", schema.primary_key().unwrap().column_names(), @@ -580,7 +613,7 @@ pub enum LogFormat { } impl Display for LogFormat { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { LogFormat::ARROW => { write!(f, "ARROW")?; @@ -612,7 +645,7 @@ pub enum KvFormat { } impl Display for KvFormat { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { KvFormat::COMPACTED => write!(f, "COMPACTED")?, KvFormat::INDEXED => write!(f, "INDEXED")?, @@ -626,7 +659,7 @@ impl KvFormat { match s.to_uppercase().as_str() { "INDEXED" => Ok(KvFormat::INDEXED), "COMPACTED" => Ok(KvFormat::COMPACTED), - _ => Err(Error::InvalidTableError { + _ => Err(InvalidTableError { message: format!("Unknown kv format: {s}"), }), } @@ -692,7 +725,7 @@ pub struct TableInfo { pub table_id: i64, pub schema_id: i32, pub schema: Schema, - pub row_type: DataType, + pub row_type: RowType, pub primary_keys: Vec, pub physical_primary_keys: Vec, pub bucket_keys: Vec, @@ -708,10 +741,7 @@ pub struct TableInfo { impl TableInfo { pub fn row_type(&self) -> &RowType { - match &self.row_type { - DataType::Row(row_type) => row_type, - _ => panic!("should be a row type"), - } + &self.row_type } } @@ -847,7 +877,7 @@ impl TableInfo { &self.schema } - pub fn get_row_type(&self) -> &DataType { + pub fn get_row_type(&self) -> &RowType { &self.row_type } @@ -946,8 +976,8 @@ impl TableInfo { } } -impl fmt::Display for TableInfo { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { +impl Display for TableInfo { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { write!( f, "TableInfo{{ table_path={:?}, table_id={}, schema_id={}, schema={:?}, physical_primary_keys={:?}, bucket_keys={:?}, partition_keys={:?}, num_buckets={}, properties={:?}, custom_properties={:?}, comment={:?}, created_time={}, modified_time={} }}", @@ -998,7 +1028,7 @@ impl TableBucket { } impl Display for TableBucket { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { if let Some(partition_id) = self.partition_id { write!( f, diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index 3c46f9b5..3c94b720 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -18,7 +18,7 @@ use crate::client::{LogWriteRecord, Record, WriteRecord}; use crate::compression::ArrowCompressionInfo; use crate::error::{Error, Result}; -use crate::metadata::DataType; +use crate::metadata::{DataType, RowType}; use crate::record::{ChangeType, ScanRecord}; use crate::row::{ColumnarRow, GenericRow}; use arrow::array::{ @@ -48,6 +48,7 @@ use std::{ sync::Arc, }; +use crate::error::Error::IllegalArgument; use arrow::ipc::writer::IpcWriteOptions; /// const for record batch pub const BASE_OFFSET_LENGTH: usize = 8; @@ -171,7 +172,7 @@ pub struct RowAppendRecordBatchBuilder { } impl RowAppendRecordBatchBuilder { - pub fn new(row_type: &DataType) -> Self { + pub fn new(row_type: &RowType) -> Self { let schema_ref = to_arrow_schema(row_type); let builders = Mutex::new( schema_ref @@ -251,7 +252,7 @@ impl ArrowRecordBatchInnerBuilder for RowAppendRecordBatchBuilder { impl MemoryLogRecordsArrowBuilder { pub fn new( schema_id: i32, - row_type: &DataType, + row_type: &RowType, to_append_record_batch: bool, arrow_compression_info: ArrowCompressionInfo, ) -> Self { @@ -329,7 +330,7 @@ impl MemoryLogRecordsArrowBuilder { // write arrow batch bytes let mut cursor = Cursor::new(&mut batch_bytes[..]); cursor.set_position(RECORD_BATCH_HEADER_SIZE as u64); - cursor.write_all(real_arrow_batch_bytes).unwrap(); + cursor.write_all(real_arrow_batch_bytes)?; let calcute_crc_bytes = &cursor.get_ref()[SCHEMA_ID_OFFSET..]; // then update crc @@ -562,16 +563,17 @@ impl LogRecordBatch { return Ok(RecordBatch::new_empty(read_context.target_schema.clone())); } - let data = self.data.get(RECORDS_OFFSET..).ok_or_else(|| { - crate::error::Error::UnexpectedError { + let data = self + .data + .get(RECORDS_OFFSET..) + .ok_or_else(|| Error::UnexpectedError { message: format!( "Corrupt log record batch: data length {} is less than RECORDS_OFFSET {}", self.data.len(), RECORDS_OFFSET ), source: None, - } - })?; + })?; read_context.record_batch(data) } } @@ -639,27 +641,20 @@ fn parse_ipc_message( Ok((batch_metadata, body_buffer, message.version())) } -pub fn to_arrow_schema(fluss_schema: &DataType) -> SchemaRef { - match &fluss_schema { - DataType::Row(row_type) => { - let fields: Vec = row_type - .fields() - .iter() - .map(|f| { - Field::new( - f.name(), - to_arrow_type(f.data_type()), - f.data_type().is_nullable(), - ) - }) - .collect(); +pub fn to_arrow_schema(fluss_schema: &RowType) -> SchemaRef { + let fields: Vec = fluss_schema + .fields() + .iter() + .map(|f| { + Field::new( + f.name(), + to_arrow_type(f.data_type()), + f.data_type().is_nullable(), + ) + }) + .collect(); - SchemaRef::new(arrow_schema::Schema::new(fields)) - } - _ => { - panic!("must be row data type.") - } - } + SchemaRef::new(arrow_schema::Schema::new(fields)) } pub fn to_arrow_type(fluss_type: &DataType) -> ArrowDataType { @@ -813,7 +808,7 @@ impl ReadContext { let mut reordering_indexes = Vec::with_capacity(projected_fields.len()); for &original_idx in &projected_fields { let pos = sorted_fields.binary_search(&original_idx).map_err(|_| { - Error::IllegalArgument { + IllegalArgument { message: format!( "Projection index {original_idx} is invalid for the current schema." ), @@ -857,7 +852,7 @@ impl ReadContext { let field_count = schema.fields().len(); for &index in projected_fields { if index >= field_count { - return Err(Error::IllegalArgument { + return Err(IllegalArgument { message: format!( "Projection index {index} is out of bounds for schema with {field_count} fields." ), @@ -869,7 +864,7 @@ impl ReadContext { pub fn project_schema(schema: SchemaRef, projected_fields: &[usize]) -> Result { Ok(SchemaRef::new(schema.project(projected_fields).map_err( - |e| Error::IllegalArgument { + |e| IllegalArgument { message: format!("Invalid projection: {e}"), }, )?)) @@ -1060,7 +1055,6 @@ pub struct MyVec(pub StreamReader); #[cfg(test)] mod tests { use super::*; - use crate::error::Error; use crate::metadata::{DataField, DataTypes}; #[test] @@ -1217,14 +1211,14 @@ mod tests { #[test] fn projection_rejects_out_of_bounds_index() { - let row_type = DataTypes::row(vec![ + let row_type = RowType::new(vec![ DataField::new("id".to_string(), DataTypes::int(), None), DataField::new("name".to_string(), DataTypes::string(), None), ]); let schema = to_arrow_schema(&row_type); let result = ReadContext::with_projection_pushdown(schema, vec![0, 2], false); - assert!(matches!(result, Err(Error::IllegalArgument { .. }))); + assert!(matches!(result, Err(IllegalArgument { .. }))); } #[test] diff --git a/crates/fluss/src/record/kv/kv_record_batch.rs b/crates/fluss/src/record/kv/kv_record_batch.rs index 32f712f8..eb89d69c 100644 --- a/crates/fluss/src/record/kv/kv_record_batch.rs +++ b/crates/fluss/src/record/kv/kv_record_batch.rs @@ -370,12 +370,12 @@ impl Iterator for KvRecordIterator { #[cfg(test)] mod tests { use super::*; - use crate::metadata::{DataTypes, KvFormat, RowType}; + use crate::metadata::{DataTypes, KvFormat}; use crate::record::kv::test_util::TestReadContext; use crate::record::kv::{CURRENT_KV_MAGIC_VALUE, KvRecordBatchBuilder}; use crate::row::InternalRow; use crate::row::binary::BinaryWriter; - use crate::row::compacted::CompactedRow; + use bytes::{BufMut, BytesMut}; #[test] @@ -417,12 +417,11 @@ mod tests { let mut value1_writer = CompactedRowWriter::new(1); value1_writer.write_bytes(&[1, 2, 3, 4, 5]); - let row_type = RowType::with_data_types([DataTypes::bytes()].to_vec()); - let row = &CompactedRow::from_bytes(&row_type, value1_writer.buffer()); - builder.append_row(key1, Some(row)).unwrap(); + let row_bytes = value1_writer.buffer(); + builder.append_row(key1, Some(row_bytes)).unwrap(); let key2 = b"key2"; - builder.append_row::(key2, None).unwrap(); + builder.append_row(key2, None).unwrap(); let bytes = builder.build().unwrap(); diff --git a/crates/fluss/src/record/kv/kv_record_batch_builder.rs b/crates/fluss/src/record/kv/kv_record_batch_builder.rs index e3da8640..0b65500f 100644 --- a/crates/fluss/src/record/kv/kv_record_batch_builder.rs +++ b/crates/fluss/src/record/kv/kv_record_batch_builder.rs @@ -28,7 +28,6 @@ use crate::record::kv::kv_record_batch::{ WRITE_CLIENT_ID_OFFSET, }; use crate::record::kv::{CURRENT_KV_MAGIC_VALUE, NO_BATCH_SEQUENCE, NO_WRITER_ID}; -use crate::row::BinaryRow; use bytes::{Bytes, BytesMut}; use std::io; @@ -88,14 +87,13 @@ impl KvRecordBatchBuilder { } } - /// Check if there is room for a new record containing the given key and row. + /// Check if there is room for a new record containing the given key and row bytes. /// If no records have been appended, this always returns true. - pub fn has_room_for_row(&self, key: &[u8], row: Option<&R>) -> bool { - let value = row.map(|r| r.as_bytes()); - self.size_in_bytes + KvRecord::size_of(key, value) <= self.write_limit + pub fn has_room_for_row(&self, key: &[u8], row_bytes: Option<&[u8]>) -> bool { + self.size_in_bytes + KvRecord::size_of(key, row_bytes) <= self.write_limit } - /// Append a KV record with a row value to the batch. + /// Append a KV record with row bytes to the batch. /// /// Returns an error if: /// - The builder has been aborted @@ -103,7 +101,7 @@ impl KvRecordBatchBuilder { /// - Adding this record would exceed the write limit /// - The maximum number of records is exceeded /// - The KV format is not COMPACTED - pub fn append_row(&mut self, key: &[u8], row: Option<&R>) -> io::Result<()> { + pub fn append_row(&mut self, key: &[u8], row_bytes: Option<&[u8]>) -> io::Result<()> { if self.kv_format != KvFormat::COMPACTED { return Err(io::Error::new( io::ErrorKind::InvalidInput, @@ -134,8 +132,7 @@ impl KvRecordBatchBuilder { )); } - let value = row.map(|r| r.as_bytes()); - let record_size = KvRecord::size_of(key, value); + let record_size = KvRecord::size_of(key, row_bytes); if self.size_in_bytes + record_size > self.write_limit { return Err(io::Error::new( io::ErrorKind::WriteZero, @@ -146,7 +143,7 @@ impl KvRecordBatchBuilder { )); } - let record_byte_size = KvRecord::write_to_buf(&mut self.buffer, key, value)?; + let record_byte_size = KvRecord::write_to_buf(&mut self.buffer, key, row_bytes)?; debug_assert_eq!(record_byte_size, record_size, "Record size mismatch"); self.current_record_number += 1; @@ -349,12 +346,12 @@ mod tests { let key1 = b"key1"; let value1 = create_test_row(b"value1"); - assert!(builder.has_room_for_row(key1, Some(&value1))); - builder.append_row(key1, Some(&value1)).unwrap(); + assert!(builder.has_room_for_row(key1, Some(value1.as_bytes()))); + builder.append_row(key1, Some(value1.as_bytes())).unwrap(); let key2 = b"key2"; - assert!(builder.has_room_for_row::(key2, None)); - builder.append_row::(key2, None).unwrap(); + assert!(builder.has_room_for_row(key2, None)); + builder.append_row(key2, None).unwrap(); builder.close().unwrap(); assert!(builder.is_closed()); @@ -369,35 +366,34 @@ mod tests { // Test lifecycle: abort behavior let mut builder = KvRecordBatchBuilder::new(1, 4096, KvFormat::COMPACTED); let value = create_test_row(b"value"); - builder.append_row(b"key", Some(&value)).unwrap(); + builder.append_row(b"key", Some(value.as_bytes())).unwrap(); builder.abort(); - assert!(builder.append_row::(b"key2", None).is_err()); + assert!(builder.append_row(b"key2", None).is_err()); assert!(builder.build().is_err()); assert!(builder.close().is_err()); // Test lifecycle: close behavior let mut builder = KvRecordBatchBuilder::new(1, 4096, KvFormat::COMPACTED); let value = create_test_row(b"value"); - builder.append_row(b"key", Some(&value)).unwrap(); + builder.append_row(b"key", Some(value.as_bytes())).unwrap(); builder.close().unwrap(); - assert!(builder.append_row::(b"key2", None).is_err()); + assert!(builder.append_row(b"key2", None).is_err()); assert!(builder.build().is_ok()); // Test KvFormat validation let mut row_writer = CompactedRowWriter::new(1); row_writer.write_int(42); - let row_type = RowType::with_data_types(vec![DataTypes::int()]); - let row = &CompactedRow::from_bytes(&row_type, row_writer.buffer()); + let row_bytes = row_writer.buffer(); // INDEXED format should reject append_row let mut indexed_builder = KvRecordBatchBuilder::new(1, 4096, KvFormat::INDEXED); - let result = indexed_builder.append_row(b"key", Some(row)); + let result = indexed_builder.append_row(b"key", Some(row_bytes)); assert!(result.is_err()); assert_eq!(result.unwrap_err().kind(), io::ErrorKind::InvalidInput); // COMPACTED format should accept append_row let mut compacted_builder = KvRecordBatchBuilder::new(1, 4096, KvFormat::COMPACTED); - let result = compacted_builder.append_row(b"key", Some(row)); + let result = compacted_builder.append_row(b"key", Some(row_bytes)); assert!(result.is_ok()); } @@ -410,15 +406,17 @@ mod tests { let large_key = vec![0u8; 1000]; let large_value = vec![1u8; 1000]; let large_row = create_test_row(&large_value); - assert!(!builder.has_room_for_row(&large_key, Some(&large_row))); + assert!(!builder.has_room_for_row(&large_key, Some(large_row.as_bytes()))); let small_value = create_test_row(b"value"); - assert!(builder.has_room_for_row(b"key", Some(&small_value))); + assert!(builder.has_room_for_row(b"key", Some(small_value.as_bytes()))); // Test append enforcement - add small record first - builder.append_row(b"key", Some(&small_value)).unwrap(); + builder + .append_row(b"key", Some(small_value.as_bytes())) + .unwrap(); // Try to add large record that exceeds limit (reuse large_row from above) - let result = builder.append_row(b"key2", Some(&large_row)); + let result = builder.append_row(b"key2", Some(large_row.as_bytes())); assert!(result.is_err()); assert_eq!(result.unwrap_err().kind(), io::ErrorKind::WriteZero); } @@ -429,10 +427,12 @@ mod tests { builder.current_record_number = i32::MAX - 1; let value1 = create_test_row(b"value1"); - builder.append_row(b"key1", Some(&value1)).unwrap(); + builder + .append_row(b"key1", Some(value1.as_bytes())) + .unwrap(); let value2 = create_test_row(b"value2"); - let result = builder.append_row(b"key2", Some(&value2)); + let result = builder.append_row(b"key2", Some(value2.as_bytes())); assert!(result.is_err()); assert_eq!(result.unwrap_err().kind(), io::ErrorKind::InvalidInput); } @@ -452,13 +452,17 @@ mod tests { builder.set_writer_state(100, 5); let value1 = create_test_row(b"value1"); - builder.append_row(b"key1", Some(&value1)).unwrap(); + builder + .append_row(b"key1", Some(value1.as_bytes())) + .unwrap(); let bytes1 = builder.build().unwrap(); let len1 = bytes1.len(); // Append another record - this should invalidate the cache let value2 = create_test_row(b"value2"); - builder.append_row(b"key2", Some(&value2)).unwrap(); + builder + .append_row(b"key2", Some(value2.as_bytes())) + .unwrap(); let bytes2 = builder.build().unwrap(); let len2 = bytes2.len(); @@ -472,7 +476,7 @@ mod tests { let mut builder = KvRecordBatchBuilder::new(1, 4096, KvFormat::COMPACTED); builder.set_writer_state(100, 5); let value = create_test_row(b"value"); - builder.append_row(b"key", Some(&value)).unwrap(); + builder.append_row(b"key", Some(value.as_bytes())).unwrap(); let bytes1 = builder.build().unwrap(); // Change writer state - this should invalidate the cache @@ -494,7 +498,6 @@ mod tests { fn test_builder_with_compacted_row_writer() -> crate::error::Result<()> { use crate::record::kv::KvRecordBatch; use crate::row::InternalRow; - use crate::row::compacted::CompactedRow; let mut builder = KvRecordBatchBuilder::new(1, 100000, KvFormat::COMPACTED); builder.set_writer_state(100, 5); @@ -504,26 +507,25 @@ mod tests { row_writer1.write_int(42); row_writer1.write_string("hello"); - let row_type = RowType::with_data_types([DataTypes::int(), DataTypes::string()].to_vec()); - let row1 = &CompactedRow::from_bytes(&row_type, row_writer1.buffer()); + let row_bytes1 = row_writer1.buffer(); let key1 = b"key1"; - assert!(builder.has_room_for_row(key1, Some(row1))); - builder.append_row(key1, Some(row1))?; + assert!(builder.has_room_for_row(key1, Some(row_bytes1))); + builder.append_row(key1, Some(row_bytes1))?; // Create and append second record let mut row_writer2 = CompactedRowWriter::new(2); row_writer2.write_int(100); row_writer2.write_string("world"); - let row2 = &CompactedRow::from_bytes(&row_type, row_writer2.buffer()); + let row_bytes2 = row_writer2.buffer(); let key2 = b"key2"; - builder.append_row(key2, Some(row2))?; + builder.append_row(key2, Some(row_bytes2))?; // Append a deletion record let key3 = b"key3"; - builder.append_row::(key3, None)?; + builder.append_row(key3, None)?; // Build and verify builder.close()?; diff --git a/crates/fluss/src/record/kv/kv_record_read_context.rs b/crates/fluss/src/record/kv/kv_record_read_context.rs index fe6c6f05..92363216 100644 --- a/crates/fluss/src/record/kv/kv_record_read_context.rs +++ b/crates/fluss/src/record/kv/kv_record_read_context.rs @@ -18,7 +18,7 @@ //! Default implementation of ReadContext with decoder caching. use super::ReadContext; -use crate::error::{Error, Result}; +use crate::error::Result; use crate::metadata::{KvFormat, Schema}; use crate::row::{RowDecoder, RowDecoderFactory}; use std::collections::HashMap; @@ -85,20 +85,7 @@ impl ReadContext for KvRecordReadContext { // Build decoder outside the lock to avoid blocking other threads let schema = self.schema_getter.get_schema(schema_id)?; - let row_type = match schema.row_type() { - crate::metadata::DataType::Row(row_type) => row_type.clone(), - other => { - return Err(Error::IoUnexpectedError { - message: format!( - "Schema {schema_id} has invalid row type: expected Row, got {other:?}" - ), - source: std::io::Error::new( - std::io::ErrorKind::InvalidData, - "Invalid row type", - ), - }); - } - }; + let row_type = schema.row_type().clone(); // Create decoder outside lock let decoder = RowDecoderFactory::create(self.kv_format.clone(), row_type)?; diff --git a/crates/fluss/src/row/compacted/compacted_row.rs b/crates/fluss/src/row/compacted/compacted_row.rs index bc68ea10..35d684db 100644 --- a/crates/fluss/src/row/compacted/compacted_row.rs +++ b/crates/fluss/src/row/compacted/compacted_row.rs @@ -15,9 +15,10 @@ // specific language governing permissions and limitations // under the License. +use crate::client::WriteFormat; use crate::metadata::RowType; use crate::row::compacted::compacted_row_reader::{CompactedRowDeserializer, CompactedRowReader}; -use crate::row::{BinaryRow, GenericRow, InternalRow}; +use crate::row::{GenericRow, InternalRow}; use std::sync::{Arc, OnceLock}; // Reference implementation: @@ -69,10 +70,8 @@ impl<'a> CompactedRow<'a> { self.decoded_row .get_or_init(|| self.deserializer.deserialize(&self.reader)) } -} -impl BinaryRow for CompactedRow<'_> { - fn as_bytes(&self) -> &[u8] { + pub fn as_bytes(&self) -> &[u8] { self.data } } @@ -153,6 +152,14 @@ impl<'a> InternalRow for CompactedRow<'a> { fn get_timestamp_ltz(&self, pos: usize, precision: u32) -> crate::row::datum::TimestampLtz { self.decoded_row().get_timestamp_ltz(pos, precision) } + + fn as_encoded_bytes(&self, write_format: WriteFormat) -> Option<&[u8]> { + match write_format { + WriteFormat::CompactedKv => Some(self.as_bytes()), + WriteFormat::ArrowLog => None, + WriteFormat::CompactedLog => None, + } + } } #[cfg(test)] diff --git a/crates/fluss/src/row/compacted/compacted_row_writer.rs b/crates/fluss/src/row/compacted/compacted_row_writer.rs index d1ad047a..ac0100ee 100644 --- a/crates/fluss/src/row/compacted/compacted_row_writer.rs +++ b/crates/fluss/src/row/compacted/compacted_row_writer.rs @@ -63,6 +63,18 @@ impl CompactedRowWriter { Bytes::copy_from_slice(&self.buffer[..self.position]) } + /// Flushes writer's ByteMut, resetting writer's inner state and returns Byte of flushed state + pub fn flush_bytes(&mut self) -> Bytes { + let used = self.buffer.split_to(self.position); + self.position = self.header_size_in_bytes; + if self.buffer.len() < self.header_size_in_bytes { + self.buffer.resize(self.header_size_in_bytes.max(64), 0); + } else { + self.buffer[..self.header_size_in_bytes].fill(0); + } + used.freeze() + } + fn ensure_capacity(&mut self, need_len: usize) { if (self.buffer.len() - self.position) < need_len { let new_len = cmp::max(self.buffer.len() * 2, self.buffer.len() + need_len); diff --git a/crates/fluss/src/row/encode/compacted_row_encoder.rs b/crates/fluss/src/row/encode/compacted_row_encoder.rs index 48b9f3ff..20f28820 100644 --- a/crates/fluss/src/row/encode/compacted_row_encoder.rs +++ b/crates/fluss/src/row/encode/compacted_row_encoder.rs @@ -20,8 +20,9 @@ use crate::error::Result; use crate::metadata::RowType; use crate::row::Datum; use crate::row::binary::{BinaryRowFormat, BinaryWriter, ValueWriter}; -use crate::row::compacted::{CompactedRow, CompactedRowDeserializer, CompactedRowWriter}; -use crate::row::encode::{BinaryRow, RowEncoder}; +use crate::row::compacted::{CompactedRowDeserializer, CompactedRowWriter}; +use crate::row::encode::RowEncoder; +use bytes::Bytes; use std::sync::Arc; #[allow(dead_code)] @@ -65,12 +66,8 @@ impl RowEncoder for CompactedRowEncoder<'_> { .write_value(&mut self.writer, pos, &value) } - fn finish_row(&mut self) -> Result { - Ok(CompactedRow::deserialize( - Arc::clone(&self.compacted_row_deserializer), - self.arity, - self.writer.buffer(), - )) + fn finish_row(&mut self) -> Result { + Ok(self.writer.flush_bytes()) } fn close(&mut self) -> Result<()> { diff --git a/crates/fluss/src/row/encode/mod.rs b/crates/fluss/src/row/encode/mod.rs index c294ecf1..468d4d18 100644 --- a/crates/fluss/src/row/encode/mod.rs +++ b/crates/fluss/src/row/encode/mod.rs @@ -22,7 +22,7 @@ use crate::error::Result; use crate::metadata::{DataLakeFormat, KvFormat, RowType}; use crate::row::encode::compacted_key_encoder::CompactedKeyEncoder; use crate::row::encode::compacted_row_encoder::CompactedRowEncoder; -use crate::row::{BinaryRow, Datum, InternalRow}; +use crate::row::{Datum, InternalRow}; use bytes::Bytes; /// An interface for encoding key of row into bytes. @@ -31,8 +31,9 @@ pub trait KeyEncoder { fn encode_key(&mut self, row: &dyn InternalRow) -> Result; } -#[allow(dead_code)] -impl dyn KeyEncoder { +pub struct KeyEncoderFactory; + +impl KeyEncoderFactory { /// Create a key encoder to encode the key bytes of the input row. /// # Arguments /// * `row_type` - the row type of the input row @@ -43,23 +44,21 @@ impl dyn KeyEncoder { /// key encoder pub fn of( row_type: &RowType, - key_fields: Vec, - data_lake_format: Option, + key_fields: &[String], + data_lake_format: &Option, ) -> Result> { match data_lake_format { Some(DataLakeFormat::Paimon) => { unimplemented!("KeyEncoder for Paimon format is currently unimplemented") } Some(DataLakeFormat::Lance) => Ok(Box::new(CompactedKeyEncoder::create_key_encoder( - row_type, - key_fields.as_slice(), + row_type, key_fields, )?)), Some(DataLakeFormat::Iceberg) => { unimplemented!("KeyEncoder for Iceberg format is currently unimplemented") } None => Ok(Box::new(CompactedKeyEncoder::create_key_encoder( - row_type, - key_fields.as_slice(), + row_type, key_fields, )?)), } } @@ -96,7 +95,7 @@ pub trait RowEncoder { /// /// # Returns /// * the written row - fn finish_row(&mut self) -> Result; + fn finish_row(&mut self) -> Result; /// Closes the row encoder /// @@ -110,8 +109,8 @@ pub struct RowEncoderFactory {} #[allow(dead_code)] impl RowEncoderFactory { - pub fn create(kv_format: KvFormat, row_type: &RowType) -> Result { - Self::create_for_field_types(kv_format, row_type.clone()) + pub fn create(kv_format: KvFormat, row_type: RowType) -> Result { + Self::create_for_field_types(kv_format, row_type) } pub fn create_for_field_types( diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs index d2f640e4..bc8134dd 100644 --- a/crates/fluss/src/row/mod.rs +++ b/crates/fluss/src/row/mod.rs @@ -23,9 +23,11 @@ mod decimal; pub mod binary; pub mod compacted; pub mod encode; -mod field_getter; +pub mod field_getter; mod row_decoder; +use crate::client::WriteFormat; +use bytes::Bytes; pub use column::*; pub use compacted::CompactedRow; pub use datum::*; @@ -33,9 +35,23 @@ pub use decimal::{Decimal, MAX_COMPACT_PRECISION}; pub use encode::KeyEncoder; pub use row_decoder::{CompactedRowDecoder, RowDecoder, RowDecoderFactory}; -pub trait BinaryRow: InternalRow { +pub struct BinaryRow<'a> { + data: BinaryDataWrapper<'a>, +} + +pub enum BinaryDataWrapper<'a> { + Bytes(Bytes), + Ref(&'a [u8]), +} + +impl<'a> BinaryRow<'a> { /// Returns the binary representation of this row as a byte slice. - fn as_bytes(&self) -> &[u8]; + pub fn as_bytes(&'a self) -> &'a [u8] { + match &self.data { + BinaryDataWrapper::Bytes(bytes) => bytes.as_ref(), + BinaryDataWrapper::Ref(r) => r, + } + } } // TODO make functions return Result for better error handling @@ -99,6 +115,11 @@ pub trait InternalRow { /// Returns the binary value at the given position fn get_bytes(&self, pos: usize) -> &[u8]; + + /// Returns encoded bytes if already encoded + fn as_encoded_bytes(&self, _write_format: WriteFormat) -> Option<&[u8]> { + None + } } pub struct GenericRow<'a> {