Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 40 additions & 47 deletions bindings/cpp/examples/example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,40 +153,38 @@ int main() {
std::cout << "Scanned records: " << records.Size() << std::endl;
bool scan_ok = true;
for (const auto& rec : records.records) {
const auto& f = rec.row.fields;

if (f[4].type != fluss::DatumType::Date) {
std::cerr << "ERROR: field 4 expected Date, got " << static_cast<int>(f[4].type)
<< std::endl;
if (rec.row.GetType(4) != fluss::DatumType::Date) {
std::cerr << "ERROR: field 4 expected Date, got "
<< static_cast<int>(rec.row.GetType(4)) << std::endl;
scan_ok = false;
}
if (f[5].type != fluss::DatumType::Time) {
std::cerr << "ERROR: field 5 expected Time, got " << static_cast<int>(f[5].type)
<< std::endl;
if (rec.row.GetType(5) != fluss::DatumType::Time) {
std::cerr << "ERROR: field 5 expected Time, got "
<< static_cast<int>(rec.row.GetType(5)) << std::endl;
scan_ok = false;
}
if (f[6].type != fluss::DatumType::TimestampNtz) {
std::cerr << "ERROR: field 6 expected TimestampNtz, got " << static_cast<int>(f[6].type)
<< std::endl;
if (rec.row.GetType(6) != fluss::DatumType::TimestampNtz) {
std::cerr << "ERROR: field 6 expected TimestampNtz, got "
<< static_cast<int>(rec.row.GetType(6)) << std::endl;
scan_ok = false;
}
if (f[7].type != fluss::DatumType::TimestampLtz) {
std::cerr << "ERROR: field 7 expected TimestampLtz, got " << static_cast<int>(f[7].type)
<< std::endl;
if (rec.row.GetType(7) != fluss::DatumType::TimestampLtz) {
std::cerr << "ERROR: field 7 expected TimestampLtz, got "
<< static_cast<int>(rec.row.GetType(7)) << std::endl;
scan_ok = false;
}

auto date = f[4].GetDate();
auto time = f[5].GetTime();
auto ts_ntz = f[6].GetTimestamp();
auto ts_ltz = f[7].GetTimestamp();

std::cout << " id=" << f[0].i32_val << " name=" << f[1].string_val
<< " score=" << f[2].f32_val << " age=" << f[3].i32_val << " date=" << date.Year()
<< "-" << date.Month() << "-" << date.Day() << " time=" << time.Hour() << ":"
<< time.Minute() << ":" << time.Second() << " ts_ntz=" << ts_ntz.epoch_millis
<< " ts_ltz=" << ts_ltz.epoch_millis << "+" << ts_ltz.nano_of_millisecond << "ns"
<< std::endl;
auto date = rec.row.GetDate(4);
auto time = rec.row.GetTime(5);
auto ts_ntz = rec.row.GetTimestamp(6);
auto ts_ltz = rec.row.GetTimestamp(7);

std::cout << " id=" << rec.row.GetInt32(0) << " name=" << rec.row.GetString(1)
<< " score=" << rec.row.GetFloat32(2) << " age=" << rec.row.GetInt32(3)
<< " date=" << date.Year() << "-" << date.Month() << "-" << date.Day()
<< " time=" << time.Hour() << ":" << time.Minute() << ":" << time.Second()
<< " ts_ntz=" << ts_ntz.epoch_millis << " ts_ltz=" << ts_ltz.epoch_millis << "+"
<< ts_ltz.nano_of_millisecond << "ns" << std::endl;
}

if (!scan_ok) {
Expand All @@ -210,26 +208,24 @@ int main() {

std::cout << "Projected records: " << projected_records.Size() << std::endl;
for (const auto& rec : projected_records.records) {
const auto& f = rec.row.fields;

if (f.size() != 2) {
std::cerr << "ERROR: expected 2 fields, got " << f.size() << std::endl;
if (rec.row.FieldCount() != 2) {
std::cerr << "ERROR: expected 2 fields, got " << rec.row.FieldCount() << std::endl;
scan_ok = false;
continue;
}
if (f[0].type != fluss::DatumType::Int32) {
if (rec.row.GetType(0) != fluss::DatumType::Int32) {
std::cerr << "ERROR: projected field 0 expected Int32, got "
<< static_cast<int>(f[0].type) << std::endl;
<< static_cast<int>(rec.row.GetType(0)) << std::endl;
scan_ok = false;
}
if (f[1].type != fluss::DatumType::TimestampLtz) {
if (rec.row.GetType(1) != fluss::DatumType::TimestampLtz) {
std::cerr << "ERROR: projected field 1 expected TimestampLtz, got "
<< static_cast<int>(f[1].type) << std::endl;
<< static_cast<int>(rec.row.GetType(1)) << std::endl;
scan_ok = false;
}

auto ts = f[1].GetTimestamp();
std::cout << " id=" << f[0].i32_val << " updated_at=" << ts.epoch_millis << "+"
auto ts = rec.row.GetTimestamp(1);
std::cout << " id=" << rec.row.GetInt32(0) << " updated_at=" << ts.epoch_millis << "+"
<< ts.nano_of_millisecond << "ns" << std::endl;
}

Expand Down Expand Up @@ -428,12 +424,9 @@ int main() {

std::cout << "Scanned decimal records: " << decimal_records.Size() << std::endl;
for (const auto& rec : decimal_records) {
auto& price = rec.row.fields[1];
auto& amount = rec.row.fields[2];
std::cout << " id=" << rec.row.fields[0].i32_val << " price=" << price.DecimalToString()
<< " (raw=" << price.i64_val << ")"
<< " amount=" << amount.DecimalToString() << " is_decimal=" << price.IsDecimal()
<< std::endl;
std::cout << " id=" << rec.row.GetInt32(0) << " price=" << rec.row.DecimalToString(1)
<< " amount=" << rec.row.DecimalToString(2)
<< " is_decimal=" << rec.row.IsDecimal(1) << std::endl;
}

// 13) Partitioned table example
Expand Down Expand Up @@ -525,9 +518,9 @@ int main() {
<< std::endl;
for (size_t i = 0; i < partition_records.Size(); ++i) {
const auto& rec = partition_records[i];
std::cout << " Record " << i << ": id=" << rec.row.fields[0].i32_val
<< ", region=" << rec.row.fields[1].string_val
<< ", value=" << rec.row.fields[2].i64_val << std::endl;
std::cout << " Record " << i << ": id=" << rec.row.GetInt32(0)
<< ", region=" << rec.row.GetString(1) << ", value=" << rec.row.GetInt64(2)
<< std::endl;
}

// 13.2) subscribe_partition_buckets: batch subscribe to all partitions at once
Expand All @@ -551,9 +544,9 @@ int main() {
<< " records from batch partition subscription" << std::endl;
for (size_t i = 0; i < partition_batch_records.Size(); ++i) {
const auto& rec = partition_batch_records[i];
std::cout << " Record " << i << ": id=" << rec.row.fields[0].i32_val
<< ", region=" << rec.row.fields[1].string_val
<< ", value=" << rec.row.fields[2].i64_val << std::endl;
std::cout << " Record " << i << ": id=" << rec.row.GetInt32(0)
<< ", region=" << rec.row.GetString(1) << ", value=" << rec.row.GetInt64(2)
<< std::endl;
}

// Cleanup
Expand Down
106 changes: 92 additions & 14 deletions bindings/cpp/include/fluss.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <chrono>
#include <cstdint>
#include <memory>
#include <stdexcept>
#include <string>
#include <unordered_map>
#include <vector>
Expand Down Expand Up @@ -312,19 +313,13 @@ struct TableInfo {
Schema schema;
};

namespace detail {
struct FfiAccess;
}

struct Datum {
DatumType type{DatumType::Null};
bool bool_val{false};
int32_t i32_val{0};
int64_t i64_val{0};
float f32_val{0.0F};
double f64_val{0.0};
std::string string_val;
std::vector<uint8_t> bytes_val;
int32_t decimal_precision{0}; // Decimal: precision (total digits)
int32_t decimal_scale{0}; // Decimal: scale (digits after decimal point)
int64_t i128_hi{0}; // Decimal (i128): high 64 bits of unscaled value
int64_t i128_lo{0}; // Decimal (i128): low 64 bits of unscaled value
friend struct GenericRow;
friend struct detail::FfiAccess;

static Datum Null() { return {}; }
static Datum Bool(bool v) {
Expand Down Expand Up @@ -404,6 +399,29 @@ struct Datum {
return d;
}

private:
DatumType type{DatumType::Null};
bool bool_val{false};
int32_t i32_val{0};
int64_t i64_val{0};
float f32_val{0.0F};
double f64_val{0.0};
std::string string_val;
std::vector<uint8_t> bytes_val;
int32_t decimal_precision{0}; // Decimal: precision (total digits)
int32_t decimal_scale{0}; // Decimal: scale (digits after decimal point)
int64_t i128_hi{0}; // Decimal (i128): high 64 bits of unscaled value
int64_t i128_lo{0}; // Decimal (i128): low 64 bits of unscaled value

DatumType GetType() const { return type; }
bool IsNull() const { return type == DatumType::Null; }
bool GetBool() const { return bool_val; }
int32_t GetInt32() const { return i32_val; }
int64_t GetInt64() const { return i64_val; }
float GetFloat32() const { return f32_val; }
double GetFloat64() const { return f64_val; }
const std::string& GetString() const { return string_val; }
const std::vector<uint8_t>& GetBytes() const { return bytes_val; }
fluss::Date GetDate() const { return {i32_val}; }
fluss::Time GetTime() const { return {i32_val}; }
fluss::Timestamp GetTimestamp() const { return {i64_val, i32_val}; }
Expand All @@ -428,7 +446,6 @@ struct Datum {
return "";
}

private:
static std::string FormatUnscaled64(int64_t unscaled, int32_t scale) {
bool negative = unscaled < 0;
uint64_t abs_val =
Expand Down Expand Up @@ -469,7 +486,47 @@ struct Datum {
};

struct GenericRow {
std::vector<Datum> fields;
friend struct detail::FfiAccess;

size_t FieldCount() const { return fields.size(); }

DatumType GetType(size_t idx) const { return GetField(idx).GetType(); }
bool IsNull(size_t idx) const { return GetField(idx).IsNull(); }
bool GetBool(size_t idx) const { return GetTypedField(idx, DatumType::Bool).GetBool(); }
int32_t GetInt32(size_t idx) const { return GetTypedField(idx, DatumType::Int32).GetInt32(); }
int64_t GetInt64(size_t idx) const { return GetTypedField(idx, DatumType::Int64).GetInt64(); }
float GetFloat32(size_t idx) const {
return GetTypedField(idx, DatumType::Float32).GetFloat32();
}
double GetFloat64(size_t idx) const {
return GetTypedField(idx, DatumType::Float64).GetFloat64();
}
const std::string& GetString(size_t idx) const {
return GetTypedField(idx, DatumType::String).GetString();
}
const std::vector<uint8_t>& GetBytes(size_t idx) const {
return GetTypedField(idx, DatumType::Bytes).GetBytes();
}
fluss::Date GetDate(size_t idx) const { return GetTypedField(idx, DatumType::Date).GetDate(); }
fluss::Time GetTime(size_t idx) const { return GetTypedField(idx, DatumType::Time).GetTime(); }
fluss::Timestamp GetTimestamp(size_t idx) const {
const auto& d = GetField(idx);
auto t = d.GetType();
if (t != DatumType::TimestampNtz && t != DatumType::TimestampLtz) {
throw std::runtime_error("GenericRow: field " + std::to_string(idx) +
" is not a Timestamp type");
}
return d.GetTimestamp();
}
bool IsDecimal(size_t idx) const { return GetField(idx).IsDecimal(); }
std::string DecimalToString(size_t idx) const {
const auto& d = GetField(idx);
if (!d.IsDecimal()) {
throw std::runtime_error("GenericRow: field " + std::to_string(idx) +
" is not a Decimal type");
}
return d.DecimalToString();
}

void SetNull(size_t idx) {
EnsureSize(idx);
Expand Down Expand Up @@ -537,6 +594,27 @@ struct GenericRow {
}

private:
std::vector<Datum> fields;

const Datum& GetField(size_t idx) const {
if (idx >= fields.size()) {
throw std::runtime_error("GenericRow: index " + std::to_string(idx) +
" out of bounds (size=" + std::to_string(fields.size()) + ")");
}
return fields[idx];
}

const Datum& GetTypedField(size_t idx, DatumType expected) const {
const auto& d = GetField(idx);
if (d.GetType() != expected) {
throw std::runtime_error("GenericRow: field " + std::to_string(idx) +
" type mismatch: expected " +
std::to_string(static_cast<int>(expected)) + ", got " +
std::to_string(static_cast<int>(d.GetType())));
}
return d;
}

void EnsureSize(size_t idx) {
if (fields.size() <= idx) {
fields.resize(idx + 1);
Expand Down
Loading
Loading