Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions bindings/python/example/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,112 @@ async def main():
print(f"Error with partitioned table: {e}")
traceback.print_exc()

# =====================================================
# Demo: Partitioned KV Table (Upsert, Lookup, Delete)
# =====================================================
print("\n" + "=" * 60)
print("--- Testing Partitioned KV Table ---")
print("=" * 60)

partitioned_kv_fields = [
pa.field("region", pa.string()), # partition key + part of PK
pa.field("user_id", pa.int32()), # part of PK
pa.field("name", pa.string()),
pa.field("score", pa.int64()),
]
partitioned_kv_schema = pa.schema(partitioned_kv_fields)
fluss_partitioned_kv_schema = fluss.Schema(
partitioned_kv_schema, primary_keys=["region", "user_id"]
)

partitioned_kv_descriptor = fluss.TableDescriptor(
fluss_partitioned_kv_schema,
partition_keys=["region"],
)

partitioned_kv_path = fluss.TablePath("fluss", "partitioned_kv_table_py")

try:
await admin.drop_table(partitioned_kv_path, ignore_if_not_exists=True)
await admin.create_table(partitioned_kv_path, partitioned_kv_descriptor, False)
Comment thread
fresh-borzoni marked this conversation as resolved.
print(f"Created partitioned KV table: {partitioned_kv_path}")

# Create partitions
await admin.create_partition(partitioned_kv_path, {"region": "US"})
await admin.create_partition(partitioned_kv_path, {"region": "EU"})
await admin.create_partition(partitioned_kv_path, {"region": "APAC"})
Comment thread
fresh-borzoni marked this conversation as resolved.
print("Created partitions: US, EU, APAC")

partitioned_kv_table = await conn.get_table(partitioned_kv_path)
upsert_writer = partitioned_kv_table.new_upsert()

# Upsert rows across partitions
test_data = [
("US", 1, "Gustave", 100),
("US", 2, "Lune", 200),
("EU", 1, "Sciel", 150),
("EU", 2, "Maelle", 250),
("APAC", 1, "Noco", 300),
]
for region, user_id, name, score in test_data:
upsert_writer.upsert({
"region": region, "user_id": user_id,
"name": name, "score": score,
})
await upsert_writer.flush()
print(f"Upserted {len(test_data)} rows across 3 partitions")

# Lookup all rows across partitions
print("\n--- Lookup across partitions ---")
lookuper = partitioned_kv_table.new_lookup()
for region, user_id, name, score in test_data:
result = await lookuper.lookup({"region": region, "user_id": user_id})
assert result is not None, f"Expected to find region={region} user_id={user_id}"
assert result["name"] == name, f"Name mismatch: {result['name']} != {name}"
assert result["score"] == score, f"Score mismatch: {result['score']} != {score}"
Comment thread
fresh-borzoni marked this conversation as resolved.
print(f"All {len(test_data)} rows verified across partitions")

# Update within a partition
print("\n--- Update within partition ---")
handle = upsert_writer.upsert({
"region": "US", "user_id": 1,
"name": "Gustave Updated", "score": 999,
})
await handle.wait()
result = await lookuper.lookup({"region": "US", "user_id": 1})
Comment thread
fresh-borzoni marked this conversation as resolved.
assert result is not None, "Expected to find region=US user_id=1 after update"
assert result["name"] == "Gustave Updated"
assert result["score"] == 999
print(f"Update verified: US/1 name={result['name']} score={result['score']}")

# Lookup in non-existent partition
print("\n--- Lookup in non-existent partition ---")
result = await lookuper.lookup({"region": "UNKNOWN", "user_id": 1})
assert result is None, "Expected UNKNOWN partition lookup to return None"
print("UNKNOWN partition lookup: not found (expected)")
Comment thread
fresh-borzoni marked this conversation as resolved.

# Delete within a partition
print("\n--- Delete within partition ---")
handle = upsert_writer.delete({"region": "EU", "user_id": 1})
await handle.wait()
result = await lookuper.lookup({"region": "EU", "user_id": 1})
assert result is None, "Expected EU/1 to be deleted"
print("Delete verified: EU/1 not found")

# Verify sibling record still exists
result = await lookuper.lookup({"region": "EU", "user_id": 2})
assert result is not None, "Expected EU/2 to still exist"
assert result["name"] == "Maelle"
print(f"EU/2 still exists: name={result['name']}")

# Cleanup
await admin.drop_table(partitioned_kv_path, ignore_if_not_exists=True)
print(f"\nDropped partitioned KV table: {partitioned_kv_path}")

except Exception as e:
print(f"Error with partitioned KV table: {e}")
traceback.print_exc()

# Close connection
conn.close()
print("\nConnection closed")
Expand Down
Loading