Skip to content

Commit 31d5df5

Browse files
feat: introduce kv tables support in python (#239)
1 parent 5574d55 commit 31d5df5

10 files changed

Lines changed: 1101 additions & 68 deletions

File tree

bindings/python/example/example.py

Lines changed: 273 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,9 @@
1616
# under the License.
1717

1818
import asyncio
19-
import time
20-
from datetime import date, time as dt_time, datetime
19+
import traceback
20+
from datetime import date, datetime
21+
from datetime import time as dt_time
2122
from decimal import Decimal
2223

2324
import pandas as pd
@@ -103,11 +104,34 @@ async def main():
103104
pa.array(["Alice", "Bob", "Charlie"], type=pa.string()),
104105
pa.array([95.2, 87.2, 92.1], type=pa.float32()),
105106
pa.array([25, 30, 35], type=pa.int32()),
106-
pa.array([date(1999, 5, 15), date(1994, 3, 20), date(1989, 11, 8)], type=pa.date32()),
107-
pa.array([dt_time(9, 0, 0), dt_time(9, 30, 0), dt_time(10, 0, 0)], type=pa.time32("ms")),
108-
pa.array([datetime(2024, 1, 15, 10, 30), datetime(2024, 1, 15, 11, 0), datetime(2024, 1, 15, 11, 30)], type=pa.timestamp("us")),
109-
pa.array([datetime(2024, 1, 15, 10, 30), datetime(2024, 1, 15, 11, 0), datetime(2024, 1, 15, 11, 30)], type=pa.timestamp("us", tz="UTC")),
110-
pa.array([Decimal("75000.00"), Decimal("82000.50"), Decimal("95000.75")], type=pa.decimal128(10, 2)),
107+
pa.array(
108+
[date(1999, 5, 15), date(1994, 3, 20), date(1989, 11, 8)],
109+
type=pa.date32(),
110+
),
111+
pa.array(
112+
[dt_time(9, 0, 0), dt_time(9, 30, 0), dt_time(10, 0, 0)],
113+
type=pa.time32("ms"),
114+
),
115+
pa.array(
116+
[
117+
datetime(2024, 1, 15, 10, 30),
118+
datetime(2024, 1, 15, 11, 0),
119+
datetime(2024, 1, 15, 11, 30),
120+
],
121+
type=pa.timestamp("us"),
122+
),
123+
pa.array(
124+
[
125+
datetime(2024, 1, 15, 10, 30),
126+
datetime(2024, 1, 15, 11, 0),
127+
datetime(2024, 1, 15, 11, 30),
128+
],
129+
type=pa.timestamp("us", tz="UTC"),
130+
),
131+
pa.array(
132+
[Decimal("75000.00"), Decimal("82000.50"), Decimal("95000.75")],
133+
type=pa.decimal128(10, 2),
134+
),
111135
],
112136
schema=schema,
113137
)
@@ -125,9 +149,18 @@ async def main():
125149
pa.array([28, 32], type=pa.int32()),
126150
pa.array([date(1996, 7, 22), date(1992, 12, 1)], type=pa.date32()),
127151
pa.array([dt_time(14, 15, 0), dt_time(8, 45, 0)], type=pa.time32("ms")),
128-
pa.array([datetime(2024, 1, 16, 9, 0), datetime(2024, 1, 16, 9, 30)], type=pa.timestamp("us")),
129-
pa.array([datetime(2024, 1, 16, 9, 0), datetime(2024, 1, 16, 9, 30)], type=pa.timestamp("us", tz="UTC")),
130-
pa.array([Decimal("68000.00"), Decimal("72500.25")], type=pa.decimal128(10, 2)),
152+
pa.array(
153+
[datetime(2024, 1, 16, 9, 0), datetime(2024, 1, 16, 9, 30)],
154+
type=pa.timestamp("us"),
155+
),
156+
pa.array(
157+
[datetime(2024, 1, 16, 9, 0), datetime(2024, 1, 16, 9, 30)],
158+
type=pa.timestamp("us", tz="UTC"),
159+
),
160+
pa.array(
161+
[Decimal("68000.00"), Decimal("72500.25")],
162+
type=pa.decimal128(10, 2),
163+
),
131164
],
132165
schema=schema,
133166
)
@@ -138,28 +171,35 @@ async def main():
138171
# Test 3: Append single rows with Date, Time, Timestamp, Decimal
139172
print("\n--- Testing single row append with temporal/decimal types ---")
140173
# Dict input with all types including Date, Time, Timestamp, Decimal
141-
await append_writer.append({
142-
"id": 8,
143-
"name": "Helen",
144-
"score": 93.5,
145-
"age": 26,
146-
"birth_date": date(1998, 4, 10),
147-
"check_in_time": dt_time(11, 30, 45),
148-
"created_at": datetime(2024, 1, 17, 14, 0, 0),
149-
"updated_at": datetime(2024, 1, 17, 14, 0, 0),
150-
"salary": Decimal("88000.00"),
151-
})
174+
await append_writer.append(
175+
{
176+
"id": 8,
177+
"name": "Helen",
178+
"score": 93.5,
179+
"age": 26,
180+
"birth_date": date(1998, 4, 10),
181+
"check_in_time": dt_time(11, 30, 45),
182+
"created_at": datetime(2024, 1, 17, 14, 0, 0),
183+
"updated_at": datetime(2024, 1, 17, 14, 0, 0),
184+
"salary": Decimal("88000.00"),
185+
}
186+
)
152187
print("Successfully appended row (dict with Date, Time, Timestamp, Decimal)")
153188

