Support ListOffset/SubscribeBatch/DropTable for cpp bindings#100
Conversation
|
@luoyuxia Hi, yuxia, I added ListOffset/SubscribeBatch/Droptable apis for cpp bindings to serve internal cpp users. |
There was a problem hiding this comment.
Pull request overview
This pull request adds three new API methods to the C++ bindings for Fluss: ListOffsets, SubscribeBatch, and DropTable. It also enhances the ScanRecord structure to include bucket_id information, allowing consumers to identify which bucket each scanned record belongs to.
Key Changes:
- Added
ListOffsetsAPI to query bucket offsets by earliest, latest, or timestamp-based criteria - Added
SubscribeBatchAPI to enable subscribing to multiple buckets in a single call - Added
DropTableAPI for table deletion with optional ignore-if-not-exists flag
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
crates/fluss/src/client/table/scanner.rs |
Implements subscribe_batch method for subscribing to multiple buckets at once |
bindings/cpp/src/types.rs |
Updates core_scan_records_to_ffi to extract and include bucket_id in scan records |
bindings/cpp/src/table.cpp |
Implements C++ wrapper for SubscribeBatch method |
bindings/cpp/src/lib.rs |
Adds FFI bridge structures and implements Rust-side logic for all three new APIs |
bindings/cpp/src/ffi_converter.hpp |
Updates from_ffi_scan_record to handle bucket_id field |
bindings/cpp/src/admin.cpp |
Implements C++ wrappers for DropTable and ListOffsets admin methods |
bindings/cpp/include/fluss.hpp |
Declares new APIs, structs (OffsetQuery, BucketSubscription), and enums (OffsetSpec) |
bindings/cpp/examples/example.cpp |
Demonstrates usage of all new APIs with comprehensive examples including timestamp-based queries |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| let offset_spec = match offset_query.offset_type { | ||
| 0 => OffsetSpec::Earliest, | ||
| 1 => OffsetSpec::Latest, | ||
| 2 => OffsetSpec::Timestamp(offset_query.timestamp), | ||
| _ => { | ||
| return ffi::FfiListOffsetsResult { | ||
| result: err_result(1, format!("Invalid offset_type: {}", offset_query.offset_type)), | ||
| bucket_offsets: vec![], | ||
| }; | ||
| } | ||
| }; |
There was a problem hiding this comment.
The offset_type matching logic uses hardcoded integer values (0, 1, 2) without documentation or named constants. Consider defining these as constants in the FFI module or adding documentation comments to clarify the mapping between values and their meanings (Earliest=0, Latest=1, Timestamp=2).
|
|
||
| let result = RUNTIME.block_on(async { | ||
| self.inner.subscribe_batch(bucket_offsets).await |
There was a problem hiding this comment.
Remove trailing whitespace from this line for consistency with code style.
| let result = RUNTIME.block_on(async { | |
| self.inner.subscribe_batch(bucket_offsets).await | |
| let result = RUNTIME.block_on(async { | |
| self.inner.subscribe_batch(bucket_offsets).await |
|
|
||
| let result = RUNTIME.block_on(async { | ||
| self.inner.subscribe_batch(bucket_offsets).await |
There was a problem hiding this comment.
Remove trailing whitespace from this line for consistency with code style.
| let result = RUNTIME.block_on(async { | |
| self.inner.subscribe_batch(bucket_offsets).await | |
| let result = RUNTIME.block_on(async { | |
| self.inner.subscribe_batch(bucket_offsets).await |
| ffi_query.timestamp = offset_query.timestamp; | ||
|
|
||
| auto ffi_result = admin_->list_offsets(ffi_path, std::move(rust_bucket_ids), ffi_query); | ||
|
|
There was a problem hiding this comment.
Remove trailing whitespace from this line for consistency with code style.
| @@ -63,6 +63,24 @@ enum class DatumType { | |||
| Bytes = 7, | |||
| }; | |||
|
|
|||
There was a problem hiding this comment.
The constants EARLIEST_OFFSET and LATEST_OFFSET are defined but not used in the codebase shown. Consider documenting their intended use or removing them if they are not needed. These constants could be useful for users of the API to pass to Subscribe methods, but their purpose should be documented.
| // Special offset sentinel values exposed as part of the public API. | |
| // These can be used by callers that work directly with numeric offsets | |
| // (for example, when subscribing to a stream) to request: | |
| // - EARLIEST_OFFSET: start from the beginning of the log | |
| // - LATEST_OFFSET: start from the most recent (tail) offset. | |
| // For higher-level C++ APIs, prefer using OffsetQuery::Earliest() / | |
| // OffsetQuery::Latest() instead of these raw constants. |
|
|
||
| let mut scan_bucket_offsets = HashMap::new(); | ||
| for (bucket_id, offset) in bucket_offsets { | ||
| let table_bucket = TableBucket::new(self.table_id, bucket_id); | ||
| scan_bucket_offsets.insert(table_bucket, offset); | ||
| } | ||
|
|
There was a problem hiding this comment.
Remove trailing whitespace from this line for consistency with code style.
| let mut scan_bucket_offsets = HashMap::new(); | |
| for (bucket_id, offset) in bucket_offsets { | |
| let table_bucket = TableBucket::new(self.table_id, bucket_id); | |
| scan_bucket_offsets.insert(table_bucket, offset); | |
| } | |
| let mut scan_bucket_offsets = HashMap::new(); | |
| for (bucket_id, offset) in bucket_offsets { | |
| let table_bucket = TableBucket::new(self.table_id, bucket_id); | |
| scan_bucket_offsets.insert(table_bucket, offset); | |
| } |
|
|
||
| let mut scan_bucket_offsets = HashMap::new(); | ||
| for (bucket_id, offset) in bucket_offsets { | ||
| let table_bucket = TableBucket::new(self.table_id, bucket_id); | ||
| scan_bucket_offsets.insert(table_bucket, offset); | ||
| } | ||
|
|
There was a problem hiding this comment.
Remove trailing whitespace from this line for consistency with code style.
| let mut scan_bucket_offsets = HashMap::new(); | |
| for (bucket_id, offset) in bucket_offsets { | |
| let table_bucket = TableBucket::new(self.table_id, bucket_id); | |
| scan_bucket_offsets.insert(table_bucket, offset); | |
| } | |
| let mut scan_bucket_offsets = HashMap::new(); | |
| for (bucket_id, offset) in bucket_offsets { | |
| let table_bucket = TableBucket::new(self.table_id, bucket_id); | |
| scan_bucket_offsets.insert(table_bucket, offset); | |
| } |
| } | ||
|
|
||
| auto ffi_path = utils::to_ffi_table_path(table_path); | ||
|
|
There was a problem hiding this comment.
Remove trailing whitespace from this line for consistency with code style.
| use std::collections::HashMap; | ||
| let mut bucket_offsets = HashMap::new(); | ||
| for sub in subscriptions { | ||
| bucket_offsets.insert(sub.bucket_id, sub.offset); | ||
| } | ||
|
|
||
| let result = RUNTIME.block_on(async { | ||
| self.inner.subscribe_batch(bucket_offsets).await |
There was a problem hiding this comment.
Consider adding validation for empty subscriptions vector. When subscriptions is empty, subscribe_batch will make a call that might not be meaningful. Consider returning early with an appropriate error or success result.
| use std::collections::HashMap; | |
| let mut bucket_offsets = HashMap::new(); | |
| for sub in subscriptions { | |
| bucket_offsets.insert(sub.bucket_id, sub.offset); | |
| } | |
| let result = RUNTIME.block_on(async { | |
| self.inner.subscribe_batch(bucket_offsets).await | |
| if subscriptions.is_empty() { | |
| return ok_result(); | |
| } | |
| use std::collections::HashMap; | |
| let mut bucket_offsets = HashMap::new(); | |
| for sub in subscriptions { | |
| bucket_offsets.insert(sub.bucket_id, sub.offset); | |
| } | |
| let result = RUNTIME.block_on(async { | |
| self.inner.subscribe_batch(bucket_offsets).await |
There was a problem hiding this comment.
maybe we can optimize it in rust core
luoyuxia
left a comment
There was a problem hiding this comment.
@zhaohaidao Thanks for the pr. Left minor comment. Depend on you to take or not. Otherwise, LGTM
| }; | ||
|
|
||
| constexpr int64_t EARLIEST_OFFSET = -2; | ||
| constexpr int64_t LATEST_OFFSET = -1; |
There was a problem hiding this comment.
I'm curious about LATEST_OFFSET is for what. I can't find it in java code base.
There was a problem hiding this comment.
Sometimes, lagged consumption can impact business operations. In such cases, the business may choose to resolve the issue by reverting to the latest consumption.
| use std::collections::HashMap; | ||
| let mut bucket_offsets = HashMap::new(); | ||
| for sub in subscriptions { | ||
| bucket_offsets.insert(sub.bucket_id, sub.offset); | ||
| } | ||
|
|
||
| let result = RUNTIME.block_on(async { | ||
| self.inner.subscribe_batch(bucket_offsets).await |
There was a problem hiding this comment.
maybe we can optimize it in rust core
|
@luoyuxia Hi, yuxia, Comments are addressed. PTAL if u have time. |
…pache#100) --------- Co-authored-by: 赵海源 <zhaohaiyuan@xiaohongshu.com>
Purpose
Linked issue: close #101
Brief change log
Tests
API and Format
Documentation