Skip to content

Commit 0a881ff

Browse files
committed
Add iceberg v2 consumer for flattened data consuming, schema will be inferred by arrow
1 parent 8a6c2db commit 0a881ff

File tree

7 files changed

+128
-102
lines changed

7 files changed

+128
-102
lines changed

pydbzengine/handlers/iceberg.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,8 @@ def load_table(self, table_identifier):
143143
table = self.catalog.create_table(identifier=table_identifier,
144144
schema=self._target_schema,
145145
partition_spec=self.DEBEZIUM_TABLE_PARTITION_SPEC)
146-
self.log.info(f"Created iceberg table {'.'.join(table_identifier)} with daily partitioning on _consumed_at.")
146+
self.log.info(
147+
f"Created iceberg table {'.'.join(table_identifier)} with daily partitioning on _consumed_at.")
147148
return table
148149

149150
@property
@@ -236,6 +237,7 @@ def _handle_table_changes(self, destination: str, records: List[ChangeEvent]):
236237
records=records
237238
)
238239

240+
self._handle_schema_changes(table=table, arrow_schema=enriched_arrow_data.schema)
239241
table.append(enriched_arrow_data)
240242
self.log.info(f"Appended {len(enriched_arrow_data)} records to table {'.'.join(table.name())}")
241243

