Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bindings/cpp/examples/example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ int main() {
<< ", offset=" << offset << std::endl;
}

check("subscribe_batch", batch_scanner.Subscribe(subscriptions));
check("subscribe_buckets", batch_scanner.Subscribe(subscriptions));
std::cout << "Batch subscribed to " << subscriptions.size() << " buckets" << std::endl;

// 8.5) Poll and verify bucket_id in records
Expand Down
8 changes: 4 additions & 4 deletions bindings/cpp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ mod ffi {
// LogScanner
unsafe fn delete_log_scanner(scanner: *mut LogScanner);
fn subscribe(self: &LogScanner, bucket_id: i32, start_offset: i64) -> FfiResult;
fn subscribe_batch(
fn subscribe_buckets(
self: &LogScanner,
subscriptions: Vec<FfiBucketSubscription>,
) -> FfiResult;
Expand Down Expand Up @@ -789,23 +789,23 @@ impl LogScanner {
}
}

fn subscribe_batch(&self, subscriptions: Vec<ffi::FfiBucketSubscription>) -> ffi::FfiResult {
fn subscribe_buckets(&self, subscriptions: Vec<ffi::FfiBucketSubscription>) -> ffi::FfiResult {
use std::collections::HashMap;
let mut bucket_offsets = HashMap::new();
for sub in subscriptions {
bucket_offsets.insert(sub.bucket_id, sub.offset);
}

if let Some(ref inner) = self.inner {
let result = RUNTIME.block_on(async { inner.subscribe_batch(&bucket_offsets).await });
let result = RUNTIME.block_on(async { inner.subscribe_buckets(&bucket_offsets).await });

match result {
Ok(_) => ok_result(),
Err(e) => err_result(1, e.to_string()),
}
} else if let Some(ref inner_batch) = self.inner_batch {
let result =
RUNTIME.block_on(async { inner_batch.subscribe_batch(&bucket_offsets).await });
RUNTIME.block_on(async { inner_batch.subscribe_buckets(&bucket_offsets).await });

match result {
Ok(_) => ok_result(),
Expand Down
2 changes: 1 addition & 1 deletion bindings/cpp/src/table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ Result LogScanner::Subscribe(const std::vector<BucketSubscription>& bucket_offse
rust_subs.push_back(ffi_sub);
}

auto ffi_result = scanner_->subscribe_batch(std::move(rust_subs));
auto ffi_result = scanner_->subscribe_buckets(std::move(rust_subs));
return utils::from_ffi_result(ffi_result);
}

Expand Down
12 changes: 6 additions & 6 deletions crates/fluss/src/client/table/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,11 +357,11 @@ impl LogScannerInner {
Ok(())
}

async fn subscribe_batch(&self, bucket_offsets: &HashMap<i32, i64>) -> Result<()> {
async fn subscribe_buckets(&self, bucket_offsets: &HashMap<i32, i64>) -> Result<()> {
if self.is_partitioned_table {
return Err(Error::UnsupportedOperation {
message:
"The table is a partitioned table, subscribe_batch is not supported currently."
"The table is a partitioned table, subscribe_buckets is not supported currently."
.to_string(),
});
}
Expand Down Expand Up @@ -473,8 +473,8 @@ impl LogScanner {
self.inner.subscribe(bucket, offset).await
}

pub async fn subscribe_batch(&self, bucket_offsets: &HashMap<i32, i64>) -> Result<()> {
self.inner.subscribe_batch(bucket_offsets).await
pub async fn subscribe_buckets(&self, bucket_offsets: &HashMap<i32, i64>) -> Result<()> {
self.inner.subscribe_buckets(bucket_offsets).await
}

pub async fn subscribe_partition(
Expand All @@ -500,8 +500,8 @@ impl RecordBatchLogScanner {
self.inner.subscribe(bucket, offset).await
}

pub async fn subscribe_batch(&self, bucket_offsets: &HashMap<i32, i64>) -> Result<()> {
self.inner.subscribe_batch(bucket_offsets).await
pub async fn subscribe_buckets(&self, bucket_offsets: &HashMap<i32, i64>) -> Result<()> {
self.inner.subscribe_buckets(bucket_offsets).await
}

pub async fn subscribe_partition(
Expand Down
2 changes: 1 addition & 1 deletion crates/fluss/tests/integration/log_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ mod table_test {
let mut bucket_offsets = HashMap::new();
bucket_offsets.insert(0, 0);
log_scanner
.subscribe_batch(&bucket_offsets)
.subscribe_buckets(&bucket_offsets)
.await
.expect("Failed to subscribe");

Expand Down
Loading