From 8296c32f39fc9e7c52db56e1190ef29549957349 Mon Sep 17 00:00:00 2001 From: Glenn Miao Date: Wed, 28 Jan 2026 22:23:49 +0800 Subject: [PATCH 1/4] feat: support partition scanning --- bindings/python/src/metadata.rs | 2 +- crates/fluss/src/client/admin.rs | 4 +- crates/fluss/src/client/metadata.rs | 6 +- .../src/client/table/log_fetch_buffer.rs | 4 +- crates/fluss/src/client/table/remote_log.rs | 2 +- crates/fluss/src/client/table/scanner.rs | 87 +++++++++++++++---- crates/fluss/src/metadata/table.rs | 6 +- crates/fluss/src/record/mod.rs | 6 +- crates/fluss/src/test_utils.rs | 2 +- crates/fluss/src/util/mod.rs | 8 +- crates/fluss/tests/integration/log_table.rs | 2 +- 11 files changed, 90 insertions(+), 39 deletions(-) diff --git a/bindings/python/src/metadata.rs b/bindings/python/src/metadata.rs index 235df56b..cfd43f45 100644 --- a/bindings/python/src/metadata.rs +++ b/bindings/python/src/metadata.rs @@ -530,7 +530,7 @@ impl TableBucket { /// Convert to core TableBucket (internal use) pub fn to_core(&self) -> fcore::metadata::TableBucket { - fcore::metadata::TableBucket::new(self.table_id, self.bucket) + fcore::metadata::TableBucket::new(self.table_id, None, self.bucket) } } diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs index 9061169d..89ffeda9 100644 --- a/crates/fluss/src/client/admin.rs +++ b/crates/fluss/src/client/admin.rs @@ -273,7 +273,9 @@ impl FlussAdmin { // Convert proto response to LakeSnapshot let mut table_buckets_offset = HashMap::new(); for bucket_snapshot in response.bucket_snapshots { - let table_bucket = TableBucket::new(response.table_id, bucket_snapshot.bucket_id); + let table_bucket = TableBucket::new(response.table_id, + bucket_snapshot.partition_id, + bucket_snapshot.bucket_id); if let Some(log_offset) = bucket_snapshot.log_offset { table_buckets_offset.insert(table_bucket, log_offset); } diff --git a/crates/fluss/src/client/metadata.rs b/crates/fluss/src/client/metadata.rs index c6244cd7..beb61dc6 100644 --- a/crates/fluss/src/client/metadata.rs +++ b/crates/fluss/src/client/metadata.rs @@ -257,10 +257,8 @@ mod tests { let cluster = build_cluster_arc(&table_path, 1, 1); let metadata = Metadata::new_for_test(cluster); let leader = metadata - .leader_for(&table_path, &TableBucket::new(1, 0)) - .await - .expect("leader request should be Ok") - .expect("leader should exist"); + .leader_for(&TableBucket::new(1, None, 0)) + .expect("leader"); assert_eq!(leader.id(), 1); } diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs b/crates/fluss/src/client/table/log_fetch_buffer.rs index edab91d5..12ba43a8 100644 --- a/crates/fluss/src/client/table/log_fetch_buffer.rs +++ b/crates/fluss/src/client/table/log_fetch_buffer.rs @@ -877,7 +877,7 @@ mod tests { #[tokio::test] async fn await_not_empty_returns_pending_error() { let buffer = LogFetchBuffer::new(test_read_context().unwrap()); - let table_bucket = TableBucket::new(1, 0); + let table_bucket = TableBucket::new(1, None, 0); buffer.pend(Box::new(ErrorPendingFetch { table_bucket: table_bucket.clone(), })); @@ -920,7 +920,7 @@ mod tests { let log_records = LogRecordsBatches::new(data.clone()); let read_context = ReadContext::new(to_arrow_schema(&row_type)?, false); let mut fetch = DefaultCompletedFetch::new( - TableBucket::new(1, 0), + TableBucket::new(1, None, 0), log_records, data.len(), read_context, diff --git a/crates/fluss/src/client/table/remote_log.rs b/crates/fluss/src/client/table/remote_log.rs index 5583f89d..dc561535 100644 --- a/crates/fluss/src/client/table/remote_log.rs +++ b/crates/fluss/src/client/table/remote_log.rs @@ -946,7 +946,7 @@ mod tests { /// Helper function to create a TableBucket for testing fn create_table_bucket(table_id: i64, bucket_id: i32) -> TableBucket { - TableBucket::new(table_id, bucket_id) + TableBucket::new(table_id, None, bucket_id) } /// Simplified fake fetcher for testing diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index d30c5d55..cbdff14e 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -26,7 +26,6 @@ use std::{ }; use tempfile::TempDir; -use crate::TableId; use crate::client::connection::FlussConnection; use crate::client::credentials::SecurityTokenManager; use crate::client::metadata::Metadata; @@ -43,6 +42,7 @@ use crate::record::{ }; use crate::rpc::{RpcClient, RpcError, message}; use crate::util::FairBucketStatusMap; +use crate::{PartitionId, TableId}; const LOG_FETCH_MAX_BYTES: i32 = 16 * 1024 * 1024; #[allow(dead_code)] @@ -54,6 +54,7 @@ pub struct TableScan<'a> { conn: &'a FlussConnection, table_info: TableInfo, metadata: Arc, + partition_id: Option, /// Column indices to project. None means all columns, Some(vec) means only the specified columns (non-empty). projected_fields: Option>, } @@ -64,6 +65,7 @@ impl<'a> TableScan<'a> { conn, table_info, metadata, + partition_id: None, projected_fields: None, } } @@ -88,7 +90,7 @@ impl<'a> TableScan<'a> { /// # pub async fn example() -> Result<()> { /// let mut config = Config::default(); /// config.bootstrap_server = "127.0.0.1:9123".to_string(); - /// let conn = FlussConnection::new(config).await; + /// let conn = FlussConnection::new(config).await?; /// /// let table_descriptor = TableDescriptor::builder() /// .schema( @@ -164,7 +166,7 @@ impl<'a> TableScan<'a> { /// # pub async fn example() -> Result<()> { /// let mut config = Config::default(); /// config.bootstrap_server = "127.0.0.1:9123".to_string(); - /// let conn = FlussConnection::new(config).await; + /// let conn = FlussConnection::new(config).await?; /// /// let table_descriptor = TableDescriptor::builder() /// .schema( @@ -227,6 +229,7 @@ impl<'a> TableScan<'a> { self.conn.get_connections(), self.conn.config(), self.projected_fields, + self.partition_id, )?; Ok(LogScanner { inner: Arc::new(inner), @@ -240,11 +243,17 @@ impl<'a> TableScan<'a> { self.conn.get_connections(), self.conn.config(), self.projected_fields, + self.partition_id, )?; Ok(RecordBatchLogScanner { inner: Arc::new(inner), }) } + + pub fn filter_partition(mut self, partition_id: PartitionId) -> Self { + self.partition_id = Some(partition_id); + self + } } /// Scanner for reading log records one at a time with per-record metadata. @@ -270,6 +279,7 @@ struct LogScannerInner { metadata: Arc, log_scanner_status: Arc, log_fetcher: LogFetcher, + partition_id: Option, } impl LogScannerInner { @@ -279,6 +289,7 @@ impl LogScannerInner { connections: Arc, config: &crate::config::Config, projected_fields: Option>, + partition_id: Option, ) -> Result { let log_scanner_status = Arc::new(LogScannerStatus::new()); Ok(Self { @@ -286,6 +297,7 @@ impl LogScannerInner { table_id: table_info.table_id, metadata: metadata.clone(), log_scanner_status: log_scanner_status.clone(), + partition_id, log_fetcher: LogFetcher::new( table_info.clone(), connections.clone(), @@ -337,7 +349,7 @@ impl LogScannerInner { } async fn subscribe(&self, bucket: i32, offset: i64) -> Result<()> { - let table_bucket = TableBucket::new(self.table_id, bucket); + let table_bucket = TableBucket::new(self.table_id, self.partition_id, bucket); self.metadata .check_and_update_table_metadata(from_ref(&self.table_path)) .await?; @@ -359,7 +371,7 @@ impl LogScannerInner { let mut scan_bucket_offsets = HashMap::new(); for (bucket_id, offset) in bucket_offsets { - let table_bucket = TableBucket::new(self.table_id, *bucket_id); + let table_bucket = TableBucket::new(self.table_id, self.partition_id, *bucket_id); scan_bucket_offsets.insert(table_bucket, *offset); } @@ -368,6 +380,21 @@ impl LogScannerInner { Ok(()) } + async fn subscribe_partition( + &self, + partition_id: PartitionId, + bucket: i32, + offset: i64, + ) -> Result<()> { + let table_bucket = TableBucket::new(self.table_id, Some(partition_id), bucket); + self.metadata + .check_and_update_table_metadata(from_ref(&self.table_path)) + .await?; + self.log_scanner_status + .assign_scan_bucket(table_bucket, offset); + Ok(()) + } + async fn poll_for_fetches(&self) -> Result>> { let result = self.log_fetcher.collect_fetches()?; if !result.is_empty() { @@ -435,6 +462,17 @@ impl LogScanner { pub async fn subscribe_batch(&self, bucket_offsets: &HashMap) -> Result<()> { self.inner.subscribe_batch(bucket_offsets).await } + + pub async fn subscribe_partition( + &self, + partition_id: PartitionId, + bucket: i32, + offset: i64, + ) -> Result<()> { + self.inner + .subscribe_partition(partition_id, bucket, offset) + .await + } } // Implementation for RecordBatchLogScanner (batches mode) @@ -451,6 +489,17 @@ impl RecordBatchLogScanner { pub async fn subscribe_batch(&self, bucket_offsets: &HashMap) -> Result<()> { self.inner.subscribe_batch(bucket_offsets).await } + + pub async fn subscribe_partition( + &self, + partition_id: PartitionId, + bucket: i32, + offset: i64, + ) -> Result<()> { + self.inner + .subscribe_partition(partition_id, bucket, offset) + .await + } } struct LogFetcher { @@ -618,12 +667,14 @@ impl LogFetcher { } async fn check_and_update_metadata(&self) -> Result<()> { - let need_update = self + // Collect buckets that are missing leader information + let buckets_needing_leader: Vec = self .fetchable_buckets() - .iter() - .any(|bucket| self.get_table_bucket_leader(bucket).is_none()); + .into_iter() + .filter(|bucket| self.get_table_bucket_leader(bucket).is_none()) + .collect(); - if !need_update { + if buckets_needing_leader.is_empty() { return Ok(()); } @@ -647,7 +698,7 @@ impl LogFetcher { return Ok(()); } - // TODO: Handle PartitionNotExist error + // Non-partitioned table: standard metadata refresh self.metadata .update_tables_metadata(&HashSet::from([&self.table_path]), &HashSet::new(), vec![]) .await @@ -774,7 +825,7 @@ impl LogFetcher { for fetch_log_for_bucket in fetch_log_for_buckets { let bucket: i32 = fetch_log_for_bucket.bucket_id; - let table_bucket = TableBucket::new(table_id, bucket); + let table_bucket = TableBucket::new(table_id, fetch_log_for_bucket.partition_id, bucket); // todo: check fetch result code for per-bucket let Some(fetch_offset) = log_scanner_status.get_bucket_offset(&table_bucket) else { @@ -1302,7 +1353,7 @@ impl LogFetcher { ) } else { let fetch_log_req_for_bucket = PbFetchLogReqForBucket { - partition_id: None, + partition_id: bucket.partition_id(), bucket_id: bucket.bucket_id(), fetch_offset: offset, // 1M @@ -1541,7 +1592,7 @@ mod tests { None, )?; - let bucket = TableBucket::new(1, 0); + let bucket = TableBucket::new(1, None, 0); status.assign_scan_bucket(bucket.clone(), 0); let data = build_records(&table_info, Arc::new(table_path))?; @@ -1573,7 +1624,7 @@ mod tests { None, )?; - let bucket = TableBucket::new(1, 0); + let bucket = TableBucket::new(1, None, 0); let data = build_records(&table_info, Arc::new(table_path))?; let log_records = LogRecordsBatches::new(data.clone()); let read_context = ReadContext::new(to_arrow_schema(table_info.get_row_type())?, false); @@ -1599,7 +1650,7 @@ mod tests { let cluster = build_cluster_arc(&table_path, 1, 1); let metadata = Arc::new(Metadata::new_for_test(cluster)); let status = Arc::new(LogScannerStatus::new()); - status.assign_scan_bucket(TableBucket::new(1, 0), 0); + status.assign_scan_bucket(TableBucket::new(1, None, 0), 0); let fetcher = LogFetcher::new( table_info, Arc::new(RpcClient::new()), @@ -1623,7 +1674,7 @@ mod tests { let cluster = build_cluster_arc(&table_path, 1, 1); let metadata = Arc::new(Metadata::new_for_test(cluster)); let status = Arc::new(LogScannerStatus::new()); - status.assign_scan_bucket(TableBucket::new(1, 0), 5); + status.assign_scan_bucket(TableBucket::new(1, None, 0), 5); let fetcher = LogFetcher::new( table_info.clone(), Arc::new(RpcClient::new()), @@ -1673,7 +1724,7 @@ mod tests { let cluster = build_cluster_arc(&table_path, 1, 1); let metadata = Arc::new(Metadata::new_for_test(cluster.clone())); let status = Arc::new(LogScannerStatus::new()); - status.assign_scan_bucket(TableBucket::new(1, 0), 5); + status.assign_scan_bucket(TableBucket::new(1, None, 0), 5); let fetcher = LogFetcher::new( table_info.clone(), Arc::new(RpcClient::new()), @@ -1683,7 +1734,7 @@ mod tests { None, )?; - let bucket = TableBucket::new(1, 0); + let bucket = TableBucket::new(1,None, 0); assert!(metadata.leader_for(&table_path, &bucket).await?.is_some()); let response = crate::proto::FetchLogResponse { diff --git a/crates/fluss/src/metadata/table.rs b/crates/fluss/src/metadata/table.rs index 908f4469..cf6f6fd5 100644 --- a/crates/fluss/src/metadata/table.rs +++ b/crates/fluss/src/metadata/table.rs @@ -1205,10 +1205,10 @@ pub struct TableBucket { } impl TableBucket { - pub fn new(table_id: TableId, bucket: BucketId) -> Self { - TableBucket { + pub fn new(table_id: TableId, partition_id: Option, bucket: BucketId) -> Self { + Self { table_id, - partition_id: None, + partition_id, bucket, } } diff --git a/crates/fluss/src/record/mod.rs b/crates/fluss/src/record/mod.rs index 8438b16d..fa6051af 100644 --- a/crates/fluss/src/record/mod.rs +++ b/crates/fluss/src/record/mod.rs @@ -274,8 +274,8 @@ mod tests { #[test] fn scan_records_counts_and_iterates() { - let bucket0 = TableBucket::new(1, 0); - let bucket1 = TableBucket::new(1, 1); + let bucket0 = TableBucket::new(1, None, 0); + let bucket1 = TableBucket::new(1, None, 1); let record0 = ScanRecord::new(make_row(vec![10, 11], 0), 5, 7, ChangeType::Insert); let record1 = ScanRecord::new(make_row(vec![10, 11], 1), 6, 8, ChangeType::Delete); @@ -302,7 +302,7 @@ mod tests { #[test] fn scan_batch_last_offset() { let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int32, false)])); - let bucket = TableBucket::new(1, 0); + let bucket = TableBucket::new(1, None, 0); // Batch with 3 records starting at offset 100 -> last_offset = 102 let batch = RecordBatch::try_new( diff --git a/crates/fluss/src/test_utils.rs b/crates/fluss/src/test_utils.rs index 752d4224..852d95e3 100644 --- a/crates/fluss/src/test_utils.rs +++ b/crates/fluss/src/test_utils.rs @@ -46,7 +46,7 @@ pub(crate) fn build_cluster(table_path: &TablePath, table_id: i64, buckets: i32) let mut bucket_locations = Vec::new(); for bucket_id in 0..buckets { - let table_bucket = TableBucket::new(table_id, bucket_id); + let table_bucket = TableBucket::new(table_id, None, bucket_id); let bucket_location = BucketLocation::new( table_bucket.clone(), Some(server.clone()), diff --git a/crates/fluss/src/util/mod.rs b/crates/fluss/src/util/mod.rs index ee8dde4a..938e1ed7 100644 --- a/crates/fluss/src/util/mod.rs +++ b/crates/fluss/src/util/mod.rs @@ -191,8 +191,8 @@ mod tests { #[test] fn fair_bucket_status_map_tracks_order_and_size() { - let bucket0 = TableBucket::new(1, 0); - let bucket1 = TableBucket::new(1, 1); + let bucket0 = TableBucket::new(1, None, 0); + let bucket1 = TableBucket::new(1, None, 1); let mut map = FairBucketStatusMap::new(); map.update_and_move_to_end(bucket0.clone(), 10); @@ -217,8 +217,8 @@ mod tests { #[test] fn fair_bucket_status_map_mutations() { - let bucket0 = TableBucket::new(1, 0); - let bucket1 = TableBucket::new(2, 1); + let bucket0 = TableBucket::new(1, None, 0); + let bucket1 = TableBucket::new(2, None, 1); let mut map = FairBucketStatusMap::new(); let mut input = HashMap::new(); diff --git a/crates/fluss/tests/integration/log_table.rs b/crates/fluss/tests/integration/log_table.rs index 514df828..eae127ff 100644 --- a/crates/fluss/tests/integration/log_table.rs +++ b/crates/fluss/tests/integration/log_table.rs @@ -135,7 +135,7 @@ mod table_test { .expect("Failed to poll records"); // Verify the scanned records - let table_bucket = TableBucket::new(table.table_info().table_id, 0); + let table_bucket = TableBucket::new(table.table_info().table_id, None, 0); let records = scan_records.records(&table_bucket); assert_eq!(records.len(), 6, "Expected 6 records"); From 0c3bfb67be9638bac70283225d28ab35f410cd37 Mon Sep 17 00:00:00 2001 From: Glenn Miao Date: Thu, 29 Jan 2026 10:36:14 +0800 Subject: [PATCH 2/4] test: add unit tests for partition scanning --- crates/fluss/src/client/table/scanner.rs | 81 ++++++++++++++++++++++++ 1 file changed, 81 insertions(+) diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index cbdff14e..3a0951c2 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -1767,4 +1767,85 @@ mod tests { assert!(metadata.get_cluster().leader_for(&bucket).is_none()); Ok(()) } + + #[tokio::test] + async fn subscribe_with_partition_creates_correct_table_bucket() -> Result<()> { + let table_path = TablePath::new("db".to_string(), "tbl".to_string()); + let table_info = build_table_info(table_path.clone(), 1, 1); + let cluster = build_cluster_arc(&table_path, 1, 1); + let metadata = Arc::new(Metadata::new_for_test(cluster)); + + let scanner = LogScannerInner::new( + &table_info, + metadata.clone(), + Arc::new(RpcClient::new()), + &crate::config::Config::default(), + None, + Some(27), + )?; + + scanner.subscribe(0, 0).await?; + + let bucket = TableBucket::new(1, Some(27), 0); + assert_eq!( + scanner.log_scanner_status.get_bucket_offset(&bucket), + Some(0) + ); + + Ok(()) + } + + #[tokio::test] + async fn subscribe_partition_overrides_stored_partition() -> Result<()> { + let table_path = TablePath::new("db".to_string(), "tbl".to_string()); + let table_info = build_table_info(table_path.clone(), 1, 1); + let cluster = build_cluster_arc(&table_path, 1, 1); + let metadata = Arc::new(Metadata::new_for_test(cluster)); + + let scanner = LogScannerInner::new( + &table_info, + metadata.clone(), + Arc::new(RpcClient::new()), + &crate::config::Config::default(), + None, + Some(27), + )?; + + scanner.subscribe_partition(99, 0, 0).await?; + + let bucket = TableBucket::new(1, Some(99), 0); + assert_eq!( + scanner.log_scanner_status.get_bucket_offset(&bucket), + Some(0) + ); + + Ok(()) + } + + #[tokio::test] + async fn subscribe_without_partition_uses_none() -> Result<()> { + let table_path = TablePath::new("db".to_string(), "tbl".to_string()); + let table_info = build_table_info(table_path.clone(), 1, 1); + let cluster = build_cluster_arc(&table_path, 1, 1); + let metadata = Arc::new(Metadata::new_for_test(cluster)); + + let scanner = LogScannerInner::new( + &table_info, + metadata.clone(), + Arc::new(RpcClient::new()), + &crate::config::Config::default(), + None, + None, + )?; + + scanner.subscribe(0, 0).await?; + + let bucket = TableBucket::new(1, None, 0); + assert_eq!( + scanner.log_scanner_status.get_bucket_offset(&bucket), + Some(0) + ); + + Ok(()) + } } From 82f56ede99de66a8071047ca7a98add28840dc37 Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Wed, 4 Feb 2026 10:44:46 +0800 Subject: [PATCH 3/4] rebase main branch --- crates/fluss/src/client/admin.rs | 8 +++++--- crates/fluss/src/client/metadata.rs | 4 +++- crates/fluss/src/client/table/scanner.rs | 5 +++-- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs index 89ffeda9..7f0d730d 100644 --- a/crates/fluss/src/client/admin.rs +++ b/crates/fluss/src/client/admin.rs @@ -273,9 +273,11 @@ impl FlussAdmin { // Convert proto response to LakeSnapshot let mut table_buckets_offset = HashMap::new(); for bucket_snapshot in response.bucket_snapshots { - let table_bucket = TableBucket::new(response.table_id, - bucket_snapshot.partition_id, - bucket_snapshot.bucket_id); + let table_bucket = TableBucket::new( + response.table_id, + bucket_snapshot.partition_id, + bucket_snapshot.bucket_id, + ); if let Some(log_offset) = bucket_snapshot.log_offset { table_buckets_offset.insert(table_bucket, log_offset); } diff --git a/crates/fluss/src/client/metadata.rs b/crates/fluss/src/client/metadata.rs index beb61dc6..8287f912 100644 --- a/crates/fluss/src/client/metadata.rs +++ b/crates/fluss/src/client/metadata.rs @@ -257,7 +257,9 @@ mod tests { let cluster = build_cluster_arc(&table_path, 1, 1); let metadata = Metadata::new_for_test(cluster); let leader = metadata - .leader_for(&TableBucket::new(1, None, 0)) + .leader_for(&table_path, &TableBucket::new(1, None, 0)) + .await + .unwrap() .expect("leader"); assert_eq!(leader.id(), 1); } diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 3a0951c2..a30b879f 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -825,7 +825,8 @@ impl LogFetcher { for fetch_log_for_bucket in fetch_log_for_buckets { let bucket: i32 = fetch_log_for_bucket.bucket_id; - let table_bucket = TableBucket::new(table_id, fetch_log_for_bucket.partition_id, bucket); + let table_bucket = + TableBucket::new(table_id, fetch_log_for_bucket.partition_id, bucket); // todo: check fetch result code for per-bucket let Some(fetch_offset) = log_scanner_status.get_bucket_offset(&table_bucket) else { @@ -1734,7 +1735,7 @@ mod tests { None, )?; - let bucket = TableBucket::new(1,None, 0); + let bucket = TableBucket::new(1, None, 0); assert!(metadata.leader_for(&table_path, &bucket).await?.is_some()); let response = crate::proto::FetchLogResponse { From 0a7f9a02d84dfa71f6d129412e367aff0ee21804 Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Wed, 4 Feb 2026 11:15:37 +0800 Subject: [PATCH 4/4] add yuxia modification --- bindings/python/src/metadata.rs | 6 +- crates/fluss/src/client/admin.rs | 2 +- crates/fluss/src/client/metadata.rs | 2 +- .../src/client/table/log_fetch_buffer.rs | 4 +- crates/fluss/src/client/table/remote_log.rs | 2 +- crates/fluss/src/client/table/scanner.rs | 222 ++++++------------ crates/fluss/src/metadata/table.rs | 4 +- crates/fluss/src/record/mod.rs | 6 +- crates/fluss/src/test_utils.rs | 2 +- crates/fluss/src/util/mod.rs | 8 +- crates/fluss/tests/integration/log_table.rs | 68 +++++- 11 files changed, 161 insertions(+), 165 deletions(-) diff --git a/bindings/python/src/metadata.rs b/bindings/python/src/metadata.rs index cfd43f45..f4226961 100644 --- a/bindings/python/src/metadata.rs +++ b/bindings/python/src/metadata.rs @@ -530,7 +530,11 @@ impl TableBucket { /// Convert to core TableBucket (internal use) pub fn to_core(&self) -> fcore::metadata::TableBucket { - fcore::metadata::TableBucket::new(self.table_id, None, self.bucket) + fcore::metadata::TableBucket::new_with_partition( + self.table_id, + self.partition_id, + self.bucket, + ) } } diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs index 7f0d730d..737ead3d 100644 --- a/crates/fluss/src/client/admin.rs +++ b/crates/fluss/src/client/admin.rs @@ -273,7 +273,7 @@ impl FlussAdmin { // Convert proto response to LakeSnapshot let mut table_buckets_offset = HashMap::new(); for bucket_snapshot in response.bucket_snapshots { - let table_bucket = TableBucket::new( + let table_bucket = TableBucket::new_with_partition( response.table_id, bucket_snapshot.partition_id, bucket_snapshot.bucket_id, diff --git a/crates/fluss/src/client/metadata.rs b/crates/fluss/src/client/metadata.rs index 8287f912..ce00ced2 100644 --- a/crates/fluss/src/client/metadata.rs +++ b/crates/fluss/src/client/metadata.rs @@ -257,7 +257,7 @@ mod tests { let cluster = build_cluster_arc(&table_path, 1, 1); let metadata = Metadata::new_for_test(cluster); let leader = metadata - .leader_for(&table_path, &TableBucket::new(1, None, 0)) + .leader_for(&table_path, &TableBucket::new(1, 0)) .await .unwrap() .expect("leader"); diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs b/crates/fluss/src/client/table/log_fetch_buffer.rs index 12ba43a8..edab91d5 100644 --- a/crates/fluss/src/client/table/log_fetch_buffer.rs +++ b/crates/fluss/src/client/table/log_fetch_buffer.rs @@ -877,7 +877,7 @@ mod tests { #[tokio::test] async fn await_not_empty_returns_pending_error() { let buffer = LogFetchBuffer::new(test_read_context().unwrap()); - let table_bucket = TableBucket::new(1, None, 0); + let table_bucket = TableBucket::new(1, 0); buffer.pend(Box::new(ErrorPendingFetch { table_bucket: table_bucket.clone(), })); @@ -920,7 +920,7 @@ mod tests { let log_records = LogRecordsBatches::new(data.clone()); let read_context = ReadContext::new(to_arrow_schema(&row_type)?, false); let mut fetch = DefaultCompletedFetch::new( - TableBucket::new(1, None, 0), + TableBucket::new(1, 0), log_records, data.len(), read_context, diff --git a/crates/fluss/src/client/table/remote_log.rs b/crates/fluss/src/client/table/remote_log.rs index dc561535..5583f89d 100644 --- a/crates/fluss/src/client/table/remote_log.rs +++ b/crates/fluss/src/client/table/remote_log.rs @@ -946,7 +946,7 @@ mod tests { /// Helper function to create a TableBucket for testing fn create_table_bucket(table_id: i64, bucket_id: i32) -> TableBucket { - TableBucket::new(table_id, None, bucket_id) + TableBucket::new(table_id, bucket_id) } /// Simplified fake fetcher for testing diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index a30b879f..a88964ea 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -54,7 +54,6 @@ pub struct TableScan<'a> { conn: &'a FlussConnection, table_info: TableInfo, metadata: Arc, - partition_id: Option, /// Column indices to project. None means all columns, Some(vec) means only the specified columns (non-empty). projected_fields: Option>, } @@ -65,7 +64,6 @@ impl<'a> TableScan<'a> { conn, table_info, metadata, - partition_id: None, projected_fields: None, } } @@ -229,7 +227,6 @@ impl<'a> TableScan<'a> { self.conn.get_connections(), self.conn.config(), self.projected_fields, - self.partition_id, )?; Ok(LogScanner { inner: Arc::new(inner), @@ -243,17 +240,11 @@ impl<'a> TableScan<'a> { self.conn.get_connections(), self.conn.config(), self.projected_fields, - self.partition_id, )?; Ok(RecordBatchLogScanner { inner: Arc::new(inner), }) } - - pub fn filter_partition(mut self, partition_id: PartitionId) -> Self { - self.partition_id = Some(partition_id); - self - } } /// Scanner for reading log records one at a time with per-record metadata. @@ -279,7 +270,7 @@ struct LogScannerInner { metadata: Arc, log_scanner_status: Arc, log_fetcher: LogFetcher, - partition_id: Option, + is_partitioned_table: bool, } impl LogScannerInner { @@ -289,15 +280,14 @@ impl LogScannerInner { connections: Arc, config: &crate::config::Config, projected_fields: Option>, - partition_id: Option, ) -> Result { let log_scanner_status = Arc::new(LogScannerStatus::new()); Ok(Self { table_path: table_info.table_path.clone(), table_id: table_info.table_id, + is_partitioned_table: table_info.is_partitioned(), metadata: metadata.clone(), log_scanner_status: log_scanner_status.clone(), - partition_id, log_fetcher: LogFetcher::new( table_info.clone(), connections.clone(), @@ -349,7 +339,14 @@ impl LogScannerInner { } async fn subscribe(&self, bucket: i32, offset: i64) -> Result<()> { - let table_bucket = TableBucket::new(self.table_id, self.partition_id, bucket); + if self.is_partitioned_table { + return Err(Error::UnsupportedOperation { + message: "The table is a partitioned table, please use \"subscribe_partition\" to \ + subscribe a partitioned bucket instead." + .to_string(), + }); + } + let table_bucket = TableBucket::new(self.table_id, bucket); self.metadata .check_and_update_table_metadata(from_ref(&self.table_path)) .await?; @@ -359,6 +356,13 @@ impl LogScannerInner { } async fn subscribe_batch(&self, bucket_offsets: &HashMap) -> Result<()> { + if self.is_partitioned_table { + return Err(Error::UnsupportedOperation { + message: + "The table is a partitioned table, subscribe_batch is not supported currently." + .to_string(), + }); + } self.metadata .check_and_update_table_metadata(from_ref(&self.table_path)) .await?; @@ -371,7 +375,7 @@ impl LogScannerInner { let mut scan_bucket_offsets = HashMap::new(); for (bucket_id, offset) in bucket_offsets { - let table_bucket = TableBucket::new(self.table_id, self.partition_id, *bucket_id); + let table_bucket = TableBucket::new(self.table_id, *bucket_id); scan_bucket_offsets.insert(table_bucket, *offset); } @@ -386,7 +390,15 @@ impl LogScannerInner { bucket: i32, offset: i64, ) -> Result<()> { - let table_bucket = TableBucket::new(self.table_id, Some(partition_id), bucket); + if !self.is_partitioned_table { + return Err(Error::UnsupportedOperation { + message: "The table is not a partitioned table, please use \"subscribe\" to \ + subscribe a non-partitioned bucket instead." + .to_string(), + }); + } + let table_bucket = + TableBucket::new_with_partition(self.table_id, Some(partition_id), bucket); self.metadata .check_and_update_table_metadata(from_ref(&self.table_path)) .await?; @@ -666,57 +678,55 @@ impl LogFetcher { ) } - async fn check_and_update_metadata(&self) -> Result<()> { - // Collect buckets that are missing leader information - let buckets_needing_leader: Vec = self - .fetchable_buckets() - .into_iter() - .filter(|bucket| self.get_table_bucket_leader(bucket).is_none()) - .collect(); + async fn check_and_update_metadata(&self, table_buckets: &[TableBucket]) -> Result<()> { + let mut partition_ids = Vec::new(); + let mut need_update = false; - if buckets_needing_leader.is_empty() { - return Ok(()); + for tb in table_buckets { + if self.get_table_bucket_leader(tb).is_some() { + continue; + } + + if self.is_partitioned { + partition_ids.push(tb.partition_id().unwrap()); + } else { + need_update = true; + break; + } } - if self.is_partitioned { - // Fallback to full table metadata refresh until partition-aware updates are available. + let update_result = if self.is_partitioned && !partition_ids.is_empty() { self.metadata - .update_tables_metadata(&HashSet::from([&self.table_path]), &HashSet::new(), vec![]) + .update_tables_metadata( + &HashSet::from([&self.table_path]), + &HashSet::new(), + partition_ids, + ) .await - .or_else(|e| { - if let Error::RpcError { source, .. } = &e - && matches!(source, RpcError::ConnectionError(_) | RpcError::Poisoned(_)) - { - warn!( - "Retrying after encountering error while updating table metadata: {e}" - ); - Ok(()) - } else { - Err(e) - } - })?; - return Ok(()); - } + } else if need_update { + self.metadata.update_table_metadata(&self.table_path).await + } else { + Ok(()) + }; - // Non-partitioned table: standard metadata refresh - self.metadata - .update_tables_metadata(&HashSet::from([&self.table_path]), &HashSet::new(), vec![]) - .await - .or_else(|e| { - if let Error::RpcError { source, .. } = &e - && matches!(source, RpcError::ConnectionError(_) | RpcError::Poisoned(_)) - { - warn!("Retrying after encountering error while updating table metadata: {e}"); - Ok(()) - } else { - Err(e) - } - }) + // TODO: Handle PartitionNotExist error like java side + update_result.or_else(|e| { + if let Error::RpcError { source, .. } = &e + && matches!(source, RpcError::ConnectionError(_) | RpcError::Poisoned(_)) + { + warn!("Retrying after encountering error while updating table metadata: {e}"); + Ok(()) + } else { + Err(e) + } + })?; + Ok(()) } /// Send fetch requests asynchronously without waiting for responses async fn send_fetches(&self) -> Result<()> { - self.check_and_update_metadata().await?; + self.check_and_update_metadata(self.fetchable_buckets().as_slice()) + .await?; let fetch_request = self.prepare_fetch_log_requests().await; for (leader, fetch_request) in fetch_request { @@ -825,8 +835,11 @@ impl LogFetcher { for fetch_log_for_bucket in fetch_log_for_buckets { let bucket: i32 = fetch_log_for_bucket.bucket_id; - let table_bucket = - TableBucket::new(table_id, fetch_log_for_bucket.partition_id, bucket); + let table_bucket = TableBucket::new_with_partition( + table_id, + fetch_log_for_bucket.partition_id, + bucket, + ); // todo: check fetch result code for per-bucket let Some(fetch_offset) = log_scanner_status.get_bucket_offset(&table_bucket) else { @@ -1593,7 +1606,7 @@ mod tests { None, )?; - let bucket = TableBucket::new(1, None, 0); + let bucket = TableBucket::new(1, 0); status.assign_scan_bucket(bucket.clone(), 0); let data = build_records(&table_info, Arc::new(table_path))?; @@ -1625,7 +1638,7 @@ mod tests { None, )?; - let bucket = TableBucket::new(1, None, 0); + let bucket = TableBucket::new(1, 0); let data = build_records(&table_info, Arc::new(table_path))?; let log_records = LogRecordsBatches::new(data.clone()); let read_context = ReadContext::new(to_arrow_schema(table_info.get_row_type())?, false); @@ -1651,7 +1664,7 @@ mod tests { let cluster = build_cluster_arc(&table_path, 1, 1); let metadata = Arc::new(Metadata::new_for_test(cluster)); let status = Arc::new(LogScannerStatus::new()); - status.assign_scan_bucket(TableBucket::new(1, None, 0), 0); + status.assign_scan_bucket(TableBucket::new(1, 0), 0); let fetcher = LogFetcher::new( table_info, Arc::new(RpcClient::new()), @@ -1675,7 +1688,7 @@ mod tests { let cluster = build_cluster_arc(&table_path, 1, 1); let metadata = Arc::new(Metadata::new_for_test(cluster)); let status = Arc::new(LogScannerStatus::new()); - status.assign_scan_bucket(TableBucket::new(1, None, 0), 5); + status.assign_scan_bucket(TableBucket::new(1, 0), 5); let fetcher = LogFetcher::new( table_info.clone(), Arc::new(RpcClient::new()), @@ -1725,7 +1738,7 @@ mod tests { let cluster = build_cluster_arc(&table_path, 1, 1); let metadata = Arc::new(Metadata::new_for_test(cluster.clone())); let status = Arc::new(LogScannerStatus::new()); - status.assign_scan_bucket(TableBucket::new(1, None, 0), 5); + status.assign_scan_bucket(TableBucket::new(1, 0), 5); let fetcher = LogFetcher::new( table_info.clone(), Arc::new(RpcClient::new()), @@ -1735,7 +1748,7 @@ mod tests { None, )?; - let bucket = TableBucket::new(1, None, 0); + let bucket = TableBucket::new(1, 0); assert!(metadata.leader_for(&table_path, &bucket).await?.is_some()); let response = crate::proto::FetchLogResponse { @@ -1768,85 +1781,4 @@ mod tests { assert!(metadata.get_cluster().leader_for(&bucket).is_none()); Ok(()) } - - #[tokio::test] - async fn subscribe_with_partition_creates_correct_table_bucket() -> Result<()> { - let table_path = TablePath::new("db".to_string(), "tbl".to_string()); - let table_info = build_table_info(table_path.clone(), 1, 1); - let cluster = build_cluster_arc(&table_path, 1, 1); - let metadata = Arc::new(Metadata::new_for_test(cluster)); - - let scanner = LogScannerInner::new( - &table_info, - metadata.clone(), - Arc::new(RpcClient::new()), - &crate::config::Config::default(), - None, - Some(27), - )?; - - scanner.subscribe(0, 0).await?; - - let bucket = TableBucket::new(1, Some(27), 0); - assert_eq!( - scanner.log_scanner_status.get_bucket_offset(&bucket), - Some(0) - ); - - Ok(()) - } - - #[tokio::test] - async fn subscribe_partition_overrides_stored_partition() -> Result<()> { - let table_path = TablePath::new("db".to_string(), "tbl".to_string()); - let table_info = build_table_info(table_path.clone(), 1, 1); - let cluster = build_cluster_arc(&table_path, 1, 1); - let metadata = Arc::new(Metadata::new_for_test(cluster)); - - let scanner = LogScannerInner::new( - &table_info, - metadata.clone(), - Arc::new(RpcClient::new()), - &crate::config::Config::default(), - None, - Some(27), - )?; - - scanner.subscribe_partition(99, 0, 0).await?; - - let bucket = TableBucket::new(1, Some(99), 0); - assert_eq!( - scanner.log_scanner_status.get_bucket_offset(&bucket), - Some(0) - ); - - Ok(()) - } - - #[tokio::test] - async fn subscribe_without_partition_uses_none() -> Result<()> { - let table_path = TablePath::new("db".to_string(), "tbl".to_string()); - let table_info = build_table_info(table_path.clone(), 1, 1); - let cluster = build_cluster_arc(&table_path, 1, 1); - let metadata = Arc::new(Metadata::new_for_test(cluster)); - - let scanner = LogScannerInner::new( - &table_info, - metadata.clone(), - Arc::new(RpcClient::new()), - &crate::config::Config::default(), - None, - None, - )?; - - scanner.subscribe(0, 0).await?; - - let bucket = TableBucket::new(1, None, 0); - assert_eq!( - scanner.log_scanner_status.get_bucket_offset(&bucket), - Some(0) - ); - - Ok(()) - } } diff --git a/crates/fluss/src/metadata/table.rs b/crates/fluss/src/metadata/table.rs index cf6f6fd5..4e0a5256 100644 --- a/crates/fluss/src/metadata/table.rs +++ b/crates/fluss/src/metadata/table.rs @@ -1205,10 +1205,10 @@ pub struct TableBucket { } impl TableBucket { - pub fn new(table_id: TableId, partition_id: Option, bucket: BucketId) -> Self { + pub fn new(table_id: TableId, bucket: BucketId) -> Self { Self { table_id, - partition_id, + partition_id: None, bucket, } } diff --git a/crates/fluss/src/record/mod.rs b/crates/fluss/src/record/mod.rs index fa6051af..8438b16d 100644 --- a/crates/fluss/src/record/mod.rs +++ b/crates/fluss/src/record/mod.rs @@ -274,8 +274,8 @@ mod tests { #[test] fn scan_records_counts_and_iterates() { - let bucket0 = TableBucket::new(1, None, 0); - let bucket1 = TableBucket::new(1, None, 1); + let bucket0 = TableBucket::new(1, 0); + let bucket1 = TableBucket::new(1, 1); let record0 = ScanRecord::new(make_row(vec![10, 11], 0), 5, 7, ChangeType::Insert); let record1 = ScanRecord::new(make_row(vec![10, 11], 1), 6, 8, ChangeType::Delete); @@ -302,7 +302,7 @@ mod tests { #[test] fn scan_batch_last_offset() { let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int32, false)])); - let bucket = TableBucket::new(1, None, 0); + let bucket = TableBucket::new(1, 0); // Batch with 3 records starting at offset 100 -> last_offset = 102 let batch = RecordBatch::try_new( diff --git a/crates/fluss/src/test_utils.rs b/crates/fluss/src/test_utils.rs index 852d95e3..752d4224 100644 --- a/crates/fluss/src/test_utils.rs +++ b/crates/fluss/src/test_utils.rs @@ -46,7 +46,7 @@ pub(crate) fn build_cluster(table_path: &TablePath, table_id: i64, buckets: i32) let mut bucket_locations = Vec::new(); for bucket_id in 0..buckets { - let table_bucket = TableBucket::new(table_id, None, bucket_id); + let table_bucket = TableBucket::new(table_id, bucket_id); let bucket_location = BucketLocation::new( table_bucket.clone(), Some(server.clone()), diff --git a/crates/fluss/src/util/mod.rs b/crates/fluss/src/util/mod.rs index 938e1ed7..ee8dde4a 100644 --- a/crates/fluss/src/util/mod.rs +++ b/crates/fluss/src/util/mod.rs @@ -191,8 +191,8 @@ mod tests { #[test] fn fair_bucket_status_map_tracks_order_and_size() { - let bucket0 = TableBucket::new(1, None, 0); - let bucket1 = TableBucket::new(1, None, 1); + let bucket0 = TableBucket::new(1, 0); + let bucket1 = TableBucket::new(1, 1); let mut map = FairBucketStatusMap::new(); map.update_and_move_to_end(bucket0.clone(), 10); @@ -217,8 +217,8 @@ mod tests { #[test] fn fair_bucket_status_map_mutations() { - let bucket0 = TableBucket::new(1, None, 0); - let bucket1 = TableBucket::new(2, None, 1); + let bucket0 = TableBucket::new(1, 0); + let bucket1 = TableBucket::new(2, 1); let mut map = FairBucketStatusMap::new(); let mut input = HashMap::new(); diff --git a/crates/fluss/tests/integration/log_table.rs b/crates/fluss/tests/integration/log_table.rs index eae127ff..cbfcbe58 100644 --- a/crates/fluss/tests/integration/log_table.rs +++ b/crates/fluss/tests/integration/log_table.rs @@ -135,7 +135,7 @@ mod table_test { .expect("Failed to poll records"); // Verify the scanned records - let table_bucket = TableBucket::new(table.table_info().table_id, None, 0); + let table_bucket = TableBucket::new(table.table_info().table_id, 0); let records = scan_records.records(&table_bucket); assert_eq!(records.len(), 6, "Expected 6 records"); @@ -974,7 +974,7 @@ mod table_test { } #[tokio::test] - async fn partitioned_table_append() { + async fn partitioned_table_append_scan() { let cluster = get_fluss_cluster(); let connection = cluster.get_fluss_connection().await; @@ -1098,11 +1098,71 @@ mod table_test { "Table partition 'fluss.test_partitioned_log_append(p=NOT Exists)' does not exist." )); + let log_scanner = table + .new_scan() + .create_log_scanner() + .expect("Failed to create log scanner"); + let partition_info = admin + .list_partition_infos(&table_path) + .await + .expect("Failed to list partition infos"); + for partition_info in partition_info { + log_scanner + .subscribe_partition(partition_info.get_partition_id(), 0, 0) + .await + .expect("Failed to subscribe to partition"); + } + + let expected_records = vec![ + (1, "US", 100i64), + (2, "US", 200i64), + (3, "EU", 300i64), + (4, "EU", 400), + (5, "US", 500i64), + (6, "US", 600i64), + (7, "EU", 700i64), + (8, "EU", 800i64), + ]; + let expected_records: Vec<(i32, String, i64)> = expected_records + .into_iter() + .map(|(id, region, val)| (id, region.to_string(), val)) + .collect(); + + let mut collected_records: Vec<(i32, String, i64)> = Vec::new(); + let start_time = std::time::Instant::now(); + while collected_records.len() < expected_records.len() + && start_time.elapsed() < Duration::from_secs(10) + { + let records = log_scanner + .poll(Duration::from_millis(500)) + .await + .expect("Failed to poll log scanner"); + for rec in records { + let row = rec.row(); + collected_records.push(( + row.get_int(0), + row.get_string(1).to_string(), + row.get_long(2), + )); + } + } + + assert_eq!( + collected_records.len(), + expected_records.len(), + "Did not receive all records in time, expect receive {} records, but got {} records", + expected_records.len(), + collected_records.len() + ); + collected_records.sort_by_key(|r| r.0); + assert_eq!( + collected_records, expected_records, + "Data mismatch between sent and received" + ); + admin .drop_table(&table_path, false) .await .expect("Failed to drop table"); - - // todo: add scan test in 203 } }