154189
# List input with all types
155-
await append_writer.append([
156-
9, "Ivan", 90.0, 31,
157-
date(1993, 8, 25),
158-
dt_time(16, 45, 0),
159-
datetime(2024, 1, 17, 15, 30, 0),
160-
datetime(2024, 1, 17, 15, 30, 0),
161-
Decimal("91500.50"),
162-
])
190+
await append_writer.append(
191+
[
192+
9,
193+
"Ivan",
194+
90.0,
195+
31,
196+
date(1993, 8, 25),
197+
dt_time(16, 45, 0),
198+
datetime(2024, 1, 17, 15, 30, 0),
199+
datetime(2024, 1, 17, 15, 30, 0),
200+
Decimal("91500.50"),
201+
]
202+
)
163203
print("Successfully appended row (list with Date, Time, Timestamp, Decimal)")
164204

165205
# Test 4: Write Pandas DataFrame
@@ -172,8 +212,14 @@ async def main():
172212
"age": [29, 27],
173213
"birth_date": [date(1995, 2, 14), date(1997, 9, 30)],
174214
"check_in_time": [dt_time(10, 0, 0), dt_time(10, 30, 0)],
175-
"created_at": [datetime(2024, 1, 18, 8, 0), datetime(2024, 1, 18, 8, 30)],
176-
"updated_at": [datetime(2024, 1, 18, 8, 0), datetime(2024, 1, 18, 8, 30)],
215+
"created_at": [
216+
datetime(2024, 1, 18, 8, 0),
217+
datetime(2024, 1, 18, 8, 30),
218+
],
219+
"updated_at": [
220+
datetime(2024, 1, 18, 8, 0),
221+
datetime(2024, 1, 18, 8, 30),
222+
],
177223
"salary": [Decimal("79000.00"), Decimal("85500.75")],
178224
}
179225
)
@@ -249,6 +295,199 @@ async def main():
249295
except Exception as e:
250296
print(f"Error during scanning: {e}")
251297

