diff --git a/bindings/cpp/examples/example.cpp b/bindings/cpp/examples/example.cpp index 45f7f9ea..7c783538 100644 --- a/bindings/cpp/examples/example.cpp +++ b/bindings/cpp/examples/example.cpp @@ -235,7 +235,7 @@ int main() { << ", offset=" << offset << std::endl; } - check("subscribe_batch", batch_scanner.Subscribe(subscriptions)); + check("subscribe_buckets", batch_scanner.Subscribe(subscriptions)); std::cout << "Batch subscribed to " << subscriptions.size() << " buckets" << std::endl; // 8.5) Poll and verify bucket_id in records diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index ab02c8d0..b8348654 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -245,7 +245,7 @@ mod ffi { // LogScanner unsafe fn delete_log_scanner(scanner: *mut LogScanner); fn subscribe(self: &LogScanner, bucket_id: i32, start_offset: i64) -> FfiResult; - fn subscribe_batch( + fn subscribe_buckets( self: &LogScanner, subscriptions: Vec, ) -> FfiResult; @@ -789,7 +789,7 @@ impl LogScanner { } } - fn subscribe_batch(&self, subscriptions: Vec) -> ffi::FfiResult { + fn subscribe_buckets(&self, subscriptions: Vec) -> ffi::FfiResult { use std::collections::HashMap; let mut bucket_offsets = HashMap::new(); for sub in subscriptions { @@ -797,7 +797,7 @@ impl LogScanner { } if let Some(ref inner) = self.inner { - let result = RUNTIME.block_on(async { inner.subscribe_batch(&bucket_offsets).await }); + let result = RUNTIME.block_on(async { inner.subscribe_buckets(&bucket_offsets).await }); match result { Ok(_) => ok_result(), @@ -805,7 +805,7 @@ impl LogScanner { } } else if let Some(ref inner_batch) = self.inner_batch { let result = - RUNTIME.block_on(async { inner_batch.subscribe_batch(&bucket_offsets).await }); + RUNTIME.block_on(async { inner_batch.subscribe_buckets(&bucket_offsets).await }); match result { Ok(_) => ok_result(), diff --git a/bindings/cpp/src/table.cpp b/bindings/cpp/src/table.cpp index b327dbac..133dcca2 100644 --- a/bindings/cpp/src/table.cpp +++ b/bindings/cpp/src/table.cpp @@ -261,7 +261,7 @@ Result LogScanner::Subscribe(const std::vector& bucket_offse rust_subs.push_back(ffi_sub); } - auto ffi_result = scanner_->subscribe_batch(std::move(rust_subs)); + auto ffi_result = scanner_->subscribe_buckets(std::move(rust_subs)); return utils::from_ffi_result(ffi_result); } diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index aa9fca4f..ef68fb4d 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -357,11 +357,11 @@ impl LogScannerInner { Ok(()) } - async fn subscribe_batch(&self, bucket_offsets: &HashMap) -> Result<()> { + async fn subscribe_buckets(&self, bucket_offsets: &HashMap) -> Result<()> { if self.is_partitioned_table { return Err(Error::UnsupportedOperation { message: - "The table is a partitioned table, subscribe_batch is not supported currently." + "The table is a partitioned table, subscribe_buckets is not supported currently." .to_string(), }); } @@ -473,8 +473,8 @@ impl LogScanner { self.inner.subscribe(bucket, offset).await } - pub async fn subscribe_batch(&self, bucket_offsets: &HashMap) -> Result<()> { - self.inner.subscribe_batch(bucket_offsets).await + pub async fn subscribe_buckets(&self, bucket_offsets: &HashMap) -> Result<()> { + self.inner.subscribe_buckets(bucket_offsets).await } pub async fn subscribe_partition( @@ -500,8 +500,8 @@ impl RecordBatchLogScanner { self.inner.subscribe(bucket, offset).await } - pub async fn subscribe_batch(&self, bucket_offsets: &HashMap) -> Result<()> { - self.inner.subscribe_batch(bucket_offsets).await + pub async fn subscribe_buckets(&self, bucket_offsets: &HashMap) -> Result<()> { + self.inner.subscribe_buckets(bucket_offsets).await } pub async fn subscribe_partition( diff --git a/crates/fluss/tests/integration/log_table.rs b/crates/fluss/tests/integration/log_table.rs index cbfcbe58..3f7dd6e2 100644 --- a/crates/fluss/tests/integration/log_table.rs +++ b/crates/fluss/tests/integration/log_table.rs @@ -426,7 +426,7 @@ mod table_test { let mut bucket_offsets = HashMap::new(); bucket_offsets.insert(0, 0); log_scanner - .subscribe_batch(&bucket_offsets) + .subscribe_buckets(&bucket_offsets) .await .expect("Failed to subscribe");