diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp index 901b90ca..3ff9a26c 100644 --- a/bindings/cpp/include/fluss.hpp +++ b/bindings/cpp/include/fluss.hpp @@ -564,6 +564,7 @@ class LogScanner { Result Subscribe(int32_t bucket_id, int64_t start_offset); Result Subscribe(const std::vector& bucket_offsets); Result SubscribePartition(int64_t partition_id, int32_t bucket_id, int64_t start_offset); + Result UnsubscribePartition(int64_t partition_id, int32_t bucket_id); Result Poll(int64_t timeout_ms, ScanRecords& out); Result PollRecordBatch(int64_t timeout_ms, ArrowRecordBatches& out); diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index b8348654..d6e3a9a6 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -255,6 +255,8 @@ mod ffi { bucket_id: i32, start_offset: i64, ) -> FfiResult; + fn unsubscribe_partition(self: &LogScanner, partition_id: i64, bucket_id: i32) + -> FfiResult; fn poll(self: &LogScanner, timeout_ms: i64) -> FfiScanRecordsResult; fn poll_record_batch(self: &LogScanner, timeout_ms: i64) -> FfiArrowRecordBatchesResult; fn free_arrow_ffi_structures(array_ptr: usize, schema_ptr: usize); @@ -825,6 +827,28 @@ impl LogScanner { self.do_subscribe(Some(partition_id), bucket_id, start_offset) } + fn unsubscribe_partition(&self, partition_id: PartitionId, bucket_id: i32) -> ffi::FfiResult { + if let Some(ref inner) = self.inner { + match RUNTIME + .block_on(async { inner.unsubscribe_partition(partition_id, bucket_id).await }) + { + Ok(_) => ok_result(), + Err(e) => err_result(1, e.to_string()), + } + } else if let Some(ref inner_batch) = self.inner_batch { + match RUNTIME.block_on(async { + inner_batch + .unsubscribe_partition(partition_id, bucket_id) + .await + }) { + Ok(_) => ok_result(), + Err(e) => err_result(1, e.to_string()), + } + } else { + err_result(1, "LogScanner not initialized".to_string()) + } + } + fn poll(&self, timeout_ms: i64) -> ffi::FfiScanRecordsResult { if let Some(ref inner) = self.inner { let timeout = Duration::from_millis(timeout_ms as u64); diff --git a/bindings/cpp/src/table.cpp b/bindings/cpp/src/table.cpp index ab260389..efb762bd 100644 --- a/bindings/cpp/src/table.cpp +++ b/bindings/cpp/src/table.cpp @@ -264,6 +264,15 @@ Result LogScanner::SubscribePartition(int64_t partition_id, int32_t bucket_id, i return utils::from_ffi_result(ffi_result); } +Result LogScanner::UnsubscribePartition(int64_t partition_id, int32_t bucket_id) { + if (!Available()) { + return utils::make_error(1, "LogScanner not available"); + } + + auto ffi_result = scanner_->unsubscribe_partition(partition_id, bucket_id); + return utils::from_ffi_result(ffi_result); +} + Result LogScanner::Poll(int64_t timeout_ms, ScanRecords& out) { if (!Available()) { return utils::make_error(1, "LogScanner not available"); diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index d50f19e0..26f54da8 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -409,6 +409,19 @@ impl LogScannerInner { Ok(()) } + async fn unsubscribe_partition(&self, partition_id: PartitionId, bucket: i32) -> Result<()> { + if !self.is_partitioned_table { + return Err(Error::UnsupportedOperation { + message: "Can't unsubscribe a partition for a non-partitioned table.".to_string(), + }); + } + let table_bucket = + TableBucket::new_with_partition(self.table_id, Some(partition_id), bucket); + self.log_scanner_status + .unassign_scan_buckets(from_ref(&table_bucket)); + Ok(()) + } + async fn poll_for_fetches(&self) -> Result>> { let result = self.log_fetcher.collect_fetches()?; if !result.is_empty() { @@ -487,6 +500,14 @@ impl LogScanner { .subscribe_partition(partition_id, bucket, offset) .await } + + pub async fn unsubscribe_partition( + &self, + partition_id: PartitionId, + bucket: i32, + ) -> Result<()> { + self.inner.unsubscribe_partition(partition_id, bucket).await + } } // Implementation for RecordBatchLogScanner (batches mode) @@ -524,6 +545,14 @@ impl RecordBatchLogScanner { pub fn get_subscribed_buckets(&self) -> Vec<(TableBucket, i64)> { self.inner.log_scanner_status.get_all_subscriptions() } + + pub async fn unsubscribe_partition( + &self, + partition_id: PartitionId, + bucket: i32, + ) -> Result<()> { + self.inner.unsubscribe_partition(partition_id, bucket).await + } } struct LogFetcher { diff --git a/crates/fluss/tests/integration/log_table.rs b/crates/fluss/tests/integration/log_table.rs index 3f7dd6e2..493bb344 100644 --- a/crates/fluss/tests/integration/log_table.rs +++ b/crates/fluss/tests/integration/log_table.rs @@ -1160,6 +1160,64 @@ mod table_test { "Data mismatch between sent and received" ); + // Test unsubscribe_partition: after unsubscribing from one partition, + // data from that partition should no longer be read. + let log_scanner_unsub = table + .new_scan() + .create_log_scanner() + .expect("Failed to create log scanner for unsubscribe test"); + let partition_infos = admin + .list_partition_infos(&table_path) + .await + .expect("Failed to list partition infos"); + let eu_partition_id = partition_infos + .iter() + .find(|p| p.get_partition_name() == "EU") + .map(|p| p.get_partition_id()) + .expect("EU partition should exist"); + for info in &partition_infos { + log_scanner_unsub + .subscribe_partition(info.get_partition_id(), 0, 0) + .await + .expect("Failed to subscribe to partition"); + } + log_scanner_unsub + .unsubscribe_partition(eu_partition_id, 0) + .await + .expect("Failed to unsubscribe from EU partition"); + + let mut records_after_unsubscribe: Vec<(i32, String, i64)> = Vec::new(); + let unsub_deadline = std::time::Instant::now() + Duration::from_secs(5); + while records_after_unsubscribe.len() < 4 && std::time::Instant::now() < unsub_deadline { + let records = log_scanner_unsub + .poll(Duration::from_millis(300)) + .await + .expect("Failed to poll after unsubscribe"); + for rec in records { + let row = rec.row(); + records_after_unsubscribe.push(( + row.get_int(0), + row.get_string(1).to_string(), + row.get_long(2), + )); + } + } + + assert!( + records_after_unsubscribe.iter().all(|r| r.1 == "US"), + "After unsubscribe_partition(EU), only US partition data should be read; got regions: {:?}", + records_after_unsubscribe + .iter() + .map(|r| r.1.as_str()) + .collect::>() + ); + assert_eq!( + records_after_unsubscribe.len(), + 4, + "Should receive exactly 4 US records (ids 1,2,5,6); got {}", + records_after_unsubscribe.len() + ); + admin .drop_table(&table_path, false) .await