@@ -401,3 +403,8 @@ def _get_identifier_fields(self, sample_event: ChangeEvent, table_identifier: tu
401403

402404
self.log.info(f"Found potential primary key fields {key_field_names} for table {table_name_str}")
403405
return key_field_names
406+
407+
def _handle_schema_changes(self, table: "Table", arrow_schema: "pa.Schema"):
408+
with table.update_schema() as update:
409+
update.union_by_name(new_schema=arrow_schema)
410+
self.log.info(f"Schema for table {'.'.join(table.name())} has been updated.")

pydbzengine/helper.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ def timeout_handler(signum, frame):
2020
class Utils:
2121

2222
@staticmethod
23-
def run_engine_async(engine, timeout_sec=22):
23+
def run_engine_async(engine, timeout_sec=22, blocking=True):
2424
"""
2525
Runs an engine asynchronously with a timeout.
2626
@@ -37,11 +37,12 @@ def run_engine_async(engine, timeout_sec=22):
3737
signal.alarm(timeout_sec)
3838

3939
try:
40-
thread = threading.Thread(target=engine.run)
40+
thread = threading.Thread(target=engine.run, daemon=True)
4141
thread.start()
4242

43-
# Wait for the thread to complete (or the timeout to occur).
44-
thread.join() # This will block until the thread finishes or the signal is received.
43+
if blocking:
44+
# Wait for the thread to complete (or the timeout to occur).
45+
thread.join() # This will block until the thread finishes or the signal is received.
4546

4647
except TimeoutError:
4748
# Handle the timeout exception.

tests/base_postgresql_test.py

Lines changed: 35 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -11,35 +11,44 @@ class BasePostgresqlTest(unittest.TestCase):
1111
OFFSET_FILE = CURRENT_DIR.joinpath('postgresql-offsets.dat')
1212
SOURCEPGDB = DbPostgresql()
1313

14-
def debezium_engine_props(self, unwrap_messages=True):
14+
def debezium_engine_props_dict(self, unwrap_messages=True) -> dict:
1515
current_dir = Path(__file__).parent
1616
offset_file_path = current_dir.joinpath('postgresql-offsets.dat')
1717

18-
props = Properties()
19-
props.setProperty("name", "engine")
20-
props.setProperty("snapshot.mode", "always")
21-
props.setProperty("database.hostname", self.SOURCEPGDB.CONTAINER.get_container_host_ip())
22-
props.setProperty("database.port", str(self.SOURCEPGDB.CONTAINER.get_exposed_port(self.SOURCEPGDB.POSTGRES_PORT_DEFAULT)))
23-
props.setProperty("database.user", self.SOURCEPGDB.POSTGRES_USER)
24-
props.setProperty("database.password", self.SOURCEPGDB.POSTGRES_PASSWORD)
25-
props.setProperty("database.dbname", self.SOURCEPGDB.POSTGRES_DBNAME)
26-
props.setProperty("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
27-
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
28-
props.setProperty("offset.storage.file.filename", offset_file_path.as_posix())
29-
props.setProperty("poll.interval.ms", "10000")
30-
props.setProperty("converter.schemas.enable", "false")
31-
props.setProperty("offset.flush.interval.ms", "1000")
32-
props.setProperty("topic.prefix", "testc")
33-
props.setProperty("schema.whitelist", "inventory")
34-
props.setProperty("database.whitelist", "inventory")
35-
props.setProperty("table.whitelist", "inventory.products")
36-
props.setProperty("replica.identity.autoset.values", "inventory.*:FULL")
18+
conf: dict = {}
19+
conf.setdefault("name", "engine")
20+
conf.setdefault("snapshot.mode", "always")
21+
conf.setdefault("database.hostname", self.SOURCEPGDB.CONTAINER.get_container_host_ip())
22+
conf.setdefault("database.port",
23+
str(self.SOURCEPGDB.CONTAINER.get_exposed_port(self.SOURCEPGDB.POSTGRES_PORT_DEFAULT)))
24+
conf.setdefault("database.user", self.SOURCEPGDB.POSTGRES_USER)
25+
conf.setdefault("database.password", self.SOURCEPGDB.POSTGRES_PASSWORD)
26+
conf.setdefault("database.dbname", self.SOURCEPGDB.POSTGRES_DBNAME)
27+
conf.setdefault("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
28+
conf.setdefault("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
29+
conf.setdefault("offset.storage.file.filename", offset_file_path.as_posix())
30+
conf.setdefault("poll.interval.ms", "10000")
31+
conf.setdefault("converter.schemas.enable", "false")
32+
conf.setdefault("offset.flush.interval.ms", "1000")
33+
conf.setdefault("topic.prefix", "testc")
34+
conf.setdefault("schema.whitelist", "inventory")
35+
conf.setdefault("database.whitelist", "inventory")
36+
conf.setdefault("table.whitelist", "inventory.products")
37+
conf.setdefault("replica.identity.autoset.values", "inventory.*:FULL")
3738

3839
if unwrap_messages:
39-
props.setProperty("transforms", "unwrap")
40-
props.setProperty("transforms.unwrap.type", "io.debezium.transforms.ExtractNewRecordState")
41-
props.setProperty("transforms.unwrap.add.fields", "op,table,source.ts_ms,sourcedb,ts_ms")
42-
props.setProperty("transforms.unwrap.delete.handling.mode", "rewrite")
40+
conf.setdefault("transforms", "unwrap")
41+
conf.setdefault("transforms.unwrap.type", "io.debezium.transforms.ExtractNewRecordState")
42+
conf.setdefault("transforms.unwrap.add.fields", "op,table,source.ts_ms,sourcedb,ts_ms")
43+
conf.setdefault("transforms.unwrap.delete.handling.mode", "rewrite")
44+
45+
return conf
46+
47+
def debezium_engine_props(self, unwrap_messages=True):
48+
props = Properties()
49+
conf = self.debezium_engine_props_dict(unwrap_messages=unwrap_messages)
50+
for k, v in conf.items():
51+
props.setProperty(k, v)
4352
return props
4453

4554
def clean_offset_file(self):
@@ -54,5 +63,5 @@ def tearDown(self):
5463
self.SOURCEPGDB.stop()
5564
self.clean_offset_file()
5665

57-
def execute_on_source_db(self, sql:str):
58-
self.SOURCEPGDB.execute_sql(sql=sql)
66+
def execute_on_source_db(self, sql: str):
67+
self.SOURCEPGDB.execute_sql(sql=sql)

tests/catalog_rest.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,3 +46,6 @@ def list_namespaces(self):
4646
catalog = self.get_catalog()
4747
namespaces = catalog.list_namespaces()
4848
print("Namespaces:", namespaces)
49+
50+
def __exit__(self, exc_type, exc_val, exc_tb):
51+
self.stop()

tests/db_postgresql.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@ def stop(self):
3939
pass
4040

4141
def get_connection(self) -> Connection:
42-
engine = sqlalchemy.create_engine(self.CONTAINER.get_connection_url())
42+
url = self.CONTAINER.get_connection_url()
43+
print(url)
44+
engine = sqlalchemy.create_engine(url)
4345
return engine.connect()
4446

4547
def __exit__(self, exc_type, exc_value, traceback):

tests/test_iceberg_handler.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ def setUp(self):
2626
def tearDown(self):
2727
self.SOURCEPGDB.stop()
2828
self.S3MiNIO.stop()
29+
self.RESTCATALOG.stop()
2930
self.clean_offset_file()
3031

3132
@unittest.skip

tests/test_iceberg_handlerv2.py

Lines changed: 73 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1-
import unittest
1+
import io
2+
import threading
3+
import time
24

35
import pandas as pd
6+
import pyarrow as pa
7+
import pyarrow.json as pj
8+
import waiting
49
from pyiceberg.catalog import load_catalog
5-
from pyiceberg.schema import Schema
6-
from pyiceberg.types import LongType, NestedField, StringType
710

811
from base_postgresql_test import BasePostgresqlTest
912
from catalog_rest import CatalogRestContainer
@@ -24,99 +27,99 @@ def setUp(self):
2427
self.S3MiNIO.start()
2528
self.RESTCATALOG.start(s3_endpoint=self.S3MiNIO.endpoint())
2629
# Set pandas options to display all rows and columns, and prevent truncation of cell content
27-
pd.set_option('display.max_rows', None) # Show all rows
28-
pd.set_option('display.max_columns', None) # Show all columns
29-
pd.set_option('display.width', None) # Auto-detect terminal width
30-
pd.set_option('display.max_colwidth', None) # Do not truncate cell contents
30+
pd.set_option('display.max_rows', None) # Show all rows
31+
pd.set_option('display.max_columns', None) # Show all columns
32+
pd.set_option('display.width', None) # Auto-detect terminal width
33+
pd.set_option('display.max_colwidth', None) # Do not truncate cell contents
3134

3235
def tearDown(self):
3336
self.SOURCEPGDB.stop()
3437
self.S3MiNIO.stop()
38+
self.RESTCATALOG.stop()
3539
self.clean_offset_file()
3640

37-
@unittest.skip
38-
def test_iceberg_catalog(self):
39-
conf = {
40-
"uri": self.RESTCATALOG.get_uri(),
41-
# "s3.path-style.access": "true",
42-
"warehouse": "warehouse",
43-
"s3.endpoint": self.S3MiNIO.endpoint(),
44-
"s3.access-key-id": S3Minio.AWS_ACCESS_KEY_ID,
45-
"s3.secret-access-key": S3Minio.AWS_SECRET_ACCESS_KEY,
46-
}
47-
print(conf)
48-
catalog = load_catalog(
49-
name="rest",
50-
**conf
51-
)
52-
catalog.create_namespace('my_warehouse')
53-
debezium_event_schema = Schema(
54-
NestedField(field_id=1, name="id", field_type=LongType(), required=True),
55-
NestedField(field_id=2, name="data", field_type=StringType(), required=False),
56-
)
57-
table = catalog.create_table(identifier=("my_warehouse", "test_table",), schema=debezium_event_schema)
58-
print(f"Created iceberg table {table.refs()}")
41+
def test_read_json_lines_example(self):
42+
json_data = """
43+
{"id": 1, "name": "Alice", "age": 30}
44+
{"id": 2, "name": "Bob", "age": 24}
45+
{"id": 3, "name": "Charlie", "age": 35}
46+
""".strip() # .strip() removes leading/trailing whitespace/newlines
47+
json_buffer = io.BytesIO(json_data.encode('utf-8'))
48+
json_buffer.seek(0)
49+
# =============================
50+
table_inferred = pj.read_json(json_buffer)
51+
print("\nInferred Schema:")
52+
print(table_inferred.schema)
53+
# =============================
54+
explicit_schema = pa.schema([
55+
pa.field('id', pa.int64()), # Integer type for 'id'
56+
pa.field('name', pa.string()), # String type for 'name'
57+
])
58+
json_buffer.seek(0)
59+
po = pj.ParseOptions(explicit_schema=explicit_schema)
60+
table_explicit = pj.read_json(json_buffer, parse_options=po)
61+
print("\nExplicit Schema:")
62+
print(table_explicit.schema)
63+
64+
def _apply_source_db_changes(self):
65+
time.sleep(12)
66+
self.execute_on_source_db("UPDATE inventory.customers SET first_name='George__UPDATE1' WHERE ID = 1002 ;")
67+
# self.execute_on_source_db("ALTER TABLE inventory.customers DROP COLUMN email;")
68+
self.execute_on_source_db("UPDATE inventory.customers SET first_name='George__UPDATE2' WHERE ID = 1002 ;")
69+
self.execute_on_source_db("DELETE FROM inventory.orders WHERE purchaser = 1002 ;")
70+
self.execute_on_source_db("DELETE FROM inventory.customers WHERE id = 1002 ;")
71+
self.execute_on_source_db("ALTER TABLE inventory.customers ADD birth_date date;")
72+
self.execute_on_source_db("UPDATE inventory.customers SET birth_date = '2020-01-01' WHERE id = 1001 ;")
5973

6074
def test_iceberg_handler(self):
61-
dest_ns1_database="my_warehouse"
62-
dest_ns2_schema="dbz_cdc_data"
63-
conf = {
75+
dest_ns1_database = "my_warehouse"
76+
dest_ns2_schema = "dbz_cdc_data"
77+
catalog_conf = {
6478
"uri": self.RESTCATALOG.get_uri(),
65-
# "s3.path-style.access": "true",
6679
"warehouse": "warehouse",
6780
"s3.endpoint": self.S3MiNIO.endpoint(),
6881
"s3.access-key-id": S3Minio.AWS_ACCESS_KEY_ID,
6982
"s3.secret-access-key": S3Minio.AWS_SECRET_ACCESS_KEY,
7083
}
71-
catalog = load_catalog(name="rest",**conf)
72-
84+
catalog = load_catalog(name="rest", **catalog_conf)
7385
handler = IcebergChangeHandlerV2(catalog=catalog,
7486
destination_namespace=(dest_ns1_database, dest_ns2_schema,),
7587
event_flattening_enabled=True
7688
)
77-
7889
dbz_props = self.debezium_engine_props(unwrap_messages=True)
7990
engine = DebeziumJsonEngine(properties=dbz_props, handler=handler)
80-
with self.assertLogs(IcebergChangeHandlerV2.LOGGER_NAME, level='INFO') as cm:
81-
# run async then interrupt after timeout time to test the result!
82-
Utils.run_engine_async(engine=engine, timeout_sec=44)
8391

84-
# for t in cm.output:
85-
# print(t)
86-
self.assertRegex(text=str(cm.output), expected_regex='.*Created iceberg table.*')
87-
self.assertRegex(text=str(cm.output), expected_regex='.*Appended.*records to table.*')
92+
t = threading.Thread(target=self._apply_source_db_changes)
93+
t.start()
94+
Utils.run_engine_async(engine=engine, timeout_sec=77, blocking=False)
8895

89-
# catalog.create_namespace(dest_ns1_database)
90-
namespaces = catalog.list_namespaces()
91-
self.assertIn((dest_ns1_database,) , namespaces, msg="Namespace not found in catalog")
96+
test_ns = (dest_ns1_database,)
97+
print(catalog.list_namespaces())
98+
waiting.wait(predicate=lambda: test_ns in catalog.list_namespaces(), timeout_seconds=7.5)
9299

93-
tables = catalog.list_tables((dest_ns1_database, dest_ns2_schema,))
94-
print(tables)
95-
self.assertIn(('my_warehouse', 'dbz_cdc_data', 'testc_inventory_customers'), tables, msg="Namespace not found in catalog")
100+
test_tbl = ('my_warehouse', 'dbz_cdc_data', 'testc_inventory_customers')
101+
test_tbl_ns = (dest_ns1_database, dest_ns2_schema,)
102+
waiting.wait(predicate=lambda: test_tbl in catalog.list_tables(test_tbl_ns), timeout_seconds=10.5)
96103

97-
tbl = catalog.load_table(identifier=('my_warehouse', 'dbz_cdc_data', 'testc_inventory_customers'))
98-
data = tbl.scan().to_arrow()
99-
self.assertIn("[email protected]", str(data))
100-
self.assertIn("[email protected]", str(data))
101-
self.assertEqual(data.num_rows, 4)
104+
test_tbl_data = ('my_warehouse', 'dbz_cdc_data', 'testc_inventory_customers')
105+
waiting.wait(predicate=lambda: "[email protected]" in str(self.red_table(catalog, test_tbl_data)),
106+
timeout_seconds=10.5)
107+
waiting.wait(predicate=lambda: self.red_table(catalog, test_tbl_data).num_rows >= 4, timeout_seconds=10.5)
108+
109+
data = self.red_table(catalog, test_tbl_data)
102110
self.pprint_table(data=data)
103-
#=================================================================
104-
## ==== PART 2 CONSUME CHANGES FROM BINLOG =======================
105-
#=================================================================
106-
self.execute_on_source_db("UPDATE inventory.customers SET first_name='George__UPDATE1' WHERE ID = 1002 ;")
107-
# self.execute_on_source_db("ALTER TABLE inventory.customers DROP COLUMN email;")
108-
self.execute_on_source_db("UPDATE inventory.customers SET first_name='George__UPDATE2' WHERE ID = 1002 ;")
109-
self.execute_on_source_db("DELETE FROM inventory.orders WHERE purchaser = 1002 ;")
110-
self.execute_on_source_db("DELETE FROM inventory.customers WHERE id = 1002 ;")
111-
self.execute_on_source_db("ALTER TABLE inventory.customers ADD birth_date date;")
112-
self.execute_on_source_db("UPDATE inventory.customers SET birth_date = '2020-01-01' WHERE id = 1001 ;")
113-
# run
114-
Utils.run_engine_async(engine=engine, timeout_sec=44)
115-
# test
116-
# @TODO test that new field is received and added to iceberg!
117-
data = tbl.scan().to_arrow()
111+
# =================================================================
112+
## ==== PART 2 CONSUME CHANGES FROM BINLOG ========================
113+
# =================================================================
114+
waiting.wait(predicate=lambda: self.red_table(catalog, test_tbl_data).num_rows >= 7, timeout_seconds=77)
115+
data = self.red_table(catalog, test_tbl_data)
118116
self.pprint_table(data=data)
119-
self.assertEqual(data.num_rows, 4)
117+
118+
def red_table(self, catalog, table_identifier) -> "pa.Table":
119+
tbl = catalog.load_table(identifier=table_identifier)
120+
data = tbl.scan().to_arrow()
121+
self.pprint_table(data)
122+
return data
120123

121124
def pprint_table(self, data):
122125
print("--- Iceberg Table Content ---")

0 commit comments

Comments
 (0)