diff --git a/Cargo.toml b/Cargo.toml index dfddd8d4..77d71400 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,7 @@ tokio = { version = "1.44.2", features = ["full"] } clap = { version = "4.5.37", features = ["derive"] } arrow = { version = "57.0.0", features = ["ipc_compression"] } +bigdecimal = "0.4" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" opendal = "0.53" diff --git a/bindings/cpp/Cargo.toml b/bindings/cpp/Cargo.toml index 8606a226..26816522 100644 --- a/bindings/cpp/Cargo.toml +++ b/bindings/cpp/Cargo.toml @@ -29,6 +29,7 @@ crate-type = ["staticlib"] [dependencies] anyhow = "1.0" arrow = { workspace = true, features = ["ffi"] } +bigdecimal = { workspace = true } cxx = "1.0" fluss = { workspace = true, features = ["storage-all"] } tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } diff --git a/bindings/cpp/examples/example.cpp b/bindings/cpp/examples/example.cpp index 92ebe9cf..efdf2e80 100644 --- a/bindings/cpp/examples/example.cpp +++ b/bindings/cpp/examples/example.cpp @@ -54,14 +54,14 @@ int main() { // 3) Schema with scalar and temporal columns auto schema = fluss::Schema::NewBuilder() - .AddColumn("id", fluss::DataType::Int) - .AddColumn("name", fluss::DataType::String) - .AddColumn("score", fluss::DataType::Float) - .AddColumn("age", fluss::DataType::Int) - .AddColumn("event_date", fluss::DataType::Date) - .AddColumn("event_time", fluss::DataType::Time) - .AddColumn("created_at", fluss::DataType::Timestamp) - .AddColumn("updated_at", fluss::DataType::TimestampLtz) + .AddColumn("id", fluss::DataType::Int()) + .AddColumn("name", fluss::DataType::String()) + .AddColumn("score", fluss::DataType::Float()) + .AddColumn("age", fluss::DataType::Int()) + .AddColumn("event_date", fluss::DataType::Date()) + .AddColumn("event_time", fluss::DataType::Time()) + .AddColumn("created_at", fluss::DataType::Timestamp()) + .AddColumn("updated_at", fluss::DataType::TimestampLtz()) .Build(); auto descriptor = fluss::TableDescriptor::NewBuilder() @@ -131,6 +131,10 @@ int main() { row.SetString(1, "AckTest"); row.SetFloat32(2, 99.9f); row.SetInt32(3, 42); + row.SetDate(4, fluss::Date::FromYMD(2025, 3, 1)); + row.SetTime(5, fluss::Time::FromHMS(12, 0, 0)); + row.SetTimestampNtz(6, fluss::Timestamp::FromMillis(1740787200000)); + row.SetTimestampLtz(7, fluss::Timestamp::FromMillis(1740787200000)); fluss::WriteResult wr; check("append", writer.Append(row, wr)); check("wait", wr.Wait()); @@ -365,5 +369,80 @@ int main() { } } + // 12) Decimal support example + std::cout << "\n=== Decimal Support Example ===" << std::endl; + + fluss::TablePath decimal_table_path("fluss", "decimal_table_cpp_v1"); + + // Drop table if exists + admin.DropTable(decimal_table_path, true); + + // Create schema with decimal columns + auto decimal_schema = fluss::Schema::NewBuilder() + .AddColumn("id", fluss::DataType::Int()) + .AddColumn("price", fluss::DataType::Decimal(10, 2)) // compact + .AddColumn("amount", fluss::DataType::Decimal(28, 8)) // i128 + .Build(); + + auto decimal_descriptor = fluss::TableDescriptor::NewBuilder() + .SetSchema(decimal_schema) + .SetBucketCount(1) + .SetComment("cpp decimal example table") + .Build(); + + check("create_decimal_table", admin.CreateTable(decimal_table_path, decimal_descriptor, false)); + + // Get table and writer + fluss::Table decimal_table; + check("get_decimal_table", conn.GetTable(decimal_table_path, decimal_table)); + + fluss::AppendWriter decimal_writer; + check("new_decimal_writer", decimal_table.NewAppendWriter(decimal_writer)); + + // Just provide the value — Rust resolves (p,s) from schema + { + fluss::GenericRow row; + row.SetInt32(0, 1); + row.SetDecimal(1, "123.45"); // Rust knows DECIMAL(10,2) + row.SetDecimal(2, "1.00000000"); // Rust knows DECIMAL(28,8) + check("append_decimal", decimal_writer.Append(row)); + } + { + fluss::GenericRow row; + row.SetInt32(0, 2); + row.SetDecimal(1, "-999.99"); + row.SetDecimal(2, "3.14159265"); + check("append_decimal", decimal_writer.Append(row)); + } + { + fluss::GenericRow row; + row.SetInt32(0, 3); + row.SetDecimal(1, "500.00"); + row.SetDecimal(2, "2.71828182"); + check("append_decimal", decimal_writer.Append(row)); + } + check("flush_decimal", decimal_writer.Flush()); + std::cout << "Wrote 3 decimal rows" << std::endl; + + // Scan and read back + fluss::LogScanner decimal_scanner; + check("new_decimal_scanner", decimal_table.NewScan().CreateLogScanner(decimal_scanner)); + check("subscribe_decimal", decimal_scanner.Subscribe(0, 0)); + + fluss::ScanRecords decimal_records; + check("poll_decimal", decimal_scanner.Poll(5000, decimal_records)); + + std::cout << "Scanned decimal records: " << decimal_records.Size() << std::endl; + for (const auto& rec : decimal_records) { + auto& price = rec.row.fields[1]; + auto& amount = rec.row.fields[2]; + std::cout << " id=" << rec.row.fields[0].i32_val + << " price=" << price.DecimalToString() + << " (raw=" << price.i64_val << ")" + << " amount=" << amount.DecimalToString() + << " is_decimal=" << price.IsDecimal() + << std::endl; + } + return 0; } diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp index 239d9a47..8125c49c 100644 --- a/bindings/cpp/include/fluss.hpp +++ b/bindings/cpp/include/fluss.hpp @@ -20,8 +20,8 @@ #pragma once #include +#include #include -#include #include #include #include @@ -100,7 +100,7 @@ struct Timestamp { } }; -enum class DataType { +enum class TypeId { Boolean = 1, TinyInt = 2, SmallInt = 3, @@ -114,6 +114,43 @@ enum class DataType { Time = 11, Timestamp = 12, TimestampLtz = 13, + Decimal = 14, +}; + +class DataType { +public: + explicit DataType(TypeId id, int32_t p = 0, int32_t s = 0) + : id_(id), precision_(p), scale_(s) {} + + static DataType Boolean() { return DataType(TypeId::Boolean); } + static DataType TinyInt() { return DataType(TypeId::TinyInt); } + static DataType SmallInt() { return DataType(TypeId::SmallInt); } + static DataType Int() { return DataType(TypeId::Int); } + static DataType BigInt() { return DataType(TypeId::BigInt); } + static DataType Float() { return DataType(TypeId::Float); } + static DataType Double() { return DataType(TypeId::Double); } + static DataType String() { return DataType(TypeId::String); } + static DataType Bytes() { return DataType(TypeId::Bytes); } + static DataType Date() { return DataType(TypeId::Date); } + static DataType Time() { return DataType(TypeId::Time); } + static DataType Timestamp(int32_t precision = 6) { + return DataType(TypeId::Timestamp, precision, 0); + } + static DataType TimestampLtz(int32_t precision = 6) { + return DataType(TypeId::TimestampLtz, precision, 0); + } + static DataType Decimal(int32_t precision, int32_t scale) { + return DataType(TypeId::Decimal, precision, scale); + } + + TypeId id() const { return id_; } + int32_t precision() const { return precision_; } + int32_t scale() const { return scale_; } + +private: + TypeId id_; + int32_t precision_{0}; + int32_t scale_{0}; }; enum class DatumType { @@ -125,7 +162,9 @@ enum class DatumType { Float64 = 5, String = 6, Bytes = 7, - // 8-10 reserved for decimal types + DecimalI64 = 8, + DecimalI128 = 9, + DecimalString = 10, Date = 11, Time = 12, TimestampNtz = 13, @@ -182,7 +221,7 @@ struct Schema { public: Builder& AddColumn(std::string name, DataType type, std::string comment = "") { - columns_.push_back({std::move(name), type, std::move(comment)}); + columns_.push_back({std::move(name), std::move(type), std::move(comment)}); return *this; } @@ -290,6 +329,10 @@ struct Datum { double f64_val{0.0}; std::string string_val; std::vector bytes_val; + int32_t decimal_precision{0}; // Decimal: precision (total digits) + int32_t decimal_scale{0}; // Decimal: scale (digits after decimal point) + int64_t i128_hi{0}; // Decimal (i128): high 64 bits of unscaled value + int64_t i128_lo{0}; // Decimal (i128): low 64 bits of unscaled value static Datum Null() { return {}; } static Datum Bool(bool v) { @@ -360,10 +403,75 @@ struct Datum { dat.i32_val = ts.nano_of_millisecond; return dat; } + // Stores the decimal string as-is. Rust side will parse via BigDecimal, + // look up (p,s) from the schema, validate, and create the Decimal. + static Datum DecimalString(std::string str) { + Datum d; + d.type = DatumType::DecimalString; + d.string_val = std::move(str); + return d; + } fluss::Date GetDate() const { return {i32_val}; } fluss::Time GetTime() const { return {i32_val}; } fluss::Timestamp GetTimestamp() const { return {i64_val, i32_val}; } + + bool IsDecimal() const { + return type == DatumType::DecimalI64 || type == DatumType::DecimalI128 + || type == DatumType::DecimalString; + } + + std::string DecimalToString() const { + if (type == DatumType::DecimalI64) { + return FormatUnscaled64(i64_val, decimal_scale); + } else if (type == DatumType::DecimalI128) { + unsigned __int128 uval = (static_cast(static_cast(i128_hi)) << 64) | + static_cast(static_cast(i128_lo)); + __int128 val = static_cast<__int128>(uval); + return FormatUnscaled128(val, decimal_scale); + } else if (type == DatumType::DecimalString) { + return string_val; + } + return ""; + } + +private: + static std::string FormatUnscaled64(int64_t unscaled, int32_t scale) { + bool negative = unscaled < 0; + uint64_t abs_val = negative ? -static_cast(unscaled) : static_cast(unscaled); + std::string digits = std::to_string(abs_val); + if (scale <= 0) { + return (negative ? "-" : "") + digits; + } + while (static_cast(digits.size()) <= scale) { + digits = "0" + digits; + } + auto pos = digits.size() - static_cast(scale); + return (negative ? "-" : "") + digits.substr(0, pos) + "." + digits.substr(pos); + } + + static std::string FormatUnscaled128(__int128 val, int32_t scale) { + bool negative = val < 0; + unsigned __int128 abs_val = negative ? -static_cast(val) + : static_cast(val); + std::string digits; + if (abs_val == 0) { + digits = "0"; + } else { + while (abs_val > 0) { + digits = static_cast('0' + static_cast(abs_val % 10)) + digits; + abs_val /= 10; + } + } + if (scale <= 0) { + return (negative ? "-" : "") + digits; + } + while (static_cast(digits.size()) <= scale) { + digits = "0" + digits; + } + auto pos = digits.size() - static_cast(scale); + return (negative ? "-" : "") + digits.substr(0, pos) + "." + digits.substr(pos); + } }; struct GenericRow { @@ -429,6 +537,11 @@ struct GenericRow { fields[idx] = Datum::TimestampLtz(ts); } + void SetDecimal(size_t idx, const std::string& value) { + EnsureSize(idx); + fields[idx] = Datum::DecimalString(value); + } + private: void EnsureSize(size_t idx) { if (fields.size() <= idx) { diff --git a/bindings/cpp/src/ffi_converter.hpp b/bindings/cpp/src/ffi_converter.hpp index 63a2e91a..e3e63a85 100644 --- a/bindings/cpp/src/ffi_converter.hpp +++ b/bindings/cpp/src/ffi_converter.hpp @@ -47,8 +47,10 @@ inline ffi::FfiTablePath to_ffi_table_path(const TablePath& path) { inline ffi::FfiColumn to_ffi_column(const Column& col) { ffi::FfiColumn ffi_col; ffi_col.name = rust::String(col.name); - ffi_col.data_type = static_cast(col.data_type); + ffi_col.data_type = static_cast(col.data_type.id()); ffi_col.comment = rust::String(col.comment); + ffi_col.precision = col.data_type.precision(); + ffi_col.scale = col.data_type.scale(); return ffi_col; } @@ -112,6 +114,10 @@ inline ffi::FfiDatum to_ffi_datum(const Datum& datum) { ffi_datum.f32_val = datum.f32_val; ffi_datum.f64_val = datum.f64_val; ffi_datum.string_val = rust::String(datum.string_val); + ffi_datum.decimal_precision = datum.decimal_precision; + ffi_datum.decimal_scale = datum.decimal_scale; + ffi_datum.i128_hi = datum.i128_hi; + ffi_datum.i128_lo = datum.i128_lo; rust::Vec bytes; for (auto b : datum.bytes_val) { @@ -137,7 +143,7 @@ inline ffi::FfiGenericRow to_ffi_generic_row(const GenericRow& row) { inline Column from_ffi_column(const ffi::FfiColumn& ffi_col) { return Column{ std::string(ffi_col.name), - static_cast(ffi_col.data_type), + DataType(static_cast(ffi_col.data_type), ffi_col.precision, ffi_col.scale), std::string(ffi_col.comment)}; } @@ -202,6 +208,10 @@ inline Datum from_ffi_datum(const ffi::FfiDatum& ffi_datum) { datum.f64_val = ffi_datum.f64_val; // todo: avoid copy string datum.string_val = std::string(ffi_datum.string_val); + datum.decimal_precision = ffi_datum.decimal_precision; + datum.decimal_scale = ffi_datum.decimal_scale; + datum.i128_hi = ffi_datum.i128_hi; + datum.i128_lo = ffi_datum.i128_lo; for (auto b : ffi_datum.bytes_val) { datum.bytes_val.push_back(b); diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index 5a266135..4aeb13db 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -51,6 +51,8 @@ mod ffi { name: String, data_type: i32, comment: String, + precision: i32, + scale: i32, } struct FfiSchema { @@ -98,6 +100,10 @@ mod ffi { f64_val: f64, string_val: String, bytes_val: Vec, + decimal_precision: i32, + decimal_scale: i32, + i128_hi: i64, + i128_lo: i64, } struct FfiGenericRow { @@ -301,6 +307,7 @@ pub struct Table { pub struct AppendWriter { inner: fcore::client::AppendWriter, + table_info: fcore::metadata::TableInfo, } pub struct WriteResult { @@ -636,7 +643,10 @@ impl Table { Ok(w) => w, Err(e) => return Err(format!("Failed to create writer: {e}")), }; - let writer = Box::into_raw(Box::new(AppendWriter { inner: writer })); + let writer = Box::into_raw(Box::new(AppendWriter { + inner: writer, + table_info: self.table_info.clone(), + })); Ok(writer) } @@ -792,7 +802,8 @@ unsafe fn delete_append_writer(writer: *mut AppendWriter) { impl AppendWriter { fn append(&mut self, row: &ffi::FfiGenericRow) -> Result, String> { - let generic_row = types::ffi_row_to_core(row); + let schema = self.table_info.get_schema(); + let generic_row = types::ffi_row_to_core(row, Some(schema)).map_err(|e| e.to_string())?; let result_future = self .inner diff --git a/bindings/cpp/src/types.rs b/bindings/cpp/src/types.rs index f546b682..7837032e 100644 --- a/bindings/cpp/src/types.rs +++ b/bindings/cpp/src/types.rs @@ -18,8 +18,8 @@ use crate::ffi; use anyhow::{Result, anyhow}; use arrow::array::{ - Date32Array, LargeBinaryArray, LargeStringArray, Time32MillisecondArray, Time32SecondArray, - Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray, + Date32Array, Decimal128Array, LargeBinaryArray, LargeStringArray, Time32MillisecondArray, + Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, }; use arrow::datatypes::{DataType as ArrowDataType, TimeUnit}; @@ -27,6 +27,7 @@ use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema}; use fcore::row::InternalRow; use fluss as fcore; use std::borrow::Cow; +use std::str::FromStr; use arrow::array::Array; @@ -43,6 +44,7 @@ pub const DATA_TYPE_DATE: i32 = 10; pub const DATA_TYPE_TIME: i32 = 11; pub const DATA_TYPE_TIMESTAMP: i32 = 12; pub const DATA_TYPE_TIMESTAMP_LTZ: i32 = 13; +pub const DATA_TYPE_DECIMAL: i32 = 14; pub const DATUM_TYPE_NULL: i32 = 0; pub const DATUM_TYPE_BOOL: i32 = 1; @@ -52,6 +54,9 @@ pub const DATUM_TYPE_FLOAT32: i32 = 4; pub const DATUM_TYPE_FLOAT64: i32 = 5; pub const DATUM_TYPE_STRING: i32 = 6; pub const DATUM_TYPE_BYTES: i32 = 7; +pub const DATUM_TYPE_DECIMAL_I64: i32 = 8; +pub const DATUM_TYPE_DECIMAL_I128: i32 = 9; +pub const DATUM_TYPE_DECIMAL_STRING: i32 = 10; pub const DATUM_TYPE_DATE: i32 = 11; pub const DATUM_TYPE_TIME: i32 = 12; pub const DATUM_TYPE_TIMESTAMP_NTZ: i32 = 13; @@ -62,7 +67,7 @@ const MICROS_PER_MILLI: i64 = 1_000; const NANOS_PER_MICRO: i64 = 1_000; const NANOS_PER_MILLI: i64 = 1_000_000; -fn ffi_data_type_to_core(dt: i32) -> Result { +fn ffi_data_type_to_core(dt: i32, precision: u32, scale: u32) -> Result { match dt { DATA_TYPE_BOOLEAN => Ok(fcore::metadata::DataTypes::boolean()), DATA_TYPE_TINYINT => Ok(fcore::metadata::DataTypes::tinyint()), @@ -75,8 +80,16 @@ fn ffi_data_type_to_core(dt: i32) -> Result { DATA_TYPE_BYTES => Ok(fcore::metadata::DataTypes::bytes()), DATA_TYPE_DATE => Ok(fcore::metadata::DataTypes::date()), DATA_TYPE_TIME => Ok(fcore::metadata::DataTypes::time()), - DATA_TYPE_TIMESTAMP => Ok(fcore::metadata::DataTypes::timestamp()), - DATA_TYPE_TIMESTAMP_LTZ => Ok(fcore::metadata::DataTypes::timestamp_ltz()), + DATA_TYPE_TIMESTAMP => Ok(fcore::metadata::DataTypes::timestamp_with_precision( + precision, + )), + DATA_TYPE_TIMESTAMP_LTZ => Ok(fcore::metadata::DataTypes::timestamp_ltz_with_precision( + precision, + )), + DATA_TYPE_DECIMAL => { + let dt = fcore::metadata::DecimalType::new(precision, scale)?; + Ok(fcore::metadata::DataType::Decimal(dt)) + } _ => Err(anyhow!("Unknown data type: {dt}")), } } @@ -96,6 +109,7 @@ fn core_data_type_to_ffi(dt: &fcore::metadata::DataType) -> i32 { fcore::metadata::DataType::Time(_) => DATA_TYPE_TIME, fcore::metadata::DataType::Timestamp(_) => DATA_TYPE_TIMESTAMP, fcore::metadata::DataType::TimestampLTz(_) => DATA_TYPE_TIMESTAMP_LTZ, + fcore::metadata::DataType::Decimal(_) => DATA_TYPE_DECIMAL, _ => 0, } } @@ -106,7 +120,13 @@ pub fn ffi_descriptor_to_core( let mut schema_builder = fcore::metadata::Schema::builder(); for col in &descriptor.schema.columns { - let dt = ffi_data_type_to_core(col.data_type)?; + if col.precision < 0 || col.scale < 0 { + return Err(anyhow!( + "Column '{}': precision and scale must be non-negative", + col.name + )); + } + let dt = ffi_data_type_to_core(col.data_type, col.precision as u32, col.scale as u32)?; schema_builder = schema_builder.column(&col.name, dt); if !col.comment.is_empty() { schema_builder = schema_builder.with_comment(&col.comment); @@ -148,10 +168,22 @@ pub fn core_table_info_to_ffi(info: &fcore::metadata::TableInfo) -> ffi::FfiTabl let columns: Vec = schema .columns() .iter() - .map(|col| ffi::FfiColumn { - name: col.name().to_string(), - data_type: core_data_type_to_ffi(col.data_type()), - comment: col.comment().unwrap_or("").to_string(), + .map(|col| { + let (precision, scale) = match col.data_type() { + fcore::metadata::DataType::Decimal(dt) => { + (dt.precision() as i32, dt.scale() as i32) + } + fcore::metadata::DataType::Timestamp(dt) => (dt.precision() as i32, 0), + fcore::metadata::DataType::TimestampLTz(dt) => (dt.precision() as i32, 0), + _ => (0, 0), + }; + ffi::FfiColumn { + name: col.name().to_string(), + data_type: core_data_type_to_ffi(col.data_type()), + comment: col.comment().unwrap_or("").to_string(), + precision, + scale, + } }) .collect(); @@ -218,7 +250,21 @@ pub fn empty_table_info() -> ffi::FfiTableInfo { } } -pub fn ffi_row_to_core(row: &ffi::FfiGenericRow) -> fcore::row::GenericRow<'_> { +/// Look up decimal (precision, scale) from schema for column `idx`. +fn get_decimal_type(idx: usize, schema: Option<&fcore::metadata::Schema>) -> Result<(u32, u32)> { + let col = schema + .and_then(|s| s.columns().get(idx)) + .ok_or_else(|| anyhow!("Schema not available for decimal column {idx}"))?; + match col.data_type() { + fcore::metadata::DataType::Decimal(dt) => Ok((dt.precision(), dt.scale())), + other => Err(anyhow!("Column {idx} is {:?}, not Decimal", other)), + } +} + +pub fn ffi_row_to_core<'a>( + row: &'a ffi::FfiGenericRow, + schema: Option<&fcore::metadata::Schema>, +) -> Result> { use fcore::row::Datum; let mut generic_row = fcore::row::GenericRow::new(row.fields.len()); @@ -233,6 +279,40 @@ pub fn ffi_row_to_core(row: &ffi::FfiGenericRow) -> fcore::row::GenericRow<'_> { DATUM_TYPE_FLOAT64 => Datum::Float64(field.f64_val.into()), DATUM_TYPE_STRING => Datum::String(Cow::Borrowed(field.string_val.as_str())), DATUM_TYPE_BYTES => Datum::Blob(Cow::Borrowed(field.bytes_val.as_slice())), + DATUM_TYPE_DECIMAL_STRING => { + let (precision, scale) = get_decimal_type(idx, schema)?; + let bd = + bigdecimal::BigDecimal::from_str(field.string_val.as_str()).map_err(|e| { + anyhow!( + "Column {idx}: invalid decimal string '{}': {e}", + field.string_val + ) + })?; + let decimal = fcore::row::Decimal::from_big_decimal(bd, precision, scale) + .map_err(|e| anyhow!("Column {idx}: {e}"))?; + Datum::Decimal(decimal) + } + DATUM_TYPE_DECIMAL_I64 => { + let precision = field.decimal_precision as u32; + let scale = field.decimal_scale as u32; + let decimal = + fcore::row::Decimal::from_unscaled_long(field.i64_val, precision, scale) + .map_err(|e| anyhow!("Column {idx}: {e}"))?; + Datum::Decimal(decimal) + } + DATUM_TYPE_DECIMAL_I128 => { + let precision = field.decimal_precision as u32; + let scale = field.decimal_scale as u32; + let i128_val = ((field.i128_hi as i128) << 64) | (field.i128_lo as u64 as i128); + let decimal = fcore::row::Decimal::from_arrow_decimal128( + i128_val, + scale as i64, + precision, + scale, + ) + .map_err(|e| anyhow!("Column {idx}: {e}"))?; + Datum::Decimal(decimal) + } DATUM_TYPE_DATE => Datum::Date(fcore::row::Date::new(field.i32_val)), DATUM_TYPE_TIME => Datum::Time(fcore::row::Time::new(field.i32_val)), DATUM_TYPE_TIMESTAMP_NTZ => Datum::TimestampNtz( @@ -243,12 +323,12 @@ pub fn ffi_row_to_core(row: &ffi::FfiGenericRow) -> fcore::row::GenericRow<'_> { fcore::row::TimestampLtz::from_millis_nanos(field.i64_val, field.i32_val) .unwrap_or_else(|_| fcore::row::TimestampLtz::new(field.i64_val)), ), - _ => Datum::Null, + other => return Err(anyhow!("Column {idx}: unknown datum type {other}")), }; generic_row.set_field(idx, datum); } - generic_row + Ok(generic_row) } pub fn core_scan_records_to_ffi( @@ -292,6 +372,10 @@ fn core_row_to_ffi_fields( f64_val: 0.0, string_val: String::new(), bytes_val: vec![], + decimal_precision: 0, + decimal_scale: 0, + i128_hi: 0, + i128_lo: 0, } } @@ -485,6 +569,29 @@ fn core_row_to_ffi_fields( } _ => panic!("Will never come here. Unsupported Time64 unit for column {i}"), }, + ArrowDataType::Decimal128(precision, scale) => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::() + .expect("Decimal128 column expected"); + let i128_val = array.value(row_id); + + if fcore::row::Decimal::is_compact_precision(*precision as u32) { + let mut datum = new_datum(DATUM_TYPE_DECIMAL_I64); + datum.i64_val = i128_val as i64; + datum.decimal_precision = *precision as i32; + datum.decimal_scale = *scale as i32; + datum + } else { + let mut datum = new_datum(DATUM_TYPE_DECIMAL_I128); + datum.i128_hi = (i128_val >> 64) as i64; + datum.i128_lo = i128_val as i64; + datum.decimal_precision = *precision as i32; + datum.decimal_scale = *scale as i32; + datum + } + } other => panic!( "Will never come here. Unsupported Arrow data type for column {i}: {other:?}" ), diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml index 4d9be026..db1348a0 100644 --- a/crates/fluss/Cargo.toml +++ b/crates/fluss/Cargo.toml @@ -59,7 +59,7 @@ tokio = { workspace = true } parking_lot = "0.12" bytes = "1.10.1" dashmap = "6.1.0" -bigdecimal = { version = "0.4", features = ["serde"] } +bigdecimal = { workspace = true, features = ["serde"] } ordered-float = { version = "5", features = ["serde"] } parse-display = "0.10" jiff = { workspace = true }