diff --git a/bindings/python/API_REFERENCE.md b/bindings/python/API_REFERENCE.md new file mode 100644 index 00000000..258b26e6 --- /dev/null +++ b/bindings/python/API_REFERENCE.md @@ -0,0 +1,278 @@ + + +# Python API Reference + +Complete API reference for the Fluss Python client. For a usage guide with examples, see the [Python Client Guide](README.md). + +## `Config` + +| Method / Property | Description | +|---|---| +| `Config(properties: dict = None)` | Create config from a dict of key-value pairs | +| `.bootstrap_server` | Get/set coordinator server address | +| `.request_max_size` | Get/set max request size in bytes | +| `.writer_batch_size` | Get/set write batch size in bytes | + +## `FlussConnection` + +| Method | Description | +|---|---| +| `await FlussConnection.connect(config) -> FlussConnection` | Connect to a Fluss cluster | +| `await conn.get_admin() -> FlussAdmin` | Get admin interface | +| `await conn.get_table(table_path) -> FlussTable` | Get a table for read/write operations | +| `conn.close()` | Close the connection | + +Supports `with` statement (context manager). + +## `FlussAdmin` + +| Method | Description | +|---|---| +| `await create_database(name, ignore_if_exists=False, database_descriptor=None)` | Create a database | +| `await drop_database(name, ignore_if_not_exists=False, cascade=True)` | Drop a database | +| `await list_databases() -> list[str]` | List all databases | +| `await database_exists(name) -> bool` | Check if a database exists | +| `await get_database_info(name) -> DatabaseInfo` | Get database metadata | +| `await create_table(table_path, table_descriptor, ignore_if_exists=False)` | Create a table | +| `await drop_table(table_path, ignore_if_not_exists=False)` | Drop a table | +| `await get_table(table_path) -> TableInfo` | Get table metadata | +| `await list_tables(database_name) -> list[str]` | List tables in a database | +| `await table_exists(table_path) -> bool` | Check if a table exists | +| `await list_offsets(table_path, bucket_ids, offset_type, timestamp=None) -> dict[int, int]` | Get offsets for buckets | +| `await list_partition_offsets(table_path, partition_name, bucket_ids, offset_type, timestamp=None) -> dict[int, int]` | Get offsets for a partition's buckets | +| `await create_partition(table_path, partition_spec, ignore_if_exists=False)` | Create a partition | +| `await drop_partition(table_path, partition_spec, ignore_if_not_exists=False)` | Drop a partition | +| `await list_partition_infos(table_path) -> list[PartitionInfo]` | List partitions | +| `await get_latest_lake_snapshot(table_path) -> LakeSnapshot` | Get latest lake snapshot | + +## `FlussTable` + +| Method | Description | +|---|---| +| `new_scan() -> TableScan` | Create a scan builder | +| `await new_append_writer() -> AppendWriter` | Create writer for log tables | +| `new_upsert(columns=None, column_indices=None) -> UpsertWriter` | Create writer for PK tables (optionally partial) | +| `new_lookup() -> Lookuper` | Create lookuper for PK tables | +| `get_table_info() -> TableInfo` | Get table metadata | +| `get_table_path() -> TablePath` | Get table path | +| `has_primary_key() -> bool` | Check if table has a primary key | + +## `TableScan` + +| Method | Description | +|---|---| +| `.project(indices) -> TableScan` | Project columns by index | +| `.project_by_name(names) -> TableScan` | Project columns by name | +| `await .create_log_scanner() -> LogScanner` | Create record-based scanner (for `poll()`) | +| `await .create_batch_scanner() -> LogScanner` | Create batch-based scanner (for `poll_arrow()`, `to_arrow()`, etc.) | + +## `AppendWriter` + +| Method | Description | +|---|---| +| `.append(row) -> WriteResultHandle` | Append a row (dict, list, or tuple) | +| `.write_arrow(table)` | Write a PyArrow Table | +| `.write_arrow_batch(batch) -> WriteResultHandle` | Write a PyArrow RecordBatch | +| `.write_pandas(df)` | Write a Pandas DataFrame | +| `await .flush()` | Flush all pending writes | + +## `UpsertWriter` + +| Method | Description | +|---|---| +| `.upsert(row) -> WriteResultHandle` | Upsert a row (insert or update by PK) | +| `.delete(pk) -> WriteResultHandle` | Delete a row by primary key | +| `await .flush()` | Flush all pending operations | + +## `WriteResultHandle` + +| Method | Description | +|---|---| +| `await .wait()` | Wait for server acknowledgment of this write | + +## `Lookuper` + +| Method | Description | +|---|---| +| `await .lookup(pk) -> dict \| None` | Lookup a row by primary key | + +## `LogScanner` + +| Method | Description | +|---|---| +| `.subscribe(bucket_id, start_offset)` | Subscribe to a bucket | +| `.subscribe_buckets(bucket_offsets)` | Subscribe to multiple buckets (`{bucket_id: offset}`) | +| `.subscribe_partition(partition_id, bucket_id, start_offset)` | Subscribe to a partition bucket | +| `.subscribe_partition_buckets(partition_bucket_offsets)` | Subscribe to multiple partition+bucket combos (`{(part_id, bucket_id): offset}`) | +| `.unsubscribe_partition(partition_id, bucket_id)` | Unsubscribe from a partition bucket | +| `.poll(timeout_ms) -> list[ScanRecord]` | Poll individual records (record scanner only) | +| `.poll_arrow(timeout_ms) -> pa.Table` | Poll as Arrow Table (batch scanner only) | +| `.poll_batches(timeout_ms) -> list[RecordBatch]` | Poll batches with metadata (batch scanner only) | +| `.to_arrow() -> pa.Table` | Read all subscribed data as Arrow Table (batch scanner only) | +| `.to_pandas() -> pd.DataFrame` | Read all subscribed data as DataFrame (batch scanner only) | + +## `ScanRecord` + +| Property | Description | +|---|---| +| `.bucket -> TableBucket` | Bucket this record belongs to | +| `.offset -> int` | Record offset in the log | +| `.timestamp -> int` | Record timestamp | +| `.change_type -> ChangeType` | Change type (AppendOnly, Insert, UpdateBefore, UpdateAfter, Delete) | +| `.row -> dict` | Row data as `{column_name: value}` | + +## `RecordBatch` + +| Property | Description | +|---|---| +| `.batch -> pa.RecordBatch` | Arrow RecordBatch data | +| `.bucket -> TableBucket` | Bucket this batch belongs to | +| `.base_offset -> int` | First record offset | +| `.last_offset -> int` | Last record offset | + +## `Schema` + +| Method | Description | +|---|---| +| `Schema(schema: pa.Schema, primary_keys=None)` | Create from PyArrow schema | +| `.get_column_names() -> list[str]` | Get column names | +| `.get_column_types() -> list[str]` | Get column type names | + +## `TableDescriptor` + +| Method | Description | +|---|---| +| `TableDescriptor(schema, *, partition_keys=None, bucket_count=None, bucket_keys=None, comment=None, log_format=None, kv_format=None, properties=None, custom_properties=None)` | Create table descriptor | +| `.get_schema() -> Schema` | Get the schema | + +## `TablePath` + +| Method / Property | Description | +|---|---| +| `TablePath(database, table)` | Create a table path | +| `.database_name -> str` | Database name | +| `.table_name -> str` | Table name | + +## `TableInfo` + +| Property / Method | Description | +|---|---| +| `.table_id -> int` | Table ID | +| `.table_path -> TablePath` | Table path | +| `.num_buckets -> int` | Number of buckets | +| `.schema_id -> int` | Schema ID | +| `.comment -> str \| None` | Table comment | +| `.created_time -> int` | Creation timestamp | +| `.modified_time -> int` | Last modification timestamp | +| `.get_primary_keys() -> list[str]` | Primary key columns | +| `.get_partition_keys() -> list[str]` | Partition columns | +| `.get_bucket_keys() -> list[str]` | Bucket key columns | +| `.has_primary_key() -> bool` | Has primary key? | +| `.is_partitioned() -> bool` | Is partitioned? | +| `.get_schema() -> Schema` | Get table schema | +| `.get_column_names() -> list[str]` | Column names | +| `.get_column_count() -> int` | Number of columns | +| `.get_properties() -> dict` | All table properties | +| `.get_custom_properties() -> dict` | Custom properties only | + +## `PartitionInfo` + +| Property | Description | +|---|---| +| `.partition_id -> int` | Partition ID | +| `.partition_name -> str` | Partition name | + +## `DatabaseDescriptor` + +| Method / Property | Description | +|---|---| +| `DatabaseDescriptor(comment=None, custom_properties=None)` | Create descriptor | +| `.comment -> str \| None` | Database comment | +| `.get_custom_properties() -> dict` | Custom properties | + +## `DatabaseInfo` + +| Property / Method | Description | +|---|---| +| `.database_name -> str` | Database name | +| `.created_time -> int` | Creation timestamp | +| `.modified_time -> int` | Last modification timestamp | +| `.get_database_descriptor() -> DatabaseDescriptor` | Get descriptor | + +## `LakeSnapshot` + +| Property / Method | Description | +|---|---| +| `.snapshot_id -> int` | Snapshot ID | +| `.table_buckets_offset -> dict[TableBucket, int]` | All bucket offsets | +| `.get_bucket_offset(bucket) -> int \| None` | Get offset for a bucket | +| `.get_table_buckets() -> list[TableBucket]` | Get all buckets | + +## `TableBucket` + +| Method / Property | Description | +|---|---| +| `TableBucket(table_id, bucket)` | Create non-partitioned bucket | +| `TableBucket.with_partition(table_id, partition_id, bucket)` | Create partitioned bucket | +| `.table_id -> int` | Table ID | +| `.bucket_id -> int` | Bucket ID | +| `.partition_id -> int \| None` | Partition ID (None if non-partitioned) | + +## `FlussError` + +| Property | Description | +|---|---| +| `.message -> str` | Error message | + +Raised for all Fluss-specific errors (connection failures, table not found, schema mismatches, etc.). Inherits from `Exception`. + +## Constants + +| Constant | Value | Description | +|---|---|---| +| `fluss.EARLIEST_OFFSET` | `-2` | Start reading from earliest available offset | +| `fluss.LATEST_OFFSET` | `-1` | Start reading from latest offset (only new records) | +| `fluss.OffsetType.EARLIEST` | `"earliest"` | For `list_offsets()` | +| `fluss.OffsetType.LATEST` | `"latest"` | For `list_offsets()` | +| `fluss.OffsetType.TIMESTAMP` | `"timestamp"` | For `list_offsets()` with timestamp | + +## `ChangeType` + +| Value | Short String | Description | +|---|---|---| +| `ChangeType.AppendOnly` (0) | `+A` | Append-only | +| `ChangeType.Insert` (1) | `+I` | Insert | +| `ChangeType.UpdateBefore` (2) | `-U` | Previous value of updated row | +| `ChangeType.UpdateAfter` (3) | `+U` | New value of updated row | +| `ChangeType.Delete` (4) | `-D` | Delete | + +## Data Types + +| PyArrow Type | Fluss Type | Python Type | +|---|---|---| +| `pa.boolean()` | Boolean | `bool` | +| `pa.int8()` / `int16()` / `int32()` / `int64()` | TinyInt / SmallInt / Int / BigInt | `int` | +| `pa.float32()` / `float64()` | Float / Double | `float` | +| `pa.string()` | String | `str` | +| `pa.binary()` | Bytes | `bytes` | +| `pa.date32()` | Date | `datetime.date` | +| `pa.time32("ms")` | Time | `datetime.time` | +| `pa.timestamp("us")` | Timestamp (NTZ) | `datetime.datetime` | +| `pa.timestamp("us", tz="UTC")` | TimestampLTZ | `datetime.datetime` | +| `pa.decimal128(precision, scale)` | Decimal | `decimal.Decimal` | diff --git a/bindings/python/DEVELOPMENT.md b/bindings/python/DEVELOPMENT.md new file mode 100644 index 00000000..e316f5e8 --- /dev/null +++ b/bindings/python/DEVELOPMENT.md @@ -0,0 +1,114 @@ + + +# Development + +## Requirements + +- Python 3.9+ +- Rust 1.70+ +- [uv](https://docs.astral.sh/uv/) package manager +- Linux or MacOS + +> **Before you start:** +> Please make sure you can successfully build and run the [Fluss Rust client](../../crates/fluss/README.md) on your machine. +> The Python bindings require a working Fluss Rust backend and compatible environment. + +## Install Development Dependencies + +```bash +cd bindings/python +uv sync --all-extras +``` + +## Build Development Version + +```bash +source .venv/bin/activate +uv run maturin develop +``` + +## Build Release Version + +```bash +uv run maturin build --release +``` + +## Code Formatting and Linting + +```bash +uv run ruff format python/ +uv run ruff check python/ +``` + +## Type Checking + +```bash +uv run mypy python/ +``` + +## Run Examples + +```bash +uv run python example/example.py +``` + +## Build API Docs + +```bash +uv run pdoc fluss +``` + +## Release + +```bash +# Build wheel +uv run maturin build --release + +# Publish to PyPI +uv run maturin publish +``` + +## Project Structure + +``` +bindings/python/ +├── Cargo.toml # Rust dependency configuration +├── pyproject.toml # Python project configuration +├── README.md # User guide +├── DEVELOPMENT.md # This file +├── API_REFERENCE.md # API reference +├── src/ # Rust source code (PyO3 bindings) +│ ├── lib.rs +│ ├── config.rs +│ ├── connection.rs +│ ├── admin.rs +│ ├── table.rs +│ └── error.rs +├── fluss/ # Python package +│ ├── __init__.py +│ ├── __init__.pyi # Type stubs +│ └── py.typed +└── example/ + └── example.py +``` + +## License + +Apache 2.0 License diff --git a/bindings/python/README.md b/bindings/python/README.md index b0970397..a31c990a 100644 --- a/bindings/python/README.md +++ b/bindings/python/README.md @@ -1,155 +1,453 @@ -# Apache Fluss™ Python Bindings +# Fluss Python Client -Python bindings for Fluss using PyO3 and Maturin. +This guide covers how to use the Fluss Python client for reading and writing data to log tables and primary key tables. -## API Overview +The Python client is async-first, built on top of the Rust core via [PyO3](https://pyo3.rs/), and uses [PyArrow](https://arrow.apache.org/docs/python/) for schema definitions and data interchange. -### Basic Usage +## Key Concepts -TODO: Add basic usage examples here +- **Log table** — an append-only table (no primary key). Records are immutable once written. Use for event streams, logs, and audit trails. +- **Primary key (PK) table** — a table with a primary key. Supports upsert, delete, and point lookups. +- **Bucket** — the unit of parallelism within a table (similar to Kafka partitions). Each table has one or more buckets. Readers subscribe to individual buckets. +- **Partition** — a way to organize data by column values (e.g. by date or region). Each partition contains its own set of buckets. Partitions must be created explicitly before writing. +- **Offset** — the position of a record within a bucket. Used to track reading progress. Start from `EARLIEST_OFFSET` to read all data, or `LATEST_OFFSET` to only read new records. -### Core Classes +## Prerequisites -#### `Config` +You need a running Fluss cluster to use the Python client. See the [Quick-Start guide](../../README.md#quick-start) for how to start a local cluster. -Configuration for Fluss connection parameters +## Installation -#### `FlussConnection` +```bash +pip install pyfluss +``` -Main interface for connecting to Fluss cluster +To build from source instead, see the [Development Guide](DEVELOPMENT.md). + +## Quick Start + +A minimal end-to-end example: connect, create a table, write data, and read it back. Assumes a Fluss cluster is running on `localhost:9123`. + +```python +import asyncio +import pyarrow as pa +import fluss + +async def main(): + # Connect + config = fluss.Config({"bootstrap.servers": "127.0.0.1:9123"}) + conn = await fluss.FlussConnection.connect(config) + admin = await conn.get_admin() + + # Create a log table + schema = fluss.Schema(pa.schema([ + pa.field("id", pa.int32()), + pa.field("name", pa.string()), + pa.field("score", pa.float32()), + ])) + table_path = fluss.TablePath("fluss", "quick_start") + await admin.create_table(table_path, fluss.TableDescriptor(schema), ignore_if_exists=True) + + # Write + table = await conn.get_table(table_path) + writer = await table.new_append_writer() + writer.append({"id": 1, "name": "Alice", "score": 95.5}) + writer.append({"id": 2, "name": "Bob", "score": 87.0}) + await writer.flush() + + # Read + num_buckets = (await admin.get_table(table_path)).num_buckets + scanner = await table.new_scan().create_batch_scanner() + scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) + print(scanner.to_pandas()) + + # Cleanup + await admin.drop_table(table_path, ignore_if_not_exists=True) + conn.close() + +asyncio.run(main()) +``` -#### `FlussAdmin` +## Connection Setup + +```python +config = fluss.Config({"bootstrap.servers": "127.0.0.1:9123"}) +conn = await fluss.FlussConnection.connect(config) +``` -Administrative operations for managing tables (create, delete, etc.) +The connection also supports context managers: -#### `FlussTable` +```python +with await fluss.FlussConnection.connect(config) as conn: + ... +``` -Represents a Fluss table, providing read and write operations +### Configuration Options -#### `TableWriter` +| Key | Description | Default | +|-----|-------------|---------| +| `bootstrap.servers` | Coordinator server address | `127.0.0.1:9123` | +| `request.max.size` | Maximum request size in bytes | `10485760` (10 MB) | +| `writer.acks` | Acknowledgment setting (`all` waits for all replicas) | `all` | +| `writer.retries` | Number of retries on failure | `2147483647` | +| `writer.batch.size` | Batch size for writes in bytes | `2097152` (2 MB) | -Used for writing data to tables, supports PyArrow and Pandas +## Admin Operations -#### `LogScanner` +```python +admin = await conn.get_admin() +``` -Used for scanning table log data +### Databases +```python +await admin.create_database("my_database", ignore_if_exists=True) +databases = await admin.list_databases() +exists = await admin.database_exists("my_database") +await admin.drop_database("my_database", ignore_if_not_exists=True, cascade=True) +``` -# todo: we may move the following part to DEVELOPMENT.md -## Development +### Tables -## Requirements +Schemas are defined using PyArrow and wrapped in `fluss.Schema`: -- Python 3.9+ -- Rust 1.70+ -- [uv](https://docs.astral.sh/uv/) package manager -- Linux or MacOS +```python +import pyarrow as pa -> **⚠️ Before you start:** -> Please make sure you can successfully build and run the [Fluss Rust client](../../crates/fluss/README.md) on your machine. -> The Python bindings require a working Fluss Rust backend and compatible environment. +schema = fluss.Schema(pa.schema([ + pa.field("id", pa.int32()), + pa.field("name", pa.string()), + pa.field("amount", pa.int64()), +])) -### Install Development Dependencies +table_path = fluss.TablePath("my_database", "my_table") +await admin.create_table(table_path, fluss.TableDescriptor(schema), ignore_if_exists=True) -```bash -cd bindings/python -uv sync --all-extras +table_info = await admin.get_table(table_path) +tables = await admin.list_tables("my_database") +await admin.drop_table(table_path, ignore_if_not_exists=True) ``` -### Build Development Version +`TableDescriptor` accepts these optional parameters: -```bash -source .venv/bin/activate -uv run maturin develop +| Parameter | Description | +|---|---| +| `partition_keys` | Column names to partition by (e.g. `["region"]`) | +| `bucket_count` | Number of buckets (parallelism units) for the table | +| `bucket_keys` | Columns used to determine bucket assignment | +| `comment` | Table comment / description | +| `log_format` | Log storage format: `"ARROW"` or `"INDEXED"` | +| `kv_format` | KV storage format for primary key tables: `"INDEXED"` or `"COMPACTED"` | +| `properties` | Table configuration properties as a dict (e.g. `{"table.replication.factor": "1"}`) | +| `custom_properties` | User-defined properties as a dict | + +### Offsets + +```python +# Latest offsets for buckets +offsets = await admin.list_offsets(table_path, bucket_ids=[0, 1], offset_type="latest") + +# By timestamp +offsets = await admin.list_offsets(table_path, bucket_ids=[0], offset_type="timestamp", timestamp=1704067200000) + +# Per-partition offsets +offsets = await admin.list_partition_offsets(table_path, partition_name="US", bucket_ids=[0], offset_type="latest") ``` -### Build Release Version +## Log Tables -```bash -uv run maturin build --release +Log tables are append-only tables without primary keys, suitable for event streaming. + +### Writing + +Rows can be appended as dicts, lists, or tuples. For bulk writes, use `write_arrow()`, `write_arrow_batch()`, or `write_pandas()`. + +Write methods like `append()` and `write_arrow_batch()` return a `WriteResultHandle`. You can ignore it for fire-and-forget semantics (flush at the end), or `await handle.wait()` to block until the server acknowledges that specific write. + +```python +table = await conn.get_table(table_path) +writer = await table.new_append_writer() + +# Fire-and-forget: queue writes, flush at the end +writer.append({"id": 1, "name": "Alice", "score": 95.5}) +writer.append([2, "Bob", 87.0]) +await writer.flush() + +# Per-record acknowledgment +handle = writer.append({"id": 3, "name": "Charlie", "score": 91.0}) +await handle.wait() + +# Bulk writes +writer.write_arrow(pa_table) # PyArrow Table +writer.write_arrow_batch(record_batch) # PyArrow RecordBatch +writer.write_pandas(df) # Pandas DataFrame +await writer.flush() ``` -### Code Formatting and Linting +### Reading -```bash -uv run ruff format python/ -uv run ruff check python/ +There are two scanner types: +- **Batch scanner** (`create_batch_scanner()`) — returns Arrow Tables or DataFrames, best for analytics +- **Record scanner** (`create_log_scanner()`) — returns individual records with metadata (offset, timestamp, change type), best for streaming + +And two reading modes: +- **`to_arrow()` / `to_pandas()`** — reads all data from subscribed buckets up to the current latest offset, then returns. Best for one-shot batch reads. +- **`poll_arrow()` / `poll()` / `poll_batches()`** — returns whatever data is available within the timeout, then returns. Call in a loop for continuous streaming. + +#### Batch Read (One-Shot) + +```python +num_buckets = (await admin.get_table(table_path)).num_buckets + +scanner = await table.new_scan().create_batch_scanner() +scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) + +# Reads everything up to current latest offset, then returns +arrow_table = scanner.to_arrow() +df = scanner.to_pandas() ``` -### Type Checking +#### Continuous Polling -```bash -uv run mypy python/ +Use `poll_arrow()` or `poll()` in a loop for streaming consumption: + +```python +# Batch scanner: poll as Arrow Tables +scanner = await table.new_scan().create_batch_scanner() +scanner.subscribe(bucket_id=0, start_offset=fluss.EARLIEST_OFFSET) + +while True: + result = scanner.poll_arrow(timeout_ms=5000) + if result.num_rows > 0: + print(result.to_pandas()) + +# Record scanner: poll individual records with metadata +scanner = await table.new_scan().create_log_scanner() +scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) + +while True: + for record in scanner.poll(timeout_ms=5000): + print(f"offset={record.offset}, change={record.change_type.short_string()}, row={record.row}") ``` -### Run Examples +#### Subscribe from Latest Offset -```bash -uv run python example/example.py +To only consume new records (skip existing data), use `LATEST_OFFSET`: + +```python +scanner = await table.new_scan().create_batch_scanner() +scanner.subscribe(bucket_id=0, start_offset=fluss.LATEST_OFFSET) ``` -### Build API docs: +### Column Projection -```bash -uv run pdoc fluss +```python +scanner = await table.new_scan().project([0, 2]).create_batch_scanner() +# or by name +scanner = await table.new_scan().project_by_name(["id", "score"]).create_batch_scanner() ``` -### Release +## Primary Key Tables -```bash -# Build wheel -uv run maturin build --release +Primary key tables support upsert, delete, and point lookup operations. -# Publish to PyPI -uv run maturin publish +### Creating + +Pass `primary_keys` to `fluss.Schema`: + +```python +schema = fluss.Schema( + pa.schema([ + pa.field("id", pa.int32()), + pa.field("name", pa.string()), + pa.field("age", pa.int64()), + ]), + primary_keys=["id"], +) +table_path = fluss.TablePath("fluss", "users") +await admin.create_table(table_path, fluss.TableDescriptor(schema, bucket_count=3), ignore_if_exists=True) ``` -## Project Structure +### Upsert, Delete, Lookup + +```python +table = await conn.get_table(table_path) + +# Upsert (fire-and-forget, flush at the end) +writer = table.new_upsert() +writer.upsert({"id": 1, "name": "Alice", "age": 25}) +writer.upsert({"id": 2, "name": "Bob", "age": 30}) +await writer.flush() + +# Per-record acknowledgment (for read-after-write) +handle = writer.upsert({"id": 3, "name": "Charlie", "age": 35}) +await handle.wait() + +# Delete by primary key +handle = writer.delete({"id": 2}) +await handle.wait() + +# Lookup +lookuper = table.new_lookup() +result = await lookuper.lookup({"id": 1}) +if result: + print(f"Found: name={result['name']}, age={result['age']}") ``` -bindings/python/ -├── Cargo.toml # Rust dependency configuration -├── pyproject.toml # Python project configuration -├── README.md # This file -├── src/ # Rust source code -│ ├── lib.rs # Main entry module -│ ├── config.rs # Configuration related -│ ├── connection.rs # Connection management -│ ├── admin.rs # Admin operations -│ ├── table.rs # Table operations -│ ├── types.rs # Data types -│ └── error.rs # Error handling -├── fluss/ # Python package source -│ ├── __init__.py # Python package entry -│ ├── __init__.pyi # Stub file -│ └── py.typed # Type declarations -└── example/ # Example code - └── example.py + +### Partial Updates + +Update specific columns while preserving others: + +```python +partial_writer = table.new_upsert(columns=["id", "age"]) +partial_writer.upsert({"id": 1, "age": 27}) # only updates age +await partial_writer.flush() ``` -## TODO +## Partitioned Tables + +Partitioned tables distribute data across partitions based on column values. Partitions must be created before writing. + +### Creating and Managing Partitions + +```python +schema = fluss.Schema(pa.schema([ + pa.field("id", pa.int32()), + pa.field("region", pa.string()), + pa.field("value", pa.int64()), +])) + +table_path = fluss.TablePath("fluss", "partitioned_events") +await admin.create_table( + table_path, + fluss.TableDescriptor(schema, partition_keys=["region"], bucket_count=1), + ignore_if_exists=True, +) + +# Create partitions +await admin.create_partition(table_path, {"region": "US"}, ignore_if_exists=True) +await admin.create_partition(table_path, {"region": "EU"}, ignore_if_exists=True) + +# List partitions +partition_infos = await admin.list_partition_infos(table_path) +``` + +### Writing + +Same as non-partitioned tables — include partition column values in each row: + +```python +table = await conn.get_table(table_path) +writer = await table.new_append_writer() +writer.append({"id": 1, "region": "US", "value": 100}) +writer.append({"id": 2, "region": "EU", "value": 200}) +await writer.flush() +``` + +### Reading + +Use `subscribe_partition()` or `subscribe_partition_buckets()` instead of `subscribe()`: + +```python +scanner = await table.new_scan().create_batch_scanner() + +# Subscribe to individual partitions +for p in partition_infos: + scanner.subscribe_partition(partition_id=p.partition_id, bucket_id=0, start_offset=fluss.EARLIEST_OFFSET) + +# Or batch-subscribe +scanner.subscribe_partition_buckets({ + (p.partition_id, 0): fluss.EARLIEST_OFFSET for p in partition_infos +}) + +print(scanner.to_pandas()) +``` + +### Partitioned Primary Key Tables + +Partition columns must be part of the primary key. Partitions must be created before upserting. + +```python +schema = fluss.Schema( + pa.schema([ + pa.field("user_id", pa.int32()), + pa.field("region", pa.string()), + pa.field("score", pa.int64()), + ]), + primary_keys=["user_id", "region"], +) + +table_path = fluss.TablePath("fluss", "partitioned_users") +await admin.create_table( + table_path, + fluss.TableDescriptor(schema, partition_keys=["region"]), + ignore_if_exists=True, +) + +await admin.create_partition(table_path, {"region": "US"}, ignore_if_exists=True) + +table = await conn.get_table(table_path) +writer = table.new_upsert() +writer.upsert({"user_id": 1, "region": "US", "score": 1234}) +await writer.flush() + +# Lookup includes partition columns +lookuper = table.new_lookup() +result = await lookuper.lookup({"user_id": 1, "region": "US"}) +``` + +## Error Handling + +The client raises `fluss.FlussError` for Fluss-specific errors (connection failures, table not found, invalid operations, etc.): + +```python +try: + await admin.create_table(table_path, table_descriptor) +except fluss.FlussError as e: + print(f"Fluss error: {e.message}") +``` + +Common error scenarios: +- **Connection refused** — Fluss cluster is not running or wrong address in `bootstrap.servers` +- **Table not found** — table doesn't exist or wrong database/table name +- **Partition not found** — writing to a partitioned table before creating partitions +- **Schema mismatch** — row data doesn't match the table schema + +## Data Types + +The Python client uses PyArrow types for schema definitions: -- [ ] Add basic usage examples in API Overview (code snippets for Config, FlussConnection, FlussAdmin, FlussTable). -- [ ] Add a "Verifying a release" subsection with install-from-TestPyPI/PyPI and smoke-test steps. +| PyArrow Type | Fluss Type | Python Type | +|---|---|---| +| `pa.boolean()` | Boolean | `bool` | +| `pa.int8()` / `int16()` / `int32()` / `int64()` | TinyInt / SmallInt / Int / BigInt | `int` | +| `pa.float32()` / `float64()` | Float / Double | `float` | +| `pa.string()` | String | `str` | +| `pa.binary()` | Bytes | `bytes` | +| `pa.date32()` | Date | `datetime.date` | +| `pa.time32("ms")` | Time | `datetime.time` | +| `pa.timestamp("us")` | Timestamp (NTZ) | `datetime.datetime` | +| `pa.timestamp("us", tz="UTC")` | TimestampLTZ | `datetime.datetime` | +| `pa.decimal128(precision, scale)` | Decimal | `decimal.Decimal` | -## License +All Python native types (`date`, `time`, `datetime`, `Decimal`) work when appending rows via dicts. -Apache 2.0 License +For a complete list of classes, methods, and properties, see the [API Reference](API_REFERENCE.md). diff --git a/bindings/python/fluss/__init__.pyi b/bindings/python/fluss/__init__.pyi index cc7053e4..adbfc2fe 100644 --- a/bindings/python/fluss/__init__.pyi +++ b/bindings/python/fluss/__init__.pyi @@ -610,7 +610,8 @@ class TableDescriptor: comment: Optional[str] = None, log_format: Optional[str] = None, kv_format: Optional[str] = None, - **properties: str, + properties: Optional[Dict[str, str]] = None, + custom_properties: Optional[Dict[str, str]] = None, ) -> None: ... def get_schema(self) -> Schema: ...