55import pandas as pd
66import pyarrow as pa
77import pyarrow .json as pj
8- import waiting
98from pyiceberg .catalog import load_catalog
109
1110from base_postgresql_test import BasePostgresqlTest
@@ -89,29 +88,42 @@ def test_iceberg_handler(self):
8988 dbz_props = self .debezium_engine_props (unwrap_messages = True )
9089 engine = DebeziumJsonEngine (properties = dbz_props , handler = handler )
9190
92- t = threading .Thread (target = self ._apply_source_db_changes )
93- t .start ()
9491 Utils .run_engine_async (engine = engine , timeout_sec = 77 , blocking = False )
9592
9693 test_ns = (dest_ns1_database ,)
9794 print (catalog .list_namespaces ())
98- waiting .wait (predicate = lambda : test_ns in catalog .list_namespaces (), timeout_seconds = 7.5 )
95+ self ._wait_for_condition (
96+ predicate = lambda : test_ns in catalog .list_namespaces (),
97+ failure_message = f"Namespace { test_ns } did not appear in the catalog"
98+ )
9999
100100 test_tbl = ('my_warehouse' , 'dbz_cdc_data' , 'testc_inventory_customers' )
101101 test_tbl_ns = (dest_ns1_database , dest_ns2_schema ,)
102- waiting .wait (predicate = lambda : test_tbl in catalog .list_tables (test_tbl_ns ), timeout_seconds = 10.5 )
102+ self ._wait_for_condition (
103+ predicate = lambda : test_tbl in catalog .list_tables (test_tbl_ns ),
104+ failure_message = f"Table { test_tbl } did not appear in the tables"
105+ )
103106
104107 test_tbl_data = ('my_warehouse' , 'dbz_cdc_data' , 'testc_inventory_customers' )
105- waiting .
wait (
predicate = lambda :
"[email protected] " in str (
self .
red_table (
catalog ,
test_tbl_data )),
106- timeout_seconds = 10.5 )
107- waiting .wait (predicate = lambda : self .red_table (catalog , test_tbl_data ).num_rows >= 4 , timeout_seconds = 10.5 )
108+ self ._wait_for_condition (
109+ predicate = lambda :
"[email protected] " in str (
self .
red_table (
catalog ,
test_tbl_data )),
110+ failure_message = f"Expected row not consumed!"
111+ )
112+ self ._wait_for_condition (
113+ predicate = lambda : self .red_table (catalog , test_tbl_data ).num_rows >= 4 ,
114+ failure_message = f"Rows { 4 } did not consumed"
115+ )
108116
109117 data = self .red_table (catalog , test_tbl_data )
110118 self .pprint_table (data = data )
111119 # =================================================================
112120 ## ==== PART 2 CONSUME CHANGES FROM BINLOG ========================
113121 # =================================================================
114- waiting .wait (predicate = lambda : self .red_table (catalog , test_tbl_data ).num_rows >= 7 , timeout_seconds = 77 )
122+ self ._apply_source_db_changes ()
123+ self ._wait_for_condition (
124+ predicate = lambda : self .red_table (catalog , test_tbl_data ).num_rows >= 7 ,
125+ failure_message = f"Rows { 7 } did not consumed"
126+ )
115127 data = self .red_table (catalog , test_tbl_data )
116128 self .pprint_table (data = data )
117129
@@ -125,3 +137,18 @@ def pprint_table(self, data):
125137 print ("--- Iceberg Table Content ---" )
126138 print (data .to_pandas ())
127139 print ("---------------------------\n " )
140+
141+ def _wait_for_condition (self , predicate , failure_message : str , retries : int = 10 , delay_seconds : int = 2 ):
142+ attempts = 0
143+ while attempts < retries :
144+ print (f"Attempt { attempts + 1 } /{ retries } : Checking condition..." )
145+ if predicate ():
146+ print ("Condition met." )
147+ return
148+
149+ attempts += 1
150+ # Avoid sleeping after the last attempt
151+ if attempts < retries :
152+ time .sleep (delay_seconds )
153+
154+ raise TimeoutError (f"{ failure_message } after { retries } attempts ({ retries * delay_seconds } seconds)." )
0 commit comments