From b76c0a6b7cb01fe6f529dbdc5bef2f170fb94255 Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Sun, 8 Feb 2026 14:26:50 +0000 Subject: [PATCH 1/3] [TASK-289] Partitioned KV tables python example --- bindings/python/example/example.py | 105 +++++++++++++++++++++++++++++ 1 file changed, 105 insertions(+) diff --git a/bindings/python/example/example.py b/bindings/python/example/example.py index 732b7dff..03b94fab 100644 --- a/bindings/python/example/example.py +++ b/bindings/python/example/example.py @@ -759,6 +759,111 @@ async def main(): print(f"Error with partitioned table: {e}") traceback.print_exc() + # ===================================================== + # Demo: Partitioned KV Table (Upsert, Lookup, Delete) + # ===================================================== + print("\n" + "=" * 60) + print("--- Testing Partitioned KV Table ---") + print("=" * 60) + + part_kv_fields = [ + pa.field("region", pa.string()), # partition key + part of PK + pa.field("user_id", pa.int32()), # part of PK + pa.field("name", pa.string()), + pa.field("score", pa.int64()), + ] + part_kv_schema = pa.schema(part_kv_fields) + fluss_part_kv_schema = fluss.Schema( + part_kv_schema, primary_keys=["region", "user_id"] + ) + + part_kv_descriptor = fluss.TableDescriptor( + fluss_part_kv_schema, + partition_keys=["region"], + ) + + part_kv_path = fluss.TablePath("fluss", "partitioned_kv_table_py") + + try: + await admin.drop_table(part_kv_path, ignore_if_not_exists=True) + await admin.create_table(part_kv_path, part_kv_descriptor, False) + print(f"Created partitioned KV table: {part_kv_path}") + + # Create partitions + await admin.create_partition(part_kv_path, {"region": "US"}) + await admin.create_partition(part_kv_path, {"region": "EU"}) + await admin.create_partition(part_kv_path, {"region": "APAC"}) + print("Created partitions: US, EU, APAC") + + part_kv_table = await conn.get_table(part_kv_path) + upsert_writer = part_kv_table.new_upsert() + + # Upsert rows across partitions + test_data = [ + ("US", 1, "Gustave", 100), + ("US", 2, "Lune", 200), + ("EU", 1, "Sciel", 150), + ("EU", 2, "Maelle", 250), + ("APAC", 1, "Noco", 300), + ] + for region, user_id, name, score in test_data: + upsert_writer.upsert({ + "region": region, "user_id": user_id, + "name": name, "score": score, + }) + await upsert_writer.flush() + print(f"Upserted {len(test_data)} rows across 3 partitions") + + # Lookup all rows across partitions + print("\n--- Lookup across partitions ---") + lookuper = part_kv_table.new_lookup() + for region, user_id, name, score in test_data: + result = await lookuper.lookup({"region": region, "user_id": user_id}) + assert result is not None, f"Expected to find region={region} user_id={user_id}" + assert result["name"] == name, f"Name mismatch: {result['name']} != {name}" + assert result["score"] == score, f"Score mismatch: {result['score']} != {score}" + print(f"All {len(test_data)} rows verified across partitions") + + # Update within a partition + print("\n--- Update within partition ---") + handle = upsert_writer.upsert({ + "region": "US", "user_id": 1, + "name": "Gustave Updated", "score": 999, + }) + await handle.wait() + result = await lookuper.lookup({"region": "US", "user_id": 1}) + assert result["name"] == "Gustave Updated" + assert result["score"] == 999 + print(f"Update verified: US/1 name={result['name']} score={result['score']}") + + # Lookup in non-existent partition + print("\n--- Lookup in non-existent partition ---") + result = await lookuper.lookup({"region": "UNKNOWN", "user_id": 1}) + assert result is None, "Expected UNKNOWN partition lookup to return None" + print("UNKNOWN partition lookup: not found (expected)") + + # Delete within a partition + print("\n--- Delete within partition ---") + handle = upsert_writer.delete({"region": "EU", "user_id": 1}) + await handle.wait() + result = await lookuper.lookup({"region": "EU", "user_id": 1}) + assert result is None, "Expected EU/1 to be deleted" + print("Delete verified: EU/1 not found") + + # Verify sibling record still exists + result = await lookuper.lookup({"region": "EU", "user_id": 2}) + assert result is not None, "Expected EU/2 to still exist" + assert result["name"] == "Maelle" + print(f"EU/2 still exists: name={result['name']}") + + # Cleanup + await admin.drop_table(part_kv_path, ignore_if_not_exists=True) + print(f"\nDropped partitioned KV table: {part_kv_path}") + + except Exception as e: + print(f"Error with partitioned KV table: {e}") + traceback.print_exc() + # Close connection conn.close() print("\nConnection closed") From e03dc5c87343567696502f39935d24f0884eaded Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Mon, 9 Feb 2026 00:04:35 +0000 Subject: [PATCH 2/3] change name --- bindings/python/example/example.py | 36 +++++++++++++++--------------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/bindings/python/example/example.py b/bindings/python/example/example.py index 03b94fab..586c2a83 100644 --- a/bindings/python/example/example.py +++ b/bindings/python/example/example.py @@ -766,37 +766,37 @@ async def main(): print("--- Testing Partitioned KV Table ---") print("=" * 60) - part_kv_fields = [ + partitioned_kv_fields = [ pa.field("region", pa.string()), # partition key + part of PK pa.field("user_id", pa.int32()), # part of PK pa.field("name", pa.string()), pa.field("score", pa.int64()), ] - part_kv_schema = pa.schema(part_kv_fields) - fluss_part_kv_schema = fluss.Schema( - part_kv_schema, primary_keys=["region", "user_id"] + partitioned_kv_schema = pa.schema(partitioned_kv_fields) + fluss_partitioned_kv_schema = fluss.Schema( + partitioned_kv_schema, primary_keys=["region", "user_id"] ) - part_kv_descriptor = fluss.TableDescriptor( - fluss_part_kv_schema, + partitioned_kv_descriptor = fluss.TableDescriptor( + fluss_partitioned_kv_schema, partition_keys=["region"], ) - part_kv_path = fluss.TablePath("fluss", "partitioned_kv_table_py") + partitioned_kv_path = fluss.TablePath("fluss", "partitioned_kv_table_py") try: - await admin.drop_table(part_kv_path, ignore_if_not_exists=True) - await admin.create_table(part_kv_path, part_kv_descriptor, False) - print(f"Created partitioned KV table: {part_kv_path}") + await admin.drop_table(partitioned_kv_path, ignore_if_not_exists=True) + await admin.create_table(partitioned_kv_path, partitioned_kv_descriptor, False) + print(f"Created partitioned KV table: {partitioned_kv_path}") # Create partitions - await admin.create_partition(part_kv_path, {"region": "US"}) - await admin.create_partition(part_kv_path, {"region": "EU"}) - await admin.create_partition(part_kv_path, {"region": "APAC"}) + await admin.create_partition(partitioned_kv_path, {"region": "US"}) + await admin.create_partition(partitioned_kv_path, {"region": "EU"}) + await admin.create_partition(partitioned_kv_path, {"region": "APAC"}) print("Created partitions: US, EU, APAC") - part_kv_table = await conn.get_table(part_kv_path) - upsert_writer = part_kv_table.new_upsert() + partitioned_kv_table = await conn.get_table(partitioned_kv_path) + upsert_writer = partitioned_kv_table.new_upsert() # Upsert rows across partitions test_data = [ @@ -816,7 +816,7 @@ async def main(): # Lookup all rows across partitions print("\n--- Lookup across partitions ---") - lookuper = part_kv_table.new_lookup() + lookuper = partitioned_kv_table.new_lookup() for region, user_id, name, score in test_data: result = await lookuper.lookup({"region": region, "user_id": user_id}) assert result is not None, f"Expected to find region={region} user_id={user_id}" @@ -857,8 +857,8 @@ async def main(): print(f"EU/2 still exists: name={result['name']}") # Cleanup - await admin.drop_table(part_kv_path, ignore_if_not_exists=True) - print(f"\nDropped partitioned KV table: {part_kv_path}") + await admin.drop_table(partitioned_kv_path, ignore_if_not_exists=True) + print(f"\nDropped partitioned KV table: {partitioned_kv_path}") except Exception as e: print(f"Error with partitioned KV table: {e}") From da6b2b879bf5408eaf2150d40e5d16139a71b0c7 Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Mon, 9 Feb 2026 10:29:51 +0000 Subject: [PATCH 3/3] address comments --- bindings/python/example/example.py | 1 + 1 file changed, 1 insertion(+) diff --git a/bindings/python/example/example.py b/bindings/python/example/example.py index 586c2a83..6dff1b2a 100644 --- a/bindings/python/example/example.py +++ b/bindings/python/example/example.py @@ -832,6 +832,7 @@ async def main(): }) await handle.wait() result = await lookuper.lookup({"region": "US", "user_id": 1}) + assert result is not None, "Expected to find region=US user_id=1 after update" assert result["name"] == "Gustave Updated" assert result["score"] == 999 print(f"Update verified: US/1 name={result['name']} score={result['score']}")