diff --git a/bindings/python/example/example.py b/bindings/python/example/example.py index 732b7dff..6dff1b2a 100644 --- a/bindings/python/example/example.py +++ b/bindings/python/example/example.py @@ -759,6 +759,112 @@ async def main(): print(f"Error with partitioned table: {e}") traceback.print_exc() + # ===================================================== + # Demo: Partitioned KV Table (Upsert, Lookup, Delete) + # ===================================================== + print("\n" + "=" * 60) + print("--- Testing Partitioned KV Table ---") + print("=" * 60) + + partitioned_kv_fields = [ + pa.field("region", pa.string()), # partition key + part of PK + pa.field("user_id", pa.int32()), # part of PK + pa.field("name", pa.string()), + pa.field("score", pa.int64()), + ] + partitioned_kv_schema = pa.schema(partitioned_kv_fields) + fluss_partitioned_kv_schema = fluss.Schema( + partitioned_kv_schema, primary_keys=["region", "user_id"] + ) + + partitioned_kv_descriptor = fluss.TableDescriptor( + fluss_partitioned_kv_schema, + partition_keys=["region"], + ) + + partitioned_kv_path = fluss.TablePath("fluss", "partitioned_kv_table_py") + + try: + await admin.drop_table(partitioned_kv_path, ignore_if_not_exists=True) + await admin.create_table(partitioned_kv_path, partitioned_kv_descriptor, False) + print(f"Created partitioned KV table: {partitioned_kv_path}") + + # Create partitions + await admin.create_partition(partitioned_kv_path, {"region": "US"}) + await admin.create_partition(partitioned_kv_path, {"region": "EU"}) + await admin.create_partition(partitioned_kv_path, {"region": "APAC"}) + print("Created partitions: US, EU, APAC") + + partitioned_kv_table = await conn.get_table(partitioned_kv_path) + upsert_writer = partitioned_kv_table.new_upsert() + + # Upsert rows across partitions + test_data = [ + ("US", 1, "Gustave", 100), + ("US", 2, "Lune", 200), + ("EU", 1, "Sciel", 150), + ("EU", 2, "Maelle", 250), + ("APAC", 1, "Noco", 300), + ] + for region, user_id, name, score in test_data: + upsert_writer.upsert({ + "region": region, "user_id": user_id, + "name": name, "score": score, + }) + await upsert_writer.flush() + print(f"Upserted {len(test_data)} rows across 3 partitions") + + # Lookup all rows across partitions + print("\n--- Lookup across partitions ---") + lookuper = partitioned_kv_table.new_lookup() + for region, user_id, name, score in test_data: + result = await lookuper.lookup({"region": region, "user_id": user_id}) + assert result is not None, f"Expected to find region={region} user_id={user_id}" + assert result["name"] == name, f"Name mismatch: {result['name']} != {name}" + assert result["score"] == score, f"Score mismatch: {result['score']} != {score}" + print(f"All {len(test_data)} rows verified across partitions") + + # Update within a partition + print("\n--- Update within partition ---") + handle = upsert_writer.upsert({ + "region": "US", "user_id": 1, + "name": "Gustave Updated", "score": 999, + }) + await handle.wait() + result = await lookuper.lookup({"region": "US", "user_id": 1}) + assert result is not None, "Expected to find region=US user_id=1 after update" + assert result["name"] == "Gustave Updated" + assert result["score"] == 999 + print(f"Update verified: US/1 name={result['name']} score={result['score']}") + + # Lookup in non-existent partition + print("\n--- Lookup in non-existent partition ---") + result = await lookuper.lookup({"region": "UNKNOWN", "user_id": 1}) + assert result is None, "Expected UNKNOWN partition lookup to return None" + print("UNKNOWN partition lookup: not found (expected)") + + # Delete within a partition + print("\n--- Delete within partition ---") + handle = upsert_writer.delete({"region": "EU", "user_id": 1}) + await handle.wait() + result = await lookuper.lookup({"region": "EU", "user_id": 1}) + assert result is None, "Expected EU/1 to be deleted" + print("Delete verified: EU/1 not found") + + # Verify sibling record still exists + result = await lookuper.lookup({"region": "EU", "user_id": 2}) + assert result is not None, "Expected EU/2 to still exist" + assert result["name"] == "Maelle" + print(f"EU/2 still exists: name={result['name']}") + + # Cleanup + await admin.drop_table(partitioned_kv_path, ignore_if_not_exists=True) + print(f"\nDropped partitioned KV table: {partitioned_kv_path}") + + except Exception as e: + print(f"Error with partitioned KV table: {e}") + traceback.print_exc() + # Close connection conn.close() print("\nConnection closed")