File tree Expand file tree Collapse file tree 1 file changed +9
-5
lines changed Expand file tree Collapse file tree 1 file changed +9
-5
lines changed Original file line number Diff line number Diff line change @@ -854,12 +854,15 @@ impl<T: ReadConnectionProvider + Sync + ?Sized> DatabaseReadOperations for T {
854854 . one ( self . get_connection ( ) )
855855 . await ?
856856 {
857- // Yield n messages starting from the found queue index + 1.
857+ // Yield n messages starting from the found queue index + 1. Only return
858+ // messages that have not been skipped (skipped = false) to handle the edge
859+ // case where the last message in a batch is skipped.
860+ let condition = Condition :: all ( )
861+ // We add 1 to the queue index to constrain across block boundaries
862+ . add ( models:: l1_message:: Column :: QueueIndex . gte ( record. queue_index + 1 ) )
863+ . add ( models:: l1_message:: Column :: Skipped . eq ( false ) ) ;
858864 Ok ( models:: l1_message:: Entity :: find ( )
859- . filter (
860- // We add 1 to the queue index to constrain across block boundaries
861- models:: l1_message:: Column :: QueueIndex . gte ( record. queue_index + 1 ) ,
862- )
865+ . filter ( condition)
863866 . order_by_asc ( models:: l1_message:: Column :: QueueIndex )
864867 . limit ( Some ( n as u64 ) )
865868 . all ( self . get_connection ( ) )
@@ -871,6 +874,7 @@ impl<T: ReadConnectionProvider + Sync + ?Sized> DatabaseReadOperations for T {
871874 // index starting from the beginning.
872875 else {
873876 Ok ( models:: l1_message:: Entity :: find ( )
877+ . filter ( models:: l1_message:: Column :: Skipped . eq ( false ) )
874878 . order_by_asc ( models:: l1_message:: Column :: QueueIndex )
875879 . limit ( Some ( n as u64 ) )
876880 . all ( self . get_connection ( ) )
You can’t perform that action at this time.
0 commit comments