Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bindings/cpp/include/fluss.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,7 @@ class LogScanner {
Result Subscribe(int32_t bucket_id, int64_t start_offset);
Result Subscribe(const std::vector<BucketSubscription>& bucket_offsets);
Result SubscribePartition(int64_t partition_id, int32_t bucket_id, int64_t start_offset);
Result UnsubscribePartition(int64_t partition_id, int32_t bucket_id);
Result Poll(int64_t timeout_ms, ScanRecords& out);
Result PollRecordBatch(int64_t timeout_ms, ArrowRecordBatches& out);

Expand Down
21 changes: 21 additions & 0 deletions bindings/cpp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ mod ffi {
bucket_id: i32,
start_offset: i64,
) -> FfiResult;
fn unsubscribe_partition(self: &LogScanner, partition_id: i64, bucket_id: i32) -> FfiResult;
fn poll(self: &LogScanner, timeout_ms: i64) -> FfiScanRecordsResult;
fn poll_record_batch(self: &LogScanner, timeout_ms: i64) -> FfiArrowRecordBatchesResult;
fn free_arrow_ffi_structures(array_ptr: usize, schema_ptr: usize);
Expand Down Expand Up @@ -825,6 +826,26 @@ impl LogScanner {
self.do_subscribe(Some(partition_id), bucket_id, start_offset)
}

fn unsubscribe_partition(&self, partition_id: PartitionId, bucket_id: i32) -> ffi::FfiResult {
if let Some(ref inner) = self.inner {
match RUNTIME.block_on(async {
inner.unsubscribe_partition(partition_id, bucket_id).await
}) {
Ok(_) => ok_result(),
Err(e) => err_result(1, e.to_string()),
}
} else if let Some(ref inner_batch) = self.inner_batch {
match RUNTIME.block_on(async {
inner_batch.unsubscribe_partition(partition_id, bucket_id).await
}) {
Ok(_) => ok_result(),
Err(e) => err_result(1, e.to_string()),
}
} else {
err_result(1, "LogScanner not initialized".to_string())
}
}

fn poll(&self, timeout_ms: i64) -> ffi::FfiScanRecordsResult {
if let Some(ref inner) = self.inner {
let timeout = Duration::from_millis(timeout_ms as u64);
Expand Down
9 changes: 9 additions & 0 deletions bindings/cpp/src/table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,15 @@ Result LogScanner::SubscribePartition(int64_t partition_id, int32_t bucket_id, i
return utils::from_ffi_result(ffi_result);
}

Result LogScanner::UnsubscribePartition(int64_t partition_id, int32_t bucket_id) {
if (!Available()) {
return utils::make_error(1, "LogScanner not available");
}

auto ffi_result = scanner_->unsubscribe_partition(partition_id, bucket_id);
return utils::from_ffi_result(ffi_result);
}

Result LogScanner::Poll(int64_t timeout_ms, ScanRecords& out) {
if (!Available()) {
return utils::make_error(1, "LogScanner not available");
Expand Down
29 changes: 29 additions & 0 deletions crates/fluss/src/client/table/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,19 @@ impl LogScannerInner {
Ok(())
}

async fn unsubscribe_partition(&self, partition_id: PartitionId, bucket: i32) -> Result<()> {
if !self.is_partitioned_table {
return Err(Error::UnsupportedOperation {
message: "Can't unsubscribe a partition for a non-partitioned table.".to_string(),
});
}
let table_bucket =
TableBucket::new_with_partition(self.table_id, Some(partition_id), bucket);
self.log_scanner_status
.unassign_scan_buckets(from_ref(&table_bucket));
Ok(())
}
Comment on lines +412 to +423
Copy link

Copilot AI Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The unsubscribe_partition method doesn't call check_and_update_table_metadata like subscribe_partition does. All subscribe methods (subscribe, subscribe_batch, and subscribe_partition) update the table metadata before modifying the scanner state. For consistency and to ensure the scanner has the latest table information, unsubscribe_partition should also call check_and_update_table_metadata before unassigning buckets. This ensures that partition metadata is current when unsubscribing.

Copilot uses AI. Check for mistakes.
Comment on lines +412 to +423
Copy link

Copilot AI Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new unsubscribe_partition method lacks test coverage. There should be integration tests similar to the existing tests for subscribe_partition (e.g., in crates/fluss/tests/integration/log_table.rs) that verify:

  1. Successfully unsubscribing from a partition bucket
  2. Error handling when trying to unsubscribe from a non-partitioned table
  3. Behavior after unsubscribing (e.g., no more records received from that partition bucket)

This is especially important since the repository has comprehensive integration test coverage for scanner operations.

Copilot uses AI. Check for mistakes.

async fn poll_for_fetches(&self) -> Result<HashMap<TableBucket, Vec<ScanRecord>>> {
let result = self.log_fetcher.collect_fetches()?;
if !result.is_empty() {
Expand Down Expand Up @@ -487,6 +500,14 @@ impl LogScanner {
.subscribe_partition(partition_id, bucket, offset)
.await
}

pub async fn unsubscribe_partition(
&self,
partition_id: PartitionId,
bucket: i32,
) -> Result<()> {
self.inner.unsubscribe_partition(partition_id, bucket).await
}
Comment on lines +504 to +510
Copy link

Copilot AI Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The API is asymmetric - there are subscribe and subscribe_batch methods for non-partitioned tables, but no corresponding unsubscribe or unsubscribe_batch methods. Only unsubscribe_partition is provided for partitioned tables. This creates an inconsistency where users of non-partitioned tables have no way to unsubscribe from buckets after subscribing to them. Consider adding unsubscribe methods for non-partitioned tables to provide a complete and symmetric API.

Copilot uses AI. Check for mistakes.
}

// Implementation for RecordBatchLogScanner (batches mode)
Expand Down Expand Up @@ -514,6 +535,14 @@ impl RecordBatchLogScanner {
.subscribe_partition(partition_id, bucket, offset)
.await
}

pub async fn unsubscribe_partition(
&self,
partition_id: PartitionId,
bucket: i32,
) -> Result<()> {
self.inner.unsubscribe_partition(partition_id, bucket).await
}
}

struct LogFetcher {
Expand Down
Loading