Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions bindings/cpp/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -340,3 +340,36 @@ cc_binary(
visibility = ["//visibility:public"],
)

cc_binary(
name = "fluss_cpp_admin_example",
srcs = [
"examples/admin_example.cpp",
],
deps = [":fluss_cpp"],
copts = [
"-std=c++17",
] + select({
":debug_mode": [
"-g3",
"-O0",
"-ggdb",
"-fno-omit-frame-pointer",
"-DDEBUG",
],
":fastbuild_mode": [
"-g",
"-O0",
],
":release_mode": [
"-O2",
"-DNDEBUG",
],
}),
linkopts = select({
":debug_mode": ["-g"],
":fastbuild_mode": ["-g"],
":release_mode": [],
}),
visibility = ["//visibility:public"],
)

6 changes: 6 additions & 0 deletions bindings/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ target_link_libraries(fluss_cpp_example PRIVATE Arrow::arrow_shared)
target_compile_definitions(fluss_cpp_example PRIVATE ARROW_FOUND)
target_include_directories(fluss_cpp_example PUBLIC ${CPP_INCLUDE_DIR})

add_executable(fluss_cpp_admin_example examples/admin_example.cpp)
target_link_libraries(fluss_cpp_admin_example PRIVATE fluss_cpp)
target_link_libraries(fluss_cpp_admin_example PRIVATE Arrow::arrow_shared)
target_compile_definitions(fluss_cpp_admin_example PRIVATE ARROW_FOUND)
target_include_directories(fluss_cpp_admin_example PUBLIC ${CPP_INCLUDE_DIR})

set_target_properties(fluss_cpp
PROPERTIES ADDITIONAL_CLEAN_FILES ${CARGO_TARGET_DIR}
)
Expand Down
121 changes: 121 additions & 0 deletions bindings/cpp/examples/admin_example.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <iostream>
#include <string>
#include <unordered_map>
#include <vector>

#include "fluss.hpp"

static void check(const char* step, const fluss::Result& r) {
if (!r.Ok()) {
std::cerr << step << " failed: code=" << r.error_code << " msg=" << r.error_message
<< std::endl;
std::exit(1);
}
}

int main() {
const std::string bootstrap = "127.0.0.1:9123";
const std::string db_name = "admin_example_db";
const std::string table_name = "admin_example_table";

// 1) Connect and get Admin
fluss::Connection conn;
check("connect", fluss::Connection::Connect(bootstrap, conn));

fluss::Admin admin;
check("get_admin", conn.GetAdmin(admin));

// 2) Database operations
std::cout << "--- Database operations ---" << std::endl;

bool exists = false;
check("database_exists (before create)", admin.DatabaseExists(db_name, exists));
std::cout << "Database " << db_name << " exists before create: " << (exists ? "yes" : "no")
<< std::endl;

fluss::DatabaseDescriptor db_desc;
db_desc.comment = "Example database for Admin API";
db_desc.properties["owner"] = "admin_example";
check("create_database", admin.CreateDatabase(db_name, db_desc, true));

check("database_exists (after create)", admin.DatabaseExists(db_name, exists));
std::cout << "Database " << db_name << " exists after create: " << (exists ? "yes" : "no")
<< std::endl;

fluss::DatabaseInfo db_info;
check("get_database_info", admin.GetDatabaseInfo(db_name, db_info));
std::cout << "Database info: name=" << db_info.database_name
<< " comment=" << db_info.comment << " created_time=" << db_info.created_time
<< std::endl;

std::vector<std::string> databases;
check("list_databases", admin.ListDatabases(databases));
std::cout << "List databases (" << databases.size() << "): ";
for (size_t i = 0; i < databases.size(); ++i) {
if (i > 0) std::cout << ", ";
std::cout << databases[i];
}
std::cout << std::endl;

// 3) Table operations in the new database
std::cout << "--- Table operations ---" << std::endl;

fluss::TablePath table_path(db_name, table_name);

bool table_exists_flag = false;
check("table_exists (before create)", admin.TableExists(table_path, table_exists_flag));
std::cout << "Table " << db_name << "." << table_name
<< " exists before create: " << (table_exists_flag ? "yes" : "no") << std::endl;

auto schema = fluss::Schema::NewBuilder()
.AddColumn("id", fluss::DataType::Int())
.AddColumn("name", fluss::DataType::String())
.Build();
auto descriptor = fluss::TableDescriptor::NewBuilder()
.SetSchema(schema)
.SetBucketCount(1)
.SetComment("admin example table")
.Build();

check("create_table", admin.CreateTable(table_path, descriptor, true));

check("table_exists (after create)", admin.TableExists(table_path, table_exists_flag));
std::cout << "Table exists after create: " << (table_exists_flag ? "yes" : "no") << std::endl;

std::vector<std::string> tables;
check("list_tables", admin.ListTables(db_name, tables));
std::cout << "List tables in " << db_name << " (" << tables.size() << "): ";
for (size_t i = 0; i < tables.size(); ++i) {
if (i > 0) std::cout << ", ";
std::cout << tables[i];
}
std::cout << std::endl;

// 4) Cleanup: drop table, then drop database
std::cout << "--- Cleanup ---" << std::endl;
check("drop_table", admin.DropTable(table_path, true));
check("drop_database", admin.DropDatabase(db_name, true, true));

check("database_exists (after drop)", admin.DatabaseExists(db_name, exists));
std::cout << "Database exists after drop: " << (exists ? "yes" : "no") << std::endl;

std::cout << "Admin example completed successfully." << std::endl;
return 0;
}
36 changes: 36 additions & 0 deletions bindings/cpp/include/fluss.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,21 @@ struct PartitionInfo {
std::string partition_name;
};

