Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 139 additions & 90 deletions bindings/cpp/examples/example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ int main() {
check("get_admin", conn.GetAdmin(admin));

fluss::TablePath table_path("fluss", "sample_table_cpp_v1");

// 2.1) Drop table if exists
std::cout << "Dropping table if exists..." << std::endl;
auto drop_result = admin.DropTable(table_path, true);
Expand All @@ -52,12 +52,16 @@ int main() {
std::cout << "Table drop result: " << drop_result.error_message << std::endl;
}

// 3) Schema & descriptor
// 3) Schema with scalar and temporal columns
auto schema = fluss::Schema::NewBuilder()
.AddColumn("id", fluss::DataType::Int)
.AddColumn("name", fluss::DataType::String)
.AddColumn("score", fluss::DataType::Float)
.AddColumn("age", fluss::DataType::Int)
.AddColumn("event_date", fluss::DataType::Date)
.AddColumn("event_time", fluss::DataType::Time)
.AddColumn("created_at", fluss::DataType::Timestamp)
.AddColumn("updated_at", fluss::DataType::TimestampLtz)
.Build();

auto descriptor = fluss::TableDescriptor::NewBuilder()
Expand All @@ -66,15 +70,14 @@ int main() {
.SetComment("cpp example table with 3 buckets")
.Build();

// 3.1) Create table with 3 buckets
std::cout << "Creating table with 3 buckets..." << std::endl;
check("create_table", admin.CreateTable(table_path, descriptor, false));

// 4) Get table
fluss::Table table;
check("get_table", conn.GetTable(table_path, table));

// 5) Writer
// 5) Write rows with scalar and temporal values
fluss::AppendWriter writer;
check("new_append_writer", table.NewAppendWriter(writer));

Expand All @@ -83,12 +86,26 @@ int main() {
const char* name;
float score;
int age;
fluss::Date date;
fluss::Time time;
fluss::Timestamp ts_ntz;
fluss::Timestamp ts_ltz;
};

auto tp_now = std::chrono::system_clock::now();
std::vector<RowData> rows = {
{1, "Alice", 95.2f, 25},
{2, "Bob", 87.2f, 30},
{3, "Charlie", 92.1f, 35},
{1, "Alice", 95.2f, 25,
fluss::Date::FromYMD(2024, 6, 15), fluss::Time::FromHMS(14, 30, 45),
fluss::Timestamp::FromTimePoint(tp_now),
fluss::Timestamp::FromMillis(1718467200000)},
{2, "Bob", 87.2f, 30,
fluss::Date::FromYMD(2025, 1, 1), fluss::Time::FromHMS(0, 0, 0),
fluss::Timestamp::FromMillis(1735689600000),
fluss::Timestamp::FromMillisNanos(1735689600000, 500000)},
{3, "Charlie", 92.1f, 35,
fluss::Date::FromYMD(1999, 12, 31), fluss::Time::FromHMS(23, 59, 59),
fluss::Timestamp::FromMillis(946684799999),
fluss::Timestamp::FromMillis(946684799999)},
};

for (const auto& r : rows) {
Expand All @@ -97,12 +114,16 @@ int main() {
row.SetString(1, r.name);
row.SetFloat32(2, r.score);
row.SetInt32(3, r.age);
row.SetDate(4, r.date);
row.SetTime(5, r.time);
row.SetTimestampNtz(6, r.ts_ntz);
row.SetTimestampLtz(7, r.ts_ltz);
check("append", writer.Append(row));
}
check("flush", writer.Flush());
std::cout << "Wrote " << rows.size() << " rows" << std::endl;

// 6) Scan
// 6) Full scan — verify all column types including temporal
fluss::LogScanner scanner;
check("new_log_scanner", table.NewScan().CreateLogScanner(scanner));

Expand All @@ -115,188 +136,216 @@ int main() {
fluss::ScanRecords records;
check("poll", scanner.Poll(5000, records));

std::cout << "Scanned records: " << records.records.size() << std::endl;
std::cout << "Scanned records: " << records.Size() << std::endl;
bool scan_ok = true;
for (const auto& rec : records.records) {
std::cout << " offset=" << rec.offset << " id=" << rec.row.fields[0].i32_val
<< " name=" << rec.row.fields[1].string_val
<< " score=" << rec.row.fields[2].f32_val << " age=" << rec.row.fields[3].i32_val
<< " ts=" << rec.timestamp << std::endl;
const auto& f = rec.row.fields;

if (f[4].type != fluss::DatumType::Date) {
std::cerr << "ERROR: field 4 expected Date, got "
<< static_cast<int>(f[4].type) << std::endl;
scan_ok = false;
}
if (f[5].type != fluss::DatumType::Time) {
std::cerr << "ERROR: field 5 expected Time, got "
<< static_cast<int>(f[5].type) << std::endl;
scan_ok = false;
}
if (f[6].type != fluss::DatumType::TimestampNtz) {
std::cerr << "ERROR: field 6 expected TimestampNtz, got "
<< static_cast<int>(f[6].type) << std::endl;
scan_ok = false;
}
if (f[7].type != fluss::DatumType::TimestampLtz) {
std::cerr << "ERROR: field 7 expected TimestampLtz, got "
<< static_cast<int>(f[7].type) << std::endl;
scan_ok = false;
}

auto date = f[4].GetDate();
auto time = f[5].GetTime();
auto ts_ntz = f[6].GetTimestamp();
auto ts_ltz = f[7].GetTimestamp();

std::cout << " id=" << f[0].i32_val
<< " name=" << f[1].string_val
<< " score=" << f[2].f32_val
<< " age=" << f[3].i32_val
<< " date=" << date.Year() << "-" << date.Month() << "-" << date.Day()
<< " time=" << time.Hour() << ":" << time.Minute() << ":" << time.Second()
<< " ts_ntz=" << ts_ntz.epoch_millis
<< " ts_ltz=" << ts_ltz.epoch_millis
<< "+" << ts_ltz.nano_of_millisecond << "ns"
<< std::endl;
}

if (!scan_ok) {
std::cerr << "Full scan type verification FAILED!" << std::endl;
std::exit(1);
}

// 7) Project only id (0) and name (1) columns
std::vector<size_t> projected_columns = {0, 1};

// 7) Projected scan — project [id, updated_at(TimestampLtz)] to verify
// NTZ/LTZ disambiguation works with column index remapping
std::vector<size_t> projected_columns = {0, 7};
fluss::LogScanner projected_scanner;
check("new_log_scanner_with_projection",
table.NewScan().Project(projected_columns).CreateLogScanner(projected_scanner));

for (int b = 0; b < buckets; ++b) {
check("subscribe_projected", projected_scanner.Subscribe(b, 0));
}

fluss::ScanRecords projected_records;
check("poll_projected", projected_scanner.Poll(5000, projected_records));

std::cout << "Projected records: " << projected_records.records.size() << std::endl;

bool projection_verified = true;
for (size_t i = 0; i < projected_records.records.size(); ++i) {
const auto& rec = projected_records.records[i];
const auto& row = rec.row;

if (row.fields.size() != projected_columns.size()) {
std::cerr << "ERROR: Record " << i << " has " << row.fields.size()
<< " fields, expected " << projected_columns.size() << std::endl;
projection_verified = false;

std::cout << "Projected records: " << projected_records.Size() << std::endl;
for (const auto& rec : projected_records.records) {
const auto& f = rec.row.fields;

if (f.size() != 2) {
std::cerr << "ERROR: expected 2 fields, got " << f.size() << std::endl;
scan_ok = false;
continue;
}

// Verify field types match expected columns
// Column 0 (id) should be Int32, Column 1 (name) should be String
if (row.fields[0].type != fluss::DatumType::Int32) {
std::cerr << "ERROR: Record " << i << " field 0 type mismatch, expected Int32" << std::endl;
projection_verified = false;
if (f[0].type != fluss::DatumType::Int32) {
std::cerr << "ERROR: projected field 0 expected Int32, got "
<< static_cast<int>(f[0].type) << std::endl;
scan_ok = false;
}
if (row.fields[1].type != fluss::DatumType::String) {
std::cerr << "ERROR: Record " << i << " field 1 type mismatch, expected String" << std::endl;
projection_verified = false;
}

// Print projected data
if (row.fields[0].type == fluss::DatumType::Int32 &&
row.fields[1].type == fluss::DatumType::String) {
std::cout << " Record " << i << ": id=" << row.fields[0].i32_val
<< ", name=" << row.fields[1].string_val << std::endl;
if (f[1].type != fluss::DatumType::TimestampLtz) {
std::cerr << "ERROR: projected field 1 expected TimestampLtz, got "
<< static_cast<int>(f[1].type) << std::endl;
scan_ok = false;
}

auto ts = f[1].GetTimestamp();
std::cout << " id=" << f[0].i32_val
<< " updated_at=" << ts.epoch_millis
<< "+" << ts.nano_of_millisecond << "ns" << std::endl;
}
if (projection_verified) {
std::cout << "Column pruning verification passed!" << std::endl;

if (scan_ok) {
std::cout << "Scan verification passed!" << std::endl;
} else {
std::cerr << "Column pruning verification failed!" << std::endl;
std::cerr << "Scan verification FAILED!" << std::endl;
std::exit(1);
}

// 8) List offsets examples
std::cout << "\n=== List Offsets Examples ===" << std::endl;

// 8.1) Query earliest offsets for all buckets

std::vector<int32_t> all_bucket_ids;
all_bucket_ids.reserve(buckets);
for (int b = 0; b < buckets; ++b) {
all_bucket_ids.push_back(b);
}

std::unordered_map<int32_t, int64_t> earliest_offsets;
check("list_earliest_offsets",
admin.ListOffsets(table_path, all_bucket_ids,
fluss::OffsetQuery::Earliest(),
check("list_earliest_offsets",
admin.ListOffsets(table_path, all_bucket_ids,
fluss::OffsetQuery::Earliest(),
earliest_offsets));
std::cout << "Earliest offsets:" << std::endl;
for (const auto& [bucket_id, offset] : earliest_offsets) {
std::cout << " Bucket " << bucket_id << ": offset=" << offset << std::endl;
}

// 8.2) Query latest offsets for all buckets

std::unordered_map<int32_t, int64_t> latest_offsets;
check("list_latest_offsets",
admin.ListOffsets(table_path, all_bucket_ids,
fluss::OffsetQuery::Latest(),
check("list_latest_offsets",
admin.ListOffsets(table_path, all_bucket_ids,
fluss::OffsetQuery::Latest(),
latest_offsets));
std::cout << "Latest offsets:" << std::endl;
for (const auto& [bucket_id, offset] : latest_offsets) {
std::cout << " Bucket " << bucket_id << ": offset=" << offset << std::endl;
}

// 8.3) Query offsets for a specific timestamp (current time - 1 hour)

auto now = std::chrono::system_clock::now();
auto one_hour_ago = now - std::chrono::hours(1);
auto timestamp_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
one_hour_ago.time_since_epoch()).count();

std::unordered_map<int32_t, int64_t> timestamp_offsets;
check("list_timestamp_offsets",
admin.ListOffsets(table_path, all_bucket_ids,
fluss::OffsetQuery::FromTimestamp(timestamp_ms),
check("list_timestamp_offsets",
admin.ListOffsets(table_path, all_bucket_ids,
fluss::OffsetQuery::FromTimestamp(timestamp_ms),
timestamp_offsets));
std::cout << "Offsets for timestamp " << timestamp_ms << " (1 hour ago):" << std::endl;
for (const auto& [bucket_id, offset] : timestamp_offsets) {
std::cout << " Bucket " << bucket_id << ": offset=" << offset << std::endl;
}
// 8.4) Use batch subscribe with offsets from list_offsets

// 9) Batch subscribe
std::cout << "\n=== Batch Subscribe Example ===" << std::endl;
fluss::LogScanner batch_scanner;
check("new_log_scanner_for_batch", table.NewScan().CreateLogScanner(batch_scanner));

std::vector<fluss::BucketSubscription> subscriptions;
for (const auto& [bucket_id, offset] : earliest_offsets) {
subscriptions.push_back({bucket_id, offset});
std::cout << "Preparing subscription: bucket=" << bucket_id
std::cout << "Preparing subscription: bucket=" << bucket_id
<< ", offset=" << offset << std::endl;
}

check("subscribe_buckets", batch_scanner.Subscribe(subscriptions));
std::cout << "Batch subscribed to " << subscriptions.size() << " buckets" << std::endl;

// 8.5) Poll and verify bucket_id in records

fluss::ScanRecords batch_records;
check("poll_batch", batch_scanner.Poll(5000, batch_records));

std::cout << "Scanned " << batch_records.Size() << " records from batch subscription" << std::endl;
for (size_t i = 0; i < batch_records.Size() && i < 5; ++i) {
const auto& rec = batch_records[i];
std::cout << " Record " << i << ": bucket_id=" << rec.bucket_id
<< ", offset=" << rec.offset
std::cout << " Record " << i << ": bucket_id=" << rec.bucket_id
<< ", offset=" << rec.offset
<< ", timestamp=" << rec.timestamp << std::endl;
}
if (batch_records.Size() > 5) {
std::cout << " ... and " << (batch_records.Size() - 5) << " more records" << std::endl;
}

// 9) Test the new Arrow record batch polling functionality
// 10) Arrow record batch polling
std::cout << "\n=== Testing Arrow Record Batch Polling ===" << std::endl;

fluss::LogScanner arrow_scanner;
check("new_record_batch_log_scanner", table.NewScan().CreateRecordBatchScanner(arrow_scanner));

// Subscribe to all buckets starting from offset 0

for (int b = 0; b < buckets; ++b) {
check("subscribe_arrow", arrow_scanner.Subscribe(b, 0));
}

fluss::ArrowRecordBatches arrow_batches;
check("poll_record_batch", arrow_scanner.PollRecordBatch(5000, arrow_batches));

std::cout << "Polled " << arrow_batches.Size() << " Arrow record batches" << std::endl;
for (size_t i = 0; i < arrow_batches.Size(); ++i) {
const auto& batch = arrow_batches[i];
if (batch->Available()) {
std::cout << " Batch " << i << ": " << batch->GetArrowRecordBatch()->num_rows() << " rows. " << std::endl;
std::cout << " Batch " << i << ": " << batch->GetArrowRecordBatch()->num_rows() << " rows" << std::endl;
} else {
std::cout << " Batch " << i << ": not available" << std::endl;
}
}
// 10) Test the new Arrow record batch polling with projection

// 11) Arrow record batch polling with projection
std::cout << "\n=== Testing Arrow Record Batch Polling with Projection ===" << std::endl;

fluss::LogScanner projected_arrow_scanner;
check("new_record_batch_log_scanner_with_projection",
table.NewScan().Project(projected_columns).CreateRecordBatchScanner(projected_arrow_scanner));

// Subscribe to all buckets starting from offset 0

for (int b = 0; b < buckets; ++b) {
check("subscribe_projected_arrow", projected_arrow_scanner.Subscribe(b, 0));
}

fluss::ArrowRecordBatches projected_arrow_batches;
check("poll_projected_record_batch", projected_arrow_scanner.PollRecordBatch(5000, projected_arrow_batches));

std::cout << "Polled " << projected_arrow_batches.Size() << " projected Arrow record batches" << std::endl;
for (size_t i = 0; i < projected_arrow_batches.Size(); ++i) {
const auto& batch = projected_arrow_batches[i];
if (batch->Available()) {
std::cout << " Batch " << i << ": " << batch->GetArrowRecordBatch()->num_rows() << " rows " << std::endl;
std::cout << " Batch " << i << ": " << batch->GetArrowRecordBatch()->num_rows() << " rows" << std::endl;
} else {
std::cout << " Batch " << i << ": not available" << std::endl;
}
Expand Down
Loading