Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ tokio = { version = "1.44.2", features = ["full"] }
clap = { version = "4.5.37", features = ["derive"] }
arrow = { version = "57.0.0", features = ["ipc_compression"] }

bigdecimal = "0.4"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
opendal = "0.53"
Expand Down
1 change: 1 addition & 0 deletions bindings/cpp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ crate-type = ["staticlib"]
[dependencies]
anyhow = "1.0"
arrow = { workspace = true, features = ["ffi"] }
bigdecimal = { workspace = true }
cxx = "1.0"
fluss = { workspace = true, features = ["storage-all"] }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
Expand Down
95 changes: 87 additions & 8 deletions bindings/cpp/examples/example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ int main() {

// 3) Schema with scalar and temporal columns
auto schema = fluss::Schema::NewBuilder()
.AddColumn("id", fluss::DataType::Int)
.AddColumn("name", fluss::DataType::String)
.AddColumn("score", fluss::DataType::Float)
.AddColumn("age", fluss::DataType::Int)
.AddColumn("event_date", fluss::DataType::Date)
.AddColumn("event_time", fluss::DataType::Time)
.AddColumn("created_at", fluss::DataType::Timestamp)
.AddColumn("updated_at", fluss::DataType::TimestampLtz)
.AddColumn("id", fluss::DataType::Int())
.AddColumn("name", fluss::DataType::String())
.AddColumn("score", fluss::DataType::Float())
.AddColumn("age", fluss::DataType::Int())
.AddColumn("event_date", fluss::DataType::Date())
.AddColumn("event_time", fluss::DataType::Time())
.AddColumn("created_at", fluss::DataType::Timestamp())
.AddColumn("updated_at", fluss::DataType::TimestampLtz())
.Build();

auto descriptor = fluss::TableDescriptor::NewBuilder()
Expand Down Expand Up @@ -131,6 +131,10 @@ int main() {
row.SetString(1, "AckTest");
row.SetFloat32(2, 99.9f);
row.SetInt32(3, 42);
row.SetDate(4, fluss::Date::FromYMD(2025, 3, 1));
row.SetTime(5, fluss::Time::FromHMS(12, 0, 0));
row.SetTimestampNtz(6, fluss::Timestamp::FromMillis(1740787200000));
row.SetTimestampLtz(7, fluss::Timestamp::FromMillis(1740787200000));
fluss::WriteResult wr;
check("append", writer.Append(row, wr));
check("wait", wr.Wait());
Expand Down Expand Up @@ -365,5 +369,80 @@ int main() {
}
}

// 12) Decimal support example
std::cout << "\n=== Decimal Support Example ===" << std::endl;

fluss::TablePath decimal_table_path("fluss", "decimal_table_cpp_v1");

// Drop table if exists
admin.DropTable(decimal_table_path, true);

// Create schema with decimal columns
auto decimal_schema = fluss::Schema::NewBuilder()
.AddColumn("id", fluss::DataType::Int())
.AddColumn("price", fluss::DataType::Decimal(10, 2)) // compact
.AddColumn("amount", fluss::DataType::Decimal(28, 8)) // i128
.Build();

auto decimal_descriptor = fluss::TableDescriptor::NewBuilder()
.SetSchema(decimal_schema)
.SetBucketCount(1)
.SetComment("cpp decimal example table")
.Build();

check("create_decimal_table", admin.CreateTable(decimal_table_path, decimal_descriptor, false));

// Get table and writer
fluss::Table decimal_table;
check("get_decimal_table", conn.GetTable(decimal_table_path, decimal_table));

fluss::AppendWriter decimal_writer;
check("new_decimal_writer", decimal_table.NewAppendWriter(decimal_writer));

// Just provide the value — Rust resolves (p,s) from schema
{
fluss::GenericRow row;
row.SetInt32(0, 1);
row.SetDecimal(1, "123.45"); // Rust knows DECIMAL(10,2)
row.SetDecimal(2, "1.00000000"); // Rust knows DECIMAL(28,8)
check("append_decimal", decimal_writer.Append(row));
}
{
fluss::GenericRow row;
row.SetInt32(0, 2);
row.SetDecimal(1, "-999.99");
row.SetDecimal(2, "3.14159265");
check("append_decimal", decimal_writer.Append(row));
}
{
fluss::GenericRow row;
row.SetInt32(0, 3);
row.SetDecimal(1, "500.00");
row.SetDecimal(2, "2.71828182");
check("append_decimal", decimal_writer.Append(row));
}
check("flush_decimal", decimal_writer.Flush());
std::cout << "Wrote 3 decimal rows" << std::endl;

// Scan and read back
fluss::LogScanner decimal_scanner;
check("new_decimal_scanner", decimal_table.NewScan().CreateLogScanner(decimal_scanner));
check("subscribe_decimal", decimal_scanner.Subscribe(0, 0));

fluss::ScanRecords decimal_records;
check("poll_decimal", decimal_scanner.Poll(5000, decimal_records));

std::cout << "Scanned decimal records: " << decimal_records.Size() << std::endl;
for (const auto& rec : decimal_records) {
auto& price = rec.row.fields[1];
auto& amount = rec.row.fields[2];
std::cout << " id=" << rec.row.fields[0].i32_val
<< " price=" << price.DecimalToString()
<< " (raw=" << price.i64_val << ")"
<< " amount=" << amount.DecimalToString()
<< " is_decimal=" << price.IsDecimal()
<< std::endl;
}

return 0;
}
121 changes: 117 additions & 4 deletions bindings/cpp/include/fluss.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
#pragma once