/// Descriptor for create_database (optional). Leave comment and properties empty for default.
struct DatabaseDescriptor {
std::string comment;
std::unordered_map<std::string, std::string> properties;
};

/// Metadata returned by GetDatabaseInfo.
struct DatabaseInfo {
std::string database_name;
std::string comment;
std::unordered_map<std::string, std::string> properties;
int64_t created_time{0};
int64_t modified_time{0};
};

class AppendWriter;
class WriteResult;
class LogScanner;
Expand Down Expand Up @@ -773,6 +788,27 @@ class Admin {
const std::unordered_map<std::string, std::string>& partition_spec,
bool ignore_if_exists = false);

Result DropPartition(const TablePath& table_path,
const std::unordered_map<std::string, std::string>& partition_spec,
bool ignore_if_not_exists = false);

Result CreateDatabase(const std::string& database_name,
const DatabaseDescriptor& descriptor,
bool ignore_if_exists = false);

Result DropDatabase(const std::string& database_name, bool ignore_if_not_exists = false,
bool cascade = true);

Result ListDatabases(std::vector<std::string>& out);

Result DatabaseExists(const std::string& database_name, bool& out);

Result GetDatabaseInfo(const std::string& database_name, DatabaseInfo& out);

Result ListTables(const std::string& database_name, std::vector<std::string>& out);

Result TableExists(const TablePath& table_path, bool& out);

