@@ -776,11 +776,14 @@ def __init__(
776776 self .max_batches_per_minute = max_batches_per_minute
777777 if max_batches_per_minute is not None :
778778 from collections import deque
779+
779780 self .batch_timestamps = deque ()
780781 else :
781782 self .batch_timestamps = None
782783
783- def _process_batch_and_update_state (self , batch , last_msg_id , trigger_reason = "unknown" ):
784+ def _process_batch_and_update_state (
785+ self , batch , last_msg_id , trigger_reason = "unknown"
786+ ):
784787 """Process a batch of messages and update connector state.
785788
786789 This method handles:
@@ -831,7 +834,7 @@ def _process_batch_and_update_state(self, batch, last_msg_id, trigger_reason="un
831834 "trigger_reason" : trigger_reason ,
832835 "elapsed_time" : elapsed ,
833836 "timestamp" : time .time (),
834- }
837+ },
835838 }
836839
837840 self .callback (batch_data )
@@ -843,7 +846,7 @@ def _process_batch_and_update_state(self, batch, last_msg_id, trigger_reason="un
843846 {
844847 "batch_size" : len (batch ),
845848 "trigger" : trigger_reason ,
846- }
849+ },
847850 )
848851 self .exit_event .set ()
849852 return False
@@ -895,10 +898,9 @@ def _wait_for_rate_limit(self, q):
895898 "max_batches_per_minute" : self .max_batches_per_minute ,
896899 "current_batch_count" : len (self .batch_timestamps ),
897900 "wait_seconds" : round (wait_time , 2 ),
898- }
901+ },
899902 )
900903
901-
902904 chunk_size = 30.0
903905 total_slept = 0.0
904906
@@ -2338,7 +2340,7 @@ def listen_stream_batch(
23382340 if max_batches_per_minute > 10000 :
23392341 self .connector_logger .warning (
23402342 "Very high max_batches_per_minute configured" ,
2341- {"max_batches_per_minute" : max_batches_per_minute }
2343+ {"max_batches_per_minute" : max_batches_per_minute },
23422344 )
23432345
23442346 params = self ._resolve_stream_parameters (
@@ -2356,9 +2358,7 @@ def listen_stream_batch(
23562358 if max_batches_per_minute is not None :
23572359 self .connector_logger .info (
23582360 "Batch rate limiting enabled" ,
2359- {
2360- "max_batches_per_minute" : max_batches_per_minute
2361- }
2361+ {"max_batches_per_minute" : max_batches_per_minute },
23622362 )
23632363
23642364 self ._listen_stream_batch_thread = ListenStreamBatch (
0 commit comments