From ddd74bb3816539d05ab296ebb23d84320a9e0854 Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Sun, 8 Feb 2026 16:54:13 +0000 Subject: [PATCH 1/4] [TASK-285] python docs and API reference --- docs/python-api-reference.md | 278 ++++++++++++++++++++++ docs/python-client.md | 449 +++++++++++++++++++++++++++++++++++ 2 files changed, 727 insertions(+) create mode 100644 docs/python-api-reference.md create mode 100644 docs/python-client.md diff --git a/docs/python-api-reference.md b/docs/python-api-reference.md new file mode 100644 index 00000000..cc4cee73 --- /dev/null +++ b/docs/python-api-reference.md @@ -0,0 +1,278 @@ + + +# Python API Reference + +Complete API reference for the Fluss Python client. For a usage guide with examples, see the [Python Client Guide](python-client.md). + +## `Config` + +| Method / Property | Description | +|---|---| +| `Config(properties: dict = None)` | Create config from a dict of key-value pairs | +| `.bootstrap_server` | Get/set coordinator server address | +| `.request_max_size` | Get/set max request size in bytes | +| `.writer_batch_size` | Get/set write batch size in bytes | + +## `FlussConnection` + +| Method | Description | +|---|---| +| `await FlussConnection.connect(config) -> FlussConnection` | Connect to a Fluss cluster | +| `await conn.get_admin() -> FlussAdmin` | Get admin interface | +| `await conn.get_table(table_path) -> FlussTable` | Get a table for read/write operations | +| `conn.close()` | Close the connection | + +Supports `with` statement (context manager). + +## `FlussAdmin` + +| Method | Description | +|---|---| +| `await create_database(name, ignore_if_exists=False, database_descriptor=None)` | Create a database | +| `await drop_database(name, ignore_if_not_exists=False, cascade=True)` | Drop a database | +| `await list_databases() -> list[str]` | List all databases | +| `await database_exists(name) -> bool` | Check if a database exists | +| `await get_database_info(name) -> DatabaseInfo` | Get database metadata | +| `await create_table(table_path, table_descriptor, ignore_if_exists=False)` | Create a table | +| `await drop_table(table_path, ignore_if_not_exists=False)` | Drop a table | +| `await get_table(table_path) -> TableInfo` | Get table metadata | +| `await list_tables(database_name) -> list[str]` | List tables in a database | +| `await table_exists(table_path) -> bool` | Check if a table exists | +| `await list_offsets(table_path, bucket_ids, offset_type, timestamp=None) -> dict[int, int]` | Get offsets for buckets | +| `await list_partition_offsets(table_path, partition_name, bucket_ids, offset_type, timestamp=None) -> dict[int, int]` | Get offsets for a partition's buckets | +| `await create_partition(table_path, partition_spec, ignore_if_exists=False)` | Create a partition | +| `await drop_partition(table_path, partition_spec, ignore_if_not_exists=False)` | Drop a partition | +| `await list_partition_infos(table_path) -> list[PartitionInfo]` | List partitions | +| `await get_latest_lake_snapshot(table_path) -> LakeSnapshot` | Get latest lake snapshot | + +## `FlussTable` + +| Method | Description | +|---|---| +| `new_scan() -> TableScan` | Create a scan builder | +| `await new_append_writer() -> AppendWriter` | Create writer for log tables | +| `new_upsert(columns=None, column_indices=None) -> UpsertWriter` | Create writer for PK tables (optionally partial) | +| `new_lookup() -> Lookuper` | Create lookuper for PK tables | +| `get_table_info() -> TableInfo` | Get table metadata | +| `get_table_path() -> TablePath` | Get table path | +| `has_primary_key() -> bool` | Check if table has a primary key | + +## `TableScan` + +| Method | Description | +|---|---| +| `.project(indices) -> TableScan` | Project columns by index | +| `.project_by_name(names) -> TableScan` | Project columns by name | +| `await .create_log_scanner() -> LogScanner` | Create record-based scanner (for `poll()`) | +| `await .create_batch_scanner() -> LogScanner` | Create batch-based scanner (for `poll_arrow()`, `to_arrow()`, etc.) | + +## `AppendWriter` + +| Method | Description | +|---|---| +| `.append(row) -> WriteResultHandle` | Append a row (dict, list, or tuple) | +| `.write_arrow(table)` | Write a PyArrow Table | +| `.write_arrow_batch(batch) -> WriteResultHandle` | Write a PyArrow RecordBatch | +| `.write_pandas(df)` | Write a Pandas DataFrame | +| `await .flush()` | Flush all pending writes | + +## `UpsertWriter` + +| Method | Description | +|---|---| +| `.upsert(row) -> WriteResultHandle` | Upsert a row (insert or update by PK) | +| `.delete(pk) -> WriteResultHandle` | Delete a row by primary key | +| `await .flush()` | Flush all pending operations | + +## `WriteResultHandle` + +| Method | Description | +|---|---| +| `await .wait()` | Wait for server acknowledgment of this write | + +## `Lookuper` + +| Method | Description | +|---|---| +| `await .lookup(pk) -> dict \| None` | Lookup a row by primary key | + +## `LogScanner` + +| Method | Description | +|---|---| +| `.subscribe(bucket_id, start_offset)` | Subscribe to a bucket | +| `.subscribe_buckets(bucket_offsets)` | Subscribe to multiple buckets (`{bucket_id: offset}`) | +| `.subscribe_partition(partition_id, bucket_id, start_offset)` | Subscribe to a partition bucket | +| `.subscribe_partition_buckets(partition_bucket_offsets)` | Subscribe to multiple partition+bucket combos (`{(part_id, bucket_id): offset}`) | +| `.unsubscribe_partition(partition_id, bucket_id)` | Unsubscribe from a partition bucket | +| `.poll(timeout_ms) -> list[ScanRecord]` | Poll individual records (record scanner only) | +| `.poll_arrow(timeout_ms) -> pa.Table` | Poll as Arrow Table (batch scanner only) | +| `.poll_batches(timeout_ms) -> list[RecordBatch]` | Poll batches with metadata (batch scanner only) | +| `.to_arrow() -> pa.Table` | Read all subscribed data as Arrow Table (batch scanner only) | +| `.to_pandas() -> pd.DataFrame` | Read all subscribed data as DataFrame (batch scanner only) | + +## `ScanRecord` + +| Property | Description | +|---|---| +| `.bucket -> TableBucket` | Bucket this record belongs to | +| `.offset -> int` | Record offset in the log | +| `.timestamp -> int` | Record timestamp | +| `.change_type -> ChangeType` | Change type (AppendOnly, Insert, UpdateBefore, UpdateAfter, Delete) | +| `.row -> dict` | Row data as `{column_name: value}` | + +## `RecordBatch` + +| Property | Description | +|---|---| +| `.batch -> pa.RecordBatch` | Arrow RecordBatch data | +| `.bucket -> TableBucket` | Bucket this batch belongs to | +| `.base_offset -> int` | First record offset | +| `.last_offset -> int` | Last record offset | + +## `Schema` + +| Method | Description | +|---|---| +| `Schema(schema: pa.Schema, primary_keys=None)` | Create from PyArrow schema | +| `.get_column_names() -> list[str]` | Get column names | +| `.get_column_types() -> list[str]` | Get column type names | + +## `TableDescriptor` + +| Method | Description | +|---|---| +| `TableDescriptor(schema, *, partition_keys=None, bucket_count=None, bucket_keys=None, comment=None, **properties)` | Create table descriptor | +| `.get_schema() -> Schema` | Get the schema | + +## `TablePath` + +| Method / Property | Description | +|---|---| +| `TablePath(database, table)` | Create a table path | +| `.database_name -> str` | Database name | +| `.table_name -> str` | Table name | + +## `TableInfo` + +| Property / Method | Description | +|---|---| +| `.table_id -> int` | Table ID | +| `.table_path -> TablePath` | Table path | +| `.num_buckets -> int` | Number of buckets | +| `.schema_id -> int` | Schema ID | +| `.comment -> str \| None` | Table comment | +| `.created_time -> int` | Creation timestamp | +| `.modified_time -> int` | Last modification timestamp | +| `.get_primary_keys() -> list[str]` | Primary key columns | +| `.get_partition_keys() -> list[str]` | Partition columns | +| `.get_bucket_keys() -> list[str]` | Bucket key columns | +| `.has_primary_key() -> bool` | Has primary key? | +| `.is_partitioned() -> bool` | Is partitioned? | +| `.get_schema() -> Schema` | Get table schema | +| `.get_column_names() -> list[str]` | Column names | +| `.get_column_count() -> int` | Number of columns | +| `.get_properties() -> dict` | All table properties | +| `.get_custom_properties() -> dict` | Custom properties only | + +## `PartitionInfo` + +| Property | Description | +|---|---| +| `.partition_id -> int` | Partition ID | +| `.partition_name -> str` | Partition name | + +## `DatabaseDescriptor` + +| Method / Property | Description | +|---|---| +| `DatabaseDescriptor(comment=None, custom_properties=None)` | Create descriptor | +| `.comment -> str \| None` | Database comment | +| `.get_custom_properties() -> dict` | Custom properties | + +## `DatabaseInfo` + +| Property / Method | Description | +|---|---| +| `.database_name -> str` | Database name | +| `.created_time -> int` | Creation timestamp | +| `.modified_time -> int` | Last modification timestamp | +| `.get_database_descriptor() -> DatabaseDescriptor` | Get descriptor | + +## `LakeSnapshot` + +| Property / Method | Description | +|---|---| +| `.snapshot_id -> int` | Snapshot ID | +| `.table_buckets_offset -> dict[TableBucket, int]` | All bucket offsets | +| `.get_bucket_offset(bucket) -> int \| None` | Get offset for a bucket | +| `.get_table_buckets() -> list[TableBucket]` | Get all buckets | + +## `TableBucket` + +| Method / Property | Description | +|---|---| +| `TableBucket(table_id, bucket)` | Create non-partitioned bucket | +| `TableBucket.with_partition(table_id, partition_id, bucket)` | Create partitioned bucket | +| `.table_id -> int` | Table ID | +| `.bucket_id -> int` | Bucket ID | +| `.partition_id -> int \| None` | Partition ID (None if non-partitioned) | + +## `FlussError` + +| Property | Description | +|---|---| +| `.message -> str` | Error message | + +Raised for all Fluss-specific errors (connection failures, table not found, schema mismatches, etc.). Inherits from `Exception`. + +## Constants + +| Constant | Value | Description | +|---|---|---| +| `fluss.EARLIEST_OFFSET` | `-2` | Start reading from earliest available offset | +| `fluss.LATEST_OFFSET` | `-1` | Start reading from latest offset (only new records) | +| `fluss.OffsetType.EARLIEST` | `"earliest"` | For `list_offsets()` | +| `fluss.OffsetType.LATEST` | `"latest"` | For `list_offsets()` | +| `fluss.OffsetType.TIMESTAMP` | `"timestamp"` | For `list_offsets()` with timestamp | + +## `ChangeType` + +| Value | Short String | Description | +|---|---|---| +| `ChangeType.AppendOnly` (0) | `+A` | Append-only | +| `ChangeType.Insert` (1) | `+I` | Insert | +| `ChangeType.UpdateBefore` (2) | `-U` | Previous value of updated row | +| `ChangeType.UpdateAfter` (3) | `+U` | New value of updated row | +| `ChangeType.Delete` (4) | `-D` | Delete | + +## Data Types + +| PyArrow Type | Fluss Type | Python Type | +|---|---|---| +| `pa.boolean()` | Boolean | `bool` | +| `pa.int8()` / `int16()` / `int32()` / `int64()` | TinyInt / SmallInt / Int / BigInt | `int` | +| `pa.float32()` / `float64()` | Float / Double | `float` | +| `pa.string()` | String | `str` | +| `pa.binary()` | Bytes | `bytes` | +| `pa.date32()` | Date | `datetime.date` | +| `pa.time32("ms")` | Time | `datetime.time` | +| `pa.timestamp("us")` | Timestamp (NTZ) | `datetime.datetime` | +| `pa.timestamp("us", tz="UTC")` | TimestampLTZ | `datetime.datetime` | +| `pa.decimal128(precision, scale)` | Decimal | `decimal.Decimal` | diff --git a/docs/python-client.md b/docs/python-client.md new file mode 100644 index 00000000..c2a8ae75 --- /dev/null +++ b/docs/python-client.md @@ -0,0 +1,449 @@ + + +# Python Client Guide + +This guide covers how to use the Fluss Python client for reading and writing data to log tables and primary key tables. + +The Python client is async-first, built on top of the Rust core via [PyO3](https://pyo3.rs/), and uses [PyArrow](https://arrow.apache.org/docs/python/) for schema definitions and data interchange. + +## Key Concepts + +- **Log table** — an append-only table (no primary key). Records are immutable once written. Use for event streams, logs, and audit trails. +- **Primary key (PK) table** — a table with a primary key. Supports upsert, delete, and point lookups. +- **Bucket** — the unit of parallelism within a table (similar to Kafka partitions). Each table has one or more buckets. Readers subscribe to individual buckets. +- **Partition** — a way to organize data by column values (e.g. by date or region). Each partition contains its own set of buckets. Partitions must be created explicitly before writing. +- **Offset** — the position of a record within a bucket. Used to track reading progress. Start from `EARLIEST_OFFSET` to read all data, or `LATEST_OFFSET` to only read new records. + +## Prerequisites + +You need a running Fluss cluster to use the Python client. See the [Quick-Start guide](../README.md#quick-start) for how to start a local cluster. + +## Installation + +```bash +pip install pyfluss +``` + +To build from source instead, see the [Python bindings README](../bindings/python/README.md). + +## Quick Start + +A minimal end-to-end example: connect, create a table, write data, and read it back. Assumes a Fluss cluster is running on `localhost:9123`. + +```python +import asyncio +import pyarrow as pa +import fluss + +async def main(): + # Connect + config = fluss.Config({"bootstrap.servers": "127.0.0.1:9123"}) + conn = await fluss.FlussConnection.connect(config) + admin = await conn.get_admin() + + # Create a log table + schema = fluss.Schema(pa.schema([ + pa.field("id", pa.int32()), + pa.field("name", pa.string()), + pa.field("score", pa.float32()), + ])) + table_path = fluss.TablePath("fluss", "quick_start") + await admin.create_table(table_path, fluss.TableDescriptor(schema), ignore_if_exists=True) + + # Write + table = await conn.get_table(table_path) + writer = await table.new_append_writer() + writer.append({"id": 1, "name": "Alice", "score": 95.5}) + writer.append({"id": 2, "name": "Bob", "score": 87.0}) + await writer.flush() + + # Read + num_buckets = (await admin.get_table(table_path)).num_buckets + scanner = await table.new_scan().create_batch_scanner() + scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) + print(scanner.to_pandas()) + + # Cleanup + await admin.drop_table(table_path, ignore_if_not_exists=True) + conn.close() + +asyncio.run(main()) +``` + +## Connection Setup + +```python +config = fluss.Config({"bootstrap.servers": "127.0.0.1:9123"}) +conn = await fluss.FlussConnection.connect(config) +``` + +The connection also supports context managers: + +```python +with await fluss.FlussConnection.connect(config) as conn: + ... +``` + +### Configuration Options + +| Key | Description | Default | +|-----|-------------|---------| +| `bootstrap.servers` | Coordinator server address | `127.0.0.1:9123` | +| `request.max.size` | Maximum request size in bytes | `10485760` (10 MB) | +| `writer.acks` | Acknowledgment setting (`all` waits for all replicas) | `all` | +| `writer.retries` | Number of retries on failure | `2147483647` | +| `writer.batch.size` | Batch size for writes in bytes | `2097152` (2 MB) | + +## Admin Operations + +```python +admin = await conn.get_admin() +``` + +### Databases + +```python +await admin.create_database("my_database", ignore_if_exists=True) +databases = await admin.list_databases() +exists = await admin.database_exists("my_database") +await admin.drop_database("my_database", ignore_if_not_exists=True, cascade=True) +``` + +### Tables + +Schemas are defined using PyArrow and wrapped in `fluss.Schema`: + +```python +import pyarrow as pa + +schema = fluss.Schema(pa.schema([ + pa.field("id", pa.int32()), + pa.field("name", pa.string()), + pa.field("amount", pa.int64()), +])) + +table_path = fluss.TablePath("my_database", "my_table") +await admin.create_table(table_path, fluss.TableDescriptor(schema), ignore_if_exists=True) + +table_info = await admin.get_table(table_path) +tables = await admin.list_tables("my_database") +await admin.drop_table(table_path, ignore_if_not_exists=True) +``` + +`TableDescriptor` accepts these optional parameters: + +| Parameter | Description | +|---|---| +| `partition_keys` | Column names to partition by (e.g. `["region"]`) | +| `bucket_count` | Number of buckets (parallelism units) for the table | +| `bucket_keys` | Columns used to determine bucket assignment | +| `comment` | Table comment / description | + +### Offsets + +```python +# Latest offsets for buckets +offsets = await admin.list_offsets(table_path, bucket_ids=[0, 1], offset_type="latest") + +# By timestamp +offsets = await admin.list_offsets(table_path, bucket_ids=[0], offset_type="timestamp", timestamp=1704067200000) + +# Per-partition offsets +offsets = await admin.list_partition_offsets(table_path, partition_name="US", bucket_ids=[0], offset_type="latest") +``` + +## Log Tables + +Log tables are append-only tables without primary keys, suitable for event streaming. + +### Writing + +Rows can be appended as dicts, lists, or tuples. For bulk writes, use `write_arrow()`, `write_arrow_batch()`, or `write_pandas()`. + +Write methods like `append()` and `write_arrow_batch()` return a `WriteResultHandle`. You can ignore it for fire-and-forget semantics (flush at the end), or `await handle.wait()` to block until the server acknowledges that specific write. + +```python +table = await conn.get_table(table_path) +writer = await table.new_append_writer() + +# Fire-and-forget: queue writes, flush at the end +writer.append({"id": 1, "name": "Alice", "score": 95.5}) +writer.append([2, "Bob", 87.0]) +await writer.flush() + +# Per-record acknowledgment +handle = writer.append({"id": 3, "name": "Charlie", "score": 91.0}) +await handle.wait() + +# Bulk writes +writer.write_arrow(pa_table) # PyArrow Table +writer.write_arrow_batch(record_batch) # PyArrow RecordBatch +writer.write_pandas(df) # Pandas DataFrame +await writer.flush() +``` + +### Reading + +There are two scanner types: +- **Batch scanner** (`create_batch_scanner()`) — returns Arrow Tables or DataFrames, best for analytics +- **Record scanner** (`create_log_scanner()`) — returns individual records with metadata (offset, timestamp, change type), best for streaming + +And two reading modes: +- **`to_arrow()` / `to_pandas()`** — reads all data from subscribed buckets up to the current latest offset, then returns. Best for one-shot batch reads. +- **`poll_arrow()` / `poll()` / `poll_batches()`** — returns whatever data is available within the timeout, then returns. Call in a loop for continuous streaming. + +#### Batch Read (One-Shot) + +```python +num_buckets = (await admin.get_table(table_path)).num_buckets + +scanner = await table.new_scan().create_batch_scanner() +scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) + +# Reads everything up to current latest offset, then returns +arrow_table = scanner.to_arrow() +df = scanner.to_pandas() +``` + +#### Continuous Polling + +Use `poll_arrow()` or `poll()` in a loop for streaming consumption: + +```python +# Batch scanner: poll as Arrow Tables +scanner = await table.new_scan().create_batch_scanner() +scanner.subscribe(bucket_id=0, start_offset=fluss.EARLIEST_OFFSET) + +while True: + result = scanner.poll_arrow(timeout_ms=5000) + if result.num_rows > 0: + print(result.to_pandas()) + +# Record scanner: poll individual records with metadata +scanner = await table.new_scan().create_log_scanner() +scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) + +while True: + for record in scanner.poll(timeout_ms=5000): + print(f"offset={record.offset}, change={record.change_type.short_string()}, row={record.row}") +``` + +#### Subscribe from Latest Offset + +To only consume new records (skip existing data), use `LATEST_OFFSET`: + +```python +scanner = await table.new_scan().create_batch_scanner() +scanner.subscribe(bucket_id=0, start_offset=fluss.LATEST_OFFSET) +``` + +### Column Projection + +```python +scanner = await table.new_scan().project([0, 2]).create_batch_scanner() +# or by name +scanner = await table.new_scan().project_by_name(["id", "score"]).create_batch_scanner() +``` + +## Primary Key Tables + +Primary key tables support upsert, delete, and point lookup operations. + +### Creating + +Pass `primary_keys` to `fluss.Schema`: + +```python +schema = fluss.Schema( + pa.schema([ + pa.field("id", pa.int32()), + pa.field("name", pa.string()), + pa.field("age", pa.int64()), + ]), + primary_keys=["id"], +) +table_path = fluss.TablePath("fluss", "users") +await admin.create_table(table_path, fluss.TableDescriptor(schema, bucket_count=3), ignore_if_exists=True) +``` + +### Upsert, Delete, Lookup + +```python +table = await conn.get_table(table_path) + +# Upsert (fire-and-forget, flush at the end) +writer = table.new_upsert() +writer.upsert({"id": 1, "name": "Alice", "age": 25}) +writer.upsert({"id": 2, "name": "Bob", "age": 30}) +await writer.flush() + +# Per-record acknowledgment (for read-after-write) +handle = writer.upsert({"id": 3, "name": "Charlie", "age": 35}) +await handle.wait() + +# Delete by primary key +handle = writer.delete({"id": 2}) +await handle.wait() + +# Lookup +lookuper = table.new_lookup() +result = await lookuper.lookup({"id": 1}) +if result: + print(f"Found: name={result['name']}, age={result['age']}") +``` + +### Partial Updates + +Update specific columns while preserving others: + +```python +partial_writer = table.new_upsert(columns=["id", "age"]) +partial_writer.upsert({"id": 1, "age": 27}) # only updates age +await partial_writer.flush() +``` + +## Partitioned Tables + +Partitioned tables distribute data across partitions based on column values. Partitions must be created before writing. + +### Creating and Managing Partitions + +```python +schema = fluss.Schema(pa.schema([ + pa.field("id", pa.int32()), + pa.field("region", pa.string()), + pa.field("value", pa.int64()), +])) + +table_path = fluss.TablePath("fluss", "partitioned_events") +await admin.create_table( + table_path, + fluss.TableDescriptor(schema, partition_keys=["region"], bucket_count=1), + ignore_if_exists=True, +) + +# Create partitions +await admin.create_partition(table_path, {"region": "US"}, ignore_if_exists=True) +await admin.create_partition(table_path, {"region": "EU"}, ignore_if_exists=True) + +# List partitions +partition_infos = await admin.list_partition_infos(table_path) +``` + +### Writing + +Same as non-partitioned tables — include partition column values in each row: + +```python +table = await conn.get_table(table_path) +writer = await table.new_append_writer() +writer.append({"id": 1, "region": "US", "value": 100}) +writer.append({"id": 2, "region": "EU", "value": 200}) +await writer.flush() +``` + +### Reading + +Use `subscribe_partition()` or `subscribe_partition_buckets()` instead of `subscribe()`: + +```python +scanner = await table.new_scan().create_batch_scanner() + +# Subscribe to individual partitions +for p in partition_infos: + scanner.subscribe_partition(partition_id=p.partition_id, bucket_id=0, start_offset=fluss.EARLIEST_OFFSET) + +# Or batch-subscribe +scanner.subscribe_partition_buckets({ + (p.partition_id, 0): fluss.EARLIEST_OFFSET for p in partition_infos +}) + +print(scanner.to_pandas()) +``` + +### Partitioned Primary Key Tables + +Partition columns must be part of the primary key. Partitions must be created before upserting. + +```python +schema = fluss.Schema( + pa.schema([ + pa.field("user_id", pa.int32()), + pa.field("region", pa.string()), + pa.field("score", pa.int64()), + ]), + primary_keys=["user_id", "region"], +) + +table_path = fluss.TablePath("fluss", "partitioned_users") +await admin.create_table( + table_path, + fluss.TableDescriptor(schema, partition_keys=["region"]), + ignore_if_exists=True, +) + +await admin.create_partition(table_path, {"region": "US"}, ignore_if_exists=True) + +table = await conn.get_table(table_path) +writer = table.new_upsert() +writer.upsert({"user_id": 1, "region": "US", "score": 1234}) +await writer.flush() + +# Lookup includes partition columns +lookuper = table.new_lookup() +result = await lookuper.lookup({"user_id": 1, "region": "US"}) +``` + +## Error Handling + +The client raises `fluss.FlussError` for Fluss-specific errors (connection failures, table not found, invalid operations, etc.): + +```python +try: + await admin.create_table(table_path, table_descriptor) +except fluss.FlussError as e: + print(f"Fluss error: {e.message}") +``` + +Common error scenarios: +- **Connection refused** — Fluss cluster is not running or wrong address in `bootstrap.servers` +- **Table not found** — table doesn't exist or wrong database/table name +- **Partition not found** — writing to a partitioned table before creating partitions +- **Schema mismatch** — row data doesn't match the table schema + +## Data Types + +The Python client uses PyArrow types for schema definitions: + +| PyArrow Type | Fluss Type | Python Type | +|---|---|---| +| `pa.boolean()` | Boolean | `bool` | +| `pa.int8()` / `int16()` / `int32()` / `int64()` | TinyInt / SmallInt / Int / BigInt | `int` | +| `pa.float32()` / `float64()` | Float / Double | `float` | +| `pa.string()` | String | `str` | +| `pa.binary()` | Bytes | `bytes` | +| `pa.date32()` | Date | `datetime.date` | +| `pa.time32("ms")` | Time | `datetime.time` | +| `pa.timestamp("us")` | Timestamp (NTZ) | `datetime.datetime` | +| `pa.timestamp("us", tz="UTC")` | TimestampLTZ | `datetime.datetime` | +| `pa.decimal128(precision, scale)` | Decimal | `decimal.Decimal` | + +All Python native types (`date`, `time`, `datetime`, `Decimal`) work when appending rows via dicts. + +For a complete list of classes, methods, and properties, see the [Python API Reference](python-api-reference.md). From fa4d032b7317979da39751d144698795e9e9ff10 Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Mon, 9 Feb 2026 10:20:59 +0000 Subject: [PATCH 2/4] address comments --- bindings/python/fluss/__init__.pyi | 3 ++- docs/python-api-reference.md | 2 +- docs/python-client.md | 4 ++++ 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/bindings/python/fluss/__init__.pyi b/bindings/python/fluss/__init__.pyi index cc7053e4..adbfc2fe 100644 --- a/bindings/python/fluss/__init__.pyi +++ b/bindings/python/fluss/__init__.pyi @@ -610,7 +610,8 @@ class TableDescriptor: comment: Optional[str] = None, log_format: Optional[str] = None, kv_format: Optional[str] = None, - **properties: str, + properties: Optional[Dict[str, str]] = None, + custom_properties: Optional[Dict[str, str]] = None, ) -> None: ... def get_schema(self) -> Schema: ... diff --git a/docs/python-api-reference.md b/docs/python-api-reference.md index cc4cee73..54379f8a 100644 --- a/docs/python-api-reference.md +++ b/docs/python-api-reference.md @@ -158,7 +158,7 @@ Supports `with` statement (context manager). | Method | Description | |---|---| -| `TableDescriptor(schema, *, partition_keys=None, bucket_count=None, bucket_keys=None, comment=None, **properties)` | Create table descriptor | +| `TableDescriptor(schema, *, partition_keys=None, bucket_count=None, bucket_keys=None, comment=None, log_format=None, kv_format=None, properties=None, custom_properties=None)` | Create table descriptor | | `.get_schema() -> Schema` | Get the schema | ## `TablePath` diff --git a/docs/python-client.md b/docs/python-client.md index c2a8ae75..c328fbd5 100644 --- a/docs/python-client.md +++ b/docs/python-client.md @@ -154,6 +154,10 @@ await admin.drop_table(table_path, ignore_if_not_exists=True) | `bucket_count` | Number of buckets (parallelism units) for the table | | `bucket_keys` | Columns used to determine bucket assignment | | `comment` | Table comment / description | +| `log_format` | Log storage format: `"ARROW"` or `"INDEXED"` | +| `kv_format` | KV storage format for primary key tables: `"INDEXED"` or `"COMPACTED"` | +| `properties` | Table configuration properties as a dict (e.g. `{"table.replication.factor": "1"}`) | +| `custom_properties` | User-defined properties as a dict | ### Offsets From 6e31cb46f73d5a8a448b3016da4f2d99b6cd6818 Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Mon, 9 Feb 2026 12:38:55 +0000 Subject: [PATCH 3/4] bookeeping --- bindings/python/DEVELOPMENT.md | 155 ++++++ bindings/python/README.md | 494 ++++++++++++++---- .../python}/python-api-reference.md | 2 +- docs/python-client.md | 453 ---------------- 4 files changed, 552 insertions(+), 552 deletions(-) create mode 100644 bindings/python/DEVELOPMENT.md rename {docs => bindings/python}/python-api-reference.md (99%) delete mode 100644 docs/python-client.md diff --git a/bindings/python/DEVELOPMENT.md b/bindings/python/DEVELOPMENT.md new file mode 100644 index 00000000..b0970397 --- /dev/null +++ b/bindings/python/DEVELOPMENT.md @@ -0,0 +1,155 @@ + + +# Apache Fluss™ Python Bindings + +Python bindings for Fluss using PyO3 and Maturin. + +## API Overview + +### Basic Usage + +TODO: Add basic usage examples here + +### Core Classes + +#### `Config` + +Configuration for Fluss connection parameters + +#### `FlussConnection` + +Main interface for connecting to Fluss cluster + +#### `FlussAdmin` + +Administrative operations for managing tables (create, delete, etc.) + +#### `FlussTable` + +Represents a Fluss table, providing read and write operations + +#### `TableWriter` + +Used for writing data to tables, supports PyArrow and Pandas + +#### `LogScanner` + +Used for scanning table log data + + +# todo: we may move the following part to DEVELOPMENT.md +## Development + +## Requirements + +- Python 3.9+ +- Rust 1.70+ +- [uv](https://docs.astral.sh/uv/) package manager +- Linux or MacOS + +> **⚠️ Before you start:** +> Please make sure you can successfully build and run the [Fluss Rust client](../../crates/fluss/README.md) on your machine. +> The Python bindings require a working Fluss Rust backend and compatible environment. + +### Install Development Dependencies + +```bash +cd bindings/python +uv sync --all-extras +``` + +### Build Development Version + +```bash +source .venv/bin/activate +uv run maturin develop +``` + +### Build Release Version + +```bash +uv run maturin build --release +``` + +### Code Formatting and Linting + +```bash +uv run ruff format python/ +uv run ruff check python/ +``` + +### Type Checking + +```bash +uv run mypy python/ +``` + +### Run Examples + +```bash +uv run python example/example.py +``` + +### Build API docs: + +```bash +uv run pdoc fluss +``` + +### Release + +```bash +# Build wheel +uv run maturin build --release + +# Publish to PyPI +uv run maturin publish +``` + +## Project Structure +``` +bindings/python/ +├── Cargo.toml # Rust dependency configuration +├── pyproject.toml # Python project configuration +├── README.md # This file +├── src/ # Rust source code +│ ├── lib.rs # Main entry module +│ ├── config.rs # Configuration related +│ ├── connection.rs # Connection management +│ ├── admin.rs # Admin operations +│ ├── table.rs # Table operations +│ ├── types.rs # Data types +│ └── error.rs # Error handling +├── fluss/ # Python package source +│ ├── __init__.py # Python package entry +│ ├── __init__.pyi # Stub file +│ └── py.typed # Type declarations +└── example/ # Example code + └── example.py +``` + +## TODO + +- [ ] Add basic usage examples in API Overview (code snippets for Config, FlussConnection, FlussAdmin, FlussTable). +- [ ] Add a "Verifying a release" subsection with install-from-TestPyPI/PyPI and smoke-test steps. + +## License + +Apache 2.0 License diff --git a/bindings/python/README.md b/bindings/python/README.md index b0970397..825aad01 100644 --- a/bindings/python/README.md +++ b/bindings/python/README.md @@ -1,155 +1,453 @@ -# Apache Fluss™ Python Bindings +# Python Client Guide -Python bindings for Fluss using PyO3 and Maturin. +This guide covers how to use the Fluss Python client for reading and writing data to log tables and primary key tables. -## API Overview +The Python client is async-first, built on top of the Rust core via [PyO3](https://pyo3.rs/), and uses [PyArrow](https://arrow.apache.org/docs/python/) for schema definitions and data interchange. -### Basic Usage +## Key Concepts -TODO: Add basic usage examples here +- **Log table** — an append-only table (no primary key). Records are immutable once written. Use for event streams, logs, and audit trails. +- **Primary key (PK) table** — a table with a primary key. Supports upsert, delete, and point lookups. +- **Bucket** — the unit of parallelism within a table (similar to Kafka partitions). Each table has one or more buckets. Readers subscribe to individual buckets. +- **Partition** — a way to organize data by column values (e.g. by date or region). Each partition contains its own set of buckets. Partitions must be created explicitly before writing. +- **Offset** — the position of a record within a bucket. Used to track reading progress. Start from `EARLIEST_OFFSET` to read all data, or `LATEST_OFFSET` to only read new records. -### Core Classes +## Prerequisites -#### `Config` +You need a running Fluss cluster to use the Python client. See the [Quick-Start guide](../../README.md#quick-start) for how to start a local cluster. -Configuration for Fluss connection parameters +## Installation -#### `FlussConnection` +```bash +pip install pyfluss +``` -Main interface for connecting to Fluss cluster +To build from source instead, see the [Development Guide](DEVELOPMENT.md). + +## Quick Start + +A minimal end-to-end example: connect, create a table, write data, and read it back. Assumes a Fluss cluster is running on `localhost:9123`. + +```python +import asyncio +import pyarrow as pa +import fluss + +async def main(): + # Connect + config = fluss.Config({"bootstrap.servers": "127.0.0.1:9123"}) + conn = await fluss.FlussConnection.connect(config) + admin = await conn.get_admin() + + # Create a log table + schema = fluss.Schema(pa.schema([ + pa.field("id", pa.int32()), + pa.field("name", pa.string()), + pa.field("score", pa.float32()), + ])) + table_path = fluss.TablePath("fluss", "quick_start") + await admin.create_table(table_path, fluss.TableDescriptor(schema), ignore_if_exists=True) + + # Write + table = await conn.get_table(table_path) + writer = await table.new_append_writer() + writer.append({"id": 1, "name": "Alice", "score": 95.5}) + writer.append({"id": 2, "name": "Bob", "score": 87.0}) + await writer.flush() + + # Read + num_buckets = (await admin.get_table(table_path)).num_buckets + scanner = await table.new_scan().create_batch_scanner() + scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) + print(scanner.to_pandas()) + + # Cleanup + await admin.drop_table(table_path, ignore_if_not_exists=True) + conn.close() + +asyncio.run(main()) +``` -#### `FlussAdmin` +## Connection Setup + +```python +config = fluss.Config({"bootstrap.servers": "127.0.0.1:9123"}) +conn = await fluss.FlussConnection.connect(config) +``` -Administrative operations for managing tables (create, delete, etc.) +The connection also supports context managers: -#### `FlussTable` +```python +with await fluss.FlussConnection.connect(config) as conn: + ... +``` -Represents a Fluss table, providing read and write operations +### Configuration Options -#### `TableWriter` +| Key | Description | Default | +|-----|-------------|---------| +| `bootstrap.servers` | Coordinator server address | `127.0.0.1:9123` | +| `request.max.size` | Maximum request size in bytes | `10485760` (10 MB) | +| `writer.acks` | Acknowledgment setting (`all` waits for all replicas) | `all` | +| `writer.retries` | Number of retries on failure | `2147483647` | +| `writer.batch.size` | Batch size for writes in bytes | `2097152` (2 MB) | -Used for writing data to tables, supports PyArrow and Pandas +## Admin Operations -#### `LogScanner` +```python +admin = await conn.get_admin() +``` -Used for scanning table log data +### Databases +```python +await admin.create_database("my_database", ignore_if_exists=True) +databases = await admin.list_databases() +exists = await admin.database_exists("my_database") +await admin.drop_database("my_database", ignore_if_not_exists=True, cascade=True) +``` -# todo: we may move the following part to DEVELOPMENT.md -## Development +### Tables -## Requirements +Schemas are defined using PyArrow and wrapped in `fluss.Schema`: -- Python 3.9+ -- Rust 1.70+ -- [uv](https://docs.astral.sh/uv/) package manager -- Linux or MacOS +```python +import pyarrow as pa -> **⚠️ Before you start:** -> Please make sure you can successfully build and run the [Fluss Rust client](../../crates/fluss/README.md) on your machine. -> The Python bindings require a working Fluss Rust backend and compatible environment. +schema = fluss.Schema(pa.schema([ + pa.field("id", pa.int32()), + pa.field("name", pa.string()), + pa.field("amount", pa.int64()), +])) -### Install Development Dependencies +table_path = fluss.TablePath("my_database", "my_table") +await admin.create_table(table_path, fluss.TableDescriptor(schema), ignore_if_exists=True) -```bash -cd bindings/python -uv sync --all-extras +table_info = await admin.get_table(table_path) +tables = await admin.list_tables("my_database") +await admin.drop_table(table_path, ignore_if_not_exists=True) ``` -### Build Development Version +`TableDescriptor` accepts these optional parameters: -```bash -source .venv/bin/activate -uv run maturin develop +| Parameter | Description | +|---|---| +| `partition_keys` | Column names to partition by (e.g. `["region"]`) | +| `bucket_count` | Number of buckets (parallelism units) for the table | +| `bucket_keys` | Columns used to determine bucket assignment | +| `comment` | Table comment / description | +| `log_format` | Log storage format: `"ARROW"` or `"INDEXED"` | +| `kv_format` | KV storage format for primary key tables: `"INDEXED"` or `"COMPACTED"` | +| `properties` | Table configuration properties as a dict (e.g. `{"table.replication.factor": "1"}`) | +| `custom_properties` | User-defined properties as a dict | + +### Offsets + +```python +# Latest offsets for buckets +offsets = await admin.list_offsets(table_path, bucket_ids=[0, 1], offset_type="latest") + +# By timestamp +offsets = await admin.list_offsets(table_path, bucket_ids=[0], offset_type="timestamp", timestamp=1704067200000) + +# Per-partition offsets +offsets = await admin.list_partition_offsets(table_path, partition_name="US", bucket_ids=[0], offset_type="latest") ``` -### Build Release Version +## Log Tables -```bash -uv run maturin build --release +Log tables are append-only tables without primary keys, suitable for event streaming. + +### Writing + +Rows can be appended as dicts, lists, or tuples. For bulk writes, use `write_arrow()`, `write_arrow_batch()`, or `write_pandas()`. + +Write methods like `append()` and `write_arrow_batch()` return a `WriteResultHandle`. You can ignore it for fire-and-forget semantics (flush at the end), or `await handle.wait()` to block until the server acknowledges that specific write. + +```python +table = await conn.get_table(table_path) +writer = await table.new_append_writer() + +# Fire-and-forget: queue writes, flush at the end +writer.append({"id": 1, "name": "Alice", "score": 95.5}) +writer.append([2, "Bob", 87.0]) +await writer.flush() + +# Per-record acknowledgment +handle = writer.append({"id": 3, "name": "Charlie", "score": 91.0}) +await handle.wait() + +# Bulk writes +writer.write_arrow(pa_table) # PyArrow Table +writer.write_arrow_batch(record_batch) # PyArrow RecordBatch +writer.write_pandas(df) # Pandas DataFrame +await writer.flush() ``` -### Code Formatting and Linting +### Reading -```bash -uv run ruff format python/ -uv run ruff check python/ +There are two scanner types: +- **Batch scanner** (`create_batch_scanner()`) — returns Arrow Tables or DataFrames, best for analytics +- **Record scanner** (`create_log_scanner()`) — returns individual records with metadata (offset, timestamp, change type), best for streaming + +And two reading modes: +- **`to_arrow()` / `to_pandas()`** — reads all data from subscribed buckets up to the current latest offset, then returns. Best for one-shot batch reads. +- **`poll_arrow()` / `poll()` / `poll_batches()`** — returns whatever data is available within the timeout, then returns. Call in a loop for continuous streaming. + +#### Batch Read (One-Shot) + +```python +num_buckets = (await admin.get_table(table_path)).num_buckets + +scanner = await table.new_scan().create_batch_scanner() +scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) + +# Reads everything up to current latest offset, then returns +arrow_table = scanner.to_arrow() +df = scanner.to_pandas() ``` -### Type Checking +#### Continuous Polling -```bash -uv run mypy python/ +Use `poll_arrow()` or `poll()` in a loop for streaming consumption: + +```python +# Batch scanner: poll as Arrow Tables +scanner = await table.new_scan().create_batch_scanner() +scanner.subscribe(bucket_id=0, start_offset=fluss.EARLIEST_OFFSET) + +while True: + result = scanner.poll_arrow(timeout_ms=5000) + if result.num_rows > 0: + print(result.to_pandas()) + +# Record scanner: poll individual records with metadata +scanner = await table.new_scan().create_log_scanner() +scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) + +while True: + for record in scanner.poll(timeout_ms=5000): + print(f"offset={record.offset}, change={record.change_type.short_string()}, row={record.row}") ``` -### Run Examples +#### Subscribe from Latest Offset -```bash -uv run python example/example.py +To only consume new records (skip existing data), use `LATEST_OFFSET`: + +```python +scanner = await table.new_scan().create_batch_scanner() +scanner.subscribe(bucket_id=0, start_offset=fluss.LATEST_OFFSET) ``` -### Build API docs: +### Column Projection -```bash -uv run pdoc fluss +```python +scanner = await table.new_scan().project([0, 2]).create_batch_scanner() +# or by name +scanner = await table.new_scan().project_by_name(["id", "score"]).create_batch_scanner() ``` -### Release +## Primary Key Tables -```bash -# Build wheel -uv run maturin build --release +Primary key tables support upsert, delete, and point lookup operations. -# Publish to PyPI -uv run maturin publish +### Creating + +Pass `primary_keys` to `fluss.Schema`: + +```python +schema = fluss.Schema( + pa.schema([ + pa.field("id", pa.int32()), + pa.field("name", pa.string()), + pa.field("age", pa.int64()), + ]), + primary_keys=["id"], +) +table_path = fluss.TablePath("fluss", "users") +await admin.create_table(table_path, fluss.TableDescriptor(schema, bucket_count=3), ignore_if_exists=True) ``` -## Project Structure +### Upsert, Delete, Lookup + +```python +table = await conn.get_table(table_path) + +# Upsert (fire-and-forget, flush at the end) +writer = table.new_upsert() +writer.upsert({"id": 1, "name": "Alice", "age": 25}) +writer.upsert({"id": 2, "name": "Bob", "age": 30}) +await writer.flush() + +# Per-record acknowledgment (for read-after-write) +handle = writer.upsert({"id": 3, "name": "Charlie", "age": 35}) +await handle.wait() + +# Delete by primary key +handle = writer.delete({"id": 2}) +await handle.wait() + +# Lookup +lookuper = table.new_lookup() +result = await lookuper.lookup({"id": 1}) +if result: + print(f"Found: name={result['name']}, age={result['age']}") ``` -bindings/python/ -├── Cargo.toml # Rust dependency configuration -├── pyproject.toml # Python project configuration -├── README.md # This file -├── src/ # Rust source code -│ ├── lib.rs # Main entry module -│ ├── config.rs # Configuration related -│ ├── connection.rs # Connection management -│ ├── admin.rs # Admin operations -│ ├── table.rs # Table operations -│ ├── types.rs # Data types -│ └── error.rs # Error handling -├── fluss/ # Python package source -│ ├── __init__.py # Python package entry -│ ├── __init__.pyi # Stub file -│ └── py.typed # Type declarations -└── example/ # Example code - └── example.py + +### Partial Updates + +Update specific columns while preserving others: + +```python +partial_writer = table.new_upsert(columns=["id", "age"]) +partial_writer.upsert({"id": 1, "age": 27}) # only updates age +await partial_writer.flush() ``` -## TODO +## Partitioned Tables + +Partitioned tables distribute data across partitions based on column values. Partitions must be created before writing. + +### Creating and Managing Partitions + +```python +schema = fluss.Schema(pa.schema([ + pa.field("id", pa.int32()), + pa.field("region", pa.string()), + pa.field("value", pa.int64()), +])) + +table_path = fluss.TablePath("fluss", "partitioned_events") +await admin.create_table( + table_path, + fluss.TableDescriptor(schema, partition_keys=["region"], bucket_count=1), + ignore_if_exists=True, +) + +# Create partitions +await admin.create_partition(table_path, {"region": "US"}, ignore_if_exists=True) +await admin.create_partition(table_path, {"region": "EU"}, ignore_if_exists=True) + +# List partitions +partition_infos = await admin.list_partition_infos(table_path) +``` + +### Writing + +Same as non-partitioned tables — include partition column values in each row: + +```python +table = await conn.get_table(table_path) +writer = await table.new_append_writer() +writer.append({"id": 1, "region": "US", "value": 100}) +writer.append({"id": 2, "region": "EU", "value": 200}) +await writer.flush() +``` + +### Reading + +Use `subscribe_partition()` or `subscribe_partition_buckets()` instead of `subscribe()`: + +```python +scanner = await table.new_scan().create_batch_scanner() + +# Subscribe to individual partitions +for p in partition_infos: + scanner.subscribe_partition(partition_id=p.partition_id, bucket_id=0, start_offset=fluss.EARLIEST_OFFSET) + +# Or batch-subscribe +scanner.subscribe_partition_buckets({ + (p.partition_id, 0): fluss.EARLIEST_OFFSET for p in partition_infos +}) + +print(scanner.to_pandas()) +``` + +### Partitioned Primary Key Tables + +Partition columns must be part of the primary key. Partitions must be created before upserting. + +```python +schema = fluss.Schema( + pa.schema([ + pa.field("user_id", pa.int32()), + pa.field("region", pa.string()), + pa.field("score", pa.int64()), + ]), + primary_keys=["user_id", "region"], +) + +table_path = fluss.TablePath("fluss", "partitioned_users") +await admin.create_table( + table_path, + fluss.TableDescriptor(schema, partition_keys=["region"]), + ignore_if_exists=True, +) + +await admin.create_partition(table_path, {"region": "US"}, ignore_if_exists=True) + +table = await conn.get_table(table_path) +writer = table.new_upsert() +writer.upsert({"user_id": 1, "region": "US", "score": 1234}) +await writer.flush() + +# Lookup includes partition columns +lookuper = table.new_lookup() +result = await lookuper.lookup({"user_id": 1, "region": "US"}) +``` + +## Error Handling + +The client raises `fluss.FlussError` for Fluss-specific errors (connection failures, table not found, invalid operations, etc.): + +```python +try: + await admin.create_table(table_path, table_descriptor) +except fluss.FlussError as e: + print(f"Fluss error: {e.message}") +``` + +Common error scenarios: +- **Connection refused** — Fluss cluster is not running or wrong address in `bootstrap.servers` +- **Table not found** — table doesn't exist or wrong database/table name +- **Partition not found** — writing to a partitioned table before creating partitions +- **Schema mismatch** — row data doesn't match the table schema + +## Data Types + +The Python client uses PyArrow types for schema definitions: -- [ ] Add basic usage examples in API Overview (code snippets for Config, FlussConnection, FlussAdmin, FlussTable). -- [ ] Add a "Verifying a release" subsection with install-from-TestPyPI/PyPI and smoke-test steps. +| PyArrow Type | Fluss Type | Python Type | +|---|---|---| +| `pa.boolean()` | Boolean | `bool` | +| `pa.int8()` / `int16()` / `int32()` / `int64()` | TinyInt / SmallInt / Int / BigInt | `int` | +| `pa.float32()` / `float64()` | Float / Double | `float` | +| `pa.string()` | String | `str` | +| `pa.binary()` | Bytes | `bytes` | +| `pa.date32()` | Date | `datetime.date` | +| `pa.time32("ms")` | Time | `datetime.time` | +| `pa.timestamp("us")` | Timestamp (NTZ) | `datetime.datetime` | +| `pa.timestamp("us", tz="UTC")` | TimestampLTZ | `datetime.datetime` | +| `pa.decimal128(precision, scale)` | Decimal | `decimal.Decimal` | -## License +All Python native types (`date`, `time`, `datetime`, `Decimal`) work when appending rows via dicts. -Apache 2.0 License +For a complete list of classes, methods, and properties, see the [Python API Reference](python-api-reference.md). diff --git a/docs/python-api-reference.md b/bindings/python/python-api-reference.md similarity index 99% rename from docs/python-api-reference.md rename to bindings/python/python-api-reference.md index 54379f8a..c3f8922c 100644 --- a/docs/python-api-reference.md +++ b/bindings/python/python-api-reference.md @@ -18,7 +18,7 @@ # Python API Reference -Complete API reference for the Fluss Python client. For a usage guide with examples, see the [Python Client Guide](python-client.md). +Complete API reference for the Fluss Python client. For a usage guide with examples, see the [README](README.md). ## `Config` diff --git a/docs/python-client.md b/docs/python-client.md deleted file mode 100644 index c328fbd5..00000000 --- a/docs/python-client.md +++ /dev/null @@ -1,453 +0,0 @@ - - -# Python Client Guide - -This guide covers how to use the Fluss Python client for reading and writing data to log tables and primary key tables. - -The Python client is async-first, built on top of the Rust core via [PyO3](https://pyo3.rs/), and uses [PyArrow](https://arrow.apache.org/docs/python/) for schema definitions and data interchange. - -## Key Concepts - -- **Log table** — an append-only table (no primary key). Records are immutable once written. Use for event streams, logs, and audit trails. -- **Primary key (PK) table** — a table with a primary key. Supports upsert, delete, and point lookups. -- **Bucket** — the unit of parallelism within a table (similar to Kafka partitions). Each table has one or more buckets. Readers subscribe to individual buckets. -- **Partition** — a way to organize data by column values (e.g. by date or region). Each partition contains its own set of buckets. Partitions must be created explicitly before writing. -- **Offset** — the position of a record within a bucket. Used to track reading progress. Start from `EARLIEST_OFFSET` to read all data, or `LATEST_OFFSET` to only read new records. - -## Prerequisites - -You need a running Fluss cluster to use the Python client. See the [Quick-Start guide](../README.md#quick-start) for how to start a local cluster. - -## Installation - -```bash -pip install pyfluss -``` - -To build from source instead, see the [Python bindings README](../bindings/python/README.md). - -## Quick Start - -A minimal end-to-end example: connect, create a table, write data, and read it back. Assumes a Fluss cluster is running on `localhost:9123`. - -```python -import asyncio -import pyarrow as pa -import fluss - -async def main(): - # Connect - config = fluss.Config({"bootstrap.servers": "127.0.0.1:9123"}) - conn = await fluss.FlussConnection.connect(config) - admin = await conn.get_admin() - - # Create a log table - schema = fluss.Schema(pa.schema([ - pa.field("id", pa.int32()), - pa.field("name", pa.string()), - pa.field("score", pa.float32()), - ])) - table_path = fluss.TablePath("fluss", "quick_start") - await admin.create_table(table_path, fluss.TableDescriptor(schema), ignore_if_exists=True) - - # Write - table = await conn.get_table(table_path) - writer = await table.new_append_writer() - writer.append({"id": 1, "name": "Alice", "score": 95.5}) - writer.append({"id": 2, "name": "Bob", "score": 87.0}) - await writer.flush() - - # Read - num_buckets = (await admin.get_table(table_path)).num_buckets - scanner = await table.new_scan().create_batch_scanner() - scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) - print(scanner.to_pandas()) - - # Cleanup - await admin.drop_table(table_path, ignore_if_not_exists=True) - conn.close() - -asyncio.run(main()) -``` - -## Connection Setup - -```python -config = fluss.Config({"bootstrap.servers": "127.0.0.1:9123"}) -conn = await fluss.FlussConnection.connect(config) -``` - -The connection also supports context managers: - -```python -with await fluss.FlussConnection.connect(config) as conn: - ... -``` - -### Configuration Options - -| Key | Description | Default | -|-----|-------------|---------| -| `bootstrap.servers` | Coordinator server address | `127.0.0.1:9123` | -| `request.max.size` | Maximum request size in bytes | `10485760` (10 MB) | -| `writer.acks` | Acknowledgment setting (`all` waits for all replicas) | `all` | -| `writer.retries` | Number of retries on failure | `2147483647` | -| `writer.batch.size` | Batch size for writes in bytes | `2097152` (2 MB) | - -## Admin Operations - -```python -admin = await conn.get_admin() -``` - -### Databases - -```python -await admin.create_database("my_database", ignore_if_exists=True) -databases = await admin.list_databases() -exists = await admin.database_exists("my_database") -await admin.drop_database("my_database", ignore_if_not_exists=True, cascade=True) -``` - -### Tables - -Schemas are defined using PyArrow and wrapped in `fluss.Schema`: - -```python -import pyarrow as pa - -schema = fluss.Schema(pa.schema([ - pa.field("id", pa.int32()), - pa.field("name", pa.string()), - pa.field("amount", pa.int64()), -])) - -table_path = fluss.TablePath("my_database", "my_table") -await admin.create_table(table_path, fluss.TableDescriptor(schema), ignore_if_exists=True) - -table_info = await admin.get_table(table_path) -tables = await admin.list_tables("my_database") -await admin.drop_table(table_path, ignore_if_not_exists=True) -``` - -`TableDescriptor` accepts these optional parameters: - -| Parameter | Description | -|---|---| -| `partition_keys` | Column names to partition by (e.g. `["region"]`) | -| `bucket_count` | Number of buckets (parallelism units) for the table | -| `bucket_keys` | Columns used to determine bucket assignment | -| `comment` | Table comment / description | -| `log_format` | Log storage format: `"ARROW"` or `"INDEXED"` | -| `kv_format` | KV storage format for primary key tables: `"INDEXED"` or `"COMPACTED"` | -| `properties` | Table configuration properties as a dict (e.g. `{"table.replication.factor": "1"}`) | -| `custom_properties` | User-defined properties as a dict | - -### Offsets - -```python -# Latest offsets for buckets -offsets = await admin.list_offsets(table_path, bucket_ids=[0, 1], offset_type="latest") - -# By timestamp -offsets = await admin.list_offsets(table_path, bucket_ids=[0], offset_type="timestamp", timestamp=1704067200000) - -# Per-partition offsets -offsets = await admin.list_partition_offsets(table_path, partition_name="US", bucket_ids=[0], offset_type="latest") -``` - -## Log Tables - -Log tables are append-only tables without primary keys, suitable for event streaming. - -### Writing - -Rows can be appended as dicts, lists, or tuples. For bulk writes, use `write_arrow()`, `write_arrow_batch()`, or `write_pandas()`. - -Write methods like `append()` and `write_arrow_batch()` return a `WriteResultHandle`. You can ignore it for fire-and-forget semantics (flush at the end), or `await handle.wait()` to block until the server acknowledges that specific write. - -```python -table = await conn.get_table(table_path) -writer = await table.new_append_writer() - -# Fire-and-forget: queue writes, flush at the end -writer.append({"id": 1, "name": "Alice", "score": 95.5}) -writer.append([2, "Bob", 87.0]) -await writer.flush() - -# Per-record acknowledgment -handle = writer.append({"id": 3, "name": "Charlie", "score": 91.0}) -await handle.wait() - -# Bulk writes -writer.write_arrow(pa_table) # PyArrow Table -writer.write_arrow_batch(record_batch) # PyArrow RecordBatch -writer.write_pandas(df) # Pandas DataFrame -await writer.flush() -``` - -### Reading - -There are two scanner types: -- **Batch scanner** (`create_batch_scanner()`) — returns Arrow Tables or DataFrames, best for analytics -- **Record scanner** (`create_log_scanner()`) — returns individual records with metadata (offset, timestamp, change type), best for streaming - -And two reading modes: -- **`to_arrow()` / `to_pandas()`** — reads all data from subscribed buckets up to the current latest offset, then returns. Best for one-shot batch reads. -- **`poll_arrow()` / `poll()` / `poll_batches()`** — returns whatever data is available within the timeout, then returns. Call in a loop for continuous streaming. - -#### Batch Read (One-Shot) - -```python -num_buckets = (await admin.get_table(table_path)).num_buckets - -scanner = await table.new_scan().create_batch_scanner() -scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) - -# Reads everything up to current latest offset, then returns -arrow_table = scanner.to_arrow() -df = scanner.to_pandas() -``` - -#### Continuous Polling - -Use `poll_arrow()` or `poll()` in a loop for streaming consumption: - -```python -# Batch scanner: poll as Arrow Tables -scanner = await table.new_scan().create_batch_scanner() -scanner.subscribe(bucket_id=0, start_offset=fluss.EARLIEST_OFFSET) - -while True: - result = scanner.poll_arrow(timeout_ms=5000) - if result.num_rows > 0: - print(result.to_pandas()) - -# Record scanner: poll individual records with metadata -scanner = await table.new_scan().create_log_scanner() -scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) - -while True: - for record in scanner.poll(timeout_ms=5000): - print(f"offset={record.offset}, change={record.change_type.short_string()}, row={record.row}") -``` - -#### Subscribe from Latest Offset - -To only consume new records (skip existing data), use `LATEST_OFFSET`: - -```python -scanner = await table.new_scan().create_batch_scanner() -scanner.subscribe(bucket_id=0, start_offset=fluss.LATEST_OFFSET) -``` - -### Column Projection - -```python -scanner = await table.new_scan().project([0, 2]).create_batch_scanner() -# or by name -scanner = await table.new_scan().project_by_name(["id", "score"]).create_batch_scanner() -``` - -## Primary Key Tables - -Primary key tables support upsert, delete, and point lookup operations. - -### Creating - -Pass `primary_keys` to `fluss.Schema`: - -```python -schema = fluss.Schema( - pa.schema([ - pa.field("id", pa.int32()), - pa.field("name", pa.string()), - pa.field("age", pa.int64()), - ]), - primary_keys=["id"], -) -table_path = fluss.TablePath("fluss", "users") -await admin.create_table(table_path, fluss.TableDescriptor(schema, bucket_count=3), ignore_if_exists=True) -``` - -### Upsert, Delete, Lookup - -```python -table = await conn.get_table(table_path) - -# Upsert (fire-and-forget, flush at the end) -writer = table.new_upsert() -writer.upsert({"id": 1, "name": "Alice", "age": 25}) -writer.upsert({"id": 2, "name": "Bob", "age": 30}) -await writer.flush() - -# Per-record acknowledgment (for read-after-write) -handle = writer.upsert({"id": 3, "name": "Charlie", "age": 35}) -await handle.wait() - -# Delete by primary key -handle = writer.delete({"id": 2}) -await handle.wait() - -# Lookup -lookuper = table.new_lookup() -result = await lookuper.lookup({"id": 1}) -if result: - print(f"Found: name={result['name']}, age={result['age']}") -``` - -### Partial Updates - -Update specific columns while preserving others: - -```python -partial_writer = table.new_upsert(columns=["id", "age"]) -partial_writer.upsert({"id": 1, "age": 27}) # only updates age -await partial_writer.flush() -``` - -## Partitioned Tables - -Partitioned tables distribute data across partitions based on column values. Partitions must be created before writing. - -### Creating and Managing Partitions - -```python -schema = fluss.Schema(pa.schema([ - pa.field("id", pa.int32()), - pa.field("region", pa.string()), - pa.field("value", pa.int64()), -])) - -table_path = fluss.TablePath("fluss", "partitioned_events") -await admin.create_table( - table_path, - fluss.TableDescriptor(schema, partition_keys=["region"], bucket_count=1), - ignore_if_exists=True, -) - -# Create partitions -await admin.create_partition(table_path, {"region": "US"}, ignore_if_exists=True) -await admin.create_partition(table_path, {"region": "EU"}, ignore_if_exists=True) - -# List partitions -partition_infos = await admin.list_partition_infos(table_path) -``` - -### Writing - -Same as non-partitioned tables — include partition column values in each row: - -```python -table = await conn.get_table(table_path) -writer = await table.new_append_writer() -writer.append({"id": 1, "region": "US", "value": 100}) -writer.append({"id": 2, "region": "EU", "value": 200}) -await writer.flush() -``` - -### Reading - -Use `subscribe_partition()` or `subscribe_partition_buckets()` instead of `subscribe()`: - -```python -scanner = await table.new_scan().create_batch_scanner() - -# Subscribe to individual partitions -for p in partition_infos: - scanner.subscribe_partition(partition_id=p.partition_id, bucket_id=0, start_offset=fluss.EARLIEST_OFFSET) - -# Or batch-subscribe -scanner.subscribe_partition_buckets({ - (p.partition_id, 0): fluss.EARLIEST_OFFSET for p in partition_infos -}) - -print(scanner.to_pandas()) -``` - -### Partitioned Primary Key Tables - -Partition columns must be part of the primary key. Partitions must be created before upserting. - -```python -schema = fluss.Schema( - pa.schema([ - pa.field("user_id", pa.int32()), - pa.field("region", pa.string()), - pa.field("score", pa.int64()), - ]), - primary_keys=["user_id", "region"], -) - -table_path = fluss.TablePath("fluss", "partitioned_users") -await admin.create_table( - table_path, - fluss.TableDescriptor(schema, partition_keys=["region"]), - ignore_if_exists=True, -) - -await admin.create_partition(table_path, {"region": "US"}, ignore_if_exists=True) - -table = await conn.get_table(table_path) -writer = table.new_upsert() -writer.upsert({"user_id": 1, "region": "US", "score": 1234}) -await writer.flush() - -# Lookup includes partition columns -lookuper = table.new_lookup() -result = await lookuper.lookup({"user_id": 1, "region": "US"}) -``` - -## Error Handling - -The client raises `fluss.FlussError` for Fluss-specific errors (connection failures, table not found, invalid operations, etc.): - -```python -try: - await admin.create_table(table_path, table_descriptor) -except fluss.FlussError as e: - print(f"Fluss error: {e.message}") -``` - -Common error scenarios: -- **Connection refused** — Fluss cluster is not running or wrong address in `bootstrap.servers` -- **Table not found** — table doesn't exist or wrong database/table name -- **Partition not found** — writing to a partitioned table before creating partitions -- **Schema mismatch** — row data doesn't match the table schema - -## Data Types - -The Python client uses PyArrow types for schema definitions: - -| PyArrow Type | Fluss Type | Python Type | -|---|---|---| -| `pa.boolean()` | Boolean | `bool` | -| `pa.int8()` / `int16()` / `int32()` / `int64()` | TinyInt / SmallInt / Int / BigInt | `int` | -| `pa.float32()` / `float64()` | Float / Double | `float` | -| `pa.string()` | String | `str` | -| `pa.binary()` | Bytes | `bytes` | -| `pa.date32()` | Date | `datetime.date` | -| `pa.time32("ms")` | Time | `datetime.time` | -| `pa.timestamp("us")` | Timestamp (NTZ) | `datetime.datetime` | -| `pa.timestamp("us", tz="UTC")` | TimestampLTZ | `datetime.datetime` | -| `pa.decimal128(precision, scale)` | Decimal | `decimal.Decimal` | - -All Python native types (`date`, `time`, `datetime`, `Decimal`) work when appending rows via dicts. - -For a complete list of classes, methods, and properties, see the [Python API Reference](python-api-reference.md). From 5c8c101da6ec3966454031caf4cd77aa5bbddd7c Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Mon, 9 Feb 2026 15:26:25 +0000 Subject: [PATCH 4/4] cleanup --- ...thon-api-reference.md => API_REFERENCE.md} | 2 +- bindings/python/DEVELOPMENT.md | 97 ++++++------------- bindings/python/README.md | 4 +- 3 files changed, 31 insertions(+), 72 deletions(-) rename bindings/python/{python-api-reference.md => API_REFERENCE.md} (99%) diff --git a/bindings/python/python-api-reference.md b/bindings/python/API_REFERENCE.md similarity index 99% rename from bindings/python/python-api-reference.md rename to bindings/python/API_REFERENCE.md index c3f8922c..258b26e6 100644 --- a/bindings/python/python-api-reference.md +++ b/bindings/python/API_REFERENCE.md @@ -18,7 +18,7 @@ # Python API Reference -Complete API reference for the Fluss Python client. For a usage guide with examples, see the [README](README.md). +Complete API reference for the Fluss Python client. For a usage guide with examples, see the [Python Client Guide](README.md). ## `Config` diff --git a/bindings/python/DEVELOPMENT.md b/bindings/python/DEVELOPMENT.md index b0970397..e316f5e8 100644 --- a/bindings/python/DEVELOPMENT.md +++ b/bindings/python/DEVELOPMENT.md @@ -17,45 +17,7 @@ ~ under the License. --> -# Apache Fluss™ Python Bindings - -Python bindings for Fluss using PyO3 and Maturin. - -## API Overview - -### Basic Usage - -TODO: Add basic usage examples here - -### Core Classes - -#### `Config` - -Configuration for Fluss connection parameters - -#### `FlussConnection` - -Main interface for connecting to Fluss cluster - -#### `FlussAdmin` - -Administrative operations for managing tables (create, delete, etc.) - -#### `FlussTable` - -Represents a Fluss table, providing read and write operations - -#### `TableWriter` - -Used for writing data to tables, supports PyArrow and Pandas - -#### `LogScanner` - -Used for scanning table log data - - -# todo: we may move the following part to DEVELOPMENT.md -## Development +# Development ## Requirements @@ -64,56 +26,56 @@ Used for scanning table log data - [uv](https://docs.astral.sh/uv/) package manager - Linux or MacOS -> **⚠️ Before you start:** -> Please make sure you can successfully build and run the [Fluss Rust client](../../crates/fluss/README.md) on your machine. +> **Before you start:** +> Please make sure you can successfully build and run the [Fluss Rust client](../../crates/fluss/README.md) on your machine. > The Python bindings require a working Fluss Rust backend and compatible environment. -### Install Development Dependencies +## Install Development Dependencies ```bash cd bindings/python uv sync --all-extras ``` -### Build Development Version +## Build Development Version ```bash source .venv/bin/activate uv run maturin develop ``` -### Build Release Version +## Build Release Version ```bash uv run maturin build --release ``` -### Code Formatting and Linting +## Code Formatting and Linting ```bash uv run ruff format python/ uv run ruff check python/ ``` -### Type Checking +## Type Checking ```bash uv run mypy python/ ``` -### Run Examples +## Run Examples ```bash uv run python example/example.py ``` -### Build API docs: +## Build API Docs ```bash uv run pdoc fluss ``` -### Release +## Release ```bash # Build wheel @@ -124,32 +86,29 @@ uv run maturin publish ``` ## Project Structure + ``` bindings/python/ ├── Cargo.toml # Rust dependency configuration -├── pyproject.toml # Python project configuration -├── README.md # This file -├── src/ # Rust source code -│ ├── lib.rs # Main entry module -│ ├── config.rs # Configuration related -│ ├── connection.rs # Connection management -│ ├── admin.rs # Admin operations -│ ├── table.rs # Table operations -│ ├── types.rs # Data types -│ └── error.rs # Error handling -├── fluss/ # Python package source -│ ├── __init__.py # Python package entry -│ ├── __init__.pyi # Stub file -│ └── py.typed # Type declarations -└── example/ # Example code +├── pyproject.toml # Python project configuration +├── README.md # User guide +├── DEVELOPMENT.md # This file +├── API_REFERENCE.md # API reference +├── src/ # Rust source code (PyO3 bindings) +│ ├── lib.rs +│ ├── config.rs +│ ├── connection.rs +│ ├── admin.rs +│ ├── table.rs +│ └── error.rs +├── fluss/ # Python package +│ ├── __init__.py +│ ├── __init__.pyi # Type stubs +│ └── py.typed +└── example/ └── example.py ``` -## TODO - -- [ ] Add basic usage examples in API Overview (code snippets for Config, FlussConnection, FlussAdmin, FlussTable). -- [ ] Add a "Verifying a release" subsection with install-from-TestPyPI/PyPI and smoke-test steps. - ## License Apache 2.0 License diff --git a/bindings/python/README.md b/bindings/python/README.md index 825aad01..a31c990a 100644 --- a/bindings/python/README.md +++ b/bindings/python/README.md @@ -16,7 +16,7 @@ limitations under the License. --> -# Python Client Guide +# Fluss Python Client This guide covers how to use the Fluss Python client for reading and writing data to log tables and primary key tables. @@ -450,4 +450,4 @@ The Python client uses PyArrow types for schema definitions: All Python native types (`date`, `time`, `datetime`, `Decimal`) work when appending rows via dicts. -For a complete list of classes, methods, and properties, see the [Python API Reference](python-api-reference.md). +For a complete list of classes, methods, and properties, see the [API Reference](API_REFERENCE.md).