feat: support unsubscribe partition#256
Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds support for unsubscribing from partition buckets in the log scanner, addressing issue #254. The implementation allows users to dynamically remove partitions from their subscription set during scanning, providing more control over which partitions are actively consumed.
Changes:
- Added
unsubscribe_partitionmethod to Rust scanner API (LogScanner and RecordBatchLogScanner) - Added FFI bindings for C++ to expose unsubscribe_partition functionality
- Added C++ wrapper method
UnsubscribePartitionin the LogScanner class
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| crates/fluss/src/client/table/scanner.rs | Implements core unsubscribe_partition logic with validation and error handling for partitioned tables |
| bindings/cpp/src/lib.rs | Adds FFI function that bridges Rust implementation to C++ |
| bindings/cpp/src/table.cpp | Implements C++ wrapper for unsubscribe_partition with availability checks |
| bindings/cpp/include/fluss.hpp | Declares public C++ API method signature for UnsubscribePartition |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| pub async fn unsubscribe_partition( | ||
| &self, | ||
| partition_id: PartitionId, | ||
| bucket: i32, | ||
| ) -> Result<()> { | ||
| self.inner.unsubscribe_partition(partition_id, bucket).await | ||
| } |
There was a problem hiding this comment.
The API is asymmetric - there are subscribe and subscribe_batch methods for non-partitioned tables, but no corresponding unsubscribe or unsubscribe_batch methods. Only unsubscribe_partition is provided for partitioned tables. This creates an inconsistency where users of non-partitioned tables have no way to unsubscribe from buckets after subscribing to them. Consider adding unsubscribe methods for non-partitioned tables to provide a complete and symmetric API.
| async fn unsubscribe_partition(&self, partition_id: PartitionId, bucket: i32) -> Result<()> { | ||
| if !self.is_partitioned_table { | ||
| return Err(Error::UnsupportedOperation { | ||
| message: "Can't unsubscribe a partition for a non-partitioned table.".to_string(), | ||
| }); | ||
| } | ||
| let table_bucket = | ||
| TableBucket::new_with_partition(self.table_id, Some(partition_id), bucket); | ||
| self.log_scanner_status | ||
| .unassign_scan_buckets(from_ref(&table_bucket)); | ||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
The unsubscribe_partition method doesn't call check_and_update_table_metadata like subscribe_partition does. All subscribe methods (subscribe, subscribe_batch, and subscribe_partition) update the table metadata before modifying the scanner state. For consistency and to ensure the scanner has the latest table information, unsubscribe_partition should also call check_and_update_table_metadata before unassigning buckets. This ensures that partition metadata is current when unsubscribing.
| async fn unsubscribe_partition(&self, partition_id: PartitionId, bucket: i32) -> Result<()> { | ||
| if !self.is_partitioned_table { | ||
| return Err(Error::UnsupportedOperation { | ||
| message: "Can't unsubscribe a partition for a non-partitioned table.".to_string(), | ||
| }); | ||
| } | ||
| let table_bucket = | ||
| TableBucket::new_with_partition(self.table_id, Some(partition_id), bucket); | ||
| self.log_scanner_status | ||
| .unassign_scan_buckets(from_ref(&table_bucket)); | ||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
The new unsubscribe_partition method lacks test coverage. There should be integration tests similar to the existing tests for subscribe_partition (e.g., in crates/fluss/tests/integration/log_table.rs) that verify:
- Successfully unsubscribing from a partition bucket
- Error handling when trying to unsubscribe from a non-partitioned table
- Behavior after unsubscribing (e.g., no more records received from that partition bucket)
This is especially important since the repository has comprehensive integration test coverage for scanner operations.
Purpose
Linked issue: close #254
Brief change log
Tests
API and Format
Documentation