diff --git a/bindings/python/src/metadata.rs b/bindings/python/src/metadata.rs index 235df56b..f4226961 100644 --- a/bindings/python/src/metadata.rs +++ b/bindings/python/src/metadata.rs @@ -530,7 +530,11 @@ impl TableBucket { /// Convert to core TableBucket (internal use) pub fn to_core(&self) -> fcore::metadata::TableBucket { - fcore::metadata::TableBucket::new(self.table_id, self.bucket) + fcore::metadata::TableBucket::new_with_partition( + self.table_id, + self.partition_id, + self.bucket, + ) } } diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs index 9061169d..737ead3d 100644 --- a/crates/fluss/src/client/admin.rs +++ b/crates/fluss/src/client/admin.rs @@ -273,7 +273,11 @@ impl FlussAdmin { // Convert proto response to LakeSnapshot let mut table_buckets_offset = HashMap::new(); for bucket_snapshot in response.bucket_snapshots { - let table_bucket = TableBucket::new(response.table_id, bucket_snapshot.bucket_id); + let table_bucket = TableBucket::new_with_partition( + response.table_id, + bucket_snapshot.partition_id, + bucket_snapshot.bucket_id, + ); if let Some(log_offset) = bucket_snapshot.log_offset { table_buckets_offset.insert(table_bucket, log_offset); } diff --git a/crates/fluss/src/client/metadata.rs b/crates/fluss/src/client/metadata.rs index c6244cd7..ce00ced2 100644 --- a/crates/fluss/src/client/metadata.rs +++ b/crates/fluss/src/client/metadata.rs @@ -259,8 +259,8 @@ mod tests { let leader = metadata .leader_for(&table_path, &TableBucket::new(1, 0)) .await - .expect("leader request should be Ok") - .expect("leader should exist"); + .unwrap() + .expect("leader"); assert_eq!(leader.id(), 1); } diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index d30c5d55..a88964ea 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -26,7 +26,6 @@ use std::{ }; use tempfile::TempDir; -use crate::TableId; use crate::client::connection::FlussConnection; use crate::client::credentials::SecurityTokenManager; use crate::client::metadata::Metadata; @@ -43,6 +42,7 @@ use crate::record::{ }; use crate::rpc::{RpcClient, RpcError, message}; use crate::util::FairBucketStatusMap; +use crate::{PartitionId, TableId}; const LOG_FETCH_MAX_BYTES: i32 = 16 * 1024 * 1024; #[allow(dead_code)] @@ -88,7 +88,7 @@ impl<'a> TableScan<'a> { /// # pub async fn example() -> Result<()> { /// let mut config = Config::default(); /// config.bootstrap_server = "127.0.0.1:9123".to_string(); - /// let conn = FlussConnection::new(config).await; + /// let conn = FlussConnection::new(config).await?; /// /// let table_descriptor = TableDescriptor::builder() /// .schema( @@ -164,7 +164,7 @@ impl<'a> TableScan<'a> { /// # pub async fn example() -> Result<()> { /// let mut config = Config::default(); /// config.bootstrap_server = "127.0.0.1:9123".to_string(); - /// let conn = FlussConnection::new(config).await; + /// let conn = FlussConnection::new(config).await?; /// /// let table_descriptor = TableDescriptor::builder() /// .schema( @@ -270,6 +270,7 @@ struct LogScannerInner { metadata: Arc, log_scanner_status: Arc, log_fetcher: LogFetcher, + is_partitioned_table: bool, } impl LogScannerInner { @@ -284,6 +285,7 @@ impl LogScannerInner { Ok(Self { table_path: table_info.table_path.clone(), table_id: table_info.table_id, + is_partitioned_table: table_info.is_partitioned(), metadata: metadata.clone(), log_scanner_status: log_scanner_status.clone(), log_fetcher: LogFetcher::new( @@ -337,6 +339,13 @@ impl LogScannerInner { } async fn subscribe(&self, bucket: i32, offset: i64) -> Result<()> { + if self.is_partitioned_table { + return Err(Error::UnsupportedOperation { + message: "The table is a partitioned table, please use \"subscribe_partition\" to \ + subscribe a partitioned bucket instead." + .to_string(), + }); + } let table_bucket = TableBucket::new(self.table_id, bucket); self.metadata .check_and_update_table_metadata(from_ref(&self.table_path)) @@ -347,6 +356,13 @@ impl LogScannerInner { } async fn subscribe_batch(&self, bucket_offsets: &HashMap) -> Result<()> { + if self.is_partitioned_table { + return Err(Error::UnsupportedOperation { + message: + "The table is a partitioned table, subscribe_batch is not supported currently." + .to_string(), + }); + } self.metadata .check_and_update_table_metadata(from_ref(&self.table_path)) .await?; @@ -368,6 +384,29 @@ impl LogScannerInner { Ok(()) } + async fn subscribe_partition( + &self, + partition_id: PartitionId, + bucket: i32, + offset: i64, + ) -> Result<()> { + if !self.is_partitioned_table { + return Err(Error::UnsupportedOperation { + message: "The table is not a partitioned table, please use \"subscribe\" to \ + subscribe a non-partitioned bucket instead." + .to_string(), + }); + } + let table_bucket = + TableBucket::new_with_partition(self.table_id, Some(partition_id), bucket); + self.metadata + .check_and_update_table_metadata(from_ref(&self.table_path)) + .await?; + self.log_scanner_status + .assign_scan_bucket(table_bucket, offset); + Ok(()) + } + async fn poll_for_fetches(&self) -> Result>> { let result = self.log_fetcher.collect_fetches()?; if !result.is_empty() { @@ -435,6 +474,17 @@ impl LogScanner { pub async fn subscribe_batch(&self, bucket_offsets: &HashMap) -> Result<()> { self.inner.subscribe_batch(bucket_offsets).await } + + pub async fn subscribe_partition( + &self, + partition_id: PartitionId, + bucket: i32, + offset: i64, + ) -> Result<()> { + self.inner + .subscribe_partition(partition_id, bucket, offset) + .await + } } // Implementation for RecordBatchLogScanner (batches mode) @@ -451,6 +501,17 @@ impl RecordBatchLogScanner { pub async fn subscribe_batch(&self, bucket_offsets: &HashMap) -> Result<()> { self.inner.subscribe_batch(bucket_offsets).await } + + pub async fn subscribe_partition( + &self, + partition_id: PartitionId, + bucket: i32, + offset: i64, + ) -> Result<()> { + self.inner + .subscribe_partition(partition_id, bucket, offset) + .await + } } struct LogFetcher { @@ -617,55 +678,55 @@ impl LogFetcher { ) } - async fn check_and_update_metadata(&self) -> Result<()> { - let need_update = self - .fetchable_buckets() - .iter() - .any(|bucket| self.get_table_bucket_leader(bucket).is_none()); + async fn check_and_update_metadata(&self, table_buckets: &[TableBucket]) -> Result<()> { + let mut partition_ids = Vec::new(); + let mut need_update = false; - if !need_update { - return Ok(()); + for tb in table_buckets { + if self.get_table_bucket_leader(tb).is_some() { + continue; + } + + if self.is_partitioned { + partition_ids.push(tb.partition_id().unwrap()); + } else { + need_update = true; + break; + } } - if self.is_partitioned { - // Fallback to full table metadata refresh until partition-aware updates are available. + let update_result = if self.is_partitioned && !partition_ids.is_empty() { self.metadata - .update_tables_metadata(&HashSet::from([&self.table_path]), &HashSet::new(), vec![]) + .update_tables_metadata( + &HashSet::from([&self.table_path]), + &HashSet::new(), + partition_ids, + ) .await - .or_else(|e| { - if let Error::RpcError { source, .. } = &e - && matches!(source, RpcError::ConnectionError(_) | RpcError::Poisoned(_)) - { - warn!( - "Retrying after encountering error while updating table metadata: {e}" - ); - Ok(()) - } else { - Err(e) - } - })?; - return Ok(()); - } + } else if need_update { + self.metadata.update_table_metadata(&self.table_path).await + } else { + Ok(()) + }; - // TODO: Handle PartitionNotExist error - self.metadata - .update_tables_metadata(&HashSet::from([&self.table_path]), &HashSet::new(), vec![]) - .await - .or_else(|e| { - if let Error::RpcError { source, .. } = &e - && matches!(source, RpcError::ConnectionError(_) | RpcError::Poisoned(_)) - { - warn!("Retrying after encountering error while updating table metadata: {e}"); - Ok(()) - } else { - Err(e) - } - }) + // TODO: Handle PartitionNotExist error like java side + update_result.or_else(|e| { + if let Error::RpcError { source, .. } = &e + && matches!(source, RpcError::ConnectionError(_) | RpcError::Poisoned(_)) + { + warn!("Retrying after encountering error while updating table metadata: {e}"); + Ok(()) + } else { + Err(e) + } + })?; + Ok(()) } /// Send fetch requests asynchronously without waiting for responses async fn send_fetches(&self) -> Result<()> { - self.check_and_update_metadata().await?; + self.check_and_update_metadata(self.fetchable_buckets().as_slice()) + .await?; let fetch_request = self.prepare_fetch_log_requests().await; for (leader, fetch_request) in fetch_request { @@ -774,7 +835,11 @@ impl LogFetcher { for fetch_log_for_bucket in fetch_log_for_buckets { let bucket: i32 = fetch_log_for_bucket.bucket_id; - let table_bucket = TableBucket::new(table_id, bucket); + let table_bucket = TableBucket::new_with_partition( + table_id, + fetch_log_for_bucket.partition_id, + bucket, + ); // todo: check fetch result code for per-bucket let Some(fetch_offset) = log_scanner_status.get_bucket_offset(&table_bucket) else { @@ -1302,7 +1367,7 @@ impl LogFetcher { ) } else { let fetch_log_req_for_bucket = PbFetchLogReqForBucket { - partition_id: None, + partition_id: bucket.partition_id(), bucket_id: bucket.bucket_id(), fetch_offset: offset, // 1M diff --git a/crates/fluss/src/metadata/table.rs b/crates/fluss/src/metadata/table.rs index 908f4469..4e0a5256 100644 --- a/crates/fluss/src/metadata/table.rs +++ b/crates/fluss/src/metadata/table.rs @@ -1206,7 +1206,7 @@ pub struct TableBucket { impl TableBucket { pub fn new(table_id: TableId, bucket: BucketId) -> Self { - TableBucket { + Self { table_id, partition_id: None, bucket, diff --git a/crates/fluss/tests/integration/log_table.rs b/crates/fluss/tests/integration/log_table.rs index 514df828..cbfcbe58 100644 --- a/crates/fluss/tests/integration/log_table.rs +++ b/crates/fluss/tests/integration/log_table.rs @@ -974,7 +974,7 @@ mod table_test { } #[tokio::test] - async fn partitioned_table_append() { + async fn partitioned_table_append_scan() { let cluster = get_fluss_cluster(); let connection = cluster.get_fluss_connection().await; @@ -1098,11 +1098,71 @@ mod table_test { "Table partition 'fluss.test_partitioned_log_append(p=NOT Exists)' does not exist." )); + let log_scanner = table + .new_scan() + .create_log_scanner() + .expect("Failed to create log scanner"); + let partition_info = admin + .list_partition_infos(&table_path) + .await + .expect("Failed to list partition infos"); + for partition_info in partition_info { + log_scanner + .subscribe_partition(partition_info.get_partition_id(), 0, 0) + .await + .expect("Failed to subscribe to partition"); + } + + let expected_records = vec![ + (1, "US", 100i64), + (2, "US", 200i64), + (3, "EU", 300i64), + (4, "EU", 400), + (5, "US", 500i64), + (6, "US", 600i64), + (7, "EU", 700i64), + (8, "EU", 800i64), + ]; + let expected_records: Vec<(i32, String, i64)> = expected_records + .into_iter() + .map(|(id, region, val)| (id, region.to_string(), val)) + .collect(); + + let mut collected_records: Vec<(i32, String, i64)> = Vec::new(); + let start_time = std::time::Instant::now(); + while collected_records.len() < expected_records.len() + && start_time.elapsed() < Duration::from_secs(10) + { + let records = log_scanner + .poll(Duration::from_millis(500)) + .await + .expect("Failed to poll log scanner"); + for rec in records { + let row = rec.row(); + collected_records.push(( + row.get_int(0), + row.get_string(1).to_string(), + row.get_long(2), + )); + } + } + + assert_eq!( + collected_records.len(), + expected_records.len(), + "Did not receive all records in time, expect receive {} records, but got {} records", + expected_records.len(), + collected_records.len() + ); + collected_records.sort_by_key(|r| r.0); + assert_eq!( + collected_records, expected_records, + "Data mismatch between sent and received" + ); + admin .drop_table(&table_path, false) .await .expect("Failed to drop table"); - - // todo: add scan test in 203 } }