diff --git a/bindings/cpp/examples/example.cpp b/bindings/cpp/examples/example.cpp index 7022cad1..8262e4ac 100644 --- a/bindings/cpp/examples/example.cpp +++ b/bindings/cpp/examples/example.cpp @@ -42,7 +42,7 @@ int main() { check("get_admin", conn.GetAdmin(admin)); fluss::TablePath table_path("fluss", "sample_table_cpp_v1"); - + // 2.1) Drop table if exists std::cout << "Dropping table if exists..." << std::endl; auto drop_result = admin.DropTable(table_path, true); @@ -52,12 +52,16 @@ int main() { std::cout << "Table drop result: " << drop_result.error_message << std::endl; } - // 3) Schema & descriptor + // 3) Schema with scalar and temporal columns auto schema = fluss::Schema::NewBuilder() .AddColumn("id", fluss::DataType::Int) .AddColumn("name", fluss::DataType::String) .AddColumn("score", fluss::DataType::Float) .AddColumn("age", fluss::DataType::Int) + .AddColumn("event_date", fluss::DataType::Date) + .AddColumn("event_time", fluss::DataType::Time) + .AddColumn("created_at", fluss::DataType::Timestamp) + .AddColumn("updated_at", fluss::DataType::TimestampLtz) .Build(); auto descriptor = fluss::TableDescriptor::NewBuilder() @@ -66,7 +70,6 @@ int main() { .SetComment("cpp example table with 3 buckets") .Build(); - // 3.1) Create table with 3 buckets std::cout << "Creating table with 3 buckets..." << std::endl; check("create_table", admin.CreateTable(table_path, descriptor, false)); @@ -74,7 +77,7 @@ int main() { fluss::Table table; check("get_table", conn.GetTable(table_path, table)); - // 5) Writer + // 5) Write rows with scalar and temporal values fluss::AppendWriter writer; check("new_append_writer", table.NewAppendWriter(writer)); @@ -83,12 +86,26 @@ int main() { const char* name; float score; int age; + fluss::Date date; + fluss::Time time; + fluss::Timestamp ts_ntz; + fluss::Timestamp ts_ltz; }; + auto tp_now = std::chrono::system_clock::now(); std::vector rows = { - {1, "Alice", 95.2f, 25}, - {2, "Bob", 87.2f, 30}, - {3, "Charlie", 92.1f, 35}, + {1, "Alice", 95.2f, 25, + fluss::Date::FromYMD(2024, 6, 15), fluss::Time::FromHMS(14, 30, 45), + fluss::Timestamp::FromTimePoint(tp_now), + fluss::Timestamp::FromMillis(1718467200000)}, + {2, "Bob", 87.2f, 30, + fluss::Date::FromYMD(2025, 1, 1), fluss::Time::FromHMS(0, 0, 0), + fluss::Timestamp::FromMillis(1735689600000), + fluss::Timestamp::FromMillisNanos(1735689600000, 500000)}, + {3, "Charlie", 92.1f, 35, + fluss::Date::FromYMD(1999, 12, 31), fluss::Time::FromHMS(23, 59, 59), + fluss::Timestamp::FromMillis(946684799999), + fluss::Timestamp::FromMillis(946684799999)}, }; for (const auto& r : rows) { @@ -97,12 +114,16 @@ int main() { row.SetString(1, r.name); row.SetFloat32(2, r.score); row.SetInt32(3, r.age); + row.SetDate(4, r.date); + row.SetTime(5, r.time); + row.SetTimestampNtz(6, r.ts_ntz); + row.SetTimestampLtz(7, r.ts_ltz); check("append", writer.Append(row)); } check("flush", writer.Flush()); std::cout << "Wrote " << rows.size() << " rows" << std::endl; - // 6) Scan + // 6) Full scan — verify all column types including temporal fluss::LogScanner scanner; check("new_log_scanner", table.NewScan().CreateLogScanner(scanner)); @@ -115,188 +136,216 @@ int main() { fluss::ScanRecords records; check("poll", scanner.Poll(5000, records)); - std::cout << "Scanned records: " << records.records.size() << std::endl; + std::cout << "Scanned records: " << records.Size() << std::endl; + bool scan_ok = true; for (const auto& rec : records.records) { - std::cout << " offset=" << rec.offset << " id=" << rec.row.fields[0].i32_val - << " name=" << rec.row.fields[1].string_val - << " score=" << rec.row.fields[2].f32_val << " age=" << rec.row.fields[3].i32_val - << " ts=" << rec.timestamp << std::endl; + const auto& f = rec.row.fields; + + if (f[4].type != fluss::DatumType::Date) { + std::cerr << "ERROR: field 4 expected Date, got " + << static_cast(f[4].type) << std::endl; + scan_ok = false; + } + if (f[5].type != fluss::DatumType::Time) { + std::cerr << "ERROR: field 5 expected Time, got " + << static_cast(f[5].type) << std::endl; + scan_ok = false; + } + if (f[6].type != fluss::DatumType::TimestampNtz) { + std::cerr << "ERROR: field 6 expected TimestampNtz, got " + << static_cast(f[6].type) << std::endl; + scan_ok = false; + } + if (f[7].type != fluss::DatumType::TimestampLtz) { + std::cerr << "ERROR: field 7 expected TimestampLtz, got " + << static_cast(f[7].type) << std::endl; + scan_ok = false; + } + + auto date = f[4].GetDate(); + auto time = f[5].GetTime(); + auto ts_ntz = f[6].GetTimestamp(); + auto ts_ltz = f[7].GetTimestamp(); + + std::cout << " id=" << f[0].i32_val + << " name=" << f[1].string_val + << " score=" << f[2].f32_val + << " age=" << f[3].i32_val + << " date=" << date.Year() << "-" << date.Month() << "-" << date.Day() + << " time=" << time.Hour() << ":" << time.Minute() << ":" << time.Second() + << " ts_ntz=" << ts_ntz.epoch_millis + << " ts_ltz=" << ts_ltz.epoch_millis + << "+" << ts_ltz.nano_of_millisecond << "ns" + << std::endl; + } + + if (!scan_ok) { + std::cerr << "Full scan type verification FAILED!" << std::endl; + std::exit(1); } - - // 7) Project only id (0) and name (1) columns - std::vector projected_columns = {0, 1}; + + // 7) Projected scan — project [id, updated_at(TimestampLtz)] to verify + // NTZ/LTZ disambiguation works with column index remapping + std::vector projected_columns = {0, 7}; fluss::LogScanner projected_scanner; check("new_log_scanner_with_projection", table.NewScan().Project(projected_columns).CreateLogScanner(projected_scanner)); - + for (int b = 0; b < buckets; ++b) { check("subscribe_projected", projected_scanner.Subscribe(b, 0)); } - + fluss::ScanRecords projected_records; check("poll_projected", projected_scanner.Poll(5000, projected_records)); - - std::cout << "Projected records: " << projected_records.records.size() << std::endl; - - bool projection_verified = true; - for (size_t i = 0; i < projected_records.records.size(); ++i) { - const auto& rec = projected_records.records[i]; - const auto& row = rec.row; - - if (row.fields.size() != projected_columns.size()) { - std::cerr << "ERROR: Record " << i << " has " << row.fields.size() - << " fields, expected " << projected_columns.size() << std::endl; - projection_verified = false; + + std::cout << "Projected records: " << projected_records.Size() << std::endl; + for (const auto& rec : projected_records.records) { + const auto& f = rec.row.fields; + + if (f.size() != 2) { + std::cerr << "ERROR: expected 2 fields, got " << f.size() << std::endl; + scan_ok = false; continue; } - - // Verify field types match expected columns - // Column 0 (id) should be Int32, Column 1 (name) should be String - if (row.fields[0].type != fluss::DatumType::Int32) { - std::cerr << "ERROR: Record " << i << " field 0 type mismatch, expected Int32" << std::endl; - projection_verified = false; + if (f[0].type != fluss::DatumType::Int32) { + std::cerr << "ERROR: projected field 0 expected Int32, got " + << static_cast(f[0].type) << std::endl; + scan_ok = false; } - if (row.fields[1].type != fluss::DatumType::String) { - std::cerr << "ERROR: Record " << i << " field 1 type mismatch, expected String" << std::endl; - projection_verified = false; - } - - // Print projected data - if (row.fields[0].type == fluss::DatumType::Int32 && - row.fields[1].type == fluss::DatumType::String) { - std::cout << " Record " << i << ": id=" << row.fields[0].i32_val - << ", name=" << row.fields[1].string_val << std::endl; + if (f[1].type != fluss::DatumType::TimestampLtz) { + std::cerr << "ERROR: projected field 1 expected TimestampLtz, got " + << static_cast(f[1].type) << std::endl; + scan_ok = false; } + + auto ts = f[1].GetTimestamp(); + std::cout << " id=" << f[0].i32_val + << " updated_at=" << ts.epoch_millis + << "+" << ts.nano_of_millisecond << "ns" << std::endl; } - - if (projection_verified) { - std::cout << "Column pruning verification passed!" << std::endl; + + if (scan_ok) { + std::cout << "Scan verification passed!" << std::endl; } else { - std::cerr << "Column pruning verification failed!" << std::endl; + std::cerr << "Scan verification FAILED!" << std::endl; std::exit(1); } // 8) List offsets examples std::cout << "\n=== List Offsets Examples ===" << std::endl; - - // 8.1) Query earliest offsets for all buckets + std::vector all_bucket_ids; all_bucket_ids.reserve(buckets); for (int b = 0; b < buckets; ++b) { all_bucket_ids.push_back(b); } - + std::unordered_map earliest_offsets; - check("list_earliest_offsets", - admin.ListOffsets(table_path, all_bucket_ids, - fluss::OffsetQuery::Earliest(), + check("list_earliest_offsets", + admin.ListOffsets(table_path, all_bucket_ids, + fluss::OffsetQuery::Earliest(), earliest_offsets)); std::cout << "Earliest offsets:" << std::endl; for (const auto& [bucket_id, offset] : earliest_offsets) { std::cout << " Bucket " << bucket_id << ": offset=" << offset << std::endl; } - - // 8.2) Query latest offsets for all buckets + std::unordered_map latest_offsets; - check("list_latest_offsets", - admin.ListOffsets(table_path, all_bucket_ids, - fluss::OffsetQuery::Latest(), + check("list_latest_offsets", + admin.ListOffsets(table_path, all_bucket_ids, + fluss::OffsetQuery::Latest(), latest_offsets)); std::cout << "Latest offsets:" << std::endl; for (const auto& [bucket_id, offset] : latest_offsets) { std::cout << " Bucket " << bucket_id << ": offset=" << offset << std::endl; } - - // 8.3) Query offsets for a specific timestamp (current time - 1 hour) + auto now = std::chrono::system_clock::now(); auto one_hour_ago = now - std::chrono::hours(1); auto timestamp_ms = std::chrono::duration_cast( one_hour_ago.time_since_epoch()).count(); - + std::unordered_map timestamp_offsets; - check("list_timestamp_offsets", - admin.ListOffsets(table_path, all_bucket_ids, - fluss::OffsetQuery::FromTimestamp(timestamp_ms), + check("list_timestamp_offsets", + admin.ListOffsets(table_path, all_bucket_ids, + fluss::OffsetQuery::FromTimestamp(timestamp_ms), timestamp_offsets)); std::cout << "Offsets for timestamp " << timestamp_ms << " (1 hour ago):" << std::endl; for (const auto& [bucket_id, offset] : timestamp_offsets) { std::cout << " Bucket " << bucket_id << ": offset=" << offset << std::endl; } - - // 8.4) Use batch subscribe with offsets from list_offsets + + // 9) Batch subscribe std::cout << "\n=== Batch Subscribe Example ===" << std::endl; fluss::LogScanner batch_scanner; check("new_log_scanner_for_batch", table.NewScan().CreateLogScanner(batch_scanner)); - + std::vector subscriptions; for (const auto& [bucket_id, offset] : earliest_offsets) { subscriptions.push_back({bucket_id, offset}); - std::cout << "Preparing subscription: bucket=" << bucket_id + std::cout << "Preparing subscription: bucket=" << bucket_id << ", offset=" << offset << std::endl; } - + check("subscribe_buckets", batch_scanner.Subscribe(subscriptions)); std::cout << "Batch subscribed to " << subscriptions.size() << " buckets" << std::endl; - - // 8.5) Poll and verify bucket_id in records + fluss::ScanRecords batch_records; check("poll_batch", batch_scanner.Poll(5000, batch_records)); - + std::cout << "Scanned " << batch_records.Size() << " records from batch subscription" << std::endl; for (size_t i = 0; i < batch_records.Size() && i < 5; ++i) { const auto& rec = batch_records[i]; - std::cout << " Record " << i << ": bucket_id=" << rec.bucket_id - << ", offset=" << rec.offset + std::cout << " Record " << i << ": bucket_id=" << rec.bucket_id + << ", offset=" << rec.offset << ", timestamp=" << rec.timestamp << std::endl; } if (batch_records.Size() > 5) { std::cout << " ... and " << (batch_records.Size() - 5) << " more records" << std::endl; } - // 9) Test the new Arrow record batch polling functionality + // 10) Arrow record batch polling std::cout << "\n=== Testing Arrow Record Batch Polling ===" << std::endl; fluss::LogScanner arrow_scanner; check("new_record_batch_log_scanner", table.NewScan().CreateRecordBatchScanner(arrow_scanner)); - - // Subscribe to all buckets starting from offset 0 + for (int b = 0; b < buckets; ++b) { check("subscribe_arrow", arrow_scanner.Subscribe(b, 0)); } - + fluss::ArrowRecordBatches arrow_batches; check("poll_record_batch", arrow_scanner.PollRecordBatch(5000, arrow_batches)); - + std::cout << "Polled " << arrow_batches.Size() << " Arrow record batches" << std::endl; for (size_t i = 0; i < arrow_batches.Size(); ++i) { const auto& batch = arrow_batches[i]; if (batch->Available()) { - std::cout << " Batch " << i << ": " << batch->GetArrowRecordBatch()->num_rows() << " rows. " << std::endl; + std::cout << " Batch " << i << ": " << batch->GetArrowRecordBatch()->num_rows() << " rows" << std::endl; } else { std::cout << " Batch " << i << ": not available" << std::endl; } } - - // 10) Test the new Arrow record batch polling with projection + + // 11) Arrow record batch polling with projection std::cout << "\n=== Testing Arrow Record Batch Polling with Projection ===" << std::endl; fluss::LogScanner projected_arrow_scanner; check("new_record_batch_log_scanner_with_projection", table.NewScan().Project(projected_columns).CreateRecordBatchScanner(projected_arrow_scanner)); - - // Subscribe to all buckets starting from offset 0 + for (int b = 0; b < buckets; ++b) { check("subscribe_projected_arrow", projected_arrow_scanner.Subscribe(b, 0)); } - + fluss::ArrowRecordBatches projected_arrow_batches; check("poll_projected_record_batch", projected_arrow_scanner.PollRecordBatch(5000, projected_arrow_batches)); - + std::cout << "Polled " << projected_arrow_batches.Size() << " projected Arrow record batches" << std::endl; for (size_t i = 0; i < projected_arrow_batches.Size(); ++i) { const auto& batch = projected_arrow_batches[i]; if (batch->Available()) { - std::cout << " Batch " << i << ": " << batch->GetArrowRecordBatch()->num_rows() << " rows " << std::endl; + std::cout << " Batch " << i << ": " << batch->GetArrowRecordBatch()->num_rows() << " rows" << std::endl; } else { std::cout << " Batch " << i << ": not available" << std::endl; } diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp index 3ff9a26c..36157774 100644 --- a/bindings/cpp/include/fluss.hpp +++ b/bindings/cpp/include/fluss.hpp @@ -19,6 +19,7 @@ #pragma once +#include #include #include #include @@ -40,6 +41,64 @@ namespace ffi { struct LogScanner; } // namespace ffi +struct Date { + int32_t days_since_epoch{0}; + + static Date FromDays(int32_t days) { return {days}; } + static Date FromYMD(int year, int month, int day); + + int Year() const; + int Month() const; + int Day() const; +}; + +struct Time { + static constexpr int32_t kMillisPerSecond = 1000; + static constexpr int32_t kMillisPerMinute = 60 * kMillisPerSecond; + static constexpr int32_t kMillisPerHour = 60 * kMillisPerMinute; + + int32_t millis_since_midnight{0}; + + static Time FromMillis(int32_t ms) { return {ms}; } + static Time FromHMS(int hour, int minute, int second, int millis = 0) { + return {hour * kMillisPerHour + minute * kMillisPerMinute + + second * kMillisPerSecond + millis}; + } + + int Hour() const { return millis_since_midnight / kMillisPerHour; } + int Minute() const { return (millis_since_midnight % kMillisPerHour) / kMillisPerMinute; } + int Second() const { return (millis_since_midnight % kMillisPerMinute) / kMillisPerSecond; } + int Millis() const { return millis_since_midnight % kMillisPerSecond; } +}; + +struct Timestamp { + static constexpr int32_t kMaxNanoOfMillisecond = 999999; + static constexpr int64_t kNanosPerMilli = 1000000; + + int64_t epoch_millis{0}; + int32_t nano_of_millisecond{0}; + + static Timestamp FromMillis(int64_t ms) { return {ms, 0}; } + static Timestamp FromMillisNanos(int64_t ms, int32_t nanos) { + if (nanos < 0) nanos = 0; + if (nanos > kMaxNanoOfMillisecond) nanos = kMaxNanoOfMillisecond; + return {ms, nanos}; + } + static Timestamp FromTimePoint(std::chrono::system_clock::time_point tp) { + auto duration = tp.time_since_epoch(); + auto ns = + std::chrono::duration_cast(duration) + .count(); + auto ms = ns / kNanosPerMilli; + auto nano_of_ms = static_cast(ns % kNanosPerMilli); + if (nano_of_ms < 0) { + nano_of_ms += kNanosPerMilli; + ms -= 1; + } + return {ms, nano_of_ms}; + } +}; + enum class DataType { Boolean = 1, TinyInt = 2, @@ -65,6 +124,11 @@ enum class DatumType { Float64 = 5, String = 6, Bytes = 7, + // 8-10 reserved for decimal types + Date = 11, + Time = 12, + TimestampNtz = 13, + TimestampLtz = 14, }; constexpr int64_t EARLIEST_OFFSET = -2; @@ -269,6 +333,36 @@ struct Datum { d.bytes_val = std::move(v); return d; } + static Datum Date(fluss::Date d) { + Datum dat; + dat.type = DatumType::Date; + dat.i32_val = d.days_since_epoch; + return dat; + } + static Datum Time(fluss::Time t) { + Datum dat; + dat.type = DatumType::Time; + dat.i32_val = t.millis_since_midnight; + return dat; + } + static Datum TimestampNtz(fluss::Timestamp ts) { + Datum dat; + dat.type = DatumType::TimestampNtz; + dat.i64_val = ts.epoch_millis; + dat.i32_val = ts.nano_of_millisecond; + return dat; + } + static Datum TimestampLtz(fluss::Timestamp ts) { + Datum dat; + dat.type = DatumType::TimestampLtz; + dat.i64_val = ts.epoch_millis; + dat.i32_val = ts.nano_of_millisecond; + return dat; + } + + fluss::Date GetDate() const { return {i32_val}; } + fluss::Time GetTime() const { return {i32_val}; } + fluss::Timestamp GetTimestamp() const { return {i64_val, i32_val}; } }; struct GenericRow { @@ -314,6 +408,26 @@ struct GenericRow { fields[idx] = Datum::Bytes(std::move(v)); } + void SetDate(size_t idx, fluss::Date d) { + EnsureSize(idx); + fields[idx] = Datum::Date(d); + } + + void SetTime(size_t idx, fluss::Time t) { + EnsureSize(idx); + fields[idx] = Datum::Time(t); + } + + void SetTimestampNtz(size_t idx, fluss::Timestamp ts) { + EnsureSize(idx); + fields[idx] = Datum::TimestampNtz(ts); + } + + void SetTimestampLtz(size_t idx, fluss::Timestamp ts) { + EnsureSize(idx); + fields[idx] = Datum::TimestampLtz(ts); + } + private: void EnsureSize(size_t idx) { if (fields.size() <= idx) { diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index d6e3a9a6..b51ffa3a 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -288,6 +288,9 @@ pub struct AppendWriter { pub struct LogScanner { inner: Option, inner_batch: Option, + /// Fluss columns matching the projected Arrow fields (1:1 by index). + /// For non-projected scanners this is the full table schema columns. + projected_columns: Vec, } fn ok_result() -> ffi::FfiResult { @@ -601,6 +604,7 @@ impl Table { let scanner_ptr = Box::into_raw(Box::new(LogScanner { inner: Some(scanner), inner_batch: None, + projected_columns: self.table_info.get_schema().columns().to_vec(), })); Ok(scanner_ptr) @@ -618,6 +622,19 @@ impl Table { self.table_info.clone(), ); + let all_columns = self.table_info.get_schema().columns(); + let projected_columns: Vec<_> = column_indices + .iter() + .map(|&i| { + all_columns.get(i).cloned().ok_or_else(|| { + format!( + "Invalid column index {i}: schema has {} columns", + all_columns.len() + ) + }) + }) + .collect::>()?; + let log_scanner = fluss_table .new_scan() .project(&column_indices) @@ -628,6 +645,7 @@ impl Table { let scanner = Box::into_raw(Box::new(LogScanner { inner: Some(log_scanner), inner_batch: None, + projected_columns, })); Ok(scanner) }) @@ -649,6 +667,7 @@ impl Table { let scanner = Box::into_raw(Box::new(LogScanner { inner: None, inner_batch: Some(batch_scanner), + projected_columns: self.table_info.get_schema().columns().to_vec(), })); Ok(scanner) }) @@ -665,6 +684,19 @@ impl Table { self.table_info.clone(), ); + let all_columns = self.table_info.get_schema().columns(); + let projected_columns: Vec<_> = column_indices + .iter() + .map(|&i| { + all_columns.get(i).cloned().ok_or_else(|| { + format!( + "Invalid column index {i}: schema has {} columns", + all_columns.len() + ) + }) + }) + .collect::>()?; + let batch_scanner = fluss_table .new_scan() .project(&column_indices) @@ -675,6 +707,7 @@ impl Table { let scanner = Box::into_raw(Box::new(LogScanner { inner: None, inner_batch: Some(batch_scanner), + projected_columns, })); Ok(scanner) }) @@ -857,7 +890,10 @@ impl LogScanner { match result { Ok(records) => ffi::FfiScanRecordsResult { result: ok_result(), - scan_records: types::core_scan_records_to_ffi(&records), + scan_records: types::core_scan_records_to_ffi( + &records, + &self.projected_columns, + ), }, Err(e) => ffi::FfiScanRecordsResult { result: err_result(1, e.to_string()), diff --git a/bindings/cpp/src/table.cpp b/bindings/cpp/src/table.cpp index efb762bd..e1070a5e 100644 --- a/bindings/cpp/src/table.cpp +++ b/bindings/cpp/src/table.cpp @@ -22,12 +22,60 @@ #include "ffi_converter.hpp" #include "rust/cxx.h" #include +#include // todo: bindings/cpp/BUILD.bazel still doesn’t declare Arrow include/link dependencies. // In environments where Bazel does not already have Arrow available, this will fail at compile/link time. #include namespace fluss { +static constexpr int kSecondsPerDay = 24 * 60 * 60; + +static std::time_t timegm_utc(std::tm* tm) { +#if defined(_WIN32) + return _mkgmtime(tm); +#else + return ::timegm(tm); +#endif +} + +static std::tm gmtime_utc(std::time_t epoch_seconds) { + std::tm tm{}; +#if defined(_WIN32) + gmtime_s(&tm, &epoch_seconds); +#else + ::gmtime_r(&epoch_seconds, &tm); +#endif + return tm; +} + +Date Date::FromYMD(int year, int month, int day) { + std::tm tm{}; + tm.tm_year = year - 1900; + tm.tm_mon = month - 1; + tm.tm_mday = day; + std::time_t epoch_seconds = timegm_utc(&tm); + return {static_cast(epoch_seconds / kSecondsPerDay)}; +} + +int Date::Year() const { + std::time_t epoch_seconds = static_cast(days_since_epoch) * kSecondsPerDay; + std::tm tm = gmtime_utc(epoch_seconds); + return tm.tm_year + 1900; +} + +int Date::Month() const { + std::time_t epoch_seconds = static_cast(days_since_epoch) * kSecondsPerDay; + std::tm tm = gmtime_utc(epoch_seconds); + return tm.tm_mon + 1; +} + +int Date::Day() const { + std::time_t epoch_seconds = static_cast(days_since_epoch) * kSecondsPerDay; + std::tm tm = gmtime_utc(epoch_seconds); + return tm.tm_mday; +} + Table::Table() noexcept = default; Table::Table(ffi::Table* table) noexcept : table_(table) {} diff --git a/bindings/cpp/src/types.rs b/bindings/cpp/src/types.rs index 91d6e260..f546b682 100644 --- a/bindings/cpp/src/types.rs +++ b/bindings/cpp/src/types.rs @@ -52,6 +52,15 @@ pub const DATUM_TYPE_FLOAT32: i32 = 4; pub const DATUM_TYPE_FLOAT64: i32 = 5; pub const DATUM_TYPE_STRING: i32 = 6; pub const DATUM_TYPE_BYTES: i32 = 7; +pub const DATUM_TYPE_DATE: i32 = 11; +pub const DATUM_TYPE_TIME: i32 = 12; +pub const DATUM_TYPE_TIMESTAMP_NTZ: i32 = 13; +pub const DATUM_TYPE_TIMESTAMP_LTZ: i32 = 14; + +const MILLIS_PER_SECOND: i64 = 1_000; +const MICROS_PER_MILLI: i64 = 1_000; +const NANOS_PER_MICRO: i64 = 1_000; +const NANOS_PER_MILLI: i64 = 1_000_000; fn ffi_data_type_to_core(dt: i32) -> Result { match dt { @@ -224,6 +233,16 @@ pub fn ffi_row_to_core(row: &ffi::FfiGenericRow) -> fcore::row::GenericRow<'_> { DATUM_TYPE_FLOAT64 => Datum::Float64(field.f64_val.into()), DATUM_TYPE_STRING => Datum::String(Cow::Borrowed(field.string_val.as_str())), DATUM_TYPE_BYTES => Datum::Blob(Cow::Borrowed(field.bytes_val.as_slice())), + DATUM_TYPE_DATE => Datum::Date(fcore::row::Date::new(field.i32_val)), + DATUM_TYPE_TIME => Datum::Time(fcore::row::Time::new(field.i32_val)), + DATUM_TYPE_TIMESTAMP_NTZ => Datum::TimestampNtz( + fcore::row::TimestampNtz::from_millis_nanos(field.i64_val, field.i32_val) + .unwrap_or_else(|_| fcore::row::TimestampNtz::new(field.i64_val)), + ), + DATUM_TYPE_TIMESTAMP_LTZ => Datum::TimestampLtz( + fcore::row::TimestampLtz::from_millis_nanos(field.i64_val, field.i32_val) + .unwrap_or_else(|_| fcore::row::TimestampLtz::new(field.i64_val)), + ), _ => Datum::Null, }; generic_row.set_field(idx, datum); @@ -232,7 +251,10 @@ pub fn ffi_row_to_core(row: &ffi::FfiGenericRow) -> fcore::row::GenericRow<'_> { generic_row } -pub fn core_scan_records_to_ffi(records: &fcore::record::ScanRecords) -> ffi::FfiScanRecords { +pub fn core_scan_records_to_ffi( + records: &fcore::record::ScanRecords, + columns: &[fcore::metadata::Column], +) -> ffi::FfiScanRecords { let mut ffi_records = Vec::new(); // Iterate over all buckets and their records @@ -240,7 +262,7 @@ pub fn core_scan_records_to_ffi(records: &fcore::record::ScanRecords) -> ffi::Ff let bucket_id = table_bucket.bucket_id(); for record in bucket_records { let row = record.row(); - let fields = core_row_to_ffi_fields(row); + let fields = core_row_to_ffi_fields(row, columns); ffi_records.push(ffi::FfiScanRecord { bucket_id, @@ -256,7 +278,10 @@ pub fn core_scan_records_to_ffi(records: &fcore::record::ScanRecords) -> ffi::Ff } } -fn core_row_to_ffi_fields(row: &fcore::row::ColumnarRow) -> Vec { +fn core_row_to_ffi_fields( + row: &fcore::row::ColumnarRow, + columns: &[fcore::metadata::Column], +) -> Vec { fn new_datum(datum_type: i32) -> ffi::FfiDatum { ffi::FfiDatum { datum_type, @@ -361,52 +386,59 @@ fn core_row_to_ffi_fields(row: &fcore::row::ColumnarRow) -> Vec { .as_any() .downcast_ref::() .expect("Date32 column expected"); - let mut datum = new_datum(DATUM_TYPE_INT32); + let mut datum = new_datum(DATUM_TYPE_DATE); datum.i32_val = array.value(row_id); datum } - ArrowDataType::Timestamp(unit, _) => match unit { - TimeUnit::Second => { - let array = record_batch - .column(i) - .as_any() - .downcast_ref::() - .expect("Timestamp(second) column expected"); - let mut datum = new_datum(DATUM_TYPE_INT64); - datum.i64_val = array.value(row_id); - datum - } - TimeUnit::Millisecond => { - let array = record_batch - .column(i) - .as_any() - .downcast_ref::() - .expect("Timestamp(millisecond) column expected"); - let mut datum = new_datum(DATUM_TYPE_INT64); - datum.i64_val = array.value(row_id); - datum + ArrowDataType::Timestamp(unit, _tz) => { + let datum_type = match columns.get(i).map(|c| c.data_type()) { + Some(fcore::metadata::DataType::TimestampLTz(_)) => DATUM_TYPE_TIMESTAMP_LTZ, + _ => DATUM_TYPE_TIMESTAMP_NTZ, + }; + let mut datum = new_datum(datum_type); + match unit { + TimeUnit::Second => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::() + .expect("Timestamp(second) column expected"); + datum.i64_val = array.value(row_id) * MILLIS_PER_SECOND; + datum.i32_val = 0; + } + TimeUnit::Millisecond => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::() + .expect("Timestamp(millisecond) column expected"); + datum.i64_val = array.value(row_id); + datum.i32_val = 0; + } + TimeUnit::Microsecond => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::() + .expect("Timestamp(microsecond) column expected"); + let micros = array.value(row_id); + datum.i64_val = micros.div_euclid(MICROS_PER_MILLI); + datum.i32_val = + (micros.rem_euclid(MICROS_PER_MILLI) * NANOS_PER_MICRO) as i32; + } + TimeUnit::Nanosecond => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::() + .expect("Timestamp(nanosecond) column expected"); + let nanos = array.value(row_id); + datum.i64_val = nanos.div_euclid(NANOS_PER_MILLI); + datum.i32_val = nanos.rem_euclid(NANOS_PER_MILLI) as i32; + } } - TimeUnit::Microsecond => { - let array = record_batch - .column(i) - .as_any() - .downcast_ref::() - .expect("Timestamp(microsecond) column expected"); - let mut datum = new_datum(DATUM_TYPE_INT64); - datum.i64_val = array.value(row_id); - datum - } - TimeUnit::Nanosecond => { - let array = record_batch - .column(i) - .as_any() - .downcast_ref::() - .expect("Timestamp(nanosecond) column expected"); - let mut datum = new_datum(DATUM_TYPE_INT64); - datum.i64_val = array.value(row_id); - datum - } - }, + datum + } ArrowDataType::Time32(unit) => match unit { TimeUnit::Second => { let array = record_batch @@ -414,8 +446,8 @@ fn core_row_to_ffi_fields(row: &fcore::row::ColumnarRow) -> Vec { .as_any() .downcast_ref::() .expect("Time32(second) column expected"); - let mut datum = new_datum(DATUM_TYPE_INT32); - datum.i32_val = array.value(row_id); + let mut datum = new_datum(DATUM_TYPE_TIME); + datum.i32_val = array.value(row_id) * MILLIS_PER_SECOND as i32; datum } TimeUnit::Millisecond => { @@ -424,7 +456,7 @@ fn core_row_to_ffi_fields(row: &fcore::row::ColumnarRow) -> Vec { .as_any() .downcast_ref::() .expect("Time32(millisecond) column expected"); - let mut datum = new_datum(DATUM_TYPE_INT32); + let mut datum = new_datum(DATUM_TYPE_TIME); datum.i32_val = array.value(row_id); datum } @@ -437,8 +469,8 @@ fn core_row_to_ffi_fields(row: &fcore::row::ColumnarRow) -> Vec { .as_any() .downcast_ref::() .expect("Time64(microsecond) column expected"); - let mut datum = new_datum(DATUM_TYPE_INT64); - datum.i64_val = array.value(row_id); + let mut datum = new_datum(DATUM_TYPE_TIME); + datum.i32_val = (array.value(row_id) / MICROS_PER_MILLI) as i32; datum } TimeUnit::Nanosecond => { @@ -447,8 +479,8 @@ fn core_row_to_ffi_fields(row: &fcore::row::ColumnarRow) -> Vec { .as_any() .downcast_ref::() .expect("Time64(nanosecond) column expected"); - let mut datum = new_datum(DATUM_TYPE_INT64); - datum.i64_val = array.value(row_id); + let mut datum = new_datum(DATUM_TYPE_TIME); + datum.i32_val = (array.value(row_id) / NANOS_PER_MILLI) as i32; datum } _ => panic!("Will never come here. Unsupported Time64 unit for column {i}"),