@@ -788,7 +788,9 @@ def _process_batch_and_update_state(self, batch, last_msg_id, trigger_reason="un
788788 - Invoking user callback with batch
789789 - Updating connector state with last processed message ID
790790 - Handling state reset scenarios
791- - ALWAYS updating state, even if callback fails
791+
792+ Note: If callback fails, state will NOT be updated and exception will propagate.
793+ This matches ListenStream behavior where failed messages are retried on restart.
792794
793795 Args:
794796 batch: List of SSE message objects to process
@@ -832,28 +834,7 @@ def _process_batch_and_update_state(self, batch, last_msg_id, trigger_reason="un
832834 }
833835 }
834836
835- # Invoke user's callback function with exception handling
836- callback_failed = False
837- callback_error = None
838- try :
839- self .callback (batch_data )
840- except Exception as ex :
841- callback_failed = True
842- callback_error = ex
843- self .helper .connector_logger .error (
844- "Batch callback failed - continuing with next batch" ,
845- {
846- "error" : str (ex ),
847- "error_type" : type (ex ).__name__ ,
848- "batch_size" : len (batch ),
849- "trigger" : trigger_reason ,
850- "last_msg_id" : last_msg_id ,
851- },
852- )
853- self .helper .metric .inc ("error_count" )
854-
855- # Update state with last processed message EVEN IF CALLBACK FAILED
856- # This ensures the connector progresses through the stream
837+ self .callback (batch_data )
857838 state = self .helper .get_state ()
858839 if state is None :
859840 # State was reset from UI during processing
@@ -862,7 +843,6 @@ def _process_batch_and_update_state(self, batch, last_msg_id, trigger_reason="un
862843 {
863844 "batch_size" : len (batch ),
864845 "trigger" : trigger_reason ,
865- "callback_failed" : callback_failed ,
866846 }
867847 )
868848 self .exit_event .set ()
@@ -872,16 +852,6 @@ def _process_batch_and_update_state(self, batch, last_msg_id, trigger_reason="un
872852 state ["start_from" ] = str (last_msg_id )
873853 self .helper .set_state (state )
874854
875- if callback_failed :
876- self .helper .connector_logger .warning (
877- "State updated despite callback failure - batch will NOT be retried" ,
878- {
879- "batch_size" : len (batch ),
880- "last_msg_id" : last_msg_id ,
881- "error" : str (callback_error ),
882- }
883- )
884-
885855 return True
886856
887857 def _wait_for_rate_limit (self , q ):
@@ -1049,6 +1019,12 @@ def run(self) -> None:
10491019 state = self .helper .get_state ()
10501020 if state is None :
10511021 self .exit_event .set ()
1022+ else :
1023+ # Only update state if batch is empty to prevent message loss
1024+ # If batch has unprocessed messages, state will be updated after batch processing
1025+ if len (batch ) == 0 :
1026+ state ["start_from" ] = str (msg .id )
1027+ self .helper .set_state (state )
10521028 last_msg_id = msg .id
10531029 else :
10541030 batch .append (msg )
0 commit comments