#include <chrono>
#include <cstdint>
#include <memory>
#include <optional>
#include <string>
#include <unordered_map>
#include <vector>
Expand Down Expand Up @@ -100,7 +100,7 @@ struct Timestamp {
}
};

enum class DataType {
enum class TypeId {
Boolean = 1,
TinyInt = 2,
SmallInt = 3,
Expand All @@ -114,6 +114,43 @@ enum class DataType {
Time = 11,
Timestamp = 12,
TimestampLtz = 13,
Decimal = 14,
};

class DataType {
public:
explicit DataType(TypeId id, int32_t p = 0, int32_t s = 0)
: id_(id), precision_(p), scale_(s) {}

static DataType Boolean() { return DataType(TypeId::Boolean); }
static DataType TinyInt() { return DataType(TypeId::TinyInt); }
static DataType SmallInt() { return DataType(TypeId::SmallInt); }
static DataType Int() { return DataType(TypeId::Int); }
static DataType BigInt() { return DataType(TypeId::BigInt); }
static DataType Float() { return DataType(TypeId::Float); }
static DataType Double() { return DataType(TypeId::Double); }
static DataType String() { return DataType(TypeId::String); }
static DataType Bytes() { return DataType(TypeId::Bytes); }
static DataType Date() { return DataType(TypeId::Date); }
static DataType Time() { return DataType(TypeId::Time); }
static DataType Timestamp(int32_t precision = 6) {
return DataType(TypeId::Timestamp, precision, 0);
}
static DataType TimestampLtz(int32_t precision = 6) {
return DataType(TypeId::TimestampLtz, precision, 0);
}
static DataType Decimal(int32_t precision, int32_t scale) {
return DataType(TypeId::Decimal, precision, scale);
}

TypeId id() const { return id_; }
int32_t precision() const { return precision_; }
int32_t scale() const { return scale_; }

private:
TypeId id_;
int32_t precision_{0};
int32_t scale_{0};
};

enum class DatumType {
Expand All @@ -125,7 +162,9 @@ enum class DatumType {
Float64 = 5,
String = 6,
Bytes = 7,
// 8-10 reserved for decimal types
DecimalI64 = 8,
DecimalI128 = 9,
DecimalString = 10,
Date = 11,
Time = 12,
TimestampNtz = 13,
Expand Down Expand Up @@ -182,7 +221,7 @@ struct Schema {
public:
Builder& AddColumn(std::string name, DataType type,
std::string comment = "") {
columns_.push_back({std::move(name), type, std::move(comment)});
columns_.push_back({std::move(name), std::move(type), std::move(comment)});
return *this;
}

Expand Down Expand Up @@ -290,6 +329,10 @@ struct Datum {
double f64_val{0.0};
std::string string_val;
std::vector<uint8_t> bytes_val;
int32_t decimal_precision{0}; // Decimal: precision (total digits)
int32_t decimal_scale{0}; // Decimal: scale (digits after decimal point)
int64_t i128_hi{0}; // Decimal (i128): high 64 bits of unscaled value
int64_t i128_lo{0}; // Decimal (i128): low 64 bits of unscaled value

static Datum Null() { return {}; }
static Datum Bool(bool v) {
Expand Down Expand Up @@ -360,10 +403,75 @@ struct Datum {
dat.i32_val = ts.nano_of_millisecond;
return dat;
}
// Stores the decimal string as-is. Rust side will parse via BigDecimal,
// look up (p,s) from the schema, validate, and create the Decimal.
static Datum DecimalString(std::string str) {
Datum d;
d.type = DatumType::DecimalString;
d.string_val = std::move(str);
return d;
}

fluss::Date GetDate() const { return {i32_val}; }
fluss::Time GetTime() const { return {i32_val}; }
fluss::Timestamp GetTimestamp() const { return {i64_val, i32_val}; }

bool IsDecimal() const {
return type == DatumType::DecimalI64 || type == DatumType::DecimalI128
|| type == DatumType::DecimalString;
}

std::string DecimalToString() const {
if (type == DatumType::DecimalI64) {
return FormatUnscaled64(i64_val, decimal_scale);
} else if (type == DatumType::DecimalI128) {
unsigned __int128 uval = (static_cast<unsigned __int128>(static_cast<uint64_t>(i128_hi)) << 64) |
static_cast<unsigned __int128>(static_cast<uint64_t>(i128_lo));
__int128 val = static_cast<__int128>(uval);
return FormatUnscaled128(val, decimal_scale);
} else if (type == DatumType::DecimalString) {
return string_val;
}
return "";
}

private:
static std::string FormatUnscaled64(int64_t unscaled, int32_t scale) {
bool negative = unscaled < 0;
uint64_t abs_val = negative ? -static_cast<uint64_t>(unscaled) : static_cast<uint64_t>(unscaled);
std::string digits = std::to_string(abs_val);
if (scale <= 0) {
return (negative ? "-" : "") + digits;
}
while (static_cast<int32_t>(digits.size()) <= scale) {
digits = "0" + digits;
}
auto pos = digits.size() - static_cast<size_t>(scale);
return (negative ? "-" : "") + digits.substr(0, pos) + "." + digits.substr(pos);
}

static std::string FormatUnscaled128(__int128 val, int32_t scale) {
bool negative = val < 0;
unsigned __int128 abs_val = negative ? -static_cast<unsigned __int128>(val)
: static_cast<unsigned __int128>(val);
std::string digits;
if (abs_val == 0) {
digits = "0";
} else {
while (abs_val > 0) {
digits = static_cast<char>('0' + static_cast<int>(abs_val % 10)) + digits;
abs_val /= 10;
}
}
if (scale <= 0) {
return (negative ? "-" : "") + digits;
}
while (static_cast<int32_t>(digits.size()) <= scale) {
digits = "0" + digits;
}
auto pos = digits.size() - static_cast<size_t>(scale);
return (negative ? "-" : "") + digits.substr(0, pos) + "." + digits.substr(pos);
}
};

struct GenericRow {
Expand Down Expand Up @@ -429,6 +537,11 @@ struct GenericRow {
fields[idx] = Datum::TimestampLtz(ts);
}

void SetDecimal(size_t idx, const std::string& value) {
EnsureSize(idx);
fields[idx] = Datum::DecimalString(value);
}

private:
void EnsureSize(size_t idx) {
if (fields.size() <= idx) {
Expand Down
14 changes: 12 additions & 2 deletions bindings/cpp/src/ffi_converter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ inline ffi::FfiTablePath to_ffi_table_path(const TablePath& path) {
inline ffi::FfiColumn to_ffi_column(const Column& col) {
ffi::FfiColumn ffi_col;
ffi_col.name = rust::String(col.name);
ffi_col.data_type = static_cast<int32_t>(col.data_type);
ffi_col.data_type = static_cast<int32_t>(col.data_type.id());
ffi_col.comment = rust::String(col.comment);
ffi_col.precision = col.data_type.precision();
ffi_col.scale = col.data_type.scale();
return ffi_col;
}

Expand Down Expand Up @@ -112,6 +114,10 @@ inline ffi::FfiDatum to_ffi_datum(const Datum& datum) {
ffi_datum.f32_val = datum.f32_val;
ffi_datum.f64_val = datum.f64_val;
ffi_datum.string_val = rust::String(datum.string_val);
ffi_datum.decimal_precision = datum.decimal_precision;
ffi_datum.decimal_scale = datum.decimal_scale;
ffi_datum.i128_hi = datum.i128_hi;
ffi_datum.i128_lo = datum.i128_lo;

rust::Vec<uint8_t> bytes;
for (auto b : datum.bytes_val) {
Expand All @@ -137,7 +143,7 @@ inline ffi::FfiGenericRow to_ffi_generic_row(const GenericRow& row) {
inline Column from_ffi_column(const ffi::FfiColumn& ffi_col) {
return Column{
std::string(ffi_col.name),
static_cast<DataType>(ffi_col.data_type),
DataType(static_cast<TypeId>(ffi_col.data_type), ffi_col.precision, ffi_col.scale),
std::string(ffi_col.comment)};
}

Expand Down Expand Up @@ -202,6 +208,10 @@ inline Datum from_ffi_datum(const ffi::FfiDatum& ffi_datum) {
datum.f64_val = ffi_datum.f64_val;
// todo: avoid copy string
datum.string_val = std::string(ffi_datum.string_val);
datum.decimal_precision = ffi_datum.decimal_precision;
datum.decimal_scale = ffi_datum.decimal_scale;
datum.i128_hi = ffi_datum.i128_hi;
datum.i128_lo = ffi_datum.i128_lo;

for (auto b : ffi_datum.bytes_val) {
datum.bytes_val.push_back(b);
Expand Down
Loading
Loading