298+
# =====================================================
299+
# Demo: Primary Key Table with Lookup and Upsert
300+
# =====================================================
301+
print("\n" + "=" * 60)
302+
print("--- Testing Primary Key Table (Lookup & Upsert) ---")
303+
print("=" * 60)
304+
305+
# Create a primary key table for lookup/upsert tests
306+
# Include temporal and decimal types to test full conversion
307+
pk_table_fields = [
308+
pa.field("user_id", pa.int32()),
309+
pa.field("name", pa.string()),
310+
pa.field("email", pa.string()),
311+
pa.field("age", pa.int32()),
312+
pa.field("birth_date", pa.date32()),
313+
pa.field("login_time", pa.time32("ms")),
314+
pa.field("created_at", pa.timestamp("us")), # TIMESTAMP (NTZ)
315+
pa.field("updated_at", pa.timestamp("us", tz="UTC")), # TIMESTAMP_LTZ
316+
pa.field("balance", pa.decimal128(10, 2)),
317+
]
318+
pk_schema = pa.schema(pk_table_fields)
319+
fluss_pk_schema = fluss.Schema(pk_schema, primary_keys=["user_id"])
320+
321+
# Create table descriptor
322+
pk_table_descriptor = fluss.TableDescriptor(
323+
fluss_pk_schema,
324+
bucket_count=3,
325+
)
326+
327+
pk_table_path = fluss.TablePath("fluss", "users_pk_table_v3")
328+
329+
try:
330+
await admin.create_table(pk_table_path, pk_table_descriptor, True)
331+
print(f"Created PK table: {pk_table_path}")
332+
except Exception as e:
333+
print(f"PK Table creation failed (may already exist): {e}")
334+
335+
# Get the PK table
336+
pk_table = await conn.get_table(pk_table_path)
337+
print(f"Got PK table: {pk_table}")
338+
print(f"Has primary key: {pk_table.has_primary_key()}")
339+
340+
# --- Test Upsert ---
341+
print("\n--- Testing Upsert ---")
342+
try:
343+
upsert_writer = pk_table.new_upsert()
344+
print(f"Created upsert writer: {upsert_writer}")
345+
346+
await upsert_writer.upsert(
347+
{
348+
"user_id": 1,
349+
"name": "Alice",
350+
"email": "alice@example.com",
351+
"age": 25,
352+
"birth_date": date(1999, 5, 15),
353+
"login_time": dt_time(9, 30, 45, 123000), # 09:30:45.123
354+
"created_at": datetime(
355+
2024, 1, 15, 10, 30, 45, 123456
356+
), # with microseconds
357+
"updated_at": datetime(2024, 1, 15, 10, 30, 45, 123456),
358+
"balance": Decimal("1234.56"),
359+
}
360+
)
361+
print("Upserted user_id=1 (Alice)")
362+
363+
await upsert_writer.upsert(
364+
{
365+
"user_id": 2,
366+
"name": "Bob",
367+
"email": "bob@example.com",
368+
"age": 30,
369+
"birth_date": date(1994, 3, 20),
370+
"login_time": dt_time(14, 15, 30, 500000), # 14:15:30.500
371+
"created_at": datetime(2024, 1, 16, 11, 22, 33, 444555),
372+
"updated_at": datetime(2024, 1, 16, 11, 22, 33, 444555),
373+
"balance": Decimal("5678.91"),
374+
}
375+
)
376+
print("Upserted user_id=2 (Bob)")
377+
378+
await upsert_writer.upsert(
379+
{
380+
"user_id": 3,
381+
"name": "Charlie",
382+
"email": "charlie@example.com",
383+
"age": 35,
384+
"birth_date": date(1989, 11, 8),
385+
"login_time": dt_time(16, 45, 59, 999000), # 16:45:59.999
386+
"created_at": datetime(2024, 1, 17, 23, 59, 59, 999999),
387+
"updated_at": datetime(2024, 1, 17, 23, 59, 59, 999999),
388+
"balance": Decimal("9876.54"),
389+
}
390+
)
391+
print("Upserted user_id=3 (Charlie)")
392+
393+
# Update an existing row (same PK, different values)
394+
await upsert_writer.upsert(
395+
{
396+
"user_id": 1,
397+
"name": "Alice Updated",
398+
"email": "alice.new@example.com",
399+
"age": 26,
400+
"birth_date": date(1999, 5, 15),
401+
"login_time": dt_time(10, 11, 12, 345000), # 10:11:12.345
402+
"created_at": datetime(2024, 1, 15, 10, 30, 45, 123456), # unchanged
403+
"updated_at": datetime(
404+
2024, 1, 20, 15, 45, 30, 678901
405+
), # new update time
406+
"balance": Decimal("2345.67"),
407+
}
408+
)
409+
print("Updated user_id=1 (Alice -> Alice Updated)")
410+
411+
# Explicit flush to ensure all upserts are acknowledged
412+
await upsert_writer.flush()
413+
print("Flushed all upserts")
414+
415+
except Exception as e:
416+
print(f"Error during upsert: {e}")
417+
traceback.print_exc()
418+
419+
# --- Test Lookup ---
420+
print("\n--- Testing Lookup ---")
421+
try:
422+
lookuper = pk_table.new_lookup()
423+
print(f"Created lookuper: {lookuper}")
424+
425+
result = await lookuper.lookup({"user_id": 1})
426+
if result:
427+
print("Lookup user_id=1: Found!")
428+
print(f" name: {result['name']}")
429+
print(f" email: {result['email']}")
430+
print(f" age: {result['age']}")
431+
print(
432+
f" birth_date: {result['birth_date']} (type: {type(result['birth_date']).__name__})"
433+
)
434+
print(
435+
f" login_time: {result['login_time']} (type: {type(result['login_time']).__name__})"
436+
)
437+
print(
438+
f" created_at: {result['created_at']} (type: {type(result['created_at']).__name__})"
439+
)
440+
print(
441+
f" updated_at: {result['updated_at']} (type: {type(result['updated_at']).__name__})"
442+
)
443+
print(
444+
f" balance: {result['balance']} (type: {type(result['balance']).__name__})"
445+
)
446+
else:
447+
print("Lookup user_id=1: Not found")
448+
449+
# Lookup another row
450+
result = await lookuper.lookup({"user_id": 2})
451+
if result:
452+
print(f"Lookup user_id=2: Found! -> {result}")
453+
else:
454+
print("Lookup user_id=2: Not found")
455+
456+
# Lookup non-existent row
457+
result = await lookuper.lookup({"user_id": 999})
458+
if result:
459+
print(f"Lookup user_id=999: Found! -> {result}")
460+
else:
461+
print("Lookup user_id=999: Not found (as expected)")
462+
463+
except Exception as e:
464+
print(f"Error during lookup: {e}")
465+
traceback.print_exc()
466+
467+
# --- Test Delete ---
468+
print("\n--- Testing Delete ---")
469+
try:
470+
upsert_writer = pk_table.new_upsert()
471+
472+
# Delete only needs PK columns - much simpler API!
473+
await upsert_writer.delete({"user_id": 3})
474+
print("Deleted user_id=3")
475+
476+
# Explicit flush to ensure delete is acknowledged
477+
await upsert_writer.flush()
478+
print("Flushed delete")
479+
480+
lookuper = pk_table.new_lookup()
481+
result = await lookuper.lookup({"user_id": 3})
482+
if result:
483+
print(f"Lookup user_id=3 after delete: Still found! -> {result}")
484+
else:
485+
print("Lookup user_id=3 after delete: Not found (deletion confirmed)")
486+
487+
except Exception as e:
488+
print(f"Error during delete: {e}")
489+
traceback.print_exc()
490+
252491
# Demo: Column projection
253492
print("\n--- Testing Column Projection ---")
254493
try:
@@ -258,7 +497,9 @@ async def main():
258497
scanner_index.subscribe(None, None)
259498
df_projected = scanner_index.to_pandas()
260499
print(df_projected.head())
261-
print(f" Projected {df_projected.shape[1]} columns: {list(df_projected.columns)}")
500+
print(
501+
f" Projected {df_projected.shape[1]} columns: {list(df_projected.columns)}"
502+
)
262503

263504
# Project specific columns by name (Pythonic!)
264505
print("\n2. Projection by name ['name', 'score'] (Pythonic):")

0 commit comments

Comments
 (0)