feat: add table partition scanning#222
Conversation
There was a problem hiding this comment.
Pull request overview
Implements initial support for scanning partitioned tables in the Fluss Rust client by threading partition_id through scan/bucket identification and adding partition-specific subscription APIs.
Changes:
- Extend
TableBucketto carry an optionalpartition_idand update call sites accordingly. - Add
TableScan::filter_partition(...)andsubscribe_partition(...)APIs on scanners to target a partition. - Update fetch request construction to include
partition_idper bucket and add tests around partition subscription behavior.
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| crates/fluss/src/metadata/table.rs | Extends TableBucket constructor to accept partition_id. |
| crates/fluss/src/client/table/scanner.rs | Adds partition filtering/subscription APIs and includes partition_id in fetch requests. |
| crates/fluss/src/cluster/cluster.rs | Updates TableBucket::new call sites (currently still uses partition_id=None). |
| crates/fluss/src/client/write/sender.rs | Updates TableBucket::new call sites for write response handling. |
| crates/fluss/src/client/table/lookup.rs | Updates TableBucket::new call site for lookup leader resolution. |
| crates/fluss/src/client/table/remote_log.rs | Updates test helper to construct TableBucket with partition_id=None. |
| crates/fluss/src/client/table/log_fetch_buffer.rs | Updates tests to use new TableBucket::new signature. |
| crates/fluss/src/client/metadata.rs | Updates tests to use new TableBucket::new signature. |
| crates/fluss/src/client/admin.rs | Updates TableBucket::new call sites when materializing snapshot/leader mappings. |
| crates/fluss/src/util/mod.rs | Updates unit tests to the new TableBucket::new signature. |
| crates/fluss/src/record/mod.rs | Updates unit tests to the new TableBucket::new signature. |
| crates/fluss/src/test_utils.rs | Updates test cluster builder to use partition_id=None. |
| crates/fluss/tests/integration/table.rs | Updates integration test to use new TableBucket::new signature. |
| bindings/python/src/metadata.rs | Updates Python binding conversion to core TableBucket with partition_id=None. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| pub fn new(table_id: TableId, partition_id: Option<PartitionId>, bucket: BucketId) -> Self { | ||
| Self { | ||
| table_id, |
There was a problem hiding this comment.
Changing TableBucket::new to require partition_id is a breaking change for external callers. If this type is part of the public client API, consider keeping the old new(table_id, bucket) (defaulting partition_id=None) and adding a new constructor like new_with_partition(...) to preserve backwards compatibility.
| pub fn new(table_id: TableId, partition_id: Option<PartitionId>, bucket: BucketId) -> Self { | |
| Self { | |
| table_id, | |
| pub fn new(table_id: TableId, bucket: BucketId) -> Self { | |
| Self { | |
| table_id, | |
| partition_id: None, | |
| bucket, | |
| } | |
| } | |
| pub fn new_with_partition( | |
| table_id: TableId, | |
| partition_id: Option<PartitionId>, | |
| bucket: BucketId, | |
| ) -> Self { | |
| Self { | |
| table_id, |
| async fn check_and_update_metadata(&self) -> Result<()> { | ||
| let need_update = self | ||
| // Collect buckets that are missing leader information | ||
| let buckets_needing_leader: Vec<TableBucket> = self | ||
| .fetchable_buckets() | ||
| .iter() | ||
| .any(|bucket| self.get_table_bucket_leader(bucket).is_none()); | ||
| .into_iter() | ||
| .filter(|bucket| self.get_table_bucket_leader(bucket).is_none()) | ||
| .collect(); |
There was a problem hiding this comment.
buckets_needing_leader is only used to check is_empty(), so collecting into a Vec adds an avoidable allocation. Consider reverting to an iterator any(...) boolean, or keep the Vec only if you’ll use it later (e.g., to drive a partition-aware metadata refresh).
| // Non-partitioned table: standard metadata refresh | ||
| self.metadata | ||
| .update_tables_metadata(&HashSet::from([&self.table_path])) | ||
| .await |
There was a problem hiding this comment.
Both the is_partitioned branch above and the non-partitioned path here execute essentially the same update_tables_metadata(...).await.or_else(...) logic. Consider factoring the retrying metadata refresh into a shared helper (or unifying the branches) to avoid drift and simplify future partition-aware enhancements.
There was a problem hiding this comment.
+1 Did you intend to update metadata only for those with missing leader? Note, we should follow Java side logic where possible
|
Thanks! I'll see! |
leekeiabstraction
left a comment
There was a problem hiding this comment.
Thank you very much for the PR. I've left comments, PTAL!
| }) | ||
| } | ||
|
|
||
| pub fn filter_partition(mut self, partition_id: PartitionId) -> Self { |
There was a problem hiding this comment.
Is this API available in Java side?
This doesn't seem necessary as partitioned table scan is called with partition ID on java side. Calling subscribe without partition ID for partitioned table results in exception on Java side.
| conn: &'a FlussConnection, | ||
| table_info: TableInfo, | ||
| metadata: Arc<Metadata>, | ||
| partition_id: Option<PartitionId>, |
There was a problem hiding this comment.
TableScan shouldn't be partition aware.
| @@ -337,7 +349,7 @@ impl LogScannerInner { | |||
| } | |||
|
|
|||
| async fn subscribe(&self, bucket: i32, offset: i64) -> Result<()> { | |||
There was a problem hiding this comment.
Let's follow Java side logic here, returning error if this is a partitioned table.
| async fn check_and_update_metadata(&self) -> Result<()> { | ||
| let need_update = self | ||
| // Collect buckets that are missing leader information | ||
| let buckets_needing_leader: Vec<TableBucket> = self | ||
| .fetchable_buckets() | ||
| .iter() | ||
| .any(|bucket| self.get_table_bucket_leader(bucket).is_none()); | ||
| .into_iter() | ||
| .filter(|bucket| self.get_table_bucket_leader(bucket).is_none()) | ||
| .collect(); |
| // Non-partitioned table: standard metadata refresh | ||
| self.metadata | ||
| .update_tables_metadata(&HashSet::from([&self.table_path])) | ||
| .await |
There was a problem hiding this comment.
+1 Did you intend to update metadata only for those with missing leader? Note, we should follow Java side logic where possible
|
|
||
| for bucket_resp in response.buckets_resp() { | ||
| let tb = TableBucket::new(table_id, bucket_resp.bucket_id()); | ||
| let tb = TableBucket::new(table_id, None, bucket_resp.bucket_id()); |
There was a problem hiding this comment.
Should partition id be extracted from response?
|
|
||
| for bucket_id in buckets { | ||
| let table_bucket = TableBucket::new(table_id, *bucket_id); | ||
| let table_bucket = TableBucket::new(table_id, None, *bucket_id); |
There was a problem hiding this comment.
Partition id is one of the arg of function, pass in that instead of None.
| ) -> Result<TableBucket> { | ||
| let table_info = self.get_table(table_path)?; | ||
| Ok(TableBucket::new(table_info.table_id, bucket_id)) | ||
| Ok(TableBucket::new(table_info.table_id, None, bucket_id)) |
There was a problem hiding this comment.
I do not think we can use None here, this function is used to group requests for the same bucket (partitioned bucket as well). If we override with None here, the accumulator will look for leader in a non existent bucket for partitioned table.
|
Working on it! Thanks folks! |
| /// Convert to core TableBucket (internal use) | ||
| pub fn to_core(&self) -> fcore::metadata::TableBucket { | ||
| fcore::metadata::TableBucket::new(self.table_id, self.bucket) | ||
| fcore::metadata::TableBucket::new(self.table_id, None, self.bucket) |
There was a problem hiding this comment.
This should take self.partition_id as well
|
@lemorage Hi, is there any progress on this pr? |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 11 out of 11 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
@lemorage Thanks for the pr and Thank @leekeiabstraction for review. I'm going to merge this. Feel free to left comment in this pr, and I'll address them in following pr |
|
I am so so sorry for the delayed follow-up. I did some work on my local branch, but a bit kept by other stuff, and haven't got them clean up and pushed them. @luoyuxia Thank you so much for your rapid work on the rest. If there are anything I need do further on my side, do let me know. Thank you all for the long delay on me. |
No worries at all! Since we have an upcoming release deadline, I went ahead and handled the remaining parts to keep us on track. Your base pull request is much appreciated, and we’d love to have more of your contributions in the future! |
Purpose
Linked issue: close #203
This PR implements support for scanning partitioned tables in the Fluss Rust client.
Brief change log
TableBucket::new()to acceptpartition_idparameter, and all call sitesfilter_partition()to TableScan buildersubscribe_partition()to LogScanner/RecordBatchLogScannerTests
UT (Added):
API and Format
New APIs:
TableScan.filter_partition(partition_id)LogScanner.subscribe_partition(partition_id, bucket, offset)Documentation