From 82c610554afbab8bf7fc2fdfd8cd099198c74015 Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Thu, 5 Feb 2026 19:15:17 +0000 Subject: [PATCH 1/2] [TASK-248] Builder pattern for CPP --- bindings/cpp/examples/example.cpp | 18 ++++----- bindings/cpp/include/fluss.hpp | 23 +++++++++-- bindings/cpp/src/table.cpp | 66 +++++++++++++------------------ 3 files changed, 56 insertions(+), 51 deletions(-) diff --git a/bindings/cpp/examples/example.cpp b/bindings/cpp/examples/example.cpp index 45f7f9ea..f35f37ee 100644 --- a/bindings/cpp/examples/example.cpp +++ b/bindings/cpp/examples/example.cpp @@ -104,7 +104,7 @@ int main() { // 6) Scan fluss::LogScanner scanner; - check("new_log_scanner", table.NewLogScanner(scanner)); + check("new_log_scanner", table.NewScan().CreateLogScanner(scanner)); auto info = table.GetTableInfo(); int buckets = info.num_buckets; @@ -126,8 +126,8 @@ int main() { // 7) Project only id (0) and name (1) columns std::vector projected_columns = {0, 1}; fluss::LogScanner projected_scanner; - check("new_log_scanner_with_projection", - table.NewLogScannerWithProjection(projected_columns, projected_scanner)); + check("new_log_scanner_with_projection", + table.NewScan().Project(projected_columns).CreateLogScanner(projected_scanner)); for (int b = 0; b < buckets; ++b) { check("subscribe_projected", projected_scanner.Subscribe(b, 0)); @@ -226,7 +226,7 @@ int main() { // 8.4) Use batch subscribe with offsets from list_offsets std::cout << "\n=== Batch Subscribe Example ===" << std::endl; fluss::LogScanner batch_scanner; - check("new_log_scanner_for_batch", table.NewLogScanner(batch_scanner)); + check("new_log_scanner_for_batch", table.NewScan().CreateLogScanner(batch_scanner)); std::vector subscriptions; for (const auto& [bucket_id, offset] : earliest_offsets) { @@ -255,9 +255,9 @@ int main() { // 9) Test the new Arrow record batch polling functionality std::cout << "\n=== Testing Arrow Record Batch Polling ===" << std::endl; - + fluss::LogScanner arrow_scanner; - check("new_record_batch_log_scanner", table.NewRecordBatchLogScanner(arrow_scanner)); + check("new_record_batch_log_scanner", table.NewScan().CreateRecordBatchScanner(arrow_scanner)); // Subscribe to all buckets starting from offset 0 for (int b = 0; b < buckets; ++b) { @@ -279,10 +279,10 @@ int main() { // 10) Test the new Arrow record batch polling with projection std::cout << "\n=== Testing Arrow Record Batch Polling with Projection ===" << std::endl; - + fluss::LogScanner projected_arrow_scanner; - check("new_record_batch_log_scanner_with_projection", - table.NewRecordBatchLogScannerWithProjection(projected_columns, projected_arrow_scanner)); + check("new_record_batch_log_scanner_with_projection", + table.NewScan().Project(projected_columns).CreateRecordBatchScanner(projected_arrow_scanner)); // Subscribe to all buckets starting from offset 0 for (int b = 0; b < buckets; ++b) { diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp index d35ece2c..a1928fb5 100644 --- a/bindings/cpp/include/fluss.hpp +++ b/bindings/cpp/include/fluss.hpp @@ -407,6 +407,7 @@ class AppendWriter; class LogScanner; class Admin; class Table; +class TableScan; class Connection { public: @@ -490,10 +491,7 @@ class Table { bool Available() const; Result NewAppendWriter(AppendWriter& out); - Result NewLogScanner(LogScanner& out); - Result NewLogScannerWithProjection(const std::vector& column_indices, LogScanner& out); - Result NewRecordBatchLogScanner(LogScanner& out); - Result NewRecordBatchLogScannerWithProjection(const std::vector& column_indices, LogScanner& out); + TableScan NewScan(); TableInfo GetTableInfo() const; TablePath GetTablePath() const; @@ -501,12 +499,28 @@ class Table { private: friend class Connection; + friend class TableScan; Table(ffi::Table* table) noexcept; void Destroy() noexcept; ffi::Table* table_{nullptr}; }; +class TableScan { +public: + TableScan& Project(std::vector column_indices); + + Result CreateLogScanner(LogScanner& out); + Result CreateRecordBatchScanner(LogScanner& out); + +private: + friend class Table; + explicit TableScan(ffi::Table* table) noexcept; + + ffi::Table* table_{nullptr}; + std::vector projection_; +}; + class AppendWriter { public: AppendWriter() noexcept; @@ -550,6 +564,7 @@ class LogScanner { private: friend class Table; + friend class TableScan; LogScanner(ffi::LogScanner* scanner) noexcept; void Destroy() noexcept; diff --git a/bindings/cpp/src/table.cpp b/bindings/cpp/src/table.cpp index b327dbac..f9437908 100644 --- a/bindings/cpp/src/table.cpp +++ b/bindings/cpp/src/table.cpp @@ -71,47 +71,33 @@ Result Table::NewAppendWriter(AppendWriter& out) { } } -Result Table::NewLogScanner(LogScanner& out) { - if (!Available()) { - return utils::make_error(1, "Table not available"); - } - - try { - out.scanner_ = table_->new_log_scanner(); - return utils::make_ok(); - } catch (const rust::Error& e) { - return utils::make_error(1, e.what()); - } catch (const std::exception& e) { - return utils::make_error(1, e.what()); - } +TableScan Table::NewScan() { + return TableScan(table_); } -Result Table::NewLogScannerWithProjection(const std::vector& column_indices, LogScanner& out) { - if (!Available()) { - return utils::make_error(1, "Table not available"); - } +// TableScan implementation +TableScan::TableScan(ffi::Table* table) noexcept : table_(table) {} - try { - rust::Vec rust_indices; - for (size_t idx : column_indices) { - rust_indices.push_back(idx); - } - out.scanner_ = table_->new_log_scanner_with_projection(std::move(rust_indices)); - return utils::make_ok(); - } catch (const rust::Error& e) { - return utils::make_error(1, e.what()); - } catch (const std::exception& e) { - return utils::make_error(1, e.what()); - } +TableScan& TableScan::Project(std::vector column_indices) { + projection_ = std::move(column_indices); + return *this; } -Result Table::NewRecordBatchLogScanner(LogScanner& out) { - if (!Available()) { +Result TableScan::CreateLogScanner(LogScanner& out) { + if (table_ == nullptr) { return utils::make_error(1, "Table not available"); } try { - out.scanner_ = table_->new_record_batch_log_scanner(); + if (projection_.empty()) { + out.scanner_ = table_->new_log_scanner(); + } else { + rust::Vec rust_indices; + for (size_t idx : projection_) { + rust_indices.push_back(idx); + } + out.scanner_ = table_->new_log_scanner_with_projection(std::move(rust_indices)); + } return utils::make_ok(); } catch (const rust::Error& e) { return utils::make_error(1, e.what()); @@ -120,17 +106,21 @@ Result Table::NewRecordBatchLogScanner(LogScanner& out) { } } -Result Table::NewRecordBatchLogScannerWithProjection(const std::vector& column_indices, LogScanner& out) { - if (!Available()) { +Result TableScan::CreateRecordBatchScanner(LogScanner& out) { + if (table_ == nullptr) { return utils::make_error(1, "Table not available"); } try { - rust::Vec rust_indices; - for (size_t idx : column_indices) { - rust_indices.push_back(idx); + if (projection_.empty()) { + out.scanner_ = table_->new_record_batch_log_scanner(); + } else { + rust::Vec rust_indices; + for (size_t idx : projection_) { + rust_indices.push_back(idx); + } + out.scanner_ = table_->new_record_batch_log_scanner_with_projection(std::move(rust_indices)); } - out.scanner_ = table_->new_record_batch_log_scanner_with_projection(std::move(rust_indices)); return utils::make_ok(); } catch (const rust::Error& e) { return utils::make_error(1, e.what()); From 4b3b47a05e409abad7d46d8d73d854395e38935b Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Thu, 5 Feb 2026 22:50:19 +0000 Subject: [PATCH 2/2] delete copy operations for TableScan --- bindings/cpp/include/fluss.hpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp index a1928fb5..901b90ca 100644 --- a/bindings/cpp/include/fluss.hpp +++ b/bindings/cpp/include/fluss.hpp @@ -508,6 +508,11 @@ class Table { class TableScan { public: + TableScan(const TableScan&) = delete; + TableScan& operator=(const TableScan&) = delete; + TableScan(TableScan&&) noexcept = default; + TableScan& operator=(TableScan&&) noexcept = default; + TableScan& Project(std::vector column_indices); Result CreateLogScanner(LogScanner& out);