Support Cpp bindings#83
Conversation
|
@luoyuxia Hi, yuxia, PTAL if u have time. |
There was a problem hiding this comment.
Pull request overview
This PR adds C++ language bindings for the Fluss client library, enabling C++ applications to interact with Fluss tables. The implementation uses the cxx library to create a safe FFI bridge between Rust and C++, exposing core functionality including connection management, admin operations, table operations, append writers, and log scanners.
Key changes:
- Implements FFI bridge layer using cxx for type-safe Rust-C++ interop
- Adds C++ wrapper classes with RAII resource management for Connection, Admin, Table, AppendWriter, and LogScanner
- Includes comprehensive example demonstrating table creation, data insertion, scanning, and column projection
- Modifies Config struct to manually implement Default trait to avoid conflicts with clap's Parser derive
Reviewed changes
Copilot reviewed 15 out of 15 changed files in this pull request and generated 13 comments.
Show a summary per file
| File | Description |
|---|---|
bindings/cpp/src/lib.rs |
Rust FFI implementation defining the bridge interface and core bindings logic |
bindings/cpp/src/types.rs |
Type conversion utilities between FFI types and Fluss core types |
bindings/cpp/src/connection.cpp |
C++ implementation of Connection class |
bindings/cpp/src/admin.cpp |
C++ implementation of Admin class for table management |
bindings/cpp/src/table.cpp |
C++ implementation of Table, AppendWriter, and LogScanner classes |
bindings/cpp/src/ffi_converter.hpp |
Helper utilities for converting between C++ and FFI types |
bindings/cpp/include/fluss.hpp |
Public C++ API header with all class definitions and types |
bindings/cpp/examples/example.cpp |
Comprehensive usage example demonstrating all features |
bindings/cpp/build.rs |
Build script for cxx bridge code generation |
bindings/cpp/Cargo.toml |
Rust package configuration for C++ bindings |
bindings/cpp/CMakeLists.txt |
CMake build configuration |
bindings/cpp/.clang-format |
Code formatting configuration |
bindings/cpp/.gitignore |
Git ignore rules for build artifacts |
crates/fluss/src/config.rs |
Manual Default implementation to avoid clap derive conflicts |
Cargo.toml |
Workspace updated to include cpp bindings |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if (!Available()) { | ||
| return utils::make_error(1, "Connection not available"); | ||
| } | ||
|
|
There was a problem hiding this comment.
Potential memory leak: If out.table_ is already non-null when GetTable is called, the existing resource will be overwritten without being freed, causing a memory leak. Consider calling out.Destroy() before assigning the new pointer, or check and free the existing resource first.
| // Free any existing resource before overwriting out.table_ | |
| out.Destroy(); |
| } | ||
|
|
||
| Result Connection::Connect(const std::string& bootstrap_server, Connection& out) { | ||
| try { |
There was a problem hiding this comment.
Potential memory leak: If out.conn_ is already non-null when Connect is called, the existing resource will be overwritten without being freed, causing a memory leak. Consider calling out.Destroy() before assigning the new pointer, or check and free the existing resource first.
| try { | |
| try { | |
| out.Destroy(); |
| datum.i64_val = array.value(row_id); | ||
| datum | ||
| } | ||
| _ => panic!("Unsupported Time64 unit for column {}", i), |
There was a problem hiding this comment.
Panics can cause undefined behavior when crossing FFI boundaries. Instead of panic!, consider returning an error Result or using a default/fallback value. The panic will not be caught by C++ exception handlers and will abort the process.
| } | ||
| _ => panic!("Unsupported Time64 unit for column {}", i), | ||
| }, | ||
| other => panic!("Unsupported Arrow data type for column {}: {:?}", i, other), |
There was a problem hiding this comment.
Panics can cause undefined behavior when crossing FFI boundaries. Instead of panic!, consider returning an error Result or using a default/fallback value. The panic will not be caught by C++ exception handlers and will abort the process.
| .expect("LargeUtf8 column expected"); | ||
| let mut datum = new_datum(DATUM_TYPE_STRING); | ||
| datum.string_val = array.value(row_id).to_string(); | ||
| datum | ||
| } | ||
| ArrowDataType::Binary => { | ||
| let mut datum = new_datum(DATUM_TYPE_BYTES); | ||
| datum.bytes_val = row.get_bytes(i); | ||
| datum | ||
| } | ||
| ArrowDataType::FixedSizeBinary(len) => { | ||
| let mut datum = new_datum(DATUM_TYPE_BYTES); | ||
| datum.bytes_val = row.get_binary(i, *len as usize); | ||
| datum | ||
| } | ||
| ArrowDataType::LargeBinary => { | ||
| let array = record_batch | ||
| .column(i) | ||
| .as_any() | ||
| .downcast_ref::<LargeBinaryArray>() | ||
| .expect("LargeBinary column expected"); | ||
| let mut datum = new_datum(DATUM_TYPE_BYTES); | ||
| datum.bytes_val = array.value(row_id).to_vec(); | ||
| datum | ||
| } | ||
| ArrowDataType::Date32 => { | ||
| let array = record_batch | ||
| .column(i) | ||
| .as_any() | ||
| .downcast_ref::<Date32Array>() | ||
| .expect("Date32 column expected"); | ||
| let mut datum = new_datum(DATUM_TYPE_INT32); | ||
| datum.i32_val = array.value(row_id); | ||
| datum | ||
| } | ||
| ArrowDataType::Timestamp(unit, _) => match unit { | ||
| TimeUnit::Second => { | ||
| let array = record_batch | ||
| .column(i) | ||
| .as_any() | ||
| .downcast_ref::<TimestampSecondArray>() | ||
| .expect("Timestamp(second) column expected"); | ||
| let mut datum = new_datum(DATUM_TYPE_INT64); | ||
| datum.i64_val = array.value(row_id); | ||
| datum | ||
| } | ||
| TimeUnit::Millisecond => { | ||
| let array = record_batch | ||
| .column(i) | ||
| .as_any() | ||
| .downcast_ref::<TimestampMillisecondArray>() | ||
| .expect("Timestamp(millisecond) column expected"); | ||
| let mut datum = new_datum(DATUM_TYPE_INT64); | ||
| datum.i64_val = array.value(row_id); | ||
| datum | ||
| } | ||
| TimeUnit::Microsecond => { | ||
| let array = record_batch | ||
| .column(i) | ||
| .as_any() | ||
| .downcast_ref::<TimestampMicrosecondArray>() | ||
| .expect("Timestamp(microsecond) column expected"); | ||
| let mut datum = new_datum(DATUM_TYPE_INT64); | ||
| datum.i64_val = array.value(row_id); | ||
| datum | ||
| } | ||
| TimeUnit::Nanosecond => { | ||
| let array = record_batch | ||
| .column(i) | ||
| .as_any() | ||
| .downcast_ref::<TimestampNanosecondArray>() | ||
| .expect("Timestamp(nanosecond) column expected"); | ||
| let mut datum = new_datum(DATUM_TYPE_INT64); | ||
| datum.i64_val = array.value(row_id); | ||
| datum | ||
| } | ||
| }, | ||
| ArrowDataType::Time32(unit) => match unit { | ||
| TimeUnit::Second => { | ||
| let array = record_batch | ||
| .column(i) | ||
| .as_any() | ||
| .downcast_ref::<Time32SecondArray>() | ||
| .expect("Time32(second) column expected"); | ||
| let mut datum = new_datum(DATUM_TYPE_INT32); | ||
| datum.i32_val = array.value(row_id); | ||
| datum | ||
| } | ||
| TimeUnit::Millisecond => { | ||
| let array = record_batch | ||
| .column(i) | ||
| .as_any() | ||
| .downcast_ref::<Time32MillisecondArray>() | ||
| .expect("Time32(millisecond) column expected"); | ||
| let mut datum = new_datum(DATUM_TYPE_INT32); | ||
| datum.i32_val = array.value(row_id); | ||
| datum | ||
| } | ||
| _ => panic!("Unsupported Time32 unit for column {}", i), | ||
| }, | ||
| ArrowDataType::Time64(unit) => match unit { | ||
| TimeUnit::Microsecond => { | ||
| let array = record_batch | ||
| .column(i) | ||
| .as_any() | ||
| .downcast_ref::<Time64MicrosecondArray>() | ||
| .expect("Time64(microsecond) column expected"); | ||
| let mut datum = new_datum(DATUM_TYPE_INT64); | ||
| datum.i64_val = array.value(row_id); | ||
| datum | ||
| } | ||
| TimeUnit::Nanosecond => { | ||
| let array = record_batch | ||
| .column(i) | ||
| .as_any() | ||
| .downcast_ref::<Time64NanosecondArray>() | ||
| .expect("Time64(nanosecond) column expected"); |
There was a problem hiding this comment.
The expect() calls throughout this function can panic if the downcasts fail, which will cause undefined behavior when crossing FFI boundaries. Consider using pattern matching with proper error handling instead of expect(), or return a Result type that can be properly handled by the C++ caller.
luoyuxia
left a comment
There was a problem hiding this comment.
@zhaohaidao Thanks for the pr. Left some comments. PTAL
| auto descriptor = fluss::TableDescriptor::NewBuilder() | ||
| .SetSchema(schema) | ||
| .SetBucketCount(1) | ||
| .SetProperty("table.log.arrow.compression.type", "NONE") |
There was a problem hiding this comment.
curious about why set compression type to NONE? Is there any bug when compression is not null?
There was a problem hiding this comment.
Yes, when compression and column pruning are enabled simultaneously, an "out of range" error occurs. I verified this with both Java and Rust, and both had this problem. I was probably using Fluss version 0.6 or 0.7 at the time; I'm not sure if the latest version has fixed it. I mentioned this issue in the ##57 as followed.

There was a problem hiding this comment.
Thanks for explantion. But looks werid to me since compression and column pruning are enabled in our internal production env and no any exception happen. I'm afraid of it's a special case for rust client. Could you please help check it when you got some time. If it did a Fluss issue, could you please help create the issue in Fluss repo?
There was a problem hiding this comment.
Thanks for explantion. But looks werid to me since compression and column pruning are enabled in our internal production env and no any exception happen. I'm afraid of it's a special case for rust client. Could you please help check it when you got some time. If it did a Fluss issue, could you please help create the issue in Fluss repo?
No problem. According to our plan, our internal use cases also require both compression and column prouning to be enabled simultaneously, so we will follow up on resolving this issue soon.
| std::string string_val; | ||
| std::vector<uint8_t> bytes_val; | ||
|
|
||
| static Datum Null() { return Datum(); } |
There was a problem hiding this comment.
nit:
static Datum Null() { return {}; }
?
| Schema schema; | ||
| }; | ||
|
|
||
| struct Datum { |
There was a problem hiding this comment.
Seem for even a bool value, Datum will occupy more bytes, right?
We can consider to optimze it in the future version. Two thought in here:
- use cpp variant
- rust side emit arrow record batch, and cpp side wrap the arrow record batch to provide row api
There was a problem hiding this comment.
ok.
I've created a separate issue to document potential bottlenecks caused by copying. What do you think? I've seen you mention copying as potentially optimizeable in several places.
There was a problem hiding this comment.
Make sense to me. Let's first built it and improve one step by step.
| datum.i64_val = ffi_datum.i64_val; | ||
| datum.f32_val = ffi_datum.f32_val; | ||
| datum.f64_val = ffi_datum.f64_val; | ||
| datum.string_val = std::string(ffi_datum.string_val); |
There was a problem hiding this comment.
seem here need to a string copy?
There was a problem hiding this comment.
I'm not sure is there any effient way. If complex, maybe left a todo to mark it. It'll reminds us if we find any bottle neck
| } | ||
| ArrowDataType::Utf8 => { | ||
| let mut datum = new_datum(DATUM_TYPE_STRING); | ||
| datum.string_val = row.get_string(i).to_string(); |
There was a problem hiding this comment.
It aslo need a string copy. Not sure whether it's easy or not to avoid this copy. We can left a todo to remind us the string copy wil happen in here.
luoyuxia
left a comment
There was a problem hiding this comment.
@zhaohaidao Thanks for the pr. LGTM
|
@zhaohaidao You can use to make clippy happy |
Thanks for the suggestion, the Rust toolchain is indeed very useful. |
Purpose
Linked issue: close #67
Brief change log
Tests
API and Format
Documentation