private:
Result DoListOffsets(const TablePath& table_path, const std::vector<int32_t>& bucket_ids,
const OffsetQuery& offset_query, std::unordered_map<int32_t, int64_t>& out,
Expand Down
120 changes: 120 additions & 0 deletions bindings/cpp/src/admin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,4 +204,124 @@ Result Admin::CreatePartition(const TablePath& table_path,
return utils::from_ffi_result(ffi_result);
}

Result Admin::DropPartition(const TablePath& table_path,
const std::unordered_map<std::string, std::string>& partition_spec,
bool ignore_if_not_exists) {
if (!Available()) {
return utils::make_error(1, "Admin not available");
}

auto ffi_path = utils::to_ffi_table_path(table_path);

rust::Vec<ffi::FfiPartitionKeyValue> rust_spec;
for (const auto& [key, value] : partition_spec) {
ffi::FfiPartitionKeyValue kv;
kv.key = rust::String(key);
kv.value = rust::String(value);
rust_spec.push_back(std::move(kv));
}

auto ffi_result =
admin_->drop_partition(ffi_path, std::move(rust_spec), ignore_if_not_exists);
return utils::from_ffi_result(ffi_result);
}

Result Admin::CreateDatabase(const std::string& database_name,
const DatabaseDescriptor& descriptor,
bool ignore_if_exists) {
if (!Available()) {
return utils::make_error(1, "Admin not available");
}

auto ffi_desc = utils::to_ffi_database_descriptor(descriptor);
auto ffi_result =
admin_->create_database(rust::Str(database_name), ffi_desc, ignore_if_exists);
return utils::from_ffi_result(ffi_result);
Comment thread
luoyuxia marked this conversation as resolved.
}

Result Admin::DropDatabase(const std::string& database_name, bool ignore_if_not_exists,
bool cascade) {
if (!Available()) {
return utils::make_error(1, "Admin not available");
}

auto ffi_result =
admin_->drop_database(rust::Str(database_name), ignore_if_not_exists, cascade);
return utils::from_ffi_result(ffi_result);
Comment thread
luoyuxia marked this conversation as resolved.
}

Result Admin::ListDatabases(std::vector<std::string>& out) {
if (!Available()) {
return utils::make_error(1, "Admin not available");
}

auto ffi_result = admin_->list_databases();
auto result = utils::from_ffi_result(ffi_result.result);
if (result.Ok()) {
out.clear();
out.reserve(ffi_result.database_names.size());
for (const auto& name : ffi_result.database_names) {
out.push_back(std::string(name));
}
}
return result;
}

Result Admin::DatabaseExists(const std::string& database_name, bool& out) {
if (!Available()) {
return utils::make_error(1, "Admin not available");
}

auto ffi_result = admin_->database_exists(rust::Str(database_name));
auto result = utils::from_ffi_result(ffi_result.result);
if (result.Ok()) {
out = ffi_result.value;
}
return result;
}

Result Admin::GetDatabaseInfo(const std::string& database_name, DatabaseInfo& out) {
if (!Available()) {
return utils::make_error(1, "Admin not available");
}

auto ffi_result = admin_->get_database_info(rust::Str(database_name));
auto result = utils::from_ffi_result(ffi_result.result);
if (result.Ok()) {
out = utils::from_ffi_database_info(ffi_result.database_info);
}
return result;
}

Result Admin::ListTables(const std::string& database_name, std::vector<std::string>& out) {
if (!Available()) {
return utils::make_error(1, "Admin not available");
}

auto ffi_result = admin_->list_tables(rust::Str(database_name));
auto result = utils::from_ffi_result(ffi_result.result);
if (result.Ok()) {
out.clear();
out.reserve(ffi_result.table_names.size());
for (const auto& name : ffi_result.table_names) {
out.push_back(std::string(name));
}
}
return result;
}

Result Admin::TableExists(const TablePath& table_path, bool& out) {
if (!Available()) {
return utils::make_error(1, "Admin not available");
}

auto ffi_path = utils::to_ffi_table_path(table_path);
auto ffi_result = admin_->table_exists(ffi_path);
auto result = utils::from_ffi_result(ffi_result.result);
if (result.Ok()) {
out = ffi_result.value;
}
return result;
}

} // namespace fluss
25 changes: 25 additions & 0 deletions bindings/cpp/src/ffi_converter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,5 +290,30 @@ inline LakeSnapshot from_ffi_lake_snapshot(const ffi::FfiLakeSnapshot& ffi_snaps
return snapshot;
}

inline ffi::FfiDatabaseDescriptor to_ffi_database_descriptor(
const DatabaseDescriptor& desc) {
ffi::FfiDatabaseDescriptor ffi_desc;
ffi_desc.comment = rust::String(desc.comment);
for (const auto& [k, v] : desc.properties) {
ffi::HashMapValue kv;
kv.key = rust::String(k);
kv.value = rust::String(v);
ffi_desc.properties.push_back(std::move(kv));
}
return ffi_desc;
}

inline DatabaseInfo from_ffi_database_info(const ffi::FfiDatabaseInfo& ffi_info) {
DatabaseInfo info;
info.database_name = std::string(ffi_info.database_name);
info.comment = std::string(ffi_info.comment);
info.created_time = ffi_info.created_time;
info.modified_time = ffi_info.modified_time;
for (const auto& prop : ffi_info.properties) {
info.properties[std::string(prop.key)] = std::string(prop.value);
}
return info;
}

} // namespace utils
} // namespace fluss
Loading
Loading