diff --git a/bindings/cpp/examples/example.cpp b/bindings/cpp/examples/example.cpp index 5146f282..04f9ac64 100644 --- a/bindings/cpp/examples/example.cpp +++ b/bindings/cpp/examples/example.cpp @@ -19,6 +19,8 @@ #include #include +#include +#include static void check(const char* step, const fluss::Result& r) { if (!r.Ok()) { @@ -37,6 +39,17 @@ int main() { fluss::Admin admin; check("get_admin", conn.GetAdmin(admin)); + fluss::TablePath table_path("fluss", "sample_table_cpp_v1"); + + // 2.1) Drop table if exists + std::cout << "Dropping table if exists..." << std::endl; + auto drop_result = admin.DropTable(table_path, true); + if (drop_result.Ok()) { + std::cout << "Table dropped successfully" << std::endl; + } else { + std::cout << "Table drop result: " << drop_result.error_message << std::endl; + } + // 3) Schema & descriptor auto schema = fluss::Schema::NewBuilder() .AddColumn("id", fluss::DataType::Int) @@ -47,14 +60,14 @@ int main() { auto descriptor = fluss::TableDescriptor::NewBuilder() .SetSchema(schema) - .SetBucketCount(1) + .SetBucketCount(3) .SetProperty("table.log.arrow.compression.type", "NONE") - .SetComment("cpp example table") + .SetComment("cpp example table with 3 buckets") .Build(); - fluss::TablePath table_path("fluss", "sample_table_cpp_v1"); - // ignore_if_exists=true to allow re-run - check("create_table", admin.CreateTable(table_path, descriptor, true)); + // 3.1) Create table with 3 buckets + std::cout << "Creating table with 3 buckets..." << std::endl; + check("create_table", admin.CreateTable(table_path, descriptor, false)); // 4) Get table fluss::Table table; @@ -162,5 +175,81 @@ int main() { std::exit(1); } + // 8) List offsets examples + std::cout << "\n=== List Offsets Examples ===" << std::endl; + + // 8.1) Query earliest offsets for all buckets + std::vector all_bucket_ids; + for (int b = 0; b < buckets; ++b) { + all_bucket_ids.push_back(b); + } + + std::unordered_map earliest_offsets; + check("list_earliest_offsets", + admin.ListOffsets(table_path, all_bucket_ids, + fluss::OffsetQuery::Earliest(), + earliest_offsets)); + std::cout << "Earliest offsets:" << std::endl; + for (const auto& [bucket_id, offset] : earliest_offsets) { + std::cout << " Bucket " << bucket_id << ": offset=" << offset << std::endl; + } + + // 8.2) Query latest offsets for all buckets + std::unordered_map latest_offsets; + check("list_latest_offsets", + admin.ListOffsets(table_path, all_bucket_ids, + fluss::OffsetQuery::Latest(), + latest_offsets)); + std::cout << "Latest offsets:" << std::endl; + for (const auto& [bucket_id, offset] : latest_offsets) { + std::cout << " Bucket " << bucket_id << ": offset=" << offset << std::endl; + } + + // 8.3) Query offsets for a specific timestamp (current time - 1 hour) + auto now = std::chrono::system_clock::now(); + auto one_hour_ago = now - std::chrono::hours(1); + auto timestamp_ms = std::chrono::duration_cast( + one_hour_ago.time_since_epoch()).count(); + + std::unordered_map timestamp_offsets; + check("list_timestamp_offsets", + admin.ListOffsets(table_path, all_bucket_ids, + fluss::OffsetQuery::FromTimestamp(timestamp_ms), + timestamp_offsets)); + std::cout << "Offsets for timestamp " << timestamp_ms << " (1 hour ago):" << std::endl; + for (const auto& [bucket_id, offset] : timestamp_offsets) { + std::cout << " Bucket " << bucket_id << ": offset=" << offset << std::endl; + } + + // 8.4) Use batch subscribe with offsets from list_offsets + std::cout << "\n=== Batch Subscribe Example ===" << std::endl; + fluss::LogScanner batch_scanner; + check("new_log_scanner_for_batch", table.NewLogScanner(batch_scanner)); + + std::vector subscriptions; + for (const auto& [bucket_id, offset] : earliest_offsets) { + subscriptions.push_back({bucket_id, offset}); + std::cout << "Preparing subscription: bucket=" << bucket_id + << ", offset=" << offset << std::endl; + } + + check("subscribe_batch", batch_scanner.Subscribe(subscriptions)); + std::cout << "Batch subscribed to " << subscriptions.size() << " buckets" << std::endl; + + // 8.5) Poll and verify bucket_id in records + fluss::ScanRecords batch_records; + check("poll_batch", batch_scanner.Poll(5000, batch_records)); + + std::cout << "Scanned " << batch_records.Size() << " records from batch subscription" << std::endl; + for (size_t i = 0; i < batch_records.Size() && i < 5; ++i) { + const auto& rec = batch_records[i]; + std::cout << " Record " << i << ": bucket_id=" << rec.bucket_id + << ", offset=" << rec.offset + << ", timestamp=" << rec.timestamp << std::endl; + } + if (batch_records.Size() > 5) { + std::cout << " ... and " << (batch_records.Size() - 5) << " more records" << std::endl; + } + return 0; } diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp index 002f8069..479adf97 100644 --- a/bindings/cpp/include/fluss.hpp +++ b/bindings/cpp/include/fluss.hpp @@ -63,6 +63,24 @@ enum class DatumType { Bytes = 7, }; +constexpr int64_t EARLIEST_OFFSET = -2; +constexpr int64_t LATEST_OFFSET = -1; + +enum class OffsetSpec { + Earliest = 0, + Latest = 1, + Timestamp = 2, +}; + +struct OffsetQuery { + OffsetSpec spec; + int64_t timestamp{0}; + + static OffsetQuery Earliest() { return {OffsetSpec::Earliest, 0}; } + static OffsetQuery Latest() { return {OffsetSpec::Latest, 0}; } + static OffsetQuery FromTimestamp(int64_t ts) { return {OffsetSpec::Timestamp, ts}; } +}; + struct Result { int32_t error_code{0}; std::string error_message; @@ -301,6 +319,7 @@ struct GenericRow { }; struct ScanRecord { + int32_t bucket_id; int64_t offset; int64_t timestamp; GenericRow row; @@ -324,6 +343,11 @@ struct BucketOffset { int64_t offset; }; +struct BucketSubscription { + int32_t bucket_id; + int64_t offset; +}; + struct LakeSnapshot { int64_t snapshot_id; std::vector bucket_offsets; @@ -372,10 +396,17 @@ class Admin { const TableDescriptor& descriptor, bool ignore_if_exists = false); + Result DropTable(const TablePath& table_path, bool ignore_if_not_exists = false); + Result GetTable(const TablePath& table_path, TableInfo& out); Result GetLatestLakeSnapshot(const TablePath& table_path, LakeSnapshot& out); + Result ListOffsets(const TablePath& table_path, + const std::vector& bucket_ids, + const OffsetQuery& offset_query, + std::unordered_map& out); + private: friend class Connection; Admin(ffi::Admin* admin) noexcept; @@ -448,6 +479,7 @@ class LogScanner { bool Available() const; Result Subscribe(int32_t bucket_id, int64_t start_offset); + Result Subscribe(const std::vector& bucket_offsets); Result Poll(int64_t timeout_ms, ScanRecords& out); private: diff --git a/bindings/cpp/src/admin.cpp b/bindings/cpp/src/admin.cpp index f6997a64..bf9c712f 100644 --- a/bindings/cpp/src/admin.cpp +++ b/bindings/cpp/src/admin.cpp @@ -66,6 +66,16 @@ Result Admin::CreateTable(const TablePath& table_path, return utils::from_ffi_result(ffi_result); } +Result Admin::DropTable(const TablePath& table_path, bool ignore_if_not_exists) { + if (!Available()) { + return utils::make_error(1, "Admin not available"); + } + + auto ffi_path = utils::to_ffi_table_path(table_path); + auto ffi_result = admin_->drop_table(ffi_path, ignore_if_not_exists); + return utils::from_ffi_result(ffi_result); +} + Result Admin::GetTable(const TablePath& table_path, TableInfo& out) { if (!Available()) { return utils::make_error(1, "Admin not available"); @@ -98,4 +108,36 @@ Result Admin::GetLatestLakeSnapshot(const TablePath& table_path, LakeSnapshot& o return result; } +Result Admin::ListOffsets(const TablePath& table_path, + const std::vector& bucket_ids, + const OffsetQuery& offset_query, + std::unordered_map& out) { + if (!Available()) { + return utils::make_error(1, "Admin not available"); + } + + auto ffi_path = utils::to_ffi_table_path(table_path); + + rust::Vec rust_bucket_ids; + for (int32_t id : bucket_ids) { + rust_bucket_ids.push_back(id); + } + + ffi::FfiOffsetQuery ffi_query; + ffi_query.offset_type = static_cast(offset_query.spec); + ffi_query.timestamp = offset_query.timestamp; + + auto ffi_result = admin_->list_offsets(ffi_path, std::move(rust_bucket_ids), ffi_query); + + auto result = utils::from_ffi_result(ffi_result.result); + if (result.Ok()) { + out.clear(); + for (const auto& pair : ffi_result.bucket_offsets) { + out[pair.bucket_id] = pair.offset; + } + } + + return result; +} + } // namespace fluss diff --git a/bindings/cpp/src/ffi_converter.hpp b/bindings/cpp/src/ffi_converter.hpp index 52dd7fe5..63a2e91a 100644 --- a/bindings/cpp/src/ffi_converter.hpp +++ b/bindings/cpp/src/ffi_converter.hpp @@ -222,6 +222,7 @@ inline GenericRow from_ffi_generic_row(const ffi::FfiGenericRow& ffi_row) { inline ScanRecord from_ffi_scan_record(const ffi::FfiScanRecord& ffi_record) { return ScanRecord{ + ffi_record.bucket_id, ffi_record.offset, ffi_record.timestamp, from_ffi_generic_row(ffi_record.row)}; diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index 54d69413..cd1803b8 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -104,6 +104,7 @@ mod ffi { } struct FfiScanRecord { + bucket_id: i32, offset: i64, timestamp: i64, row: FfiGenericRow, @@ -130,6 +131,26 @@ mod ffi { offset: i64, } + struct FfiOffsetQuery { + offset_type: i32, + timestamp: i64, + } + + struct FfiBucketSubscription { + bucket_id: i32, + offset: i64, + } + + struct FfiBucketOffsetPair { + bucket_id: i32, + offset: i64, + } + + struct FfiListOffsetsResult { + result: FfiResult, + bucket_offsets: Vec, + } + struct FfiLakeSnapshotResult { result: FfiResult, lake_snapshot: FfiLakeSnapshot, @@ -156,11 +177,22 @@ mod ffi { descriptor: &FfiTableDescriptor, ignore_if_exists: bool, ) -> FfiResult; + fn drop_table( + self: &Admin, + table_path: &FfiTablePath, + ignore_if_not_exists: bool, + ) -> FfiResult; fn get_table_info(self: &Admin, table_path: &FfiTablePath) -> FfiTableInfoResult; fn get_latest_lake_snapshot( self: &Admin, table_path: &FfiTablePath, ) -> FfiLakeSnapshotResult; + fn list_offsets( + self: &Admin, + table_path: &FfiTablePath, + bucket_ids: Vec, + offset_query: &FfiOffsetQuery, + ) -> FfiListOffsetsResult; // Table unsafe fn delete_table(table: *mut Table); @@ -182,6 +214,10 @@ mod ffi { // LogScanner unsafe fn delete_log_scanner(scanner: *mut LogScanner); fn subscribe(self: &LogScanner, bucket_id: i32, start_offset: i64) -> FfiResult; + fn subscribe_batch( + self: &LogScanner, + subscriptions: Vec, + ) -> FfiResult; fn poll(self: &LogScanner, timeout_ms: i64) -> FfiScanRecordsResult; } } @@ -330,6 +366,25 @@ impl Admin { } } + fn drop_table( + &self, + table_path: &ffi::FfiTablePath, + ignore_if_not_exists: bool, + ) -> ffi::FfiResult { + let path = fcore::metadata::TablePath::new( + table_path.database_name.clone(), + table_path.table_name.clone(), + ); + + let result = + RUNTIME.block_on(async { self.inner.drop_table(&path, ignore_if_not_exists).await }); + + match result { + Ok(_) => ok_result(), + Err(e) => err_result(1, e.to_string()), + } + } + fn get_table_info(&self, table_path: &ffi::FfiTablePath) -> ffi::FfiTableInfoResult { let path = fcore::metadata::TablePath::new( table_path.database_name.clone(), @@ -375,6 +430,58 @@ impl Admin { }, } } + + fn list_offsets( + &self, + table_path: &ffi::FfiTablePath, + bucket_ids: Vec, + offset_query: &ffi::FfiOffsetQuery, + ) -> ffi::FfiListOffsetsResult { + use fcore::rpc::message::OffsetSpec; + + let path = fcore::metadata::TablePath::new( + table_path.database_name.clone(), + table_path.table_name.clone(), + ); + + let offset_spec = match offset_query.offset_type { + 0 => OffsetSpec::Earliest, + 1 => OffsetSpec::Latest, + 2 => OffsetSpec::Timestamp(offset_query.timestamp), + _ => { + return ffi::FfiListOffsetsResult { + result: err_result( + 1, + format!("Invalid offset_type: {}", offset_query.offset_type), + ), + bucket_offsets: vec![], + }; + } + }; + + let result = RUNTIME.block_on(async { + self.inner + .list_offsets(&path, &bucket_ids, offset_spec) + .await + }); + + match result { + Ok(offsets) => { + let bucket_offsets: Vec = offsets + .into_iter() + .map(|(bucket_id, offset)| ffi::FfiBucketOffsetPair { bucket_id, offset }) + .collect(); + ffi::FfiListOffsetsResult { + result: ok_result(), + bucket_offsets, + } + } + Err(e) => ffi::FfiListOffsetsResult { + result: err_result(1, e.to_string()), + bucket_offsets: vec![], + }, + } + } } // Table implementation @@ -511,6 +618,21 @@ impl LogScanner { } } + fn subscribe_batch(&self, subscriptions: Vec) -> ffi::FfiResult { + use std::collections::HashMap; + let mut bucket_offsets = HashMap::new(); + for sub in subscriptions { + bucket_offsets.insert(sub.bucket_id, sub.offset); + } + + let result = RUNTIME.block_on(async { self.inner.subscribe_batch(bucket_offsets).await }); + + match result { + Ok(_) => ok_result(), + Err(e) => err_result(1, e.to_string()), + } + } + fn poll(&self, timeout_ms: i64) -> ffi::FfiScanRecordsResult { let timeout = Duration::from_millis(timeout_ms as u64); let result = RUNTIME.block_on(async { self.inner.poll(timeout).await }); diff --git a/bindings/cpp/src/table.cpp b/bindings/cpp/src/table.cpp index b28b783e..d42e1a22 100644 --- a/bindings/cpp/src/table.cpp +++ b/bindings/cpp/src/table.cpp @@ -210,6 +210,23 @@ Result LogScanner::Subscribe(int32_t bucket_id, int64_t start_offset) { return utils::from_ffi_result(ffi_result); } +Result LogScanner::Subscribe(const std::vector& bucket_offsets) { + if (!Available()) { + return utils::make_error(1, "LogScanner not available"); + } + + rust::Vec rust_subs; + for (const auto& sub : bucket_offsets) { + ffi::FfiBucketSubscription ffi_sub; + ffi_sub.bucket_id = sub.bucket_id; + ffi_sub.offset = sub.offset; + rust_subs.push_back(ffi_sub); + } + + auto ffi_result = scanner_->subscribe_batch(std::move(rust_subs)); + return utils::from_ffi_result(ffi_result); +} + Result LogScanner::Poll(int64_t timeout_ms, ScanRecords& out) { if (!Available()) { return utils::make_error(1, "LogScanner not available"); diff --git a/bindings/cpp/src/types.rs b/bindings/cpp/src/types.rs index f9404ac6..2d0eb6e4 100644 --- a/bindings/cpp/src/types.rs +++ b/bindings/cpp/src/types.rs @@ -233,12 +233,14 @@ pub fn core_scan_records_to_ffi(records: &fcore::record::ScanRecords) -> ffi::Ff let mut ffi_records = Vec::new(); // Iterate over all buckets and their records - for bucket_records in records.records_by_buckets().values() { + for (table_bucket, bucket_records) in records.records_by_buckets() { + let bucket_id = table_bucket.bucket_id(); for record in bucket_records { let row = record.row(); let fields = core_row_to_ffi_fields(row); ffi_records.push(ffi::FfiScanRecord { + bucket_id, offset: record.offset(), timestamp: record.timestamp(), row: ffi::FfiGenericRow { fields }, diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs index e185af84..6646f97c 100644 --- a/crates/fluss/src/client/admin.rs +++ b/crates/fluss/src/client/admin.rs @@ -232,6 +232,13 @@ impl FlussAdmin { .check_and_update_table_metadata(from_ref(table_path)) .await?; + if buckets_id.is_empty() { + return Err(Error::UnexpectedError { + message: "Buckets are empty.".to_string(), + source: None, + }); + } + let cluster = self.metadata.get_cluster(); let table_id = cluster.get_table(table_path).table_id; diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 1e70649e..a9384d90 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -185,6 +185,28 @@ impl LogScanner { Ok(()) } + pub async fn subscribe_batch(&self, bucket_offsets: HashMap) -> Result<()> { + self.metadata + .check_and_update_table_metadata(from_ref(&self.table_path)) + .await?; + if bucket_offsets.is_empty() { + return Err(Error::UnexpectedError { + message: "Bucket offsets are empty.".to_string(), + source: None, + }); + } + + let mut scan_bucket_offsets = HashMap::new(); + for (bucket_id, offset) in bucket_offsets { + let table_bucket = TableBucket::new(self.table_id, bucket_id); + scan_bucket_offsets.insert(table_bucket, offset); + } + + self.log_scanner_status + .assign_scan_buckets(scan_bucket_offsets); + Ok(()) + } + async fn poll_for_fetches(&self) -> Result>> { self.log_fetcher.send_fetches_and_collect().await }