diff --git a/bindings/cpp/examples/example.cpp b/bindings/cpp/examples/example.cpp index e6f9619b..f568422e 100644 --- a/bindings/cpp/examples/example.cpp +++ b/bindings/cpp/examples/example.cpp @@ -153,40 +153,38 @@ int main() { std::cout << "Scanned records: " << records.Size() << std::endl; bool scan_ok = true; for (const auto& rec : records.records) { - const auto& f = rec.row.fields; - - if (f[4].type != fluss::DatumType::Date) { - std::cerr << "ERROR: field 4 expected Date, got " << static_cast(f[4].type) - << std::endl; + if (rec.row.GetType(4) != fluss::DatumType::Date) { + std::cerr << "ERROR: field 4 expected Date, got " + << static_cast(rec.row.GetType(4)) << std::endl; scan_ok = false; } - if (f[5].type != fluss::DatumType::Time) { - std::cerr << "ERROR: field 5 expected Time, got " << static_cast(f[5].type) - << std::endl; + if (rec.row.GetType(5) != fluss::DatumType::Time) { + std::cerr << "ERROR: field 5 expected Time, got " + << static_cast(rec.row.GetType(5)) << std::endl; scan_ok = false; } - if (f[6].type != fluss::DatumType::TimestampNtz) { - std::cerr << "ERROR: field 6 expected TimestampNtz, got " << static_cast(f[6].type) - << std::endl; + if (rec.row.GetType(6) != fluss::DatumType::TimestampNtz) { + std::cerr << "ERROR: field 6 expected TimestampNtz, got " + << static_cast(rec.row.GetType(6)) << std::endl; scan_ok = false; } - if (f[7].type != fluss::DatumType::TimestampLtz) { - std::cerr << "ERROR: field 7 expected TimestampLtz, got " << static_cast(f[7].type) - << std::endl; + if (rec.row.GetType(7) != fluss::DatumType::TimestampLtz) { + std::cerr << "ERROR: field 7 expected TimestampLtz, got " + << static_cast(rec.row.GetType(7)) << std::endl; scan_ok = false; } - auto date = f[4].GetDate(); - auto time = f[5].GetTime(); - auto ts_ntz = f[6].GetTimestamp(); - auto ts_ltz = f[7].GetTimestamp(); - - std::cout << " id=" << f[0].i32_val << " name=" << f[1].string_val - << " score=" << f[2].f32_val << " age=" << f[3].i32_val << " date=" << date.Year() - << "-" << date.Month() << "-" << date.Day() << " time=" << time.Hour() << ":" - << time.Minute() << ":" << time.Second() << " ts_ntz=" << ts_ntz.epoch_millis - << " ts_ltz=" << ts_ltz.epoch_millis << "+" << ts_ltz.nano_of_millisecond << "ns" - << std::endl; + auto date = rec.row.GetDate(4); + auto time = rec.row.GetTime(5); + auto ts_ntz = rec.row.GetTimestamp(6); + auto ts_ltz = rec.row.GetTimestamp(7); + + std::cout << " id=" << rec.row.GetInt32(0) << " name=" << rec.row.GetString(1) + << " score=" << rec.row.GetFloat32(2) << " age=" << rec.row.GetInt32(3) + << " date=" << date.Year() << "-" << date.Month() << "-" << date.Day() + << " time=" << time.Hour() << ":" << time.Minute() << ":" << time.Second() + << " ts_ntz=" << ts_ntz.epoch_millis << " ts_ltz=" << ts_ltz.epoch_millis << "+" + << ts_ltz.nano_of_millisecond << "ns" << std::endl; } if (!scan_ok) { @@ -210,26 +208,24 @@ int main() { std::cout << "Projected records: " << projected_records.Size() << std::endl; for (const auto& rec : projected_records.records) { - const auto& f = rec.row.fields; - - if (f.size() != 2) { - std::cerr << "ERROR: expected 2 fields, got " << f.size() << std::endl; + if (rec.row.FieldCount() != 2) { + std::cerr << "ERROR: expected 2 fields, got " << rec.row.FieldCount() << std::endl; scan_ok = false; continue; } - if (f[0].type != fluss::DatumType::Int32) { + if (rec.row.GetType(0) != fluss::DatumType::Int32) { std::cerr << "ERROR: projected field 0 expected Int32, got " - << static_cast(f[0].type) << std::endl; + << static_cast(rec.row.GetType(0)) << std::endl; scan_ok = false; } - if (f[1].type != fluss::DatumType::TimestampLtz) { + if (rec.row.GetType(1) != fluss::DatumType::TimestampLtz) { std::cerr << "ERROR: projected field 1 expected TimestampLtz, got " - << static_cast(f[1].type) << std::endl; + << static_cast(rec.row.GetType(1)) << std::endl; scan_ok = false; } - auto ts = f[1].GetTimestamp(); - std::cout << " id=" << f[0].i32_val << " updated_at=" << ts.epoch_millis << "+" + auto ts = rec.row.GetTimestamp(1); + std::cout << " id=" << rec.row.GetInt32(0) << " updated_at=" << ts.epoch_millis << "+" << ts.nano_of_millisecond << "ns" << std::endl; } @@ -428,12 +424,9 @@ int main() { std::cout << "Scanned decimal records: " << decimal_records.Size() << std::endl; for (const auto& rec : decimal_records) { - auto& price = rec.row.fields[1]; - auto& amount = rec.row.fields[2]; - std::cout << " id=" << rec.row.fields[0].i32_val << " price=" << price.DecimalToString() - << " (raw=" << price.i64_val << ")" - << " amount=" << amount.DecimalToString() << " is_decimal=" << price.IsDecimal() - << std::endl; + std::cout << " id=" << rec.row.GetInt32(0) << " price=" << rec.row.DecimalToString(1) + << " amount=" << rec.row.DecimalToString(2) + << " is_decimal=" << rec.row.IsDecimal(1) << std::endl; } // 13) Partitioned table example @@ -525,9 +518,9 @@ int main() { << std::endl; for (size_t i = 0; i < partition_records.Size(); ++i) { const auto& rec = partition_records[i]; - std::cout << " Record " << i << ": id=" << rec.row.fields[0].i32_val - << ", region=" << rec.row.fields[1].string_val - << ", value=" << rec.row.fields[2].i64_val << std::endl; + std::cout << " Record " << i << ": id=" << rec.row.GetInt32(0) + << ", region=" << rec.row.GetString(1) << ", value=" << rec.row.GetInt64(2) + << std::endl; } // 13.2) subscribe_partition_buckets: batch subscribe to all partitions at once @@ -551,9 +544,9 @@ int main() { << " records from batch partition subscription" << std::endl; for (size_t i = 0; i < partition_batch_records.Size(); ++i) { const auto& rec = partition_batch_records[i]; - std::cout << " Record " << i << ": id=" << rec.row.fields[0].i32_val - << ", region=" << rec.row.fields[1].string_val - << ", value=" << rec.row.fields[2].i64_val << std::endl; + std::cout << " Record " << i << ": id=" << rec.row.GetInt32(0) + << ", region=" << rec.row.GetString(1) << ", value=" << rec.row.GetInt64(2) + << std::endl; } // Cleanup diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp index 3a104455..6b9d479a 100644 --- a/bindings/cpp/include/fluss.hpp +++ b/bindings/cpp/include/fluss.hpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -312,19 +313,13 @@ struct TableInfo { Schema schema; }; +namespace detail { +struct FfiAccess; +} + struct Datum { - DatumType type{DatumType::Null}; - bool bool_val{false}; - int32_t i32_val{0}; - int64_t i64_val{0}; - float f32_val{0.0F}; - double f64_val{0.0}; - std::string string_val; - std::vector bytes_val; - int32_t decimal_precision{0}; // Decimal: precision (total digits) - int32_t decimal_scale{0}; // Decimal: scale (digits after decimal point) - int64_t i128_hi{0}; // Decimal (i128): high 64 bits of unscaled value - int64_t i128_lo{0}; // Decimal (i128): low 64 bits of unscaled value + friend struct GenericRow; + friend struct detail::FfiAccess; static Datum Null() { return {}; } static Datum Bool(bool v) { @@ -404,6 +399,29 @@ struct Datum { return d; } + private: + DatumType type{DatumType::Null}; + bool bool_val{false}; + int32_t i32_val{0}; + int64_t i64_val{0}; + float f32_val{0.0F}; + double f64_val{0.0}; + std::string string_val; + std::vector bytes_val; + int32_t decimal_precision{0}; // Decimal: precision (total digits) + int32_t decimal_scale{0}; // Decimal: scale (digits after decimal point) + int64_t i128_hi{0}; // Decimal (i128): high 64 bits of unscaled value + int64_t i128_lo{0}; // Decimal (i128): low 64 bits of unscaled value + + DatumType GetType() const { return type; } + bool IsNull() const { return type == DatumType::Null; } + bool GetBool() const { return bool_val; } + int32_t GetInt32() const { return i32_val; } + int64_t GetInt64() const { return i64_val; } + float GetFloat32() const { return f32_val; } + double GetFloat64() const { return f64_val; } + const std::string& GetString() const { return string_val; } + const std::vector& GetBytes() const { return bytes_val; } fluss::Date GetDate() const { return {i32_val}; } fluss::Time GetTime() const { return {i32_val}; } fluss::Timestamp GetTimestamp() const { return {i64_val, i32_val}; } @@ -428,7 +446,6 @@ struct Datum { return ""; } - private: static std::string FormatUnscaled64(int64_t unscaled, int32_t scale) { bool negative = unscaled < 0; uint64_t abs_val = @@ -469,7 +486,47 @@ struct Datum { }; struct GenericRow { - std::vector fields; + friend struct detail::FfiAccess; + + size_t FieldCount() const { return fields.size(); } + + DatumType GetType(size_t idx) const { return GetField(idx).GetType(); } + bool IsNull(size_t idx) const { return GetField(idx).IsNull(); } + bool GetBool(size_t idx) const { return GetTypedField(idx, DatumType::Bool).GetBool(); } + int32_t GetInt32(size_t idx) const { return GetTypedField(idx, DatumType::Int32).GetInt32(); } + int64_t GetInt64(size_t idx) const { return GetTypedField(idx, DatumType::Int64).GetInt64(); } + float GetFloat32(size_t idx) const { + return GetTypedField(idx, DatumType::Float32).GetFloat32(); + } + double GetFloat64(size_t idx) const { + return GetTypedField(idx, DatumType::Float64).GetFloat64(); + } + const std::string& GetString(size_t idx) const { + return GetTypedField(idx, DatumType::String).GetString(); + } + const std::vector& GetBytes(size_t idx) const { + return GetTypedField(idx, DatumType::Bytes).GetBytes(); + } + fluss::Date GetDate(size_t idx) const { return GetTypedField(idx, DatumType::Date).GetDate(); } + fluss::Time GetTime(size_t idx) const { return GetTypedField(idx, DatumType::Time).GetTime(); } + fluss::Timestamp GetTimestamp(size_t idx) const { + const auto& d = GetField(idx); + auto t = d.GetType(); + if (t != DatumType::TimestampNtz && t != DatumType::TimestampLtz) { + throw std::runtime_error("GenericRow: field " + std::to_string(idx) + + " is not a Timestamp type"); + } + return d.GetTimestamp(); + } + bool IsDecimal(size_t idx) const { return GetField(idx).IsDecimal(); } + std::string DecimalToString(size_t idx) const { + const auto& d = GetField(idx); + if (!d.IsDecimal()) { + throw std::runtime_error("GenericRow: field " + std::to_string(idx) + + " is not a Decimal type"); + } + return d.DecimalToString(); + } void SetNull(size_t idx) { EnsureSize(idx); @@ -537,6 +594,27 @@ struct GenericRow { } private: + std::vector fields; + + const Datum& GetField(size_t idx) const { + if (idx >= fields.size()) { + throw std::runtime_error("GenericRow: index " + std::to_string(idx) + + " out of bounds (size=" + std::to_string(fields.size()) + ")"); + } + return fields[idx]; + } + + const Datum& GetTypedField(size_t idx, DatumType expected) const { + const auto& d = GetField(idx); + if (d.GetType() != expected) { + throw std::runtime_error("GenericRow: field " + std::to_string(idx) + + " type mismatch: expected " + + std::to_string(static_cast(expected)) + ", got " + + std::to_string(static_cast(d.GetType()))); + } + return d; + } + void EnsureSize(size_t idx) { if (fields.size() <= idx) { fields.resize(idx + 1); diff --git a/bindings/cpp/src/ffi_converter.hpp b/bindings/cpp/src/ffi_converter.hpp index e3e63a85..8adcd01b 100644 --- a/bindings/cpp/src/ffi_converter.hpp +++ b/bindings/cpp/src/ffi_converter.hpp @@ -23,15 +23,94 @@ #include "lib.rs.h" namespace fluss { + +namespace detail { +struct FfiAccess { + static const std::vector& fields(const GenericRow& row) { return row.fields; } + static std::vector& fields(GenericRow& row) { return row.fields; } + + static ffi::FfiDatum to_ffi_datum(const Datum& datum) { + ffi::FfiDatum ffi_datum; + ffi_datum.datum_type = static_cast(datum.type); + ffi_datum.bool_val = datum.bool_val; + ffi_datum.i32_val = datum.i32_val; + ffi_datum.i64_val = datum.i64_val; + ffi_datum.f32_val = datum.f32_val; + ffi_datum.f64_val = datum.f64_val; + ffi_datum.string_val = rust::String(datum.string_val); + ffi_datum.decimal_precision = datum.decimal_precision; + ffi_datum.decimal_scale = datum.decimal_scale; + ffi_datum.i128_hi = datum.i128_hi; + ffi_datum.i128_lo = datum.i128_lo; + + rust::Vec bytes; + for (auto b : datum.bytes_val) { + bytes.push_back(b); + } + ffi_datum.bytes_val = std::move(bytes); + + return ffi_datum; + } + + static Datum from_ffi_datum(const ffi::FfiDatum& ffi_datum) { + auto dtype = static_cast(ffi_datum.datum_type); + switch (dtype) { + case DatumType::Null: + return Datum::Null(); + case DatumType::Bool: + return Datum::Bool(ffi_datum.bool_val); + case DatumType::Int32: + return Datum::Int32(ffi_datum.i32_val); + case DatumType::Int64: + return Datum::Int64(ffi_datum.i64_val); + case DatumType::Float32: + return Datum::Float32(ffi_datum.f32_val); + case DatumType::Float64: + return Datum::Float64(ffi_datum.f64_val); + case DatumType::String: + return Datum::String(std::string(ffi_datum.string_val)); + case DatumType::Bytes: { + std::vector bytes; + for (auto b : ffi_datum.bytes_val) { + bytes.push_back(b); + } + return Datum::Bytes(std::move(bytes)); + } + case DatumType::Date: + return Datum::Date(fluss::Date{ffi_datum.i32_val}); + case DatumType::Time: + return Datum::Time(fluss::Time{ffi_datum.i32_val}); + case DatumType::TimestampNtz: + return Datum::TimestampNtz(fluss::Timestamp{ffi_datum.i64_val, ffi_datum.i32_val}); + case DatumType::TimestampLtz: + return Datum::TimestampLtz(fluss::Timestamp{ffi_datum.i64_val, ffi_datum.i32_val}); + case DatumType::DecimalI64: + case DatumType::DecimalI128: + case DatumType::DecimalString: { + Datum d; + d.type = dtype; + d.i64_val = ffi_datum.i64_val; + d.decimal_precision = ffi_datum.decimal_precision; + d.decimal_scale = ffi_datum.decimal_scale; + d.i128_hi = ffi_datum.i128_hi; + d.i128_lo = ffi_datum.i128_lo; + if (dtype == DatumType::DecimalString) { + d.string_val = std::string(ffi_datum.string_val); + } + return d; + } + default: + return Datum::Null(); + } + } +}; +} // namespace detail + namespace utils { -inline Result make_error(int32_t code, std::string msg) { - return Result{code, std::move(msg)}; -} +inline Result make_error(int32_t code, std::string msg) { return Result{code, std::move(msg)}; } -inline Result make_ok() { - return Result{0, {}}; -} +inline Result make_ok() { return Result{0, {}}; } inline Result from_ffi_result(const ffi::FfiResult& ffi_result) { return Result{ffi_result.error_code, std::string(ffi_result.error_message)}; @@ -105,37 +184,14 @@ inline ffi::FfiTableDescriptor to_ffi_table_descriptor(const TableDescriptor& de return ffi_desc; } -inline ffi::FfiDatum to_ffi_datum(const Datum& datum) { - ffi::FfiDatum ffi_datum; - ffi_datum.datum_type = static_cast(datum.type); - ffi_datum.bool_val = datum.bool_val; - ffi_datum.i32_val = datum.i32_val; - ffi_datum.i64_val = datum.i64_val; - ffi_datum.f32_val = datum.f32_val; - ffi_datum.f64_val = datum.f64_val; - ffi_datum.string_val = rust::String(datum.string_val); - ffi_datum.decimal_precision = datum.decimal_precision; - ffi_datum.decimal_scale = datum.decimal_scale; - ffi_datum.i128_hi = datum.i128_hi; - ffi_datum.i128_lo = datum.i128_lo; - - rust::Vec bytes; - for (auto b : datum.bytes_val) { - bytes.push_back(b); - } - ffi_datum.bytes_val = std::move(bytes); - - return ffi_datum; -} - inline ffi::FfiGenericRow to_ffi_generic_row(const GenericRow& row) { ffi::FfiGenericRow ffi_row; - rust::Vec fields; - for (const auto& field : row.fields) { - fields.push_back(to_ffi_datum(field)); + rust::Vec ffi_fields; + for (const auto& field : detail::FfiAccess::fields(row)) { + ffi_fields.push_back(detail::FfiAccess::to_ffi_datum(field)); } - ffi_row.fields = std::move(fields); + ffi_row.fields = std::move(ffi_fields); return ffi_row; } @@ -166,9 +222,8 @@ inline TableInfo from_ffi_table_info(const ffi::FfiTableInfo& ffi_info) { info.table_id = ffi_info.table_id; info.schema_id = ffi_info.schema_id; - info.table_path = TablePath{ - std::string(ffi_info.table_path.database_name), - std::string(ffi_info.table_path.table_name)}; + info.table_path = TablePath{std::string(ffi_info.table_path.database_name), + std::string(ffi_info.table_path.table_name)}; info.created_time = ffi_info.created_time; info.modified_time = ffi_info.modified_time; @@ -198,44 +253,19 @@ inline TableInfo from_ffi_table_info(const ffi::FfiTableInfo& ffi_info) { return info; } -inline Datum from_ffi_datum(const ffi::FfiDatum& ffi_datum) { - Datum datum; - datum.type = static_cast(ffi_datum.datum_type); - datum.bool_val = ffi_datum.bool_val; - datum.i32_val = ffi_datum.i32_val; - datum.i64_val = ffi_datum.i64_val; - datum.f32_val = ffi_datum.f32_val; - datum.f64_val = ffi_datum.f64_val; - // todo: avoid copy string - datum.string_val = std::string(ffi_datum.string_val); - datum.decimal_precision = ffi_datum.decimal_precision; - datum.decimal_scale = ffi_datum.decimal_scale; - datum.i128_hi = ffi_datum.i128_hi; - datum.i128_lo = ffi_datum.i128_lo; - - for (auto b : ffi_datum.bytes_val) { - datum.bytes_val.push_back(b); - } - - return datum; -} - inline GenericRow from_ffi_generic_row(const ffi::FfiGenericRow& ffi_row) { GenericRow row; for (const auto& field : ffi_row.fields) { - row.fields.push_back(from_ffi_datum(field)); + detail::FfiAccess::fields(row).push_back(detail::FfiAccess::from_ffi_datum(field)); } return row; } inline ScanRecord from_ffi_scan_record(const ffi::FfiScanRecord& ffi_record) { - return ScanRecord{ - ffi_record.bucket_id, - ffi_record.offset, - ffi_record.timestamp, - from_ffi_generic_row(ffi_record.row)}; + return ScanRecord{ffi_record.bucket_id, ffi_record.offset, ffi_record.timestamp, + from_ffi_generic_row(ffi_record.row)}; } inline ScanRecords from_ffi_scan_records(const ffi::FfiScanRecords& ffi_records) { @@ -253,11 +283,8 @@ inline LakeSnapshot from_ffi_lake_snapshot(const ffi::FfiLakeSnapshot& ffi_snaps snapshot.snapshot_id = ffi_snapshot.snapshot_id; for (const auto& offset : ffi_snapshot.bucket_offsets) { - snapshot.bucket_offsets.push_back(BucketOffset{ - offset.table_id, - offset.partition_id, - offset.bucket_id, - offset.offset}); + snapshot.bucket_offsets.push_back( + BucketOffset{offset.table_id, offset.partition_id, offset.bucket_id, offset.offset}); } return snapshot;