From 0460218b3681f220c0fdfeb8bb1edfa2a6b28c7c Mon Sep 17 00:00:00 2001 From: Prajwal-banakar Date: Sat, 24 Jan 2026 05:54:57 +0000 Subject: [PATCH 1/7] add Rust client documentation --- package-lock.json | 6 + website/docs/apis/rust-client.md | 243 +++++++++++++++++++++++++++++++ 2 files changed, 249 insertions(+) create mode 100644 package-lock.json create mode 100644 website/docs/apis/rust-client.md diff --git a/package-lock.json b/package-lock.json new file mode 100644 index 0000000000..3624c8081c --- /dev/null +++ b/package-lock.json @@ -0,0 +1,6 @@ +{ + "name": "fluss", + "lockfileVersion": 3, + "requires": true, + "packages": {} +} diff --git a/website/docs/apis/rust-client.md b/website/docs/apis/rust-client.md new file mode 100644 index 0000000000..308d2fbd3b --- /dev/null +++ b/website/docs/apis/rust-client.md @@ -0,0 +1,243 @@ +--- +title: "Rust Client" +sidebar_position: 3 +--- + +# Fluss Rust Client + +## Overview +The Fluss Rust Client provides an interface for interacting with Fluss clusters. It supports asynchronous operations for managing resources and handling data. + +The client provides two main APIs: +* **Admin API**: For managing databases, tables, partitions, and retrieving metadata. +* **Table API**: For reading from and writing to Fluss tables. + +## Installation +To use the Fluss Rust client, add the following dependency to your `Cargo.toml` file: + +```toml +[dependencies] +fluss-client = "0.6.0" # Replace with the latest version +tokio = { version = "1", features = ["full"] } +``` + +## Initialization + +The `Connection` object is the entry point for interacting with Fluss. It is created using `Connection::create()` and requires a `Configuration` object. + +The `Connection` object is thread-safe and can be shared across multiple tasks. It is recommended to create a single `Connection` instance per application and use it to create multiple `Admin` and `Table` instances. + +```rust +use fluss_client::Connection; +use fluss_client::config::Configuration; +use fluss_client::admin::Admin; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Create configuration + let mut conf = Configuration::default(); + conf.set_string("bootstrap.servers", "localhost:9123"); + + // Create connection + let connection = Connection::create(conf).await?; + + // Obtain Admin instance + let admin = connection.get_admin(); + let databases = admin.list_databases().await?; + println!("Databases: {:?}", databases); + + Ok(()) +} +``` + +### SASL Authentication +If your Fluss cluster uses SASL authentication, configure the security properties: + +```rust + let mut conf = Configuration::default(); + conf.set_string("bootstrap.servers", "localhost:9123"); + conf.set_string("client.security.protocol", "sasl"); + conf.set_string("client.security.sasl.mechanism", "PLAIN"); + conf.set_string("client.security.sasl.username", "alice"); + conf.set_string("client.security.sasl.password", "alice-secret"); + + let connection = Connection::create(conf).await?; +``` + +## Async Operations +All operations in the Fluss Rust client are asynchronous and return a `Future`. You should use the `.await` syntax to wait for the result of an operation. The client is designed to work with the `tokio` runtime. + +## Admin API + +The `Admin` API allows you to manage databases and tables. + +### Creating a Database + +```rust +use fluss_client::admin::DatabaseDescriptor; + +// Create database descriptor +let descriptor = DatabaseDescriptor::builder() + .comment("This is a test database") + .add_custom_property("owner", "data-team") + .build(); + +// Create database (ignore_if_exists=true) +admin.create_database("my_db", descriptor, true).await?; +println!("Database created successfully"); +``` + +### Creating a Table + +```rust +use fluss_client::metadata::{Schema, TableDescriptor}; +use fluss_client::types::DataTypes; + +// Define schema +let schema = Schema::builder() + .column("id", DataTypes::STRING()) + .column("age", DataTypes::INT()) + .column("created_at", DataTypes::TIMESTAMP()) + .column("is_active", DataTypes::BOOLEAN()) + .primary_key(vec!["id"]) + .build(); + +// Create table descriptor +let table_descriptor = TableDescriptor::builder() + .schema(schema) + .distributed_by(1, vec!["id"]) + .build(); + +// Create table +let table_path = "my_db.user_table"; +admin.create_table(table_path, table_descriptor, false).await?; + +// Get table info +let table_info = admin.get_table_info(table_path).await?; +println!("Table Info: {:?}", table_info); +``` + +## Table API + +### Writers + +To write data, first obtain a `Table` instance. Fluss supports `UpsertWriter` for Primary Key tables and `AppendWriter` for Log tables. + +```rust +let table = connection.get_table("my_db.user_table").await?; +``` + +#### Writing to a Primary Key Table + +```rust +use fluss_client::row::Row; +use fluss_client::types::{Timestamp, TimestampNtz}; +use std::time::SystemTime; + +// Create writer +let mut writer = table.new_upsert().create_writer().await?; + +// Prepare data +// Note: Data must be passed as a Row object matching the schema +let row1 = Row::new() + .set("id", "1") + .set("age", 20) + .set("created_at", TimestampNtz::from(SystemTime::now())) + .set("is_active", true); + +let row2 = Row::new() + .set("id", "2") + .set("age", 22) + .set("created_at", TimestampNtz::from(SystemTime::now())) + .set("is_active", true); + +// Upsert data +writer.upsert(row1).await?; +writer.upsert(row2).await?; + +// Flush to ensure data is sent +writer.flush().await?; +``` + +#### Writing to a Log Table + +```rust +// Create append writer +let mut writer = table.new_append().create_writer().await?; + +// Append data +let row = Row::new() + .set("user_id", "user_log_1") + .set("event", "login_event"); + +writer.append(row).await?; +writer.flush().await?; +``` + +### Scanner + +To read data, create a `LogScanner` and subscribe to buckets. + +```rust +use fluss_client::scanner::ScanRecord; +use std::time::Duration; + +// Create scanner +let mut scanner = table.new_scan().create_log_scanner().await?; + +// Subscribe to all buckets from the beginning +let num_buckets = table.get_table_info().num_buckets(); +for i in 0..num_buckets { + scanner.subscribe_from_beginning(i); +} + +// Poll for records +loop { + let scan_records = scanner.poll(Duration::from_millis(1000)).await?; + for bucket in scan_records.buckets() { + let records = scan_records.records(bucket); + for record in records { + let row = record.get_row(); + println!("Received row: {:?}", row); + } + } +} +``` + +### Lookup + +You can perform key-based lookups on Primary Key tables. + +```rust +// Create lookuper +let lookuper = table.new_lookup().create_lookuper().await?; + +// Lookup by key +// Key must be passed as a Row with PK columns +let key = Row::new().set("id", "1"); +let result_row = lookuper.lookup(key).await?; + +match result_row { + Some(row) => println!("Found row: {:?}", row), + None => println!("Row not found"), +} +``` + +## Type Mapping + +The Rust client maps Fluss types to Rust types as follows: + +| Fluss Type | Rust Type | +|---|---| +| INT | i32 | +| BIGINT | i64 | +| STRING | String | +| BOOLEAN | bool | +| FLOAT | f32 | +| DOUBLE | f64 | +| DECIMAL | rust_decimal::Decimal | +| DATE | chrono::NaiveDate | +| TIME | chrono::NaiveTime | +| TIMESTAMP | chrono::NaiveDateTime | +| TIMESTAMP_LTZ | `chrono::DateTime` | +| BINARY / BYTES | `Vec` | From 2d072607f2c9da589219d25164e2789d4f847a17 Mon Sep 17 00:00:00 2001 From: Prajwal-banakar Date: Sat, 24 Jan 2026 16:00:04 +0000 Subject: [PATCH 2/7] address Rust reviewer feedback on installation and logical flow --- website/docs/apis/rust-client.md | 74 +++++++++----------------------- 1 file changed, 21 insertions(+), 53 deletions(-) diff --git a/website/docs/apis/rust-client.md b/website/docs/apis/rust-client.md index 308d2fbd3b..982cf2931c 100644 --- a/website/docs/apis/rust-client.md +++ b/website/docs/apis/rust-client.md @@ -6,19 +6,20 @@ sidebar_position: 3 # Fluss Rust Client ## Overview -The Fluss Rust Client provides an interface for interacting with Fluss clusters. It supports asynchronous operations for managing resources and handling data. +The Fluss Rust Client provides a high-performance, asynchronous interface for interacting with Fluss clusters. Built on top of the Tokio runtime, it serves as the core engine for other language bindings. The client provides two main APIs: -* **Admin API**: For managing databases, tables, partitions, and retrieving metadata. -* **Table API**: For reading from and writing to Fluss tables. +* **Admin API**: For managing databases, tables, and retrieving metadata. +* **Table API**: For high-performance data reading and writing. ## Installation -To use the Fluss Rust client, add the following dependency to your `Cargo.toml` file: +The Fluss Rust client is currently available via the official Git repository. Add it to your `Cargo.toml`: ```toml [dependencies] -fluss-client = "0.6.0" # Replace with the latest version -tokio = { version = "1", features = ["full"] } +# Install directly from the official Apache repository +fluss-client = { git = "https://github.com/apache/fluss-rust.git" } +tokio = { version = "1.0", features = ["full"] } ``` ## Initialization @@ -28,42 +29,24 @@ The `Connection` object is the entry point for interacting with Fluss. It is cre The `Connection` object is thread-safe and can be shared across multiple tasks. It is recommended to create a single `Connection` instance per application and use it to create multiple `Admin` and `Table` instances. ```rust -use fluss_client::Connection; +use fluss_client::connection::Connection; use fluss_client::config::Configuration; -use fluss_client::admin::Admin; #[tokio::main] async fn main() -> Result<(), Box> { - // Create configuration - let mut conf = Configuration::default(); + let mut conf = Configuration::new(); + // Adjust according to where your Fluss cluster is running conf.set_string("bootstrap.servers", "localhost:9123"); - // Create connection - let connection = Connection::create(conf).await?; - + let connection = Connection::new(conf).await?; + // Obtain Admin instance - let admin = connection.get_admin(); - let databases = admin.list_databases().await?; - println!("Databases: {:?}", databases); - + let admin = connection.admin().await; + Ok(()) } ``` -### SASL Authentication -If your Fluss cluster uses SASL authentication, configure the security properties: - -```rust - let mut conf = Configuration::default(); - conf.set_string("bootstrap.servers", "localhost:9123"); - conf.set_string("client.security.protocol", "sasl"); - conf.set_string("client.security.sasl.mechanism", "PLAIN"); - conf.set_string("client.security.sasl.username", "alice"); - conf.set_string("client.security.sasl.password", "alice-secret"); - - let connection = Connection::create(conf).await?; -``` - ## Async Operations All operations in the Fluss Rust client are asynchronous and return a `Future`. You should use the `.await` syntax to wait for the result of an operation. The client is designed to work with the `tokio` runtime. @@ -76,45 +59,30 @@ The `Admin` API allows you to manage databases and tables. ```rust use fluss_client::admin::DatabaseDescriptor; -// Create database descriptor -let descriptor = DatabaseDescriptor::builder() - .comment("This is a test database") - .add_custom_property("owner", "data-team") - .build(); +let mut descriptor = DatabaseDescriptor::new(); +descriptor.set_comment("Test database"); -// Create database (ignore_if_exists=true) admin.create_database("my_db", descriptor, true).await?; -println!("Database created successfully"); ``` ### Creating a Table ```rust use fluss_client::metadata::{Schema, TableDescriptor}; -use fluss_client::types::DataTypes; +use fluss_client::types::DataType; -// Define schema let schema = Schema::builder() - .column("id", DataTypes::STRING()) - .column("age", DataTypes::INT()) - .column("created_at", DataTypes::TIMESTAMP()) - .column("is_active", DataTypes::BOOLEAN()) + .column("id", DataType::String) + .column("value", DataType::Int) .primary_key(vec!["id"]) .build(); -// Create table descriptor let table_descriptor = TableDescriptor::builder() .schema(schema) .distributed_by(1, vec!["id"]) .build(); -// Create table -let table_path = "my_db.user_table"; -admin.create_table(table_path, table_descriptor, false).await?; - -// Get table info -let table_info = admin.get_table_info(table_path).await?; -println!("Table Info: {:?}", table_info); +admin.create_table("my_db.my_table", table_descriptor, false).await?; ``` ## Table API @@ -124,7 +92,7 @@ println!("Table Info: {:?}", table_info); To write data, first obtain a `Table` instance. Fluss supports `UpsertWriter` for Primary Key tables and `AppendWriter` for Log tables. ```rust -let table = connection.get_table("my_db.user_table").await?; +let table = connection.get_table("my_db.my_table").await?; ``` #### Writing to a Primary Key Table From aea7dbd6ae60fe2e68d51b8271923c610832057d Mon Sep 17 00:00:00 2001 From: Prajwal-banakar Date: Sat, 31 Jan 2026 15:32:25 +0000 Subject: [PATCH 3/7] align rust client sidebar position and clarify installation --- website/docs/apis/rust-client.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/website/docs/apis/rust-client.md b/website/docs/apis/rust-client.md index 982cf2931c..c3ab3acea7 100644 --- a/website/docs/apis/rust-client.md +++ b/website/docs/apis/rust-client.md @@ -1,6 +1,6 @@ --- title: "Rust Client" -sidebar_position: 3 +sidebar_position: 4 --- # Fluss Rust Client @@ -13,7 +13,7 @@ The client provides two main APIs: * **Table API**: For high-performance data reading and writing. ## Installation -The Fluss Rust client is currently available via the official Git repository. Add it to your `Cargo.toml`: +The Fluss Rust client is currently available via the official Git repository. To use it, add the following to your `Cargo.toml`: ```toml [dependencies] From 400f8cebeaa3813a8fdc3b4540eab7ec8c2d512d Mon Sep 17 00:00:00 2001 From: Prajwal-banakar Date: Tue, 10 Feb 2026 06:37:52 +0000 Subject: [PATCH 4/7] align rust client with recent changes --- website/docs/apis/rust-client.md | 224 +++++++++++++------------------ 1 file changed, 95 insertions(+), 129 deletions(-) diff --git a/website/docs/apis/rust-client.md b/website/docs/apis/rust-client.md index c3ab3acea7..3b5a5c0954 100644 --- a/website/docs/apis/rust-client.md +++ b/website/docs/apis/rust-client.md @@ -6,167 +6,139 @@ sidebar_position: 4 # Fluss Rust Client ## Overview -The Fluss Rust Client provides a high-performance, asynchronous interface for interacting with Fluss clusters. Built on top of the Tokio runtime, it serves as the core engine for other language bindings. +The Fluss Rust Client is a high-performance, asynchronous library powered by the `tokio` runtime. It provides a native interface for interacting with Fluss clusters with minimal overhead. The client provides two main APIs: -* **Admin API**: For managing databases, tables, and retrieving metadata. -* **Table API**: For high-performance data reading and writing. +* **Admin API**: For managing databases, tables, and partitions. +* **Table API**: For high-throughput reading (Scanners/Lookups) and writing (Writers). ## Installation -The Fluss Rust client is currently available via the official Git repository. To use it, add the following to your `Cargo.toml`: +The Fluss Rust client is published to [crates.io](https://crates.io/crates/fluss-rs) as `fluss-rs`. Note that the library name for imports is `fluss`. + +Add the following to your `Cargo.toml`: ```toml [dependencies] -# Install directly from the official Apache repository -fluss-client = { git = "https://github.com/apache/fluss-rust.git" } -tokio = { version = "1.0", features = ["full"] } +fluss-rs = "0.1" +tokio = { version = "1", features = ["full"] } ``` -## Initialization +:::tip +This page provides a lightweight introduction to the Fluss Rust Client. -The `Connection` object is the entry point for interacting with Fluss. It is created using `Connection::create()` and requires a `Configuration` object. +For a complete and up-to-date Rust client reference, including advanced APIs and examples, see the +[official Rust client documentation](https://github.com/apache/fluss-rust/blob/main/docs/rust-client.md). +::: -The `Connection` object is thread-safe and can be shared across multiple tasks. It is recommended to create a single `Connection` instance per application and use it to create multiple `Admin` and `Table` instances. + +## Initialization + +The FlussConnection is the entry point for interacting with a cluster. It is thread-safe and should be reused across your application ```rust -use fluss_client::connection::Connection; -use fluss_client::config::Configuration; +use fluss::client::FlussConnection; +use fluss::config::Config; +use fluss::error::Result; #[tokio::main] -async fn main() -> Result<(), Box> { - let mut conf = Configuration::new(); - // Adjust according to where your Fluss cluster is running - conf.set_string("bootstrap.servers", "localhost:9123"); +async fn main() -> Result<()> { + let mut config = Config::default(); + config.bootstrap_server = "127.0.0.1:9123".to_string(); - let connection = Connection::new(conf).await?; + let conn = FlussConnection::new(config).await?; - // Obtain Admin instance - let admin = connection.admin().await; + // Obtain Admin interface + let admin = conn.get_admin().await?; Ok(()) } ``` -## Async Operations -All operations in the Fluss Rust client are asynchronous and return a `Future`. You should use the `.await` syntax to wait for the result of an operation. The client is designed to work with the `tokio` runtime. - ## Admin API -The `Admin` API allows you to manage databases and tables. - -### Creating a Database - -```rust -use fluss_client::admin::DatabaseDescriptor; - -let mut descriptor = DatabaseDescriptor::new(); -descriptor.set_comment("Test database"); - -admin.create_database("my_db", descriptor, true).await?; -``` +The Admin interface allows you to manage the lifecycle of your data. ### Creating a Table ```rust -use fluss_client::metadata::{Schema, TableDescriptor}; -use fluss_client::types::DataType; +use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; let schema = Schema::builder() - .column("id", DataType::String) - .column("value", DataType::Int) - .primary_key(vec!["id"]) - .build(); - -let table_descriptor = TableDescriptor::builder() - .schema(schema) - .distributed_by(1, vec!["id"]) - .build(); - -admin.create_table("my_db.my_table", table_descriptor, false).await?; +.column("id", DataTypes::int()) +.column("name", DataTypes::string()) +.column("score", DataTypes::float()) +.primary_key(vec!["id"]) +.build()?; + +let descriptor = TableDescriptor::builder() +.schema(schema) +.bucket_count(3) +.build()?; + +let table_path = TablePath::new("my_db", "my_table"); +admin.create_table(&table_path, &descriptor, true).await?; ``` ## Table API ### Writers -To write data, first obtain a `Table` instance. Fluss supports `UpsertWriter` for Primary Key tables and `AppendWriter` for Log tables. +Fluss writers use a fire-and-forget pattern. Use flush() to ensure batch persistence or await the append call for per-record acknowledgment ```rust -let table = connection.get_table("my_db.my_table").await?; -``` +use fluss::row::GenericRow; -#### Writing to a Primary Key Table +let table = conn.get_table(&table_path).await?; +let writer = table.new_append()?.create_writer()?; -```rust -use fluss_client::row::Row; -use fluss_client::types::{Timestamp, TimestampNtz}; -use std::time::SystemTime; - -// Create writer -let mut writer = table.new_upsert().create_writer().await?; - -// Prepare data -// Note: Data must be passed as a Row object matching the schema -let row1 = Row::new() - .set("id", "1") - .set("age", 20) - .set("created_at", TimestampNtz::from(SystemTime::now())) - .set("is_active", true); - -let row2 = Row::new() - .set("id", "2") - .set("age", 22) - .set("created_at", TimestampNtz::from(SystemTime::now())) - .set("is_active", true); - -// Upsert data -writer.upsert(row1).await?; -writer.upsert(row2).await?; - -// Flush to ensure data is sent +// Prepare a row with 3 fields +let mut row = GenericRow::new(3); +row.set_field(0, 1); // id (int) +row.set_field(1, "Alice"); // name (string) +row.set_field(2, 95.5f32); // score (float) + +// Queue the write +writer.append(&row)?; + +// Ensure it is sent to the server writer.flush().await?; ``` -#### Writing to a Log Table +#### Writing to a Primary Key Table ```rust -// Create append writer -let mut writer = table.new_append().create_writer().await?; +let table = conn.get_table(&table_path).await?; +let upsert_writer = table.new_upsert()?.create_writer()?; -// Append data -let row = Row::new() - .set("user_id", "user_log_1") - .set("event", "login_event"); - -writer.append(row).await?; -writer.flush().await?; +let mut row = GenericRow::new(3); +row.set_field(0, 1); // id (PK) +row.set_field(1, "Bob"); +row.set_field(2, 88.0f32); + +// Upsert logic: inserts if new, updates if ID exists +upsert_writer.upsert(&row)?; +upsert_writer.flush().await?; ``` ### Scanner -To read data, create a `LogScanner` and subscribe to buckets. +To read data, create a LogScanner. The poll method returns ScanRecords, which requires nested iteration. ```rust -use fluss_client::scanner::ScanRecord; use std::time::Duration; +use fluss::client::EARLIEST_OFFSET; -// Create scanner -let mut scanner = table.new_scan().create_log_scanner().await?; - -// Subscribe to all buckets from the beginning -let num_buckets = table.get_table_info().num_buckets(); -for i in 0..num_buckets { - scanner.subscribe_from_beginning(i); -} +let mut scanner = table.new_scan().create_log_scanner()?; +scanner.subscribe(0, EARLIEST_OFFSET).await?; -// Poll for records loop { - let scan_records = scanner.poll(Duration::from_millis(1000)).await?; - for bucket in scan_records.buckets() { - let records = scan_records.records(bucket); - for record in records { - let row = record.get_row(); - println!("Received row: {:?}", row); + let records = scanner.poll(Duration::from_secs(5)).await?; + + // Iterate through bucket groups + for (_bucket, bucket_records) in records.records_by_buckets() { + for record in bucket_records { + let row = record.row(); + println!("Received row: {:?} at offset {}", row, record.offset()); } } } @@ -174,20 +146,18 @@ loop { ### Lookup -You can perform key-based lookups on Primary Key tables. +Perform point lookups on Primary Key tables using a key row. ```rust -// Create lookuper -let lookuper = table.new_lookup().create_lookuper().await?; +let mut lookuper = table.new_lookup()?.create_lookuper()?; -// Lookup by key -// Key must be passed as a Row with PK columns -let key = Row::new().set("id", "1"); -let result_row = lookuper.lookup(key).await?; +// Key row must only contain Primary Key columns +let mut key = GenericRow::new(1); +key.set_field(0, 1); -match result_row { - Some(row) => println!("Found row: {:?}", row), - None => println!("Row not found"), +let result = lookuper.lookup(&key).await?; +if let Some(row) = result.get_single_row()? { + println!("Found: id={}, name={}", row.get_int(0), row.get_string(1)); } ``` @@ -195,17 +165,13 @@ match result_row { The Rust client maps Fluss types to Rust types as follows: -| Fluss Type | Rust Type | -|---|---| -| INT | i32 | -| BIGINT | i64 | -| STRING | String | -| BOOLEAN | bool | -| FLOAT | f32 | -| DOUBLE | f64 | -| DECIMAL | rust_decimal::Decimal | -| DATE | chrono::NaiveDate | -| TIME | chrono::NaiveTime | -| TIMESTAMP | chrono::NaiveDateTime | -| TIMESTAMP_LTZ | `chrono::DateTime` | -| BINARY / BYTES | `Vec` | +| Fluss Type | Rust Type | Method Example | +|-----------|-----------|----------------| +| `INT` | `i32` | `row.get_int(idx)` | +| `BIGINT` | `i64` | `row.get_long(idx)` | +| `STRING` | `&str` | `row.get_string(idx)` | +| `BOOLEAN` | `bool` | `row.get_boolean(idx)` | +| `FLOAT` | `f32` | `row.get_float(idx)` | +| `DOUBLE` | `f64` | `row.get_double(idx)` | +| `BYTES` | `&[u8]` | `row.get_bytes(idx)` | + From e1a7887bf21e52731326b55e036957d0ec1902cc Mon Sep 17 00:00:00 2001 From: Prajwal-banakar Date: Mon, 9 Mar 2026 05:57:23 +0000 Subject: [PATCH 5/7] pointing to fluss-rust website. --- website/docs/apis/rust-client.md | 158 ++++--------------------------- 1 file changed, 16 insertions(+), 142 deletions(-) diff --git a/website/docs/apis/rust-client.md b/website/docs/apis/rust-client.md index 3b5a5c0954..b7e147339b 100644 --- a/website/docs/apis/rust-client.md +++ b/website/docs/apis/rust-client.md @@ -5,36 +5,28 @@ sidebar_position: 4 # Fluss Rust Client -## Overview -The Fluss Rust Client is a high-performance, asynchronous library powered by the `tokio` runtime. It provides a native interface for interacting with Fluss clusters with minimal overhead. +The Fluss Rust Client is a high-performance, asynchronous library powered by the +[Tokio](https://tokio.rs/) runtime. It provides a native interface for interacting +with Fluss clusters with minimal overhead. The client provides two main APIs: -* **Admin API**: For managing databases, tables, and partitions. -* **Table API**: For high-throughput reading (Scanners/Lookups) and writing (Writers). + +- **Admin API**: For managing databases, tables, and partitions. +- **Table API**: For high-throughput reading (Scanners/Lookups) and writing (Writers). ## Installation -The Fluss Rust client is published to [crates.io](https://crates.io/crates/fluss-rs) as `fluss-rs`. Note that the library name for imports is `fluss`. -Add the following to your `Cargo.toml`: +The Fluss Rust client is published to [crates.io](https://crates.io/crates/fluss-rs) +as `fluss-rs`. The crate's library name is `fluss`, so you import it with `use fluss::...`. +Add the following to your `Cargo.toml`: ```toml [dependencies] fluss-rs = "0.1" tokio = { version = "1", features = ["full"] } ``` -:::tip -This page provides a lightweight introduction to the Fluss Rust Client. - -For a complete and up-to-date Rust client reference, including advanced APIs and examples, see the -[official Rust client documentation](https://github.com/apache/fluss-rust/blob/main/docs/rust-client.md). -::: - - -## Initialization - -The FlussConnection is the entry point for interacting with a cluster. It is thread-safe and should be reused across your application - +## Quick Example ```rust use fluss::client::FlussConnection; use fluss::config::Config; @@ -43,135 +35,17 @@ use fluss::error::Result; #[tokio::main] async fn main() -> Result<()> { let mut config = Config::default(); - config.bootstrap_server = "127.0.0.1:9123".to_string(); + config.bootstrap_servers = "127.0.0.1:9123".to_string(); let conn = FlussConnection::new(config).await?; - - // Obtain Admin interface let admin = conn.get_admin().await?; - - Ok(()) -} -``` - -## Admin API - -The Admin interface allows you to manage the lifecycle of your data. - -### Creating a Table - -```rust -use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; - -let schema = Schema::builder() -.column("id", DataTypes::int()) -.column("name", DataTypes::string()) -.column("score", DataTypes::float()) -.primary_key(vec!["id"]) -.build()?; - -let descriptor = TableDescriptor::builder() -.schema(schema) -.bucket_count(3) -.build()?; - -let table_path = TablePath::new("my_db", "my_table"); -admin.create_table(&table_path, &descriptor, true).await?; -``` - -## Table API - -### Writers - -Fluss writers use a fire-and-forget pattern. Use flush() to ensure batch persistence or await the append call for per-record acknowledgment - -```rust -use fluss::row::GenericRow; - -let table = conn.get_table(&table_path).await?; -let writer = table.new_append()?.create_writer()?; -// Prepare a row with 3 fields -let mut row = GenericRow::new(3); -row.set_field(0, 1); // id (int) -row.set_field(1, "Alice"); // name (string) -row.set_field(2, 95.5f32); // score (float) - -// Queue the write -writer.append(&row)?; - -// Ensure it is sent to the server -writer.flush().await?; -``` - -#### Writing to a Primary Key Table - -```rust -let table = conn.get_table(&table_path).await?; -let upsert_writer = table.new_upsert()?.create_writer()?; - -let mut row = GenericRow::new(3); -row.set_field(0, 1); // id (PK) -row.set_field(1, "Bob"); -row.set_field(2, 88.0f32); - -// Upsert logic: inserts if new, updates if ID exists -upsert_writer.upsert(&row)?; -upsert_writer.flush().await?; -``` - -### Scanner - -To read data, create a LogScanner. The poll method returns ScanRecords, which requires nested iteration. - -```rust -use std::time::Duration; -use fluss::client::EARLIEST_OFFSET; - -let mut scanner = table.new_scan().create_log_scanner()?; -scanner.subscribe(0, EARLIEST_OFFSET).await?; - -loop { - let records = scanner.poll(Duration::from_secs(5)).await?; - - // Iterate through bucket groups - for (_bucket, bucket_records) in records.records_by_buckets() { - for record in bucket_records { - let row = record.row(); - println!("Received row: {:?} at offset {}", row, record.offset()); - } - } -} -``` - -### Lookup - -Perform point lookups on Primary Key tables using a key row. - -```rust -let mut lookuper = table.new_lookup()?.create_lookuper()?; - -// Key row must only contain Primary Key columns -let mut key = GenericRow::new(1); -key.set_field(0, 1); - -let result = lookuper.lookup(&key).await?; -if let Some(row) = result.get_single_row()? { - println!("Found: id={}, name={}", row.get_int(0), row.get_string(1)); + Ok(()) } ``` -## Type Mapping - -The Rust client maps Fluss types to Rust types as follows: - -| Fluss Type | Rust Type | Method Example | -|-----------|-----------|----------------| -| `INT` | `i32` | `row.get_int(idx)` | -| `BIGINT` | `i64` | `row.get_long(idx)` | -| `STRING` | `&str` | `row.get_string(idx)` | -| `BOOLEAN` | `bool` | `row.get_boolean(idx)` | -| `FLOAT` | `f32` | `row.get_float(idx)` | -| `DOUBLE` | `f64` | `row.get_double(idx)` | -| `BYTES` | `&[u8]` | `row.get_bytes(idx)` | +## Full Documentation +For the complete Rust client reference including all configuration options, +API methods, data types, error handling, and worked examples — see the +**[Fluss Rust Client documentation](https://clients.fluss.apache.org/user-guide/rust/installation)**. \ No newline at end of file From 910971b3d6462b9973fce3a5fec73db8a6f8a938 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Sun, 22 Mar 2026 19:55:46 +0000 Subject: [PATCH 6/7] Admin API, Table API link and Examples links. --- website/docs/apis/rust-client.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/website/docs/apis/rust-client.md b/website/docs/apis/rust-client.md index b7e147339b..910510c6e3 100644 --- a/website/docs/apis/rust-client.md +++ b/website/docs/apis/rust-client.md @@ -11,8 +11,8 @@ with Fluss clusters with minimal overhead. The client provides two main APIs: -- **Admin API**: For managing databases, tables, and partitions. -- **Table API**: For high-throughput reading (Scanners/Lookups) and writing (Writers). +- **[Admin API](https://clients.fluss.apache.org/user-guide/rust/api-reference#flussadmin)**: For managing databases, tables, and partitions. +- **[Table API](https://clients.fluss.apache.org/user-guide/rust/api-reference/#flusstablea)**: For reading and writing to Log and Primary Key tables ## Installation @@ -44,6 +44,8 @@ async fn main() -> Result<()> { } ``` +For more examples, see [Fluss Rust Client documentation](https://clients.fluss.apache.org/user-guide/rust/example/). + ## Full Documentation For the complete Rust client reference including all configuration options, From 682faf07f5c5a3372f8335c7759a68ab59f98c0f Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Sun, 22 Mar 2026 20:11:50 +0000 Subject: [PATCH 7/7] Remove package-lock.json --- package-lock.json | 6 ------ 1 file changed, 6 deletions(-) delete mode 100644 package-lock.json diff --git a/package-lock.json b/package-lock.json deleted file mode 100644 index 3624c8081c..0000000000 --- a/package-lock.json +++ /dev/null @@ -1,6 +0,0 @@ -{ - "name": "fluss", - "lockfileVersion": 3, - "requires": true, - "packages": {} -}