Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions bindings/cpp/examples/example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ int main() {

// 6) Scan
fluss::LogScanner scanner;
check("new_log_scanner", table.NewLogScanner(scanner));
check("new_log_scanner", table.NewScan().CreateLogScanner(scanner));

auto info = table.GetTableInfo();
int buckets = info.num_buckets;
Expand All @@ -126,8 +126,8 @@ int main() {
// 7) Project only id (0) and name (1) columns
std::vector<size_t> projected_columns = {0, 1};
fluss::LogScanner projected_scanner;
check("new_log_scanner_with_projection",
table.NewLogScannerWithProjection(projected_columns, projected_scanner));
check("new_log_scanner_with_projection",
table.NewScan().Project(projected_columns).CreateLogScanner(projected_scanner));

for (int b = 0; b < buckets; ++b) {
check("subscribe_projected", projected_scanner.Subscribe(b, 0));
Expand Down Expand Up @@ -226,7 +226,7 @@ int main() {
// 8.4) Use batch subscribe with offsets from list_offsets
std::cout << "\n=== Batch Subscribe Example ===" << std::endl;
fluss::LogScanner batch_scanner;
check("new_log_scanner_for_batch", table.NewLogScanner(batch_scanner));
check("new_log_scanner_for_batch", table.NewScan().CreateLogScanner(batch_scanner));

std::vector<fluss::BucketSubscription> subscriptions;
for (const auto& [bucket_id, offset] : earliest_offsets) {
Expand Down Expand Up @@ -255,9 +255,9 @@ int main() {

// 9) Test the new Arrow record batch polling functionality
std::cout << "\n=== Testing Arrow Record Batch Polling ===" << std::endl;

fluss::LogScanner arrow_scanner;
check("new_record_batch_log_scanner", table.NewRecordBatchLogScanner(arrow_scanner));
check("new_record_batch_log_scanner", table.NewScan().CreateRecordBatchScanner(arrow_scanner));

// Subscribe to all buckets starting from offset 0
for (int b = 0; b < buckets; ++b) {
Expand All @@ -279,10 +279,10 @@ int main() {

// 10) Test the new Arrow record batch polling with projection
std::cout << "\n=== Testing Arrow Record Batch Polling with Projection ===" << std::endl;

fluss::LogScanner projected_arrow_scanner;
check("new_record_batch_log_scanner_with_projection",
table.NewRecordBatchLogScannerWithProjection(projected_columns, projected_arrow_scanner));
check("new_record_batch_log_scanner_with_projection",
table.NewScan().Project(projected_columns).CreateRecordBatchScanner(projected_arrow_scanner));

// Subscribe to all buckets starting from offset 0
for (int b = 0; b < buckets; ++b) {
Expand Down
28 changes: 24 additions & 4 deletions bindings/cpp/include/fluss.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ class AppendWriter;
class LogScanner;
class Admin;
class Table;
class TableScan;

class Connection {
public:
Expand Down Expand Up @@ -490,23 +491,41 @@ class Table {
bool Available() const;

Result NewAppendWriter(AppendWriter& out);
Result NewLogScanner(LogScanner& out);
Result NewLogScannerWithProjection(const std::vector<size_t>& column_indices, LogScanner& out);
Result NewRecordBatchLogScanner(LogScanner& out);
Result NewRecordBatchLogScannerWithProjection(const std::vector<size_t>& column_indices, LogScanner& out);
TableScan NewScan();

TableInfo GetTableInfo() const;
TablePath GetTablePath() const;
bool HasPrimaryKey() const;

private:
friend class Connection;
friend class TableScan;
Table(ffi::Table* table) noexcept;

void Destroy() noexcept;
ffi::Table* table_{nullptr};
};

class TableScan {
public:
TableScan(const TableScan&) = delete;
TableScan& operator=(const TableScan&) = delete;
TableScan(TableScan&&) noexcept = default;
TableScan& operator=(TableScan&&) noexcept = default;

TableScan& Project(std::vector<size_t> column_indices);

Result CreateLogScanner(LogScanner& out);
Result CreateRecordBatchScanner(LogScanner& out);

private:
friend class Table;
explicit TableScan(ffi::Table* table) noexcept;

ffi::Table* table_{nullptr};
std::vector<size_t> projection_;
};
Comment thread
fresh-borzoni marked this conversation as resolved.

class AppendWriter {
public:
AppendWriter() noexcept;
Expand Down Expand Up @@ -550,6 +569,7 @@ class LogScanner {

private:
friend class Table;
friend class TableScan;
LogScanner(ffi::LogScanner* scanner) noexcept;

void Destroy() noexcept;
Expand Down
66 changes: 28 additions & 38 deletions bindings/cpp/src/table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,47 +71,33 @@ Result Table::NewAppendWriter(AppendWriter& out) {
}
}

Result Table::NewLogScanner(LogScanner& out) {
if (!Available()) {
return utils::make_error(1, "Table not available");
}

try {
out.scanner_ = table_->new_log_scanner();
return utils::make_ok();
} catch (const rust::Error& e) {
return utils::make_error(1, e.what());
} catch (const std::exception& e) {
return utils::make_error(1, e.what());
}
TableScan Table::NewScan() {
return TableScan(table_);
}

Result Table::NewLogScannerWithProjection(const std::vector<size_t>& column_indices, LogScanner& out) {
if (!Available()) {
return utils::make_error(1, "Table not available");
}
// TableScan implementation
TableScan::TableScan(ffi::Table* table) noexcept : table_(table) {}

try {
rust::Vec<size_t> rust_indices;
for (size_t idx : column_indices) {
rust_indices.push_back(idx);
}
out.scanner_ = table_->new_log_scanner_with_projection(std::move(rust_indices));
return utils::make_ok();
} catch (const rust::Error& e) {
return utils::make_error(1, e.what());
} catch (const std::exception& e) {
return utils::make_error(1, e.what());
}
TableScan& TableScan::Project(std::vector<size_t> column_indices) {
projection_ = std::move(column_indices);
return *this;
}

Result Table::NewRecordBatchLogScanner(LogScanner& out) {
if (!Available()) {
Result TableScan::CreateLogScanner(LogScanner& out) {
if (table_ == nullptr) {
return utils::make_error(1, "Table not available");
}

try {
out.scanner_ = table_->new_record_batch_log_scanner();
if (projection_.empty()) {
out.scanner_ = table_->new_log_scanner();
} else {
rust::Vec<size_t> rust_indices;
for (size_t idx : projection_) {
rust_indices.push_back(idx);
}
out.scanner_ = table_->new_log_scanner_with_projection(std::move(rust_indices));
}
return utils::make_ok();
} catch (const rust::Error& e) {
return utils::make_error(1, e.what());
Expand All @@ -120,17 +106,21 @@ Result Table::NewRecordBatchLogScanner(LogScanner& out) {
}
}

Result Table::NewRecordBatchLogScannerWithProjection(const std::vector<size_t>& column_indices, LogScanner& out) {
if (!Available()) {
Result TableScan::CreateRecordBatchScanner(LogScanner& out) {
if (table_ == nullptr) {
return utils::make_error(1, "Table not available");
}

try {
rust::Vec<size_t> rust_indices;
for (size_t idx : column_indices) {
rust_indices.push_back(idx);
if (projection_.empty()) {
out.scanner_ = table_->new_record_batch_log_scanner();
} else {
rust::Vec<size_t> rust_indices;
for (size_t idx : projection_) {
rust_indices.push_back(idx);
}
out.scanner_ = table_->new_record_batch_log_scanner_with_projection(std::move(rust_indices));
}
out.scanner_ = table_->new_record_batch_log_scanner_with_projection(std::move(rust_indices));
return utils::make_ok();
} catch (const rust::Error& e) {
return utils::make_error(1, e.what());
Expand Down
Loading