Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 94 additions & 5 deletions bindings/cpp/examples/example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

#include <iostream>
#include <vector>
#include <unordered_map>
#include <chrono>

static void check(const char* step, const fluss::Result& r) {
if (!r.Ok()) {
Expand All @@ -37,6 +39,17 @@ int main() {
fluss::Admin admin;
check("get_admin", conn.GetAdmin(admin));

fluss::TablePath table_path("fluss", "sample_table_cpp_v1");

// 2.1) Drop table if exists
std::cout << "Dropping table if exists..." << std::endl;
auto drop_result = admin.DropTable(table_path, true);
if (drop_result.Ok()) {
std::cout << "Table dropped successfully" << std::endl;
} else {
std::cout << "Table drop result: " << drop_result.error_message << std::endl;
}

// 3) Schema & descriptor
auto schema = fluss::Schema::NewBuilder()
.AddColumn("id", fluss::DataType::Int)
Expand All @@ -47,14 +60,14 @@ int main() {

auto descriptor = fluss::TableDescriptor::NewBuilder()
.SetSchema(schema)
.SetBucketCount(1)
.SetBucketCount(3)
.SetProperty("table.log.arrow.compression.type", "NONE")
.SetComment("cpp example table")
.SetComment("cpp example table with 3 buckets")
.Build();

fluss::TablePath table_path("fluss", "sample_table_cpp_v1");
// ignore_if_exists=true to allow re-run
check("create_table", admin.CreateTable(table_path, descriptor, true));
// 3.1) Create table with 3 buckets
std::cout << "Creating table with 3 buckets..." << std::endl;
check("create_table", admin.CreateTable(table_path, descriptor, false));

// 4) Get table
fluss::Table table;
Expand Down Expand Up @@ -162,5 +175,81 @@ int main() {
std::exit(1);
}

// 8) List offsets examples
std::cout << "\n=== List Offsets Examples ===" << std::endl;

// 8.1) Query earliest offsets for all buckets
std::vector<int32_t> all_bucket_ids;
for (int b = 0; b < buckets; ++b) {
all_bucket_ids.push_back(b);
}

std::unordered_map<int32_t, int64_t> earliest_offsets;
check("list_earliest_offsets",
admin.ListOffsets(table_path, all_bucket_ids,
fluss::OffsetQuery::Earliest(),
earliest_offsets));
std::cout << "Earliest offsets:" << std::endl;
for (const auto& [bucket_id, offset] : earliest_offsets) {
std::cout << " Bucket " << bucket_id << ": offset=" << offset << std::endl;
}

// 8.2) Query latest offsets for all buckets
std::unordered_map<int32_t, int64_t> latest_offsets;
check("list_latest_offsets",
admin.ListOffsets(table_path, all_bucket_ids,
fluss::OffsetQuery::Latest(),
latest_offsets));
std::cout << "Latest offsets:" << std::endl;
for (const auto& [bucket_id, offset] : latest_offsets) {
std::cout << " Bucket " << bucket_id << ": offset=" << offset << std::endl;
}

// 8.3) Query offsets for a specific timestamp (current time - 1 hour)
auto now = std::chrono::system_clock::now();
auto one_hour_ago = now - std::chrono::hours(1);
auto timestamp_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
one_hour_ago.time_since_epoch()).count();

std::unordered_map<int32_t, int64_t> timestamp_offsets;
check("list_timestamp_offsets",
admin.ListOffsets(table_path, all_bucket_ids,
fluss::OffsetQuery::FromTimestamp(timestamp_ms),
timestamp_offsets));
std::cout << "Offsets for timestamp " << timestamp_ms << " (1 hour ago):" << std::endl;
for (const auto& [bucket_id, offset] : timestamp_offsets) {
std::cout << " Bucket " << bucket_id << ": offset=" << offset << std::endl;
}

// 8.4) Use batch subscribe with offsets from list_offsets
std::cout << "\n=== Batch Subscribe Example ===" << std::endl;
fluss::LogScanner batch_scanner;
check("new_log_scanner_for_batch", table.NewLogScanner(batch_scanner));

std::vector<fluss::BucketSubscription> subscriptions;
for (const auto& [bucket_id, offset] : earliest_offsets) {
subscriptions.push_back({bucket_id, offset});
std::cout << "Preparing subscription: bucket=" << bucket_id
<< ", offset=" << offset << std::endl;
}

check("subscribe_batch", batch_scanner.Subscribe(subscriptions));
std::cout << "Batch subscribed to " << subscriptions.size() << " buckets" << std::endl;

// 8.5) Poll and verify bucket_id in records
fluss::ScanRecords batch_records;
check("poll_batch", batch_scanner.Poll(5000, batch_records));

std::cout << "Scanned " << batch_records.Size() << " records from batch subscription" << std::endl;
for (size_t i = 0; i < batch_records.Size() && i < 5; ++i) {
const auto& rec = batch_records[i];
std::cout << " Record " << i << ": bucket_id=" << rec.bucket_id
<< ", offset=" << rec.offset
<< ", timestamp=" << rec.timestamp << std::endl;
}
if (batch_records.Size() > 5) {
std::cout << " ... and " << (batch_records.Size() - 5) << " more records" << std::endl;
}

return 0;
}
32 changes: 32 additions & 0 deletions bindings/cpp/include/fluss.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,24 @@ enum class DatumType {
Bytes = 7,
};

Copy link

Copilot AI Dec 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The constants EARLIEST_OFFSET and LATEST_OFFSET are defined but not used in the codebase shown. Consider documenting their intended use or removing them if they are not needed. These constants could be useful for users of the API to pass to Subscribe methods, but their purpose should be documented.

Suggested change
// Special offset sentinel values exposed as part of the public API.
// These can be used by callers that work directly with numeric offsets
// (for example, when subscribing to a stream) to request:
// - EARLIEST_OFFSET: start from the beginning of the log
// - LATEST_OFFSET: start from the most recent (tail) offset.
// For higher-level C++ APIs, prefer using OffsetQuery::Earliest() /
// OffsetQuery::Latest() instead of these raw constants.

Copilot uses AI. Check for mistakes.
constexpr int64_t EARLIEST_OFFSET = -2;
constexpr int64_t LATEST_OFFSET = -1;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious about LATEST_OFFSET is for what. I can't find it in java code base.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sometimes, lagged consumption can impact business operations. In such cases, the business may choose to resolve the issue by reverting to the latest consumption.


enum class OffsetSpec {
Earliest = 0,
Latest = 1,
Timestamp = 2,
};

struct OffsetQuery {
OffsetSpec spec;
int64_t timestamp{0};

static OffsetQuery Earliest() { return {OffsetSpec::Earliest, 0}; }
static OffsetQuery Latest() { return {OffsetSpec::Latest, 0}; }
static OffsetQuery FromTimestamp(int64_t ts) { return {OffsetSpec::Timestamp, ts}; }
};

struct Result {
int32_t error_code{0};
std::string error_message;
Expand Down Expand Up @@ -301,6 +319,7 @@ struct GenericRow {
};

struct ScanRecord {
int32_t bucket_id;
int64_t offset;
int64_t timestamp;
GenericRow row;
Expand All @@ -324,6 +343,11 @@ struct BucketOffset {
int64_t offset;
};

struct BucketSubscription {
int32_t bucket_id;
int64_t offset;
};

struct LakeSnapshot {
int64_t snapshot_id;
std::vector<BucketOffset> bucket_offsets;
Expand Down Expand Up @@ -372,10 +396,17 @@ class Admin {
const TableDescriptor& descriptor,
bool ignore_if_exists = false);

Result DropTable(const TablePath& table_path, bool ignore_if_not_exists = false);

Result GetTable(const TablePath& table_path, TableInfo& out);

Result GetLatestLakeSnapshot(const TablePath& table_path, LakeSnapshot& out);

Result ListOffsets(const TablePath& table_path,
const std::vector<int32_t>& bucket_ids,
const OffsetQuery& offset_query,
std::unordered_map<int32_t, int64_t>& out);

private:
friend class Connection;
Admin(ffi::Admin* admin) noexcept;
Expand Down Expand Up @@ -448,6 +479,7 @@ class LogScanner {
bool Available() const;

Result Subscribe(int32_t bucket_id, int64_t start_offset);
Result Subscribe(const std::vector<BucketSubscription>& bucket_offsets);
Result Poll(int64_t timeout_ms, ScanRecords& out);

private:
Expand Down
42 changes: 42 additions & 0 deletions bindings/cpp/src/admin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,16 @@ Result Admin::CreateTable(const TablePath& table_path,
return utils::from_ffi_result(ffi_result);
}

Result Admin::DropTable(const TablePath& table_path, bool ignore_if_not_exists) {
if (!Available()) {
return utils::make_error(1, "Admin not available");
}

auto ffi_path = utils::to_ffi_table_path(table_path);
auto ffi_result = admin_->drop_table(ffi_path, ignore_if_not_exists);
return utils::from_ffi_result(ffi_result);
}

Result Admin::GetTable(const TablePath& table_path, TableInfo& out) {
if (!Available()) {
return utils::make_error(1, "Admin not available");
Expand Down Expand Up @@ -98,4 +108,36 @@ Result Admin::GetLatestLakeSnapshot(const TablePath& table_path, LakeSnapshot& o
return result;
}

Result Admin::ListOffsets(const TablePath& table_path,
const std::vector<int32_t>& bucket_ids,
const OffsetQuery& offset_query,
std::unordered_map<int32_t, int64_t>& out) {
if (!Available()) {
return utils::make_error(1, "Admin not available");
}

auto ffi_path = utils::to_ffi_table_path(table_path);

Copy link

Copilot AI Dec 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove trailing whitespace from this line for consistency with code style.

Copilot uses AI. Check for mistakes.
rust::Vec<int32_t> rust_bucket_ids;
for (int32_t id : bucket_ids) {
rust_bucket_ids.push_back(id);
}

ffi::FfiOffsetQuery ffi_query;
ffi_query.offset_type = static_cast<int32_t>(offset_query.spec);
ffi_query.timestamp = offset_query.timestamp;

auto ffi_result = admin_->list_offsets(ffi_path, std::move(rust_bucket_ids), ffi_query);

Copy link

Copilot AI Dec 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove trailing whitespace from this line for consistency with code style.

Copilot uses AI. Check for mistakes.
auto result = utils::from_ffi_result(ffi_result.result);
if (result.Ok()) {
out.clear();
for (const auto& pair : ffi_result.bucket_offsets) {
out[pair.bucket_id] = pair.offset;
}
}

return result;
}

} // namespace fluss
1 change: 1 addition & 0 deletions bindings/cpp/src/ffi_converter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ inline GenericRow from_ffi_generic_row(const ffi::FfiGenericRow& ffi_row) {

inline ScanRecord from_ffi_scan_record(const ffi::FfiScanRecord& ffi_record) {
return ScanRecord{
ffi_record.bucket_id,
ffi_record.offset,
ffi_record.timestamp,
from_ffi_generic_row(ffi_record.row)};
Expand Down
Loading
Loading