diff --git a/crates/examples/Cargo.toml b/crates/examples/Cargo.toml index 117ceb27..16629bed 100644 --- a/crates/examples/Cargo.toml +++ b/crates/examples/Cargo.toml @@ -33,4 +33,8 @@ path = "src/example_table.rs" [[example]] name = "example-upsert-lookup" -path = "src/example_kv_table.rs" \ No newline at end of file +path = "src/example_kv_table.rs" + +[[example]] +name = "example-partitioned-upsert-lookup" +path = "src/example_partitioned_kv_table.rs" \ No newline at end of file diff --git a/crates/examples/src/example_partitioned_kv_table.rs b/crates/examples/src/example_partitioned_kv_table.rs new file mode 100644 index 00000000..a5e76fa3 --- /dev/null +++ b/crates/examples/src/example_partitioned_kv_table.rs @@ -0,0 +1,153 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use clap::Parser; +use fluss::client::{FlussAdmin, FlussConnection, UpsertWriter}; +use fluss::config::Config; +use fluss::error::Result; +use fluss::metadata::{DataTypes, PartitionSpec, Schema, TableDescriptor, TablePath}; +use fluss::row::{GenericRow, InternalRow}; +use std::collections::HashMap; + +#[tokio::main] +#[allow(dead_code)] +pub async fn main() -> Result<()> { + let mut config = Config::parse(); + config.bootstrap_server = Some("127.0.0.1:9123".to_string()); + + let conn = FlussConnection::new(config).await?; + + let table_descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("id", DataTypes::int()) + .column("region", DataTypes::string()) + .column("zone", DataTypes::bigint()) + .column("score", DataTypes::bigint()) + .primary_key(vec![ + "id".to_string(), + "region".to_string(), + "zone".to_string(), + ]) + .build()?, + ) + .partitioned_by(vec!["region".to_string(), "zone".to_string()]) + .build()?; + + let table_path = TablePath::new("fluss".to_owned(), "partitioned_kv_example".to_owned()); + + let mut admin = conn.get_admin().await?; + admin + .create_table(&table_path, &table_descriptor, true) + .await?; + println!( + "Created KV Table:\n {}\n", + admin.get_table(&table_path).await? + ); + + create_partition(&table_path, &mut admin, "APAC", 1).await; + create_partition(&table_path, &mut admin, "EMEA", 2).await; + create_partition(&table_path, &mut admin, "US", 3).await; + + let table = conn.get_table(&table_path).await?; + let table_upsert = table.new_upsert()?; + let mut upsert_writer = table_upsert.create_writer()?; + + println!("\n=== Upserting ==="); + for (id, region, zone, score) in [ + (1001, "APAC", 1i64, 1234i64), + (1002, "EMEA", 2, 2234), + (1003, "US", 3, 3234), + ] { + let mut row = GenericRow::new(4); + row.set_field(0, id); + row.set_field(1, region); + row.set_field(2, zone); + row.set_field(3, score); + upsert_writer.upsert(&row).await?; + println!("Upserted: {row:?}"); + } + + println!("\n=== Looking up ==="); + let mut lookuper = table.new_lookup()?.create_lookuper()?; + + for (id, region, zone) in [(1001, "APAC", 1i64), (1002, "EMEA", 2), (1003, "US", 3)] { + let result = lookuper + .lookup(&make_key(id, region, zone)) + .await + .expect("lookup"); + let row = result.get_single_row()?.unwrap(); + println!( + "Found id={id}: region={}, zone={}, score={}", + row.get_string(1), + row.get_long(2), + row.get_long(3) + ); + } + + println!("\n=== Updating ==="); + let mut row = GenericRow::new(4); + row.set_field(0, 1001); + row.set_field(1, "APAC"); + row.set_field(2, 1i64); + row.set_field(3, 4321i64); + upsert_writer.upsert(&row).await?; + println!("Updated: {row:?}"); + + let result = lookuper.lookup(&make_key(1001, "APAC", 1)).await?; + let row = result.get_single_row()?.unwrap(); + println!( + "Verified update: region={}, zone={}", + row.get_string(1), + row.get_long(2) + ); + + println!("\n=== Deleting ==="); + let mut row = GenericRow::new(4); + row.set_field(0, 1002); + row.set_field(1, "EMEA"); + row.set_field(2, 2i64); + upsert_writer.delete(&row).await?; + println!("Deleted: {row:?}"); + + let result = lookuper.lookup(&make_key(1002, "EMEA", 2)).await?; + if result.get_single_row()?.is_none() { + println!("Verified deletion"); + } + + Ok(()) +} + +async fn create_partition(table_path: &TablePath, admin: &mut FlussAdmin, region: &str, zone: i64) { + let mut partition_values = HashMap::new(); + partition_values.insert("region".to_string(), region.to_string()); + partition_values.insert("zone".to_string(), zone.to_string()); + let partition_spec = PartitionSpec::new(partition_values); + + admin + .create_partition(table_path, &partition_spec, true) + .await + .unwrap(); +} + +fn make_key(id: i32, region: &str, zone: i64) -> GenericRow<'static> { + let mut row = GenericRow::new(4); + row.set_field(0, id); + row.set_field(1, region.to_string()); + row.set_field(2, zone); + row +} diff --git a/crates/examples/src/example_table.rs b/crates/examples/src/example_table.rs index ca6b9428..92055a70 100644 --- a/crates/examples/src/example_table.rs +++ b/crates/examples/src/example_table.rs @@ -16,6 +16,7 @@ // under the License. mod example_kv_table; +mod example_partitioned_kv_table; use clap::Parser; use fluss::client::FlussConnection; diff --git a/crates/fluss/src/bucketing/mod.rs b/crates/fluss/src/bucketing/mod.rs index 2611ac7e..1b43d12a 100644 --- a/crates/fluss/src/bucketing/mod.rs +++ b/crates/fluss/src/bucketing/mod.rs @@ -24,7 +24,6 @@ pub trait BucketingFunction: Sync + Send { fn bucketing(&self, bucket_key: &[u8], num_buckets: i32) -> Result; } -#[allow(dead_code)] impl dyn BucketingFunction { /// Provides the bucketing function for a given [DataLakeFormat] /// diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs index 286c46c2..ea1efc38 100644 --- a/crates/fluss/src/client/admin.rs +++ b/crates/fluss/src/client/admin.rs @@ -32,7 +32,7 @@ use crate::rpc::{RpcClient, ServerConnection}; use crate::error::{Error, Result}; use crate::proto::GetTableInfoResponse; use crate::{BucketId, PartitionId, TableId}; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::slice::from_ref; use std::sync::Arc; use tokio::task::JoinHandle; @@ -111,6 +111,12 @@ impl FlussAdmin { .admin_gateway .request(GetTableRequest::new(table_path)) .await?; + + // force update to avoid stale data in cache + self.metadata + .update_tables_metadata(&HashSet::from([table_path]), &HashSet::new(), vec![]) + .await?; + let GetTableInfoResponse { table_id, schema_id, diff --git a/crates/fluss/src/client/connection.rs b/crates/fluss/src/client/connection.rs index a19dbd2f..e0210118 100644 --- a/crates/fluss/src/client/connection.rs +++ b/crates/fluss/src/client/connection.rs @@ -93,11 +93,6 @@ impl FlussConnection { pub async fn get_table(&self, table_path: &TablePath) -> Result> { self.metadata.update_table_metadata(table_path).await?; let table_info = self.metadata.get_cluster().get_table(table_path)?.clone(); - if table_info.is_partitioned() { - return Err(crate::error::Error::UnsupportedOperation { - message: "Partitioned tables are not supported".to_string(), - }); - } Ok(FlussTable::new(self, self.metadata.clone(), table_info)) } } diff --git a/crates/fluss/src/client/metadata.rs b/crates/fluss/src/client/metadata.rs index 614f6e78..52ccd62e 100644 --- a/crates/fluss/src/client/metadata.rs +++ b/crates/fluss/src/client/metadata.rs @@ -58,7 +58,14 @@ impl Metadata { ServerType::CoordinatorServer, ); let con = connections.get_connection(&server_node).await?; - let response = con.request(UpdateMetadataRequest::new(&[])).await?; + + let response = con + .request(UpdateMetadataRequest::new( + &HashSet::default(), + &HashSet::new(), + vec![], + )) + .await?; Cluster::from_metadata_response(response, None) } @@ -95,7 +102,12 @@ impl Metadata { Ok(()) } - pub async fn update_tables_metadata(&self, table_paths: &HashSet<&TablePath>) -> Result<()> { + pub async fn update_tables_metadata( + &self, + table_paths: &HashSet<&TablePath>, + physical_table_paths: &HashSet<&Arc>, + partition_ids: Vec, + ) -> Result<()> { let maybe_server = { let guard = self.cluster.read(); guard.get_one_available_server().cloned() @@ -114,16 +126,19 @@ impl Metadata { let conn = self.connections.get_connection(&server).await?; - let update_table_paths: Vec<&TablePath> = table_paths.iter().copied().collect(); let response = conn - .request(UpdateMetadataRequest::new(update_table_paths.as_slice())) + .request(UpdateMetadataRequest::new( + table_paths, + physical_table_paths, + partition_ids, + )) .await?; self.update(response).await?; Ok(()) } pub async fn update_table_metadata(&self, table_path: &TablePath) -> Result<()> { - self.update_tables_metadata(&HashSet::from([table_path])) + self.update_tables_metadata(&HashSet::from([table_path]), &HashSet::new(), vec![]) .await } @@ -133,8 +148,9 @@ impl Metadata { .iter() .filter(|table_path| cluster_binding.opt_get_table(table_path).is_none()) .collect(); + if !need_update_table_paths.is_empty() { - self.update_tables_metadata(&need_update_table_paths) + self.update_tables_metadata(&need_update_table_paths, &HashSet::new(), vec![]) .await?; } Ok(()) @@ -150,7 +166,48 @@ impl Metadata { guard.clone() } - pub fn leader_for(&self, table_bucket: &TableBucket) -> Option { + const MAX_RETRY_TIMES: u8 = 3; + + pub async fn leader_for( + &self, + table_path: &TablePath, + table_bucket: &TableBucket, + ) -> Result> { + let leader = self.get_leader_for(table_bucket); + + if leader.is_some() { + Ok(leader) + } else { + for _ in 0..Self::MAX_RETRY_TIMES { + if let Some(partition_id) = table_bucket.partition_id() { + self.update_tables_metadata( + &HashSet::from([table_path]), + &HashSet::new(), + vec![partition_id], + ) + .await?; + } else { + self.update_tables_metadata( + &HashSet::from([table_path]), + &HashSet::new(), + vec![], + ) + .await?; + } + + let cluster = self.cluster.read(); + let leader = cluster.leader_for(table_bucket); + + if leader.is_some() { + return Ok(leader.cloned()); + } + } + + Ok(None) + } + } + + fn get_leader_for(&self, table_bucket: &TableBucket) -> Option { let cluster = self.cluster.read(); cluster.leader_for(table_bucket).cloned() } @@ -173,14 +230,16 @@ mod tests { use crate::metadata::{TableBucket, TablePath}; use crate::test_utils::build_cluster_arc; - #[test] - fn leader_for_returns_server() { + #[tokio::test] + async fn leader_for_returns_server() { let table_path = TablePath::new("db".to_string(), "tbl".to_string()); let cluster = build_cluster_arc(&table_path, 1, 1); let metadata = Metadata::new_for_test(cluster); let leader = metadata - .leader_for(&TableBucket::new(1, 0)) - .expect("leader"); + .leader_for(&table_path, &TableBucket::new(1, 0)) + .await + .expect("leader request should be Ok") + .expect("leader should exist"); assert_eq!(leader.id(), 1); } diff --git a/crates/fluss/src/client/table/append.rs b/crates/fluss/src/client/table/append.rs index 6d76f28b..7fe2023b 100644 --- a/crates/fluss/src/client/table/append.rs +++ b/crates/fluss/src/client/table/append.rs @@ -17,7 +17,7 @@ use crate::client::{WriteRecord, WriterClient}; use crate::error::Result; -use crate::metadata::{TableInfo, TablePath}; +use crate::metadata::{PhysicalTablePath, TableInfo, TablePath}; use crate::row::GenericRow; use arrow::array::RecordBatch; use std::sync::Arc; @@ -25,14 +25,14 @@ use std::sync::Arc; #[allow(dead_code)] pub struct TableAppend { table_path: TablePath, - table_info: TableInfo, + table_info: Arc, writer_client: Arc, } impl TableAppend { pub(super) fn new( table_path: TablePath, - table_info: TableInfo, + table_info: Arc, writer_client: Arc, ) -> Self { Self { @@ -44,23 +44,27 @@ impl TableAppend { pub fn create_writer(&self) -> AppendWriter { AppendWriter { - table_path: Arc::new(self.table_path.clone()), + physical_table_path: Arc::new(PhysicalTablePath::of(Arc::new(self.table_path.clone()))), writer_client: self.writer_client.clone(), - table_info: Arc::new(self.table_info.clone()), + table_info: Arc::clone(&self.table_info), } } } pub struct AppendWriter { - table_path: Arc, + physical_table_path: Arc, writer_client: Arc, table_info: Arc, } impl AppendWriter { pub async fn append(&self, row: GenericRow<'_>) -> Result<()> { - let record = - WriteRecord::for_append(self.table_path.clone(), self.table_info.schema_id, row); + let record = WriteRecord::for_append( + Arc::clone(&self.table_info), + Arc::clone(&self.physical_table_path), + self.table_info.schema_id, + row, + ); let result_handle = self.writer_client.send(&record).await?; let result = result_handle.wait().await?; result_handle.result(result) @@ -68,7 +72,8 @@ impl AppendWriter { pub async fn append_arrow_batch(&self, batch: RecordBatch) -> Result<()> { let record = WriteRecord::for_append_record_batch( - self.table_path.clone(), + Arc::clone(&self.table_info), + Arc::clone(&self.physical_table_path), self.table_info.schema_id, batch, ); diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs b/crates/fluss/src/client/table/log_fetch_buffer.rs index 7ece34b4..78ee065a 100644 --- a/crates/fluss/src/client/table/log_fetch_buffer.rs +++ b/crates/fluss/src/client/table/log_fetch_buffer.rs @@ -833,9 +833,10 @@ mod tests { use crate::compression::{ ArrowCompressionInfo, ArrowCompressionType, DEFAULT_NON_ZSTD_COMPRESSION_LEVEL, }; - use crate::metadata::{DataField, DataTypes, RowType, TablePath}; + use crate::metadata::{DataField, DataTypes, PhysicalTablePath, RowType, TablePath}; use crate::record::{MemoryLogRecordsArrowBuilder, ReadContext, to_arrow_schema}; use crate::row::GenericRow; + use crate::test_utils::build_table_info; use std::sync::Arc; fn test_read_context() -> Result { @@ -899,7 +900,9 @@ mod tests { DataField::new("id".to_string(), DataTypes::int(), None), DataField::new("name".to_string(), DataTypes::string(), None), ]); - let table_path = Arc::new(TablePath::new("db".to_string(), "tbl".to_string())); + let table_path = TablePath::new("db".to_string(), "tbl".to_string()); + let table_info = Arc::new(build_table_info(table_path.clone(), 1, 1)); + let physical_table_path = Arc::new(PhysicalTablePath::of(Arc::new(table_path))); let mut builder = MemoryLogRecordsArrowBuilder::new( 1, @@ -914,7 +917,7 @@ mod tests { let mut row = GenericRow::new(2); row.set_field(0, 1_i32); row.set_field(1, "alice"); - let record = WriteRecord::for_append(table_path, 1, row); + let record = WriteRecord::for_append(table_info, physical_table_path, 1, row); builder.append(&record)?; let data = builder.build()?; diff --git a/crates/fluss/src/client/table/lookup.rs b/crates/fluss/src/client/table/lookup.rs index 4e89176a..69cb91ef 100644 --- a/crates/fluss/src/client/table/lookup.rs +++ b/crates/fluss/src/client/table/lookup.rs @@ -18,8 +18,9 @@ use crate::bucketing::BucketingFunction; use crate::client::connection::FlussConnection; use crate::client::metadata::Metadata; +use crate::client::table::partition_getter::PartitionGetter; use crate::error::{Error, Result}; -use crate::metadata::{RowType, TableBucket, TableInfo}; +use crate::metadata::{PhysicalTablePath, RowType, TableBucket, TableInfo, TablePath}; use crate::record::kv::SCHEMA_ID_LENGTH; use crate::row::InternalRow; use crate::row::compacted::CompactedRow; @@ -133,20 +134,43 @@ impl<'a> TableLookup<'a> { let data_lake_format = self.table_info.get_table_config().get_datalake_format()?; let bucketing_function = ::of(data_lake_format.as_ref()); - // Create key encoder for the primary key fields - let pk_fields = self.table_info.get_physical_primary_keys().to_vec(); - let key_encoder = KeyEncoderFactory::of( - self.table_info.row_type(), - pk_fields.as_slice(), - &data_lake_format, - )?; + let row_type = self.table_info.row_type(); + let primary_keys = self.table_info.get_primary_keys(); + let lookup_row_type = row_type.project_with_field_names(primary_keys)?; + + let physical_primary_keys = self.table_info.get_physical_primary_keys().to_vec(); + let primary_key_encoder = + KeyEncoderFactory::of(&lookup_row_type, &physical_primary_keys, &data_lake_format)?; + + let bucket_key_encoder = if self.table_info.is_default_bucket_key() { + None + } else { + let bucket_keys = self.table_info.get_bucket_keys().to_vec(); + Some(KeyEncoderFactory::of( + &lookup_row_type, + &bucket_keys, + &data_lake_format, + )?) + }; + + let partition_getter = if self.table_info.is_partitioned() { + Some(PartitionGetter::new( + &lookup_row_type, + Arc::clone(self.table_info.get_partition_keys()), + )?) + } else { + None + }; Ok(Lookuper { conn: self.conn, + table_path: Arc::new(self.table_info.table_path.clone()), table_info: self.table_info, metadata: self.metadata, bucketing_function, - key_encoder, + primary_key_encoder, + bucket_key_encoder, + partition_getter, num_buckets, }) } @@ -163,13 +187,15 @@ impl<'a> TableLookup<'a> { /// let row = GenericRow::new(vec![Datum::Int32(42)]); // lookup key /// let result = lookuper.lookup(&row).await?; /// ``` -// TODO: Support partitioned tables (extract partition from key) pub struct Lookuper<'a> { conn: &'a FlussConnection, table_info: TableInfo, + table_path: Arc, metadata: Arc, bucketing_function: Box, - key_encoder: Box, + primary_key_encoder: Box, + bucket_key_encoder: Option>, + partition_getter: Option, num_buckets: i32, } @@ -187,26 +213,47 @@ impl<'a> Lookuper<'a> { /// * `Err(Error)` - If the lookup fails pub async fn lookup(&mut self, row: &dyn InternalRow) -> Result> { // todo: support batch lookup - // Encode the key from the row - let encoded_key = self.key_encoder.encode_key(row)?; - let key_bytes = encoded_key.to_vec(); + let pk_bytes = self.primary_key_encoder.encode_key(row)?; + let pk_bytes_vec = pk_bytes.to_vec(); + let bk_bytes = match &mut self.bucket_key_encoder { + Some(encoder) => &encoder.encode_key(row)?, + None => &pk_bytes, + }; + + let partition_id = if let Some(ref partition_getter) = self.partition_getter { + let partition_name = partition_getter.get_partition(row)?; + let physical_table_path = PhysicalTablePath::of_partitioned( + Arc::clone(&self.table_path), + Some(partition_name), + ); + let cluster = self.metadata.get_cluster(); + match cluster.get_partition_id(&physical_table_path) { + Some(id) => Some(id), + None => { + // Partition doesn't exist, return empty result (like Java) + return Ok(LookupResult::empty(self.table_info.row_type())); + } + } + } else { + None + }; - // Compute bucket from encoded key let bucket_id = self .bucketing_function - .bucketing(&key_bytes, self.num_buckets)?; + .bucketing(bk_bytes, self.num_buckets)?; let table_id = self.table_info.get_table_id(); - let table_bucket = TableBucket::new(table_id, bucket_id); + let table_bucket = TableBucket::new_with_partition(table_id, partition_id, bucket_id); // Find the leader for this bucket let cluster = self.metadata.get_cluster(); - let leader = - cluster - .leader_for(&table_bucket) - .ok_or_else(|| Error::LeaderNotAvailable { - message: format!("No leader found for table bucket: {table_bucket}"), - })?; + let leader = self + .metadata + .leader_for(self.table_path.as_ref(), &table_bucket) + .await? + .ok_or_else(|| Error::LeaderNotAvailable { + message: format!("No leader found for table bucket: {table_bucket}"), + })?; // Get connection to the tablet server let tablet_server = @@ -223,7 +270,7 @@ impl<'a> Lookuper<'a> { let connection = connections.get_connection(tablet_server).await?; // Send lookup request - let request = LookupRequest::new(table_id, None, bucket_id, vec![key_bytes]); + let request = LookupRequest::new(table_id, partition_id, bucket_id, vec![pk_bytes_vec]); let response = connection.request(request).await?; // Extract the values from response diff --git a/crates/fluss/src/client/table/mod.rs b/crates/fluss/src/client/table/mod.rs index 2dc56d52..2fbbbc91 100644 --- a/crates/fluss/src/client/table/mod.rs +++ b/crates/fluss/src/client/table/mod.rs @@ -69,7 +69,7 @@ impl<'a> FlussTable<'a> { pub fn new_append(&self) -> Result { Ok(TableAppend::new( self.table_path.clone(), - self.table_info.clone(), + Arc::new(self.table_info.clone()), self.conn.get_or_create_writer_client()?, )) } diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 356ba1cd..14d28411 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -630,7 +630,7 @@ impl LogFetcher { if self.is_partitioned { // Fallback to full table metadata refresh until partition-aware updates are available. self.metadata - .update_tables_metadata(&HashSet::from([&self.table_path])) + .update_tables_metadata(&HashSet::from([&self.table_path]), &HashSet::new(), vec![]) .await .or_else(|e| { if let Error::RpcError { source, .. } = &e @@ -649,7 +649,7 @@ impl LogFetcher { // TODO: Handle PartitionNotExist error self.metadata - .update_tables_metadata(&HashSet::from([&self.table_path])) + .update_tables_metadata(&HashSet::from([&self.table_path]), &HashSet::new(), vec![]) .await .or_else(|e| { if let Error::RpcError { source, .. } = &e @@ -799,8 +799,9 @@ impl LogFetcher { let table_id = table_bucket.table_id(); let cluster = metadata.get_cluster(); if let Some(table_path) = cluster.get_table_path_by_id(table_id) { - let physical_tables = - HashSet::from([PhysicalTablePath::of(table_path.clone())]); + let physical_tables = HashSet::from([PhysicalTablePath::of(Arc::new( + table_path.clone(), + ))]); metadata.invalidate_physical_table_meta(&physical_tables); } else { warn!( @@ -1498,7 +1499,7 @@ mod tests { use crate::compression::{ ArrowCompressionInfo, ArrowCompressionType, DEFAULT_NON_ZSTD_COMPRESSION_LEVEL, }; - use crate::metadata::{TableInfo, TablePath}; + use crate::metadata::{PhysicalTablePath, TableInfo, TablePath}; use crate::record::MemoryLogRecordsArrowBuilder; use crate::row::{Datum, GenericRow}; use crate::rpc::FlussError; @@ -1514,8 +1515,10 @@ mod tests { compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL, }, )?; + let physical_table_path = Arc::new(PhysicalTablePath::of(table_path)); let record = WriteRecord::for_append( - table_path, + Arc::new(table_info.clone()), + physical_table_path, 1, GenericRow { values: vec![Datum::Int32(1)], @@ -1684,7 +1687,7 @@ mod tests { )?; let bucket = TableBucket::new(1, 0); - assert!(metadata.leader_for(&bucket).is_some()); + assert!(metadata.leader_for(&table_path, &bucket).await?.is_some()); let response = crate::proto::FetchLogResponse { tables_resp: vec![crate::proto::PbFetchLogRespForTable { @@ -1713,7 +1716,7 @@ mod tests { LogFetcher::handle_fetch_response(response, response_context).await; - assert!(metadata.leader_for(&bucket).is_none()); + assert!(metadata.get_cluster().leader_for(&bucket).is_none()); Ok(()) } } diff --git a/crates/fluss/src/client/table/upsert.rs b/crates/fluss/src/client/table/upsert.rs index 984592d0..269d5258 100644 --- a/crates/fluss/src/client/table/upsert.rs +++ b/crates/fluss/src/client/table/upsert.rs @@ -19,12 +19,13 @@ use crate::client::table::writer::{DeleteResult, TableWriter, UpsertResult, Upse use crate::client::{RowBytes, WriteFormat, WriteRecord, WriterClient}; use crate::error::Error::IllegalArgument; use crate::error::Result; -use crate::metadata::{KvFormat, RowType, TableInfo, TablePath}; +use crate::metadata::{PhysicalTablePath, RowType, TableInfo, TablePath}; use crate::row::InternalRow; use crate::row::encode::{KeyEncoder, KeyEncoderFactory, RowEncoder, RowEncoderFactory}; use crate::row::field_getter::FieldGetter; use std::sync::Arc; +use crate::client::table::partition_getter::PartitionGetter; use bitvec::prelude::bitvec; use bytes::Bytes; @@ -107,30 +108,25 @@ impl TableUpsert { } } -#[allow(dead_code)] struct UpsertWriterImpl where RE: RowEncoder, { table_path: Arc, writer_client: Arc, - // TODO: Partitioning - // partition_field_getter: Option>, + partition_field_getter: Option, primary_key_encoder: Box, target_columns: Option>>, // Use primary key encoder as bucket key encoder when None bucket_key_encoder: Option>, - kv_format: KvFormat, write_format: WriteFormat, row_encoder: RE, field_getters: Box<[FieldGetter]>, table_info: Arc, } -#[allow(dead_code)] struct UpsertWriterFactory; -#[allow(dead_code)] impl UpsertWriterFactory { pub fn create( table_path: Arc, @@ -168,13 +164,22 @@ impl UpsertWriterFactory { let field_getters = FieldGetter::create_field_getters(row_type); + let partition_field_getter = if table_info.is_partitioned() { + Some(PartitionGetter::new( + row_type, + Arc::clone(table_info.get_partition_keys()), + )?) + } else { + None + }; + Ok(UpsertWriterImpl { table_path, + partition_field_getter, writer_client, primary_key_encoder, target_columns: partial_update_columns, bucket_key_encoder, - kv_format: kv_format.clone(), write_format, row_encoder: RowEncoderFactory::create(kv_format, row_type.clone())?, field_getters, @@ -311,6 +316,18 @@ impl UpsertWriterImpl { } self.row_encoder.finish_row() } + + fn get_physical_path(&self, row: &R) -> Result { + if let Some(partition_getter) = &self.partition_field_getter { + let partition = partition_getter.get_partition(row); + Ok(PhysicalTablePath::of_partitioned( + Arc::clone(&self.table_path), + Some(partition?), + )) + } else { + Ok(PhysicalTablePath::of(Arc::clone(&self.table_path))) + } + } } impl TableWriter for UpsertWriterImpl { @@ -343,7 +360,8 @@ impl UpsertWriter for UpsertWriterImpl { }; let write_record = WriteRecord::for_upsert( - Arc::clone(&self.table_path), + Arc::clone(&self.table_info), + Arc::new(self.get_physical_path(row)?), self.table_info.schema_id, key, bucket_key, @@ -372,7 +390,8 @@ impl UpsertWriter for UpsertWriterImpl { let (key, bucket_key) = self.get_keys(row)?; let write_record = WriteRecord::for_upsert( - Arc::clone(&self.table_path), + Arc::clone(&self.table_info), + Arc::new(self.get_physical_path(row)?), self.table_info.schema_id, key, bucket_key, diff --git a/crates/fluss/src/client/table/writer.rs b/crates/fluss/src/client/table/writer.rs index 82765457..ec26ec63 100644 --- a/crates/fluss/src/client/table/writer.rs +++ b/crates/fluss/src/client/table/writer.rs @@ -15,12 +15,9 @@ // specific language governing permissions and limitations // under the License. -use crate::client::{WriteRecord, WriterClient}; use crate::row::{GenericRow, InternalRow}; -use std::sync::Arc; use crate::error::Result; -use crate::metadata::{TableInfo, TablePath}; #[allow(dead_code, async_fn_in_trait)] pub trait TableWriter { @@ -47,55 +44,3 @@ pub struct UpsertResult; /// Currently this is an empty struct to allow for compatible evolution in the future #[derive(Default)] pub struct DeleteResult; - -#[allow(dead_code)] -pub struct AbstractTableWriter { - table_path: Arc, - writer_client: Arc, - field_count: i32, - schema_id: i32, -} - -#[allow(dead_code)] -impl AbstractTableWriter { - pub fn new( - table_path: TablePath, - table_info: &TableInfo, - writer_client: Arc, - ) -> Self { - // todo: partition - Self { - table_path: Arc::new(table_path), - writer_client, - field_count: table_info.row_type().fields().len() as i32, - schema_id: table_info.schema_id, - } - } - - pub async fn send(&self, write_record: &WriteRecord<'_>) -> Result<()> { - let result_handle = self.writer_client.send(write_record).await?; - let result = result_handle.wait().await?; - result_handle.result(result) - } -} - -impl TableWriter for AbstractTableWriter { - async fn flush(&self) -> Result<()> { - todo!() - } -} - -// Append writer implementation -#[allow(dead_code)] -pub struct AppendWriterImpl { - base: AbstractTableWriter, -} - -#[allow(dead_code)] -impl AppendWriterImpl { - pub async fn append(&self, row: GenericRow<'_>) -> Result<()> { - let record = - WriteRecord::for_append(self.base.table_path.clone(), self.base.schema_id, row); - self.base.send(&record).await - } -} diff --git a/crates/fluss/src/client/write/accumulator.rs b/crates/fluss/src/client/write/accumulator.rs index a5b98327..2a455173 100644 --- a/crates/fluss/src/client/write/accumulator.rs +++ b/crates/fluss/src/client/write/accumulator.rs @@ -21,7 +21,7 @@ use crate::client::{LogWriteRecord, Record, ResultHandle, WriteRecord}; use crate::cluster::{BucketLocation, Cluster, ServerNode}; use crate::config::Config; use crate::error::Result; -use crate::metadata::{TableBucket, TablePath}; +use crate::metadata::{PhysicalTablePath, TableBucket}; use crate::util::current_time_ms; use crate::{BucketId, PartitionId, TableId}; use dashmap::DashMap; @@ -37,7 +37,7 @@ type BucketBatches = Vec<(BucketId, Arc>>)>; #[allow(dead_code)] pub struct RecordAccumulator { config: Config, - write_batches: DashMap, + write_batches: DashMap, BucketAndWriteBatches>, // batch_id -> complete callback incomplete_batches: RwLock>, batch_timeout_ms: i64, @@ -88,14 +88,14 @@ impl RecordAccumulator { &self, cluster: &Cluster, record: &WriteRecord, - bucket_id: BucketId, dq: &mut VecDeque, ) -> Result { if let Some(append_result) = self.try_append(record, dq)? { return Ok(append_result); } - let table_path = &record.table_path; + let physical_table_path = &record.physical_table_path; + let table_path = physical_table_path.get_table_path(); let table_info = cluster.get_table(table_path)?; let arrow_compression_info = table_info.get_table_config().get_arrow_compression_info()?; let row_type = &table_info.row_type; @@ -105,22 +105,20 @@ impl RecordAccumulator { let mut batch: WriteBatch = match record.record() { Record::Log(_) => ArrowLog(ArrowLogWriteBatch::new( self.batch_id.fetch_add(1, Ordering::Relaxed), - table_path.as_ref().clone(), + Arc::clone(physical_table_path), schema_id, arrow_compression_info, row_type, - bucket_id, current_time_ms(), matches!(&record.record, Record::Log(LogWriteRecord::RecordBatch(_))), )?), Record::Kv(kv_record) => Kv(KvWriteBatch::new( self.batch_id.fetch_add(1, Ordering::Relaxed), - table_path.as_ref().clone(), + Arc::clone(physical_table_path), schema_id, // TODO: Decide how to derive write limit in the absence of java's equivalent of PreAllocatedPagedOutputView KvWriteBatch::DEFAULT_WRITE_LIMIT, record.write_format.to_kv_format()?, - bucket_id, kv_record.target_columns.clone(), current_time_ms(), )), @@ -153,18 +151,25 @@ impl RecordAccumulator { cluster: &Cluster, abort_if_batch_full: bool, ) -> Result { - let table_path = &record.table_path; + let physical_table_path = &record.physical_table_path; + let table_path = physical_table_path.get_table_path(); + let table_info = cluster.get_table(table_path)?; + let is_partitioned_table = table_info.is_partitioned(); - // TODO: Implement partitioning + let partition_id = if is_partitioned_table { + cluster.get_partition_id(physical_table_path) + } else { + None + }; let dq = { let mut binding = self .write_batches - .entry(table_path.as_ref().clone()) + .entry(Arc::clone(physical_table_path)) .or_insert_with(|| BucketAndWriteBatches { - table_id: 0, - is_partitioned_table: false, - partition_id: None, + table_id: table_info.table_id, + is_partitioned_table, + partition_id, batches: Default::default(), }); let bucket_and_batches = binding.value_mut(); @@ -185,23 +190,24 @@ impl RecordAccumulator { true, false, true, )); } - self.append_new_batch(cluster, record, bucket_id, &mut dq_guard) + self.append_new_batch(cluster, record, &mut dq_guard) } pub async fn ready(&self, cluster: &Arc) -> Result { // Snapshot just the Arcs we need, avoiding cloning the entire BucketAndWriteBatches struct - let entries: Vec<(TablePath, BucketBatches)> = self + let entries: Vec<(Arc, Option, BucketBatches)> = self .write_batches .iter() .map(|entry| { - let table_path = entry.key().clone(); + let physical_table_path = Arc::clone(entry.key()); + let partition_id = entry.value().partition_id; let bucket_batches: Vec<_> = entry .value() .batches .iter() .map(|(bucket_id, batch_arc)| (*bucket_id, batch_arc.clone())) .collect(); - (table_path, bucket_batches) + (physical_table_path, partition_id, bucket_batches) }) .collect(); @@ -209,10 +215,12 @@ impl RecordAccumulator { let mut next_ready_check_delay_ms = self.batch_timeout_ms; let mut unknown_leader_tables = HashSet::new(); - for (table_path, bucket_batches) in entries { + for (physical_table_path, mut partition_id, bucket_batches) in entries { next_ready_check_delay_ms = self .bucket_ready( - &table_path, + &physical_table_path, + physical_table_path.get_partition_name().is_some(), + &mut partition_id, bucket_batches, &mut ready_nodes, &mut unknown_leader_tables, @@ -229,17 +237,42 @@ impl RecordAccumulator { }) } + #[allow(clippy::too_many_arguments)] async fn bucket_ready( &self, - table_path: &TablePath, + physical_table_path: &Arc, + is_partitioned_table: bool, + partition_id: &mut Option, bucket_batches: BucketBatches, ready_nodes: &mut HashSet, - unknown_leader_tables: &mut HashSet, + unknown_leader_tables: &mut HashSet>, cluster: &Cluster, next_ready_check_delay_ms: i64, ) -> Result { let mut next_delay = next_ready_check_delay_ms; + // First check this table has partitionId. + if is_partitioned_table && partition_id.is_none() { + let partition_id = cluster.get_partition_id(physical_table_path); + + if partition_id.is_some() { + // Update the cached partition_id + if let Some(mut entry) = self.write_batches.get_mut(physical_table_path) { + entry.partition_id = partition_id; + } + } else { + log::debug!( + "Partition does not exist for {}, bucket will not be set to ready", + physical_table_path.as_ref() + ); + + // TODO: we shouldn't add unready partitions to unknownLeaderTables, + // because it cases PartitionNotExistException later + unknown_leader_tables.insert(Arc::clone(physical_table_path)); + return Ok(next_delay); + } + } + for (bucket_id, batch) in bucket_batches { let batch_guard = batch.lock().await; if batch_guard.is_empty() { @@ -250,12 +283,12 @@ impl RecordAccumulator { let waited_time_ms = batch.waited_time_ms(current_time_ms()); let deque_size = batch_guard.len(); let full = deque_size > 1 || batch.is_closed(); - let table_bucket = cluster.get_table_bucket(table_path, bucket_id)?; + let table_bucket = cluster.get_table_bucket(physical_table_path, bucket_id)?; if let Some(leader) = cluster.leader_for(&table_bucket) { next_delay = self.batch_ready(leader, waited_time_ms, full, ready_nodes, next_delay); } else { - unknown_leader_tables.insert(table_path.clone()); + unknown_leader_tables.insert(Arc::clone(physical_table_path)); } } Ok(next_delay) @@ -332,14 +365,14 @@ impl RecordAccumulator { loop { let bucket = &buckets[current_index]; - let table_path = bucket.table_path.clone(); + let table_path = bucket.physical_table_path(); let table_bucket = bucket.table_bucket.clone(); last_processed_index = current_index; current_index = (current_index + 1) % buckets.len(); let deque = self .write_batches - .get(&table_path) + .get(table_path) .and_then(|bucket_and_write_batches| { bucket_and_write_batches .batches @@ -399,20 +432,22 @@ impl RecordAccumulator { pub async fn re_enqueue(&self, ready_write_batch: ReadyWriteBatch) { ready_write_batch.write_batch.re_enqueued(); - let table_path = ready_write_batch.write_batch.table_path().clone(); + let physical_table_path = ready_write_batch.write_batch.physical_table_path(); let bucket_id = ready_write_batch.table_bucket.bucket_id(); let table_id = ready_write_batch.table_bucket.table_id(); + let partition_id = ready_write_batch.table_bucket.partition_id(); + let is_partitioned_table = partition_id.is_some(); let dq = { - let mut binding = - self.write_batches - .entry(table_path) - .or_insert_with(|| BucketAndWriteBatches { - table_id, - is_partitioned_table: false, - partition_id: None, - batches: Default::default(), - }); + let mut binding = self + .write_batches + .entry(Arc::clone(physical_table_path)) + .or_insert_with(|| BucketAndWriteBatches { + table_id, + is_partitioned_table, + partition_id, + batches: Default::default(), + }); let bucket_and_batches = binding.value_mut(); bucket_and_batches .batches @@ -478,6 +513,12 @@ pub struct ReadyWriteBatch { pub write_batch: WriteBatch, } +impl ReadyWriteBatch { + pub fn write_batch(&self) -> &WriteBatch { + &self.write_batch + } +} + #[allow(dead_code)] struct BucketAndWriteBatches { table_id: TableId, @@ -525,14 +566,14 @@ impl RecordAppendResult { pub struct ReadyCheckResult { pub ready_nodes: HashSet, pub next_ready_check_delay_ms: i64, - pub unknown_leader_tables: HashSet, + pub unknown_leader_tables: HashSet>, } impl ReadyCheckResult { pub fn new( ready_nodes: HashSet, next_ready_check_delay_ms: i64, - unknown_leader_tables: HashSet, + unknown_leader_tables: HashSet>, ) -> Self { ReadyCheckResult { ready_nodes, @@ -547,17 +588,20 @@ mod tests { use super::*; use crate::metadata::TablePath; use crate::row::{Datum, GenericRow}; - use crate::test_utils::build_cluster; + use crate::test_utils::{build_cluster, build_table_info}; use std::sync::Arc; #[tokio::test] async fn re_enqueue_increments_attempts() -> Result<()> { let config = Config::default(); let accumulator = RecordAccumulator::new(config); - let table_path = Arc::new(TablePath::new("db".to_string(), "tbl".to_string())); - let cluster = Arc::new(build_cluster(table_path.as_ref(), 1, 1)); + let table_path = TablePath::new("db".to_string(), "tbl".to_string()); + let physical_table_path = Arc::new(PhysicalTablePath::of(Arc::new(table_path.clone()))); + let table_info = Arc::new(build_table_info(table_path.clone(), 1, 1)); + let cluster = Arc::new(build_cluster(&table_path, 1, 1)); let record = WriteRecord::for_append( - table_path.clone(), + table_info, + physical_table_path, 1, GenericRow { values: vec![Datum::Int32(1)], diff --git a/crates/fluss/src/client/write/batch.rs b/crates/fluss/src/client/write/batch.rs index 41561d4a..da30c8ac 100644 --- a/crates/fluss/src/client/write/batch.rs +++ b/crates/fluss/src/client/write/batch.rs @@ -15,12 +15,11 @@ // specific language governing permissions and limitations // under the License. -use crate::BucketId; use crate::client::broadcast::{BatchWriteResult, BroadcastOnce}; use crate::client::{Record, ResultHandle, WriteRecord}; use crate::compression::ArrowCompressionInfo; use crate::error::{Error, Result}; -use crate::metadata::{KvFormat, RowType, TablePath}; +use crate::metadata::{KvFormat, PhysicalTablePath, RowType}; use crate::record::MemoryLogRecordsArrowBuilder; use crate::record::kv::KvRecordBatchBuilder; use bytes::Bytes; @@ -28,12 +27,10 @@ use std::cmp::max; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicI32, Ordering}; -#[allow(dead_code)] pub struct InnerWriteBatch { batch_id: i64, - table_path: TablePath, + physical_table_path: Arc, create_ms: i64, - bucket_id: BucketId, results: BroadcastOnce, completed: AtomicBool, attempts: AtomicI32, @@ -41,12 +38,11 @@ pub struct InnerWriteBatch { } impl InnerWriteBatch { - fn new(batch_id: i64, table_path: TablePath, create_ms: i64, bucket_id: BucketId) -> Self { + fn new(batch_id: i64, physical_table_path: Arc, create_ms: i64) -> Self { InnerWriteBatch { batch_id, - table_path, + physical_table_path, create_ms, - bucket_id, results: Default::default(), completed: AtomicBool::new(false), attempts: AtomicI32::new(0), @@ -74,8 +70,8 @@ impl InnerWriteBatch { self.drained_ms = max(self.drained_ms, now_ms); } - fn table_path(&self) -> &TablePath { - &self.table_path + fn physical_table_path(&self) -> &Arc { + &self.physical_table_path } fn attempts(&self) -> i32 { @@ -165,8 +161,8 @@ impl WriteBatch { self.inner_batch().batch_id } - pub fn table_path(&self) -> &TablePath { - self.inner_batch().table_path() + pub fn physical_table_path(&self) -> &Arc { + self.inner_batch().physical_table_path() } pub fn attempts(&self) -> i32 { @@ -192,15 +188,14 @@ impl ArrowLogWriteBatch { #[allow(clippy::too_many_arguments)] pub fn new( batch_id: i64, - table_path: TablePath, + physical_table_path: Arc, schema_id: i32, arrow_compression_info: ArrowCompressionInfo, row_type: &RowType, - bucket_id: BucketId, create_ms: i64, to_append_record_batch: bool, ) -> Result { - let base = InnerWriteBatch::new(batch_id, table_path, create_ms, bucket_id); + let base = InnerWriteBatch::new(batch_id, physical_table_path, create_ms); Ok(Self { write_batch: base, arrow_builder: MemoryLogRecordsArrowBuilder::new( @@ -273,15 +268,14 @@ impl KvWriteBatch { #[allow(clippy::too_many_arguments)] pub fn new( batch_id: i64, - table_path: TablePath, + physical_table_path: Arc, schema_id: i32, write_limit: usize, kv_format: KvFormat, - bucket_id: BucketId, target_columns: Option>>, create_ms: i64, ) -> Self { - let base = InnerWriteBatch::new(batch_id, table_path, create_ms, bucket_id); + let base = InnerWriteBatch::new(batch_id, physical_table_path, create_ms); Self { write_batch: base, kv_batch_builder: KvRecordBatchBuilder::new(schema_id, write_limit, kv_format), @@ -367,19 +361,22 @@ mod tests { use super::*; use crate::client::{RowBytes, WriteFormat}; use crate::metadata::TablePath; + use crate::test_utils::build_table_info; #[test] fn complete_only_once() { - let batch = - InnerWriteBatch::new(1, TablePath::new("db".to_string(), "tbl".to_string()), 0, 0); + let table_path = TablePath::new("db".to_string(), "tbl".to_string()); + let physical_path = PhysicalTablePath::of(Arc::new(table_path)); + let batch = InnerWriteBatch::new(1, Arc::new(physical_path), 0); assert!(batch.complete(Ok(()))); assert!(!batch.complete(Err(crate::client::broadcast::Error::Dropped))); } #[test] fn attempts_increment_on_reenqueue() { - let batch = - InnerWriteBatch::new(1, TablePath::new("db".to_string(), "tbl".to_string()), 0, 0); + let table_path = TablePath::new("db".to_string(), "tbl".to_string()); + let physical_path = PhysicalTablePath::of(Arc::new(table_path)); + let batch = InnerWriteBatch::new(1, Arc::new(physical_path), 0); assert_eq!(batch.attempts(), 0); batch.re_enqueued(); assert_eq!(batch.attempts(), 1); @@ -401,12 +398,14 @@ mod tests { DataField::new("name".to_string(), DataTypes::string(), None), ]); let table_path = TablePath::new("db".to_string(), "tbl".to_string()); + let table_info = Arc::new(build_table_info(table_path.clone(), 1, 1)); + let physical_table_path = Arc::new(PhysicalTablePath::of(Arc::new(table_path))); // Test 1: RowAppendRecordBatchBuilder (to_append_record_batch=false) { let mut batch = ArrowLogWriteBatch::new( 1, - table_path.clone(), + Arc::clone(&physical_table_path), 1, ArrowCompressionInfo { compression_type: ArrowCompressionType::None, @@ -414,7 +413,6 @@ mod tests { }, &row_type, 0, - 0, false, ) .unwrap(); @@ -424,7 +422,12 @@ mod tests { let mut row = GenericRow::new(2); row.set_field(0, 1_i32); row.set_field(1, "hello"); - let record = WriteRecord::for_append(Arc::new(table_path.clone()), 1, row); + let record = WriteRecord::for_append( + Arc::clone(&table_info), + Arc::clone(&physical_table_path), + 1, + row, + ); batch.try_append(&record).unwrap(); } @@ -446,7 +449,7 @@ mod tests { { let mut batch = ArrowLogWriteBatch::new( 1, - table_path.clone(), + physical_table_path.clone(), 1, ArrowCompressionInfo { compression_type: ArrowCompressionType::None, @@ -454,7 +457,6 @@ mod tests { }, &row_type, 0, - 0, true, ) .unwrap(); @@ -472,8 +474,12 @@ mod tests { ) .unwrap(); - let record = - WriteRecord::for_append_record_batch(Arc::new(table_path.clone()), 1, record_batch); + let record = WriteRecord::for_append_record_batch( + Arc::clone(&table_info), + Arc::clone(&physical_table_path), + 1, + record_batch, + ); batch.try_append(&record).unwrap(); let estimated_size = batch.estimated_size_in_bytes(); @@ -496,21 +502,23 @@ mod tests { use crate::metadata::KvFormat; let table_path = TablePath::new("db".to_string(), "tbl".to_string()); + let table_info = Arc::new(build_table_info(table_path.clone(), 1, 1)); + let physical_path = Arc::new(PhysicalTablePath::of(Arc::new(table_path))); let mut batch = KvWriteBatch::new( 1, - table_path.clone(), + Arc::clone(&physical_path), 1, KvWriteBatch::DEFAULT_WRITE_LIMIT, KvFormat::COMPACTED, - 0, None, 0, ); for _ in 0..200 { let record = WriteRecord::for_upsert( - Arc::new(table_path.clone()), + Arc::clone(&table_info), + Arc::clone(&physical_path), 1, Bytes::from(vec![1_u8, 2_u8, 3_u8]), None, diff --git a/crates/fluss/src/client/write/bucket_assigner.rs b/crates/fluss/src/client/write/bucket_assigner.rs index 817101a4..7fcd20ba 100644 --- a/crates/fluss/src/client/write/bucket_assigner.rs +++ b/crates/fluss/src/client/write/bucket_assigner.rs @@ -19,9 +19,10 @@ use crate::bucketing::BucketingFunction; use crate::cluster::Cluster; use crate::error::Error::IllegalArgument; use crate::error::Result; -use crate::metadata::TablePath; +use crate::metadata::PhysicalTablePath; use bytes::Bytes; use rand::Rng; +use std::sync::Arc; use std::sync::atomic::{AtomicI32, Ordering}; pub trait BucketAssigner: Sync + Send { @@ -34,12 +35,12 @@ pub trait BucketAssigner: Sync + Send { #[derive(Debug)] pub struct StickyBucketAssigner { - table_path: TablePath, + table_path: Arc, current_bucket_id: AtomicI32, } impl StickyBucketAssigner { - pub fn new(table_path: TablePath) -> Self { + pub fn new(table_path: Arc) -> Self { Self { table_path, current_bucket_id: AtomicI32::new(-1), @@ -55,7 +56,7 @@ impl StickyBucketAssigner { let mut rng = rand::rng(); let mut random: i32 = rng.random(); random &= i32::MAX; - new_bucket = random % cluster.get_bucket_count(&self.table_path); + new_bucket = random % cluster.get_bucket_count(self.table_path.get_table_path()); } else if available_buckets.len() == 1 { new_bucket = available_buckets[0].table_bucket.bucket_id(); } else { @@ -155,12 +156,15 @@ mod tests { use crate::cluster::Cluster; use crate::metadata::TablePath; use crate::test_utils::build_cluster; + use std::sync::Arc; #[test] fn sticky_bucket_assigner_picks_available_bucket() { let table_path = TablePath::new("db".to_string(), "tbl".to_string()); let cluster = build_cluster(&table_path, 1, 2); - let assigner = StickyBucketAssigner::new(table_path); + let assigner = StickyBucketAssigner::new(Arc::new(PhysicalTablePath::of(Arc::new( + table_path.clone(), + )))); let bucket = assigner.assign_bucket(None, &cluster).expect("bucket"); assert!((0..2).contains(&bucket)); @@ -174,7 +178,7 @@ mod tests { let assigner = HashBucketAssigner::new(3, ::of(None)); let cluster = Cluster::default(); let err = assigner.assign_bucket(None, &cluster).unwrap_err(); - assert!(matches!(err, crate::error::Error::IllegalArgument { .. })); + assert!(matches!(err, IllegalArgument { .. })); } #[test] diff --git a/crates/fluss/src/client/write/mod.rs b/crates/fluss/src/client/write/mod.rs index dcc67959..868b582c 100644 --- a/crates/fluss/src/client/write/mod.rs +++ b/crates/fluss/src/client/write/mod.rs @@ -20,7 +20,8 @@ mod batch; use crate::client::broadcast::{self as client_broadcast, BatchWriteResult, BroadcastOnceReceiver}; use crate::error::Error; -use crate::metadata::TablePath; +use crate::metadata::{PhysicalTablePath, TableInfo}; + use crate::row::GenericRow; pub use accumulator::*; use arrow::array::RecordBatch; @@ -40,16 +41,21 @@ pub use writer_client::WriterClient; #[allow(dead_code)] pub struct WriteRecord<'a> { record: Record<'a>, - table_path: Arc, + physical_table_path: Arc, bucket_key: Option, schema_id: i32, write_format: WriteFormat, + table_info: Arc, } impl<'a> WriteRecord<'a> { pub fn record(&self) -> &Record<'a> { &self.record } + + pub fn physical_table_path(&self) -> &Arc { + &self.physical_table_path + } } pub enum Record<'a> { @@ -102,10 +108,16 @@ impl<'a> KvWriteRecord<'a> { } impl<'a> WriteRecord<'a> { - pub fn for_append(table_path: Arc, schema_id: i32, row: GenericRow<'a>) -> Self { + pub fn for_append( + table_info: Arc, + physical_table_path: Arc, + schema_id: i32, + row: GenericRow<'a>, + ) -> Self { Self { + table_info, record: Record::Log(LogWriteRecord::Generic(row)), - table_path, + physical_table_path, bucket_key: None, schema_id, write_format: WriteFormat::ArrowLog, @@ -113,21 +125,25 @@ impl<'a> WriteRecord<'a> { } pub fn for_append_record_batch( - table_path: Arc, + table_info: Arc, + physical_table_path: Arc, schema_id: i32, row: RecordBatch, ) -> Self { Self { + table_info, record: Record::Log(LogWriteRecord::RecordBatch(Arc::new(row))), - table_path, + physical_table_path, bucket_key: None, schema_id, write_format: WriteFormat::ArrowLog, } } + #[allow(clippy::too_many_arguments)] pub fn for_upsert( - table_path: Arc, + table_info: Arc, + physical_table_path: Arc, schema_id: i32, key: Bytes, bucket_key: Option, @@ -136,8 +152,9 @@ impl<'a> WriteRecord<'a> { row_bytes: Option>, ) -> Self { Self { + table_info, record: Record::Kv(KvWriteRecord::new(key, target_columns, row_bytes)), - table_path, + physical_table_path, bucket_key, schema_id, write_format, diff --git a/crates/fluss/src/client/write/sender.rs b/crates/fluss/src/client/write/sender.rs index 905ef805..6a7dad00 100644 --- a/crates/fluss/src/client/write/sender.rs +++ b/crates/fluss/src/client/write/sender.rs @@ -15,20 +15,20 @@ // specific language governing permissions and limitations // under the License. -use crate::TableId; use crate::client::broadcast; use crate::client::metadata::Metadata; use crate::client::write::batch::WriteBatch; use crate::client::{ReadyWriteBatch, RecordAccumulator}; use crate::error::Error::UnexpectedError; use crate::error::{FlussError, Result}; -use crate::metadata::{TableBucket, TablePath}; +use crate::metadata::{PhysicalTablePath, TableBucket, TablePath}; use crate::proto::{ PbProduceLogRespForBucket, PbPutKvRespForBucket, ProduceLogResponse, PutKvResponse, }; use crate::rpc::ServerConnection; use crate::rpc::message::{ProduceLogRequest, PutKvRequest}; -use log::warn; +use crate::{PartitionId, TableId}; +use log::{debug, warn}; use parking_lot::Mutex; use std::collections::{HashMap, HashSet}; use std::sync::Arc; @@ -82,9 +82,39 @@ impl Sender { // Update metadata if needed if !ready_check_result.unknown_leader_tables.is_empty() { - self.metadata - .update_tables_metadata(&ready_check_result.unknown_leader_tables.iter().collect()) - .await?; + let mut table_paths: HashSet<&TablePath> = HashSet::new(); + let mut physical_table_paths: HashSet<&Arc> = HashSet::new(); + + for unknown_paths in ready_check_result.unknown_leader_tables.iter() { + if unknown_paths.get_partition_name().is_some() { + physical_table_paths.insert(unknown_paths); + } else { + table_paths.insert(unknown_paths.get_table_path()); + } + } + + if let Err(e) = self + .metadata + .update_tables_metadata(&table_paths, &physical_table_paths, vec![]) + .await + { + match &e { + crate::error::Error::FlussAPIError { api_error } + if api_error.code == FlussError::PartitionNotExists.code() => + { + warn!( + "Partition does not exist during metadata update, continuing: {}", + api_error + ); + } + _ => return Err(e), + } + } + + debug!( + "Client update metadata due to unknown leader tables from the batched records: {:?}", + ready_check_result.unknown_leader_tables + ); } if ready_check_result.ready_nodes.is_empty() { @@ -327,10 +357,15 @@ impl Sender { response: R, ) -> Result<()> { let mut invalid_metadata_tables: HashSet = HashSet::new(); + let mut invalid_physical_table_paths: HashSet> = HashSet::new(); let mut pending_buckets: HashSet = request_buckets.iter().cloned().collect(); for bucket_resp in response.buckets_resp() { - let tb = TableBucket::new(table_id, bucket_resp.bucket_id()); + let tb = TableBucket::new_with_partition( + table_id, + bucket_resp.partition_id(), + bucket_resp.bucket_id(), + ); let Some(ready_batch) = records_by_bucket.remove(&tb) else { panic!("Missing ready batch for table bucket {tb}"); }; @@ -343,11 +378,13 @@ impl Sender { .error_message() .cloned() .unwrap_or_else(|| error.message().to_string()); - if let Some(table_path) = self + if let Some(physical_table_path) = self .handle_write_batch_error(ready_batch, error, message) .await? { - invalid_metadata_tables.insert(table_path); + invalid_metadata_tables + .insert(physical_table_path.get_table_path().clone()); + invalid_physical_table_paths.insert(physical_table_path); } } _ => self.complete_batch(ready_batch), @@ -356,7 +393,7 @@ impl Sender { for bucket in pending_buckets { if let Some(ready_batch) = records_by_bucket.remove(&bucket) { - if let Some(table_path) = self + if let Some(physical_table_path) = self .handle_write_batch_error( ready_batch, FlussError::UnknownServerError, @@ -364,12 +401,13 @@ impl Sender { ) .await? { - invalid_metadata_tables.insert(table_path); + invalid_metadata_tables.insert(physical_table_path.get_table_path().clone()); + invalid_physical_table_paths.insert(physical_table_path); } } } - self.update_metadata_if_needed(invalid_metadata_tables) + self.update_metadata_if_needed(invalid_metadata_tables, invalid_physical_table_paths) .await; Ok(()) } @@ -398,15 +436,18 @@ impl Sender { message: String, ) -> Result<()> { let mut invalid_metadata_tables: HashSet = HashSet::new(); + let mut invalid_physical_table_paths: HashSet> = HashSet::new(); + for batch in batches { - if let Some(table_path) = self + if let Some(physical_table_path) = self .handle_write_batch_error(batch, error, message.clone()) .await? { - invalid_metadata_tables.insert(table_path); + invalid_metadata_tables.insert(physical_table_path.get_table_path().clone()); + invalid_physical_table_paths.insert(physical_table_path); } } - self.update_metadata_if_needed(invalid_metadata_tables) + self.update_metadata_if_needed(invalid_metadata_tables, invalid_physical_table_paths) .await; Ok(()) } @@ -432,20 +473,22 @@ impl Sender { ready_write_batch: ReadyWriteBatch, error: FlussError, message: String, - ) -> Result> { - let table_path = ready_write_batch.write_batch.table_path().clone(); + ) -> Result>> { + let physical_table_path = Arc::clone(ready_write_batch.write_batch.physical_table_path()); if self.can_retry(&ready_write_batch, error) { warn!( - "Retrying write batch for {table_path} on bucket {} after error {error:?}: {message}", + "Retrying write batch for {} on bucket {} after error {error:?}: {message}", + physical_table_path.as_ref(), ready_write_batch.table_bucket.bucket_id() ); self.re_enqueue_batch(ready_write_batch).await; - return Ok(Self::is_invalid_metadata_error(error).then_some(table_path)); + return Ok(Self::is_invalid_metadata_error(error).then_some(physical_table_path)); } if error == FlussError::DuplicateSequenceException { warn!( - "Duplicate sequence for {table_path} on bucket {}: {message}", + "Duplicate sequence for {} on bucket {}: {message}", + physical_table_path.as_ref(), ready_write_batch.table_bucket.bucket_id() ); self.complete_batch(ready_write_batch); @@ -459,7 +502,7 @@ impl Sender { message, }, ); - Ok(Self::is_invalid_metadata_error(error).then_some(table_path)) + Ok(Self::is_invalid_metadata_error(error).then_some(physical_table_path)) } async fn re_enqueue_batch(&self, ready_write_batch: ReadyWriteBatch) { @@ -484,12 +527,22 @@ impl Sender { && Self::is_retriable_error(error) } - async fn update_metadata_if_needed(&self, table_paths: HashSet) { + async fn update_metadata_if_needed( + &self, + table_paths: HashSet, + physical_table_path: HashSet>, + ) { if table_paths.is_empty() { return; } let table_path_refs: HashSet<&TablePath> = table_paths.iter().collect(); - if let Err(e) = self.metadata.update_tables_metadata(&table_path_refs).await { + let physical_table_path_refs: HashSet<&Arc> = + physical_table_path.iter().collect(); + if let Err(e) = self + .metadata + .update_tables_metadata(&table_path_refs, &physical_table_path_refs, vec![]) + .await + { warn!("Failed to update metadata after write error: {e:?}"); } } @@ -536,6 +589,8 @@ trait BucketResponse { fn bucket_id(&self) -> i32; fn error_code(&self) -> Option; fn error_message(&self) -> Option<&String>; + + fn partition_id(&self) -> Option; } impl BucketResponse for PbProduceLogRespForBucket { @@ -548,6 +603,10 @@ impl BucketResponse for PbProduceLogRespForBucket { fn error_message(&self) -> Option<&String> { self.error_message.as_ref() } + + fn partition_id(&self) -> Option { + self.partition_id + } } impl BucketResponse for PbPutKvRespForBucket { @@ -560,6 +619,10 @@ impl BucketResponse for PbPutKvRespForBucket { fn error_message(&self) -> Option<&String> { self.error_message.as_ref() } + + fn partition_id(&self) -> Option { + self.partition_id + } } trait WriteResponse { @@ -587,11 +650,11 @@ mod tests { use crate::client::WriteRecord; use crate::cluster::Cluster; use crate::config::Config; - use crate::metadata::TablePath; + use crate::metadata::{PhysicalTablePath, TablePath}; use crate::proto::{PbProduceLogRespForBucket, ProduceLogResponse}; use crate::row::{Datum, GenericRow}; use crate::rpc::FlussError; - use crate::test_utils::build_cluster_arc; + use crate::test_utils::{build_cluster_arc, build_table_info}; use std::collections::{HashMap, HashSet}; async fn build_ready_batch( @@ -599,8 +662,11 @@ mod tests { cluster: Arc, table_path: Arc, ) -> Result<(ReadyWriteBatch, crate::client::ResultHandle)> { + let table_info = Arc::new(build_table_info(table_path.as_ref().clone(), 1, 1)); + let physical_table_path = Arc::new(PhysicalTablePath::of(table_path)); let record = WriteRecord::for_append( - table_path, + table_info, + physical_table_path, 1, GenericRow { values: vec![Datum::Int32(1)], diff --git a/crates/fluss/src/client/write/writer_client.rs b/crates/fluss/src/client/write/writer_client.rs index 65b04f56..c386adf7 100644 --- a/crates/fluss/src/client/write/writer_client.rs +++ b/crates/fluss/src/client/write/writer_client.rs @@ -15,20 +15,23 @@ // specific language governing permissions and limitations // under the License. +use crate::BucketId; +use crate::bucketing::BucketingFunction; use crate::client::metadata::Metadata; -use crate::client::write::bucket_assigner::{BucketAssigner, StickyBucketAssigner}; +use crate::client::write::bucket_assigner::{ + BucketAssigner, HashBucketAssigner, StickyBucketAssigner, +}; use crate::client::write::sender::Sender; use crate::client::{RecordAccumulator, ResultHandle, WriteRecord}; use crate::config::Config; -use crate::metadata::TablePath; +use crate::error::{Error, Result}; +use crate::metadata::{PhysicalTablePath, TableInfo}; use bytes::Bytes; use dashmap::DashMap; use std::sync::Arc; use tokio::sync::mpsc; use tokio::task::JoinHandle; -use crate::error::{Error, Result}; - #[allow(dead_code)] pub struct WriterClient { config: Config, @@ -37,7 +40,7 @@ pub struct WriterClient { shutdown_tx: mpsc::Sender<()>, sender_join_handle: JoinHandle<()>, metadata: Arc, - bucket_assigners: DashMap>>, + bucket_assigners: DashMap, Arc>, } impl WriterClient { @@ -89,11 +92,12 @@ impl WriterClient { } pub async fn send(&self, record: &WriteRecord<'_>) -> Result { - let table_path = &record.table_path; + let physical_table_path = &record.physical_table_path; let cluster = self.metadata.get_cluster(); let bucket_key = record.bucket_key.as_ref(); - let (bucket_assigner, bucket_id) = self.assign_bucket(bucket_key, table_path)?; + let (bucket_assigner, bucket_id) = + self.assign_bucket(&record.table_info, bucket_key, physical_table_path)?; let mut result = self .accumulate @@ -118,17 +122,19 @@ impl WriterClient { } fn assign_bucket( &self, + table_info: &Arc, bucket_key: Option<&Bytes>, - table_path: &Arc, - ) -> Result<(Arc>, i32)> { + table_path: &Arc, + ) -> Result<(Arc, BucketId)> { let cluster = self.metadata.get_cluster(); let bucket_assigner = { if let Some(assigner) = self.bucket_assigners.get(table_path) { assigner.clone() } else { - let assigner = Arc::new(Self::create_bucket_assigner(table_path.as_ref())); + let assigner = + Self::create_bucket_assigner(table_info, Arc::clone(table_path), bucket_key)?; self.bucket_assigners - .insert(table_path.as_ref().clone(), assigner.clone()); + .insert(Arc::clone(table_path), Arc::clone(&assigner.clone())); assigner } }; @@ -160,8 +166,21 @@ impl WriterClient { Ok(()) } - pub fn create_bucket_assigner(table_path: &TablePath) -> Box { - // always sticky - Box::new(StickyBucketAssigner::new(table_path.clone())) + pub fn create_bucket_assigner( + table_info: &Arc, + table_path: Arc, + bucket_key: Option<&Bytes>, + ) -> Result> { + if bucket_key.is_some() { + let datalake_format = table_info.get_table_config().get_datalake_format()?; + let function = ::of(datalake_format.as_ref()); + Ok(Arc::new(HashBucketAssigner::new( + table_info.num_buckets, + function, + ))) + } else { + // TODO: Wire up toi use round robin/sticky according to ConfigOptions.CLIENT_WRITER_BUCKET_NO_KEY_ASSIGNER + Ok(Arc::new(StickyBucketAssigner::new(table_path))) + } } } diff --git a/crates/fluss/src/cluster/cluster.rs b/crates/fluss/src/cluster/cluster.rs index d6fe0ae5..1f950ad6 100644 --- a/crates/fluss/src/cluster/cluster.rs +++ b/crates/fluss/src/cluster/cluster.rs @@ -15,16 +15,18 @@ // specific language governing permissions and limitations // under the License. -use crate::BucketId; use crate::cluster::{BucketLocation, ServerNode, ServerType}; +use crate::error::Error::PartitionNotExist; use crate::error::{Error, Result}; use crate::metadata::{ JsonSerde, PhysicalTablePath, TableBucket, TableDescriptor, TableInfo, TablePath, }; -use crate::proto::MetadataResponse; +use crate::proto::{MetadataResponse, PbBucketMetadata}; use crate::rpc::{from_pb_server_node, from_pb_table_path}; +use crate::{BucketId, PartitionId, TableId}; use rand::random_range; use std::collections::{HashMap, HashSet}; +use std::sync::Arc; static EMPTY: Vec = Vec::new(); @@ -33,27 +35,35 @@ pub struct Cluster { coordinator_server: Option, alive_tablet_servers_by_id: HashMap, alive_tablet_servers: Vec, - available_locations_by_path: HashMap>, + available_locations_by_path: HashMap, Vec>, available_locations_by_bucket: HashMap, - table_id_by_path: HashMap, - table_path_by_id: HashMap, + table_id_by_path: HashMap, + table_path_by_id: HashMap, table_info_by_path: HashMap, + partitions_id_by_path: HashMap, PartitionId>, + partition_name_by_id: HashMap, } impl Cluster { + #[allow(clippy::too_many_arguments)] pub fn new( coordinator_server: Option, alive_tablet_servers_by_id: HashMap, - available_locations_by_path: HashMap>, + available_locations_by_path: HashMap, Vec>, available_locations_by_bucket: HashMap, - table_id_by_path: HashMap, + table_id_by_path: HashMap, table_info_by_path: HashMap, + partitions_id_by_path: HashMap, PartitionId>, ) -> Self { let alive_tablet_servers = alive_tablet_servers_by_id.values().cloned().collect(); let table_path_by_id = table_id_by_path .iter() .map(|(path, table_id)| (*table_id, path.clone())) .collect(); + let partition_name_by_id = partitions_id_by_path + .iter() + .filter_map(|(path, id)| path.get_partition_name().map(|name| (*id, name.clone()))) + .collect(); Cluster { coordinator_server, alive_tablet_servers_by_id, @@ -63,10 +73,12 @@ impl Cluster { table_id_by_path, table_path_by_id, table_info_by_path, + partitions_id_by_path, + partition_name_by_id, } } - pub fn invalidate_server(&self, server_id: &i32, table_ids: Vec) -> Self { + pub fn invalidate_server(&self, server_id: &i32, table_ids: Vec) -> Self { let alive_tablet_servers_by_id = self .alive_tablet_servers_by_id .iter() @@ -89,6 +101,7 @@ impl Cluster { available_locations_by_bucket, self.table_id_by_path.clone(), self.table_info_by_path.clone(), + self.partitions_id_by_path.clone(), ) } @@ -110,6 +123,7 @@ impl Cluster { available_locations_by_bucket, self.table_id_by_path.clone(), self.table_info_by_path.clone(), + self.partitions_id_by_path.clone(), ) } @@ -123,6 +137,8 @@ impl Cluster { table_id_by_path, table_path_by_id, table_info_by_path, + partitions_id_by_path, + partition_name_by_id, } = cluster; self.coordinator_server = coordinator_server; self.alive_tablet_servers_by_id = alive_tablet_servers_by_id; @@ -132,26 +148,30 @@ impl Cluster { self.table_id_by_path = table_id_by_path; self.table_path_by_id = table_path_by_id; self.table_info_by_path = table_info_by_path; + self.partitions_id_by_path = partitions_id_by_path; + self.partition_name_by_id = partition_name_by_id; } fn filter_bucket_locations_by_path( &self, table_paths: &HashSet<&TablePath>, ) -> ( - HashMap>, + HashMap, Vec>, HashMap, ) { let available_locations_by_path = self .available_locations_by_path .iter() - .filter(|&(path, _)| !table_paths.contains(path)) + .filter(|&(path, _)| !table_paths.contains(path.get_table_path())) .map(|(path, locations)| (path.clone(), locations.clone())) .collect(); let available_locations_by_bucket = self .available_locations_by_bucket .iter() - .filter(|&(_bucket, location)| !table_paths.contains(&location.table_path)) + .filter(|&(_bucket, location)| { + !table_paths.contains(&location.physical_table_path.get_table_path()) + }) .map(|(bucket, location)| (bucket.clone(), location.clone())) .collect(); @@ -175,15 +195,19 @@ impl Cluster { let mut table_id_by_path = HashMap::new(); let mut table_info_by_path = HashMap::new(); + let mut partitions_id_by_path = HashMap::new(); + let mut tmp_available_locations_by_path = HashMap::new(); + let mut tmp_available_location_by_bucket = HashMap::new(); + if let Some(origin) = origin_cluster { table_info_by_path.extend(origin.get_table_info_by_path().clone()); table_id_by_path.extend(origin.get_table_id_by_path().clone()); + partitions_id_by_path.extend(origin.partitions_id_by_path.clone()); + tmp_available_locations_by_path.extend(origin.available_locations_by_path.clone()); + tmp_available_location_by_bucket.extend(origin.available_locations_by_bucket.clone()); } - // Index the bucket locations by table path, and index bucket location by bucket - let mut tmp_available_location_by_bucket = HashMap::new(); - let mut tmp_available_locations_by_path = HashMap::new(); - + // iterate all table metadata for table_metadata in metadata_response.table_metadata { let table_id = table_metadata.table_id; let table_path = from_pb_table_path(&table_metadata.table_path); @@ -207,39 +231,56 @@ impl Cluster { table_info_by_path.insert(table_path.clone(), table_info); table_id_by_path.insert(table_path.clone(), table_id); - // now, get bucket matadata - let mut found_unavailable_bucket = false; - let mut available_bucket_for_table = vec![]; - let mut bucket_for_table = vec![]; - for bucket_metadata in table_metadata.bucket_metadata { - let bucket_id = bucket_metadata.bucket_id; - let bucket = TableBucket::new(table_id, bucket_id); - let bucket_location; - if let Some(leader_id) = bucket_metadata.leader_id - && let Some(server_node) = servers.get(&leader_id) - { - bucket_location = BucketLocation::new( - bucket.clone(), - Some(server_node.clone()), - table_path.clone(), - ); - available_bucket_for_table.push(bucket_location.clone()); - tmp_available_location_by_bucket - .insert(bucket.clone(), bucket_location.clone()); - } else { - found_unavailable_bucket = true; - bucket_location = BucketLocation::new(bucket.clone(), None, table_path.clone()); - } - bucket_for_table.push(bucket_location.clone()); + let bucket_metadata = table_metadata.bucket_metadata; + let physical_table_path = Arc::new(PhysicalTablePath::of(Arc::new(table_path.clone()))); + + let bucket_locations = get_bucket_locations( + &mut servers, + bucket_metadata.as_slice(), + table_id, + None, + &physical_table_path, + ); + tmp_available_locations_by_path.insert(physical_table_path, bucket_locations); + } + + // iterate all partition metadata + for partition_metadata in metadata_response.partition_metadata { + let table_id = partition_metadata.table_id; + + if let Some(cluster) = origin_cluster { + let partition_name = partition_metadata.partition_name; + let table_path = cluster.get_table_path_by_id(table_id).unwrap(); + let partition_id = partition_metadata.partition_id; + + let physical_table_path = Arc::new(PhysicalTablePath::of_partitioned( + Arc::new(table_path.clone()), + Some(partition_name), + )); + + partitions_id_by_path.insert(Arc::clone(&physical_table_path), partition_id); + + let bucket_locations = get_bucket_locations( + &mut servers, + partition_metadata.bucket_metadata.as_slice(), + table_id, + Some(partition_id), + &physical_table_path, + ); + + tmp_available_locations_by_path.insert(physical_table_path, bucket_locations); } + } - if found_unavailable_bucket { - tmp_available_locations_by_path - .insert(table_path.clone(), available_bucket_for_table.clone()); - } else { - tmp_available_locations_by_path.insert(table_path.clone(), bucket_for_table); + for bucket_locations in &mut tmp_available_locations_by_path.values() { + for location in bucket_locations { + if location.leader().is_some() { + tmp_available_location_by_bucket + .insert(location.table_bucket.clone(), location.clone()); + } } } + Ok(Cluster::new( coordinator_server, servers, @@ -247,6 +288,7 @@ impl Cluster { tmp_available_location_by_bucket, table_id_by_path, table_info_by_path, + partitions_id_by_path, )) } @@ -269,14 +311,43 @@ impl Cluster { pub fn get_table_bucket( &self, - table_path: &TablePath, + physical_table_path: &PhysicalTablePath, bucket_id: BucketId, ) -> Result { - let table_info = self.get_table(table_path)?; - Ok(TableBucket::new(table_info.table_id, bucket_id)) + let table_info = self.get_table(physical_table_path.get_table_path())?; + let partition_id = self.get_partition_id(physical_table_path); + + if physical_table_path.get_partition_name().is_some() && partition_id.is_none() { + return Err(PartitionNotExist { + message: format!( + "The partition {} is not found in cluster", + physical_table_path.get_partition_name().unwrap() + ), + }); + } + + Ok(TableBucket::new_with_partition( + table_info.table_id, + partition_id, + bucket_id, + )) } - pub fn get_bucket_locations_by_path(&self) -> &HashMap> { + pub fn get_partition_id(&self, physical_table_path: &PhysicalTablePath) -> Option { + self.partitions_id_by_path.get(physical_table_path).copied() + } + + pub fn get_partition_name(&self, partition_id: PartitionId) -> Option<&String> { + self.partition_name_by_id.get(&partition_id) + } + + pub fn get_table_id(&self, table_path: &TablePath) -> Option { + self.table_id_by_path.get(table_path).copied() + } + + pub fn get_bucket_locations_by_path( + &self, + ) -> &HashMap, Vec> { &self.available_locations_by_path } @@ -288,13 +359,13 @@ impl Cluster { &self.table_id_by_path } - pub fn get_table_path_by_id(&self, table_id: i64) -> Option<&TablePath> { + pub fn get_table_path_by_id(&self, table_id: TableId) -> Option<&TablePath> { self.table_path_by_id.get(&table_id) } pub fn get_available_buckets_for_table_path( &self, - table_path: &TablePath, + table_path: &PhysicalTablePath, ) -> &Vec { self.available_locations_by_path .get(table_path) @@ -327,4 +398,37 @@ impl Cluster { pub fn opt_get_table(&self, table_path: &TablePath) -> Option<&TableInfo> { self.table_info_by_path.get(table_path) } + + pub fn get_partition_id_by_path(&self) -> &HashMap, PartitionId> { + &self.partitions_id_by_path + } +} + +fn get_bucket_locations( + servers: &mut HashMap, + bucket_metadata: &[PbBucketMetadata], + table_id: i64, + partition_id: Option, + physical_table_path: &Arc, +) -> Vec { + let mut bucket_locations = Vec::new(); + for metadata in bucket_metadata { + let bucket_id = metadata.bucket_id; + let bucket = TableBucket::new_with_partition(table_id, partition_id, bucket_id); + + let server = if let Some(leader_id) = metadata.leader_id + && let Some(server_node) = servers.get(&leader_id) + { + Some(server_node.clone()) + } else { + None + }; + + bucket_locations.push(BucketLocation::new( + bucket.clone(), + server, + Arc::clone(physical_table_path), + )); + } + bucket_locations } diff --git a/crates/fluss/src/cluster/mod.rs b/crates/fluss/src/cluster/mod.rs index f9d42e44..58e80c00 100644 --- a/crates/fluss/src/cluster/mod.rs +++ b/crates/fluss/src/cluster/mod.rs @@ -16,7 +16,8 @@ // under the License. use crate::BucketId; -use crate::metadata::{TableBucket, TablePath}; +use crate::metadata::{PhysicalTablePath, TableBucket}; +use std::sync::Arc; #[allow(clippy::module_inception)] mod cluster; @@ -69,19 +70,19 @@ pub enum ServerType { pub struct BucketLocation { pub table_bucket: TableBucket, leader: Option, - pub table_path: TablePath, + physical_table_path: Arc, } impl BucketLocation { pub fn new( table_bucket: TableBucket, leader: Option, - table_path: TablePath, + physical_table_path: Arc, ) -> BucketLocation { BucketLocation { table_bucket, leader, - table_path, + physical_table_path, } } @@ -96,4 +97,8 @@ impl BucketLocation { pub fn bucket_id(&self) -> BucketId { self.table_bucket.bucket_id() } + + pub fn physical_table_path(&self) -> &Arc { + &self.physical_table_path + } } diff --git a/crates/fluss/src/error.rs b/crates/fluss/src/error.rs index 68426d7c..ef865308 100644 --- a/crates/fluss/src/error.rs +++ b/crates/fluss/src/error.rs @@ -99,6 +99,12 @@ pub enum Error { )] InvalidPartition { message: String }, + #[snafu( + visibility(pub(crate)), + display("Fluss hitting partition not exist error {}.", message) + )] + PartitionNotExist { message: String }, + #[snafu( visibility(pub(crate)), display("Fluss hitting IO not supported error {}.", message) diff --git a/crates/fluss/src/metadata/datatype.rs b/crates/fluss/src/metadata/datatype.rs index e3652370..6431d3a9 100644 --- a/crates/fluss/src/metadata/datatype.rs +++ b/crates/fluss/src/metadata/datatype.rs @@ -930,6 +930,15 @@ impl RowType { self.fields.iter().map(|f| f.name.as_str()).collect() } + pub fn project_with_field_names(&self, field_names: &[String]) -> Result { + let indices: Vec = field_names + .iter() + .filter_map(|pk| self.get_field_index(pk)) + .collect(); + + self.project(indices.as_slice()) + } + pub fn project(&self, project_field_positions: &[usize]) -> Result { Ok(RowType::with_nullable( self.nullable, diff --git a/crates/fluss/src/metadata/table.rs b/crates/fluss/src/metadata/table.rs index 7b93acae..ce362c43 100644 --- a/crates/fluss/src/metadata/table.rs +++ b/crates/fluss/src/metadata/table.rs @@ -25,6 +25,7 @@ use core::fmt; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; use std::fmt::{Display, Formatter}; +use std::sync::Arc; use strum_macros::EnumString; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] @@ -172,7 +173,7 @@ impl SchemaBuilder { self } _ => { - panic!("data type msut be row type") + panic!("data type must be row type") } } } @@ -325,7 +326,7 @@ pub struct TableDescriptorBuilder { schema: Option, properties: HashMap, custom_properties: HashMap, - partition_keys: Vec, + partition_keys: Arc<[String]>, comment: Option, table_distribution: Option, } @@ -374,7 +375,7 @@ impl TableDescriptorBuilder { } pub fn partitioned_by(mut self, partition_keys: Vec) -> Self { - self.partition_keys = partition_keys; + self.partition_keys = Arc::from(partition_keys); self } @@ -413,7 +414,7 @@ impl TableDescriptorBuilder { pub struct TableDescriptor { schema: Schema, comment: Option, - partition_keys: Vec, + partition_keys: Arc<[String]>, table_distribution: Option, properties: HashMap, custom_properties: HashMap, @@ -749,19 +750,19 @@ impl TablePath { /// `partition_name` will be `Some(...)`; otherwise, it will be `None`. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct PhysicalTablePath { - table_path: TablePath, + table_path: Arc, partition_name: Option, } impl PhysicalTablePath { - pub fn of(table_path: TablePath) -> Self { + pub fn of(table_path: Arc) -> Self { Self { table_path, partition_name: None, } } - pub fn of_partitioned(table_path: TablePath, partition_name: Option) -> Self { + pub fn of_partitioned(table_path: Arc, partition_name: Option) -> Self { Self { table_path, partition_name, @@ -774,7 +775,7 @@ impl PhysicalTablePath { partition_name: Option, ) -> Self { Self { - table_path: TablePath::new(database_name, table_name), + table_path: Arc::new(TablePath::new(database_name, table_name)), partition_name, } } @@ -815,7 +816,7 @@ pub struct TableInfo { pub primary_keys: Vec, pub physical_primary_keys: Vec, pub bucket_keys: Vec, - pub partition_keys: Vec, + pub partition_keys: Arc<[String]>, pub num_buckets: i32, pub properties: HashMap, pub table_config: TableConfig, @@ -982,7 +983,7 @@ impl TableInfo { schema_id: i32, schema: Schema, bucket_keys: Vec, - partition_keys: Vec, + partition_keys: Arc<[String]>, num_buckets: i32, properties: HashMap, custom_properties: HashMap, @@ -1080,7 +1081,7 @@ impl TableInfo { .is_auto_partition_enabled() } - pub fn get_partition_keys(&self) -> &[String] { + pub fn get_partition_keys(&self) -> &Arc<[String]> { &self.partition_keys } @@ -1115,7 +1116,7 @@ impl TableInfo { pub fn to_table_descriptor(&self) -> Result { let mut builder = TableDescriptor::builder() .schema(self.schema.clone()) - .partitioned_by(self.partition_keys.clone()) + .partitioned_by(self.partition_keys.to_vec()) .distributed_by(Some(self.num_buckets), self.bucket_keys.clone()) .properties(self.properties.clone()) .custom_properties(self.custom_properties.clone()); @@ -1177,6 +1178,18 @@ impl TableBucket { } } + pub fn new_with_partition( + table_id: TableId, + partition_id: Option, + bucket: BucketId, + ) -> Self { + TableBucket { + table_id, + partition_id, + bucket, + } + } + pub fn table_id(&self) -> TableId { self.table_id } @@ -1308,7 +1321,7 @@ mod tests { 1, schema.clone(), vec!["id".to_string()], - vec![], // No partition keys + Arc::from(vec![]), // No partition keys 1, properties.clone(), HashMap::new(), @@ -1329,7 +1342,7 @@ mod tests { 1, schema.clone(), vec!["id".to_string()], - vec![], // No partition keys + Arc::from(vec![]), // No partition keys 1, properties.clone(), HashMap::new(), @@ -1350,7 +1363,7 @@ mod tests { 1, schema.clone(), vec!["id".to_string()], - vec!["name".to_string()], // Partition keys + Arc::from(vec!["name".to_string()]), // Partition keys 1, properties.clone(), HashMap::new(), @@ -1371,7 +1384,7 @@ mod tests { 1, schema.clone(), vec!["id".to_string()], - vec!["name".to_string()], // Partition keys + Arc::from(vec!["name".to_string()]), // Partition keys 1, properties.clone(), HashMap::new(), diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index 6340dc80..726106b7 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -1450,6 +1450,7 @@ pub struct MyVec(pub StreamReader); mod tests { use super::*; use crate::metadata::{DataField, DataTypes, RowType}; + use crate::test_utils::build_table_info; #[test] fn test_to_array_type() { @@ -1932,7 +1933,7 @@ mod tests { use crate::compression::{ ArrowCompressionInfo, ArrowCompressionType, DEFAULT_NON_ZSTD_COMPRESSION_LEVEL, }; - use crate::metadata::TablePath; + use crate::metadata::{PhysicalTablePath, TablePath}; use crate::row::GenericRow; use tempfile::NamedTempFile; @@ -1941,7 +1942,9 @@ mod tests { DataField::new("id".to_string(), DataTypes::int(), None), DataField::new("name".to_string(), DataTypes::string(), None), ]); - let table_path = Arc::new(TablePath::new("db".to_string(), "tbl".to_string())); + let table_path = TablePath::new("db".to_string(), "tbl".to_string()); + let table_info = Arc::new(build_table_info(table_path.clone(), 1, 1)); + let physical_table_path = Arc::new(PhysicalTablePath::of(Arc::new(table_path))); let mut builder = MemoryLogRecordsArrowBuilder::new( 1, @@ -1956,13 +1959,15 @@ mod tests { let mut row = GenericRow::new(2); row.set_field(0, 1_i32); row.set_field(1, "alice"); - let record = WriteRecord::for_append(table_path.clone(), 1, row); + let record = + WriteRecord::for_append(Arc::clone(&table_info), physical_table_path.clone(), 1, row); builder.append(&record)?; let mut row2 = GenericRow::new(2); row2.set_field(0, 2_i32); row2.set_field(1, "bob"); - let record2 = WriteRecord::for_append(table_path, 2, row2); + let record2 = + WriteRecord::for_append(Arc::clone(&table_info), physical_table_path, 2, row2); builder.append(&record2)?; let data = builder.build()?; diff --git a/crates/fluss/src/rpc/message/update_metadata.rs b/crates/fluss/src/rpc/message/update_metadata.rs index a6e62886..1f0d88c2 100644 --- a/crates/fluss/src/rpc/message/update_metadata.rs +++ b/crates/fluss/src/rpc/message/update_metadata.rs @@ -15,14 +15,15 @@ // specific language governing permissions and limitations // under the License. -use crate::proto::{MetadataResponse, PbTablePath}; +use crate::metadata::{PhysicalTablePath, TablePath}; +use crate::proto::{MetadataResponse, PbPhysicalTablePath, PbTablePath}; use crate::rpc::api_key::ApiKey; use crate::rpc::api_version::ApiVersion; +use crate::rpc::frame::ReadError; use crate::rpc::frame::WriteError; use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType}; - -use crate::metadata::TablePath; -use crate::rpc::frame::ReadError; +use std::collections::HashSet; +use std::sync::Arc; use crate::{impl_read_version_type, impl_write_version_type, proto}; use bytes::{Buf, BufMut}; @@ -33,7 +34,11 @@ pub struct UpdateMetadataRequest { } impl UpdateMetadataRequest { - pub fn new(table_paths: &[&TablePath]) -> Self { + pub fn new( + table_paths: &HashSet<&TablePath>, + physical_table_paths: &HashSet<&Arc>, + partition_ids: Vec, + ) -> Self { UpdateMetadataRequest { inner_request: proto::MetadataRequest { table_path: table_paths @@ -43,8 +48,15 @@ impl UpdateMetadataRequest { table_name: path.table().to_string(), }) .collect(), - partitions_path: vec![], - partitions_id: vec![], + partitions_path: physical_table_paths + .iter() + .map(|path| PbPhysicalTablePath { + database_name: path.get_database_name().to_string(), + table_name: path.get_table_name().to_string(), + partition_name: path.get_partition_name().map(|pn| pn.to_string()), + }) + .collect(), + partitions_id: partition_ids, }, } } diff --git a/crates/fluss/src/test_utils.rs b/crates/fluss/src/test_utils.rs index d1cd3ec7..8e8fbe48 100644 --- a/crates/fluss/src/test_utils.rs +++ b/crates/fluss/src/test_utils.rs @@ -17,7 +17,8 @@ use crate::cluster::{BucketLocation, Cluster, ServerNode, ServerType}; use crate::metadata::{ - DataField, DataTypes, Schema, TableBucket, TableDescriptor, TableInfo, TablePath, + DataField, DataTypes, PhysicalTablePath, Schema, TableBucket, TableDescriptor, TableInfo, + TablePath, }; use std::collections::HashMap; use std::sync::Arc; @@ -53,12 +54,15 @@ pub(crate) fn build_cluster(table_path: &TablePath, table_id: i64, buckets: i32) let bucket_location = BucketLocation::new( table_bucket.clone(), Some(server.clone()), - table_path.clone(), + Arc::new(PhysicalTablePath::of(Arc::new(table_path.clone()))), ); bucket_locations.push(bucket_location.clone()); locations_by_bucket.insert(table_bucket, bucket_location); } - locations_by_path.insert(table_path.clone(), bucket_locations); + locations_by_path.insert( + Arc::new(PhysicalTablePath::of(Arc::new(table_path.clone()))), + bucket_locations, + ); let mut table_id_by_path = HashMap::new(); table_id_by_path.insert(table_path.clone(), table_id); @@ -76,6 +80,7 @@ pub(crate) fn build_cluster(table_path: &TablePath, table_id: i64, buckets: i32) locations_by_bucket, table_id_by_path, table_info_by_path, + HashMap::new(), ) } diff --git a/crates/fluss/tests/integration/kv_table.rs b/crates/fluss/tests/integration/kv_table.rs index a4f29617..b2263c29 100644 --- a/crates/fluss/tests/integration/kv_table.rs +++ b/crates/fluss/tests/integration/kv_table.rs @@ -36,7 +36,7 @@ mod kv_table_test { use crate::integration::fluss_cluster::FlussTestingCluster; use crate::integration::utils::{create_table, get_cluster, start_cluster, stop_cluster}; use fluss::client::UpsertWriter; - use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; + use fluss::metadata::{DataTypes, PartitionSpec, Schema, TableDescriptor, TablePath}; use fluss::row::{GenericRow, InternalRow}; use std::sync::Arc; @@ -438,6 +438,179 @@ mod kv_table_test { .expect("Failed to drop table"); } + #[tokio::test] + async fn partitioned_table_upsert_and_lookup() { + let cluster = get_fluss_cluster(); + let connection = cluster.get_fluss_connection().await; + + let admin = connection.get_admin().await.expect("Failed to get admin"); + + let table_path = + TablePath::new("fluss".to_string(), "test_partitioned_kv_table".to_string()); + + // Create a partitioned KV table with region as partition key + let table_descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("region", DataTypes::string()) + .column("user_id", DataTypes::int()) + .column("name", DataTypes::string()) + .column("score", DataTypes::bigint()) + .primary_key(vec!["region".to_string(), "user_id".to_string()]) + .build() + .expect("Failed to build schema"), + ) + .partitioned_by(vec!["region".to_string()]) + .build() + .expect("Failed to build table"); + + create_table(&admin, &table_path, &table_descriptor).await; + + // Create partitions for each region before inserting data + for region in &["US", "EU", "APAC"] { + let mut partition_map = std::collections::HashMap::new(); + partition_map.insert("region".to_string(), region.to_string()); + let partition_spec = PartitionSpec::new(partition_map); + admin + .create_partition(&table_path, &partition_spec, false) + .await + .expect("Failed to create partition"); + } + + let connection = cluster.get_fluss_connection().await; + + let table = connection + .get_table(&table_path) + .await + .expect("Failed to get table"); + + let table_upsert = table.new_upsert().expect("Failed to create upsert"); + + let mut upsert_writer = table_upsert + .create_writer() + .expect("Failed to create writer"); + + // Insert records with different partitions + let test_data = [ + ("US", 1, "Gustave", 100i64), + ("US", 2, "Lune", 200i64), + ("EU", 1, "Sciel", 150i64), + ("EU", 2, "Maelle", 250i64), + ("APAC", 1, "Noco", 300i64), + ]; + + for (region, user_id, name, score) in &test_data { + let mut row = GenericRow::new(4); + row.set_field(0, *region); + row.set_field(1, *user_id); + row.set_field(2, *name); + row.set_field(3, *score); + upsert_writer.upsert(&row).await.expect("Failed to upsert"); + } + + // Create lookuper + let mut lookuper = table + .new_lookup() + .expect("Failed to create lookup") + .create_lookuper() + .expect("Failed to create lookuper"); + + // Lookup records - the lookup key includes partition key columns + for (region, user_id, expected_name, expected_score) in &test_data { + let mut key = GenericRow::new(4); + key.set_field(0, *region); + key.set_field(1, *user_id); + + let result = lookuper.lookup(&key).await.expect("Failed to lookup"); + let row = result + .get_single_row() + .expect("Failed to get row") + .expect("Row should exist"); + + assert_eq!(row.get_string(0), *region, "region mismatch"); + assert_eq!(row.get_int(1), *user_id, "user_id mismatch"); + assert_eq!(row.get_string(2), *expected_name, "name mismatch"); + assert_eq!(row.get_long(3), *expected_score, "score mismatch"); + } + + // Test update within a partition + let mut updated_row = GenericRow::new(4); + updated_row.set_field(0, "US"); + updated_row.set_field(1, 1); + updated_row.set_field(2, "Gustave Updated"); + updated_row.set_field(3, 999i64); + upsert_writer + .upsert(&updated_row) + .await + .expect("Failed to upsert updated row"); + + // Verify the update + let mut key = GenericRow::new(4); + key.set_field(0, "US"); + key.set_field(1, 1); + let result = lookuper.lookup(&key).await.expect("Failed to lookup"); + let row = result + .get_single_row() + .expect("Failed to get row") + .expect("Row should exist"); + assert_eq!(row.get_string(2), "Gustave Updated"); + assert_eq!(row.get_long(3), 999); + + // Lookup in non-existent partition should return empty result + let mut non_existent_key = GenericRow::new(4); + non_existent_key.set_field(0, "UNKNOWN_REGION"); + non_existent_key.set_field(1, 1); + let result = lookuper + .lookup(&non_existent_key) + .await + .expect("Failed to lookup non-existent partition"); + assert!( + result + .get_single_row() + .expect("Failed to get row") + .is_none(), + "Lookup in non-existent partition should return None" + ); + + // Delete a record within a partition + let mut delete_key = GenericRow::new(4); + delete_key.set_field(0, "EU"); + delete_key.set_field(1, 1); + upsert_writer + .delete(&delete_key) + .await + .expect("Failed to delete"); + + // Verify deletion + let mut key = GenericRow::new(4); + key.set_field(0, "EU"); + key.set_field(1, 1); + let result = lookuper.lookup(&key).await.expect("Failed to lookup"); + assert!( + result + .get_single_row() + .expect("Failed to get row") + .is_none(), + "Deleted record should not exist" + ); + + // Verify other records in the same partition still exist + let mut key = GenericRow::new(4); + key.set_field(0, "EU"); + key.set_field(1, 2); + let result = lookuper.lookup(&key).await.expect("Failed to lookup"); + let row = result + .get_single_row() + .expect("Failed to get row") + .expect("Row should exist"); + assert_eq!(row.get_string(2), "Maelle"); + + admin + .drop_table(&table_path, false) + .await + .expect("Failed to drop table"); + } + /// Integration test covering put and get operations for all supported datatypes. #[tokio::test] async fn all_supported_datatypes() {