Skip to content

Commit ec6b68c

Browse files
committed
feat: support unsubscribe partition
1 parent 90cc35a commit ec6b68c

4 files changed

Lines changed: 60 additions & 0 deletions

File tree

bindings/cpp/include/fluss.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -545,6 +545,7 @@ class LogScanner {
545545
Result Subscribe(int32_t bucket_id, int64_t start_offset);
546546
Result Subscribe(const std::vector<BucketSubscription>& bucket_offsets);
547547
Result SubscribePartition(int64_t partition_id, int32_t bucket_id, int64_t start_offset);
548+
Result UnsubscribePartition(int64_t partition_id, int32_t bucket_id);
548549
Result Poll(int64_t timeout_ms, ScanRecords& out);
549550
Result PollRecordBatch(int64_t timeout_ms, ArrowRecordBatches& out);
550551

bindings/cpp/src/lib.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,7 @@ mod ffi {
255255
bucket_id: i32,
256256
start_offset: i64,
257257
) -> FfiResult;
258+
fn unsubscribe_partition(self: &LogScanner, partition_id: i64, bucket_id: i32) -> FfiResult;
258259
fn poll(self: &LogScanner, timeout_ms: i64) -> FfiScanRecordsResult;
259260
fn poll_record_batch(self: &LogScanner, timeout_ms: i64) -> FfiArrowRecordBatchesResult;
260261
fn free_arrow_ffi_structures(array_ptr: usize, schema_ptr: usize);
@@ -825,6 +826,26 @@ impl LogScanner {
825826
self.do_subscribe(Some(partition_id), bucket_id, start_offset)
826827
}
827828

829+
fn unsubscribe_partition(&self, partition_id: PartitionId, bucket_id: i32) -> ffi::FfiResult {
830+
if let Some(ref inner) = self.inner {
831+
match RUNTIME.block_on(async {
832+
inner.unsubscribe_partition(partition_id, bucket_id).await
833+
}) {
834+
Ok(_) => ok_result(),
835+
Err(e) => err_result(1, e.to_string()),
836+
}
837+
} else if let Some(ref inner_batch) = self.inner_batch {
838+
match RUNTIME.block_on(async {
839+
inner_batch.unsubscribe_partition(partition_id, bucket_id).await
840+
}) {
841+
Ok(_) => ok_result(),
842+
Err(e) => err_result(1, e.to_string()),
843+
}
844+
} else {
845+
err_result(1, "LogScanner not initialized".to_string())
846+
}
847+
}
848+
828849
fn poll(&self, timeout_ms: i64) -> ffi::FfiScanRecordsResult {
829850
if let Some(ref inner) = self.inner {
830851
let timeout = Duration::from_millis(timeout_ms as u64);

bindings/cpp/src/table.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,15 @@ Result LogScanner::SubscribePartition(int64_t partition_id, int32_t bucket_id, i
274274
return utils::from_ffi_result(ffi_result);
275275
}
276276

277+
Result LogScanner::UnsubscribePartition(int64_t partition_id, int32_t bucket_id) {
278+
if (!Available()) {
279+
return utils::make_error(1, "LogScanner not available");
280+
}
281+
282+
auto ffi_result = scanner_->unsubscribe_partition(partition_id, bucket_id);
283+
return utils::from_ffi_result(ffi_result);
284+
}
285+
277286
Result LogScanner::Poll(int64_t timeout_ms, ScanRecords& out) {
278287
if (!Available()) {
279288
return utils::make_error(1, "LogScanner not available");

crates/fluss/src/client/table/scanner.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,19 @@ impl LogScannerInner {
409409
Ok(())
410410
}
411411

412+
async fn unsubscribe_partition(&self, partition_id: PartitionId, bucket: i32) -> Result<()> {
413+
if !self.is_partitioned_table {
414+
return Err(Error::UnsupportedOperation {
415+
message: "Can't unsubscribe a partition for a non-partitioned table.".to_string(),
416+
});
417+
}
418+
let table_bucket =
419+
TableBucket::new_with_partition(self.table_id, Some(partition_id), bucket);
420+
self.log_scanner_status
421+
.unassign_scan_buckets(from_ref(&table_bucket));
422+
Ok(())
423+
}
424+
412425
async fn poll_for_fetches(&self) -> Result<HashMap<TableBucket, Vec<ScanRecord>>> {
413426
let result = self.log_fetcher.collect_fetches()?;
414427
if !result.is_empty() {
@@ -487,6 +500,14 @@ impl LogScanner {
487500
.subscribe_partition(partition_id, bucket, offset)
488501
.await
489502
}
503+
504+
pub async fn unsubscribe_partition(
505+
&self,
506+
partition_id: PartitionId,
507+
bucket: i32,
508+
) -> Result<()> {
509+
self.inner.unsubscribe_partition(partition_id, bucket).await
510+
}
490511
}
491512

492513
// Implementation for RecordBatchLogScanner (batches mode)
@@ -514,6 +535,14 @@ impl RecordBatchLogScanner {
514535
.subscribe_partition(partition_id, bucket, offset)
515536
.await
516537
}
538+
539+
pub async fn unsubscribe_partition(
540+
&self,
541+
partition_id: PartitionId,
542+
bucket: i32,
543+
) -> Result<()> {
544+
self.inner.unsubscribe_partition(partition_id, bucket).await
545+
}
517546
}
518547

519548
struct LogFetcher {

0 commit comments

Comments
 (0)