@@ -379,27 +379,6 @@ def wait_for_slot(self, slot: int) -> int:
379
379
msg = f"Failed to wait for slot number { slot } ."
380
380
raise exceptions .CLIError (msg )
381
381
382
- def poll_new_epoch (self , exp_epoch : int , padding_seconds : int = 0 ) -> None :
383
- """Wait for new epoch(s) by polling current epoch every 3 sec.
384
-
385
- Can be used only for waiting up to 3000 sec + padding seconds.
386
-
387
- Args:
388
- exp_epoch: An epoch number to wait for.
389
- padding_seconds: A number of additional seconds to wait for (optional).
390
- """
391
- for check_no in range (1000 ):
392
- wakeup_epoch = self .g_query .get_epoch ()
393
- if wakeup_epoch != exp_epoch :
394
- time .sleep (3 )
395
- continue
396
- # we are in the expected epoch right from the beginning, we'll skip padding seconds
397
- if check_no == 0 :
398
- break
399
- if padding_seconds :
400
- time .sleep (padding_seconds )
401
- break
402
-
403
382
def wait_for_new_epoch (self , new_epochs : int = 1 , padding_seconds : int = 0 ) -> int :
404
383
"""Wait for new epoch(s).
405
384
@@ -410,40 +389,38 @@ def wait_for_new_epoch(self, new_epochs: int = 1, padding_seconds: int = 0) -> i
410
389
Returns:
411
390
int: The current epoch.
412
391
"""
413
- start_epoch = self .g_query .get_epoch ()
392
+ start_tip = self .g_query .get_tip ()
393
+ start_epoch = int (start_tip ["epoch" ])
414
394
415
395
if new_epochs < 1 :
416
396
return start_epoch
417
397
418
- exp_epoch = start_epoch + new_epochs
419
- LOGGER . debug (
420
- f"Current epoch: { start_epoch } ; Waiting for the beginning of epoch: { exp_epoch } "
398
+ epoch_no = start_epoch + new_epochs
399
+ return clusterlib_helpers . wait_for_epoch (
400
+ clusterlib_obj = self , tip = start_tip , epoch_no = epoch_no , padding_seconds = padding_seconds
421
401
)
422
402
423
- # calculate and wait for the expected slot
424
- boundary_slot = int ((start_epoch + new_epochs ) * self .epoch_length - self .slots_offset )
425
- padding_slots = int (padding_seconds / self .slot_length ) if padding_seconds else 5
426
- exp_slot = boundary_slot + padding_slots
427
- self .wait_for_slot (slot = exp_slot )
428
-
429
- this_epoch = self .g_query .get_epoch ()
430
- if this_epoch != exp_epoch :
431
- LOGGER .error (
432
- f"Waited for epoch number { exp_epoch } and current epoch is "
433
- f"number { this_epoch } , wrong `slots_offset` ({ self .slots_offset } )?"
434
- )
435
- # attempt to get the epoch boundary as precisely as possible failed, now just
436
- # query epoch number and wait
437
- self .poll_new_epoch (exp_epoch = exp_epoch , padding_seconds = padding_seconds )
438
-
439
- # Still not in the correct epoch? Something is wrong.
440
- this_epoch = self .g_query .get_epoch ()
441
- if this_epoch != exp_epoch :
442
- msg = f"Waited for epoch number { exp_epoch } and current epoch is number { this_epoch } ."
443
- raise exceptions .CLIError (msg )
403
+ def wait_for_epoch (
404
+ self , epoch_no : int , padding_seconds : int = 0 , future_is_ok : bool = True
405
+ ) -> int :
406
+ """Wait for epoch no.
407
+
408
+ Args:
409
+ epoch_no: A number of epoch to wait for.
410
+ padding_seconds: A number of additional seconds to wait for (optional).
411
+ future_is_ok: A bool indicating whether current epoch > `epoch_no` is acceptable
412
+ (default: True).
444
413
445
- LOGGER .debug (f"Expected epoch started; epoch number: { this_epoch } " )
446
- return this_epoch
414
+ Returns:
415
+ int: The current epoch.
416
+ """
417
+ return clusterlib_helpers .wait_for_epoch (
418
+ clusterlib_obj = self ,
419
+ tip = self .g_query .get_tip (),
420
+ epoch_no = epoch_no ,
421
+ padding_seconds = padding_seconds ,
422
+ future_is_ok = future_is_ok ,
423
+ )
447
424
448
425
def time_to_epoch_end (self , tip : tp .Optional [dict ] = None ) -> float :
449
426
"""How many seconds to go to start of a new epoch."""
0 commit comments