Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bindings/cpp/include/fluss.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,7 @@ class LogScanner {
Result Subscribe(int32_t bucket_id, int64_t start_offset);
Result Subscribe(const std::vector<BucketSubscription>& bucket_offsets);
Result SubscribePartition(int64_t partition_id, int32_t bucket_id, int64_t start_offset);
Result UnsubscribePartition(int64_t partition_id, int32_t bucket_id);
Result Poll(int64_t timeout_ms, ScanRecords& out);
Result PollRecordBatch(int64_t timeout_ms, ArrowRecordBatches& out);

Expand Down
24 changes: 24 additions & 0 deletions bindings/cpp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@ mod ffi {
bucket_id: i32,
start_offset: i64,
) -> FfiResult;
fn unsubscribe_partition(self: &LogScanner, partition_id: i64, bucket_id: i32)
-> FfiResult;
fn poll(self: &LogScanner, timeout_ms: i64) -> FfiScanRecordsResult;
fn poll_record_batch(self: &LogScanner, timeout_ms: i64) -> FfiArrowRecordBatchesResult;
fn free_arrow_ffi_structures(array_ptr: usize, schema_ptr: usize);
Expand Down Expand Up @@ -825,6 +827,28 @@ impl LogScanner {
self.do_subscribe(Some(partition_id), bucket_id, start_offset)
}

fn unsubscribe_partition(&self, partition_id: PartitionId, bucket_id: i32) -> ffi::FfiResult {
if let Some(ref inner) = self.inner {
match RUNTIME
.block_on(async { inner.unsubscribe_partition(partition_id, bucket_id).await })
{
Ok(_) => ok_result(),
Err(e) => err_result(1, e.to_string()),
}
} else if let Some(ref inner_batch) = self.inner_batch {
match RUNTIME.block_on(async {
inner_batch
.unsubscribe_partition(partition_id, bucket_id)
.await
}) {
Ok(_) => ok_result(),
Err(e) => err_result(1, e.to_string()),
}
} else {
err_result(1, "LogScanner not initialized".to_string())
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i cheked subscribe partition delegates to the consolidated do_subscribe helper. Maybe we can plan a similar helper going forward?

this LGTM tho and its not strictly needed

fn poll(&self, timeout_ms: i64) -> ffi::FfiScanRecordsResult {
if let Some(ref inner) = self.inner {
let timeout = Duration::from_millis(timeout_ms as u64);
Expand Down
9 changes: 9 additions & 0 deletions bindings/cpp/src/table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,15 @@ Result LogScanner::SubscribePartition(int64_t partition_id, int32_t bucket_id, i
return utils::from_ffi_result(ffi_result);
}

Result LogScanner::UnsubscribePartition(int64_t partition_id, int32_t bucket_id) {
if (!Available()) {
return utils::make_error(1, "LogScanner not available");
}

auto ffi_result = scanner_->unsubscribe_partition(partition_id, bucket_id);
return utils::from_ffi_result(ffi_result);
}

Result LogScanner::Poll(int64_t timeout_ms, ScanRecords& out) {
if (!Available()) {
return utils::make_error(1, "LogScanner not available");
Expand Down
29 changes: 29 additions & 0 deletions crates/fluss/src/client/table/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,19 @@ impl LogScannerInner {
Ok(())
}

async fn unsubscribe_partition(&self, partition_id: PartitionId, bucket: i32) -> Result<()> {
if !self.is_partitioned_table {
return Err(Error::UnsupportedOperation {
message: "Can't unsubscribe a partition for a non-partitioned table.".to_string(),
});
}
let table_bucket =
TableBucket::new_with_partition(self.table_id, Some(partition_id), bucket);
self.log_scanner_status
.unassign_scan_buckets(from_ref(&table_bucket));
Ok(())
}
Comment thread
luoyuxia marked this conversation as resolved.

async fn poll_for_fetches(&self) -> Result<HashMap<TableBucket, Vec<ScanRecord>>> {
let result = self.log_fetcher.collect_fetches()?;
if !result.is_empty() {
Expand Down Expand Up @@ -487,6 +500,14 @@ impl LogScanner {
.subscribe_partition(partition_id, bucket, offset)
.await
}

pub async fn unsubscribe_partition(
&self,
partition_id: PartitionId,
bucket: i32,
) -> Result<()> {
self.inner.unsubscribe_partition(partition_id, bucket).await
}
}

// Implementation for RecordBatchLogScanner (batches mode)
Expand Down Expand Up @@ -524,6 +545,14 @@ impl RecordBatchLogScanner {
pub fn get_subscribed_buckets(&self) -> Vec<(TableBucket, i64)> {
self.inner.log_scanner_status.get_all_subscriptions()
}

pub async fn unsubscribe_partition(
&self,
partition_id: PartitionId,
bucket: i32,
) -> Result<()> {
self.inner.unsubscribe_partition(partition_id, bucket).await
}
}

struct LogFetcher {
Expand Down
58 changes: 58 additions & 0 deletions crates/fluss/tests/integration/log_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1160,6 +1160,64 @@ mod table_test {
"Data mismatch between sent and received"
);

// Test unsubscribe_partition: after unsubscribing from one partition,
// data from that partition should no longer be read.
let log_scanner_unsub = table
.new_scan()
.create_log_scanner()
.expect("Failed to create log scanner for unsubscribe test");
let partition_infos = admin
.list_partition_infos(&table_path)
.await
.expect("Failed to list partition infos");
let eu_partition_id = partition_infos
.iter()
.find(|p| p.get_partition_name() == "EU")
.map(|p| p.get_partition_id())
.expect("EU partition should exist");
for info in &partition_infos {
log_scanner_unsub
.subscribe_partition(info.get_partition_id(), 0, 0)
.await
.expect("Failed to subscribe to partition");
}
log_scanner_unsub
.unsubscribe_partition(eu_partition_id, 0)
.await
.expect("Failed to unsubscribe from EU partition");

let mut records_after_unsubscribe: Vec<(i32, String, i64)> = Vec::new();
let unsub_deadline = std::time::Instant::now() + Duration::from_secs(5);
while records_after_unsubscribe.len() < 4 && std::time::Instant::now() < unsub_deadline {
let records = log_scanner_unsub
.poll(Duration::from_millis(300))
.await
.expect("Failed to poll after unsubscribe");
for rec in records {
let row = rec.row();
records_after_unsubscribe.push((
row.get_int(0),
row.get_string(1).to_string(),
row.get_long(2),
));
}
}

assert!(
records_after_unsubscribe.iter().all(|r| r.1 == "US"),
"After unsubscribe_partition(EU), only US partition data should be read; got regions: {:?}",
records_after_unsubscribe
.iter()
.map(|r| r.1.as_str())
.collect::<Vec<_>>()
);
assert_eq!(
records_after_unsubscribe.len(),
4,
"Should receive exactly 4 US records (ids 1,2,5,6); got {}",
records_after_unsubscribe.len()
);

admin
.drop_table(&table_path, false)
.await
Expand Down
Loading