@@ -365,11 +365,12 @@ apply_(_, #checkout{spec = {dequeue, _}},
365365 #? STATE {cfg = # cfg {consumer_strategy = single_active }} = State0 ) ->
366366 {State0 , {error , {unsupported , single_active_consumer }}};
367367apply_ (#{index := Index ,
368- system_time := Ts ,
369- from := From } = Meta , # checkout {spec = {dequeue , Settlement },
370- meta = ConsumerMeta ,
371- consumer_id = ConsumerId },
372- #? STATE {consumers = Consumers } = State00 ) ->
368+ system_time := Ts ,
369+ from := From } = Meta ,
370+ # checkout {spec = {dequeue , Settlement },
371+ meta = ConsumerMeta ,
372+ consumer_id = ConsumerId },
373+ #? STATE {consumers = Consumers } = State00 ) ->
373374 % % dequeue always updates last_active
374375 State0 = State00 #? STATE {last_active = Ts },
375376 % % all dequeue operations result in keeping the queue from expiring
@@ -416,8 +417,8 @@ apply_(#{index := Index,
416417 end
417418 end ;
418419apply_ (#{index := _Idx } = Meta ,
419- # checkout {spec = Spec ,
420- consumer_id = ConsumerId }, State0 )
420+ # checkout {spec = Spec ,
421+ consumer_id = ConsumerId }, State0 )
421422 when Spec == cancel orelse
422423 Spec == remove ->
423424 case consumer_key_from_id (ConsumerId , State0 ) of
@@ -432,9 +433,9 @@ apply_(#{index := _Idx} = Meta,
432433 {State0 , {error , consumer_not_found }, []}
433434 end ;
434435apply_ (#{index := Idx } = Meta ,
435- # checkout {spec = Spec0 ,
436- meta = ConsumerMeta ,
437- consumer_id = {_ , Pid } = ConsumerId }, State0 ) ->
436+ # checkout {spec = Spec0 ,
437+ meta = ConsumerMeta ,
438+ consumer_id = {_ , Pid } = ConsumerId }, State0 ) ->
438439 % % might be better to check machine_version
439440 IsV4 = tuple_size (Spec0 ) == 2 ,
440441 % % normalise spec format
@@ -475,7 +476,7 @@ apply_(#{index := Idx} = Meta,
475476 num_checked_out => map_size (Checked )}},
476477 checkout (Meta , State0 , State2 , [{monitor , process , Pid } | Effs ], Reply );
477478apply_ (#{index := Index }, # purge {},
478- #? STATE {messages_total = Total } = State0 ) ->
479+ #? STATE {messages_total = Total } = State0 ) ->
479480 NumReady = messages_ready (State0 ),
480481 State1 = State0 #? STATE {messages = rabbit_fifo_q :new (),
481482 messages_total = Total - NumReady ,
@@ -492,11 +493,11 @@ apply_(#{index := _Idx}, #garbage_collection{}, State) ->
492493apply_ (Meta , {timeout , expire_msgs }, State ) ->
493494 checkout (Meta , State , State , []);
494495apply_ (#{system_time := Ts } = Meta ,
495- {down , Pid , noconnection },
496- #? STATE {consumers = Cons0 ,
497- cfg = # cfg {consumer_strategy = single_active },
498- waiting_consumers = Waiting0 ,
499- enqueuers = Enqs0 } = State0 ) ->
496+ {down , Pid , noconnection },
497+ #? STATE {consumers = Cons0 ,
498+ cfg = # cfg {consumer_strategy = single_active },
499+ waiting_consumers = Waiting0 ,
500+ enqueuers = Enqs0 } = State0 ) ->
500501 Node = node (Pid ),
501502 % % if the pid refers to an active or cancelled consumer,
502503 % % mark it as suspected and return it to the waiting queue
@@ -539,9 +540,9 @@ apply_(#{system_time := Ts} = Meta,
539540 Effects = [{monitor , node , Node } | Effects1 ],
540541 checkout (Meta , State0 , State #? STATE {enqueuers = Enqs }, Effects );
541542apply_ (#{system_time := Ts } = Meta ,
542- {down , Pid , noconnection },
543- #? STATE {consumers = Cons0 ,
544- enqueuers = Enqs0 } = State0 ) ->
543+ {down , Pid , noconnection },
544+ #? STATE {consumers = Cons0 ,
545+ enqueuers = Enqs0 } = State0 ) ->
545546 % % A node has been disconnected. This doesn't necessarily mean that
546547 % % any processes on this node are down, they _may_ come back so here
547548 % % we just mark them as suspected (effectively deactivated)
@@ -581,7 +582,7 @@ apply_(Meta, {down, Pid, _Info}, State0) ->
581582 {State1 , Effects1 } = activate_next_consumer (handle_down (Meta , Pid , State0 )),
582583 checkout (Meta , State0 , State1 , Effects1 );
583584apply_ (Meta , {nodeup , Node }, #? STATE {consumers = Cons0 ,
584- enqueuers = Enqs0 } = State0 ) ->
585+ enqueuers = Enqs0 } = State0 ) ->
585586 % % A node we are monitoring has come back.
586587 % % If we have suspected any processes of being
587588 % % down we should now re-issue the monitors for them to detect if they're
@@ -629,10 +630,10 @@ apply_(Meta, #purge_nodes{nodes = Nodes}, State0) ->
629630 end , {State0 , []}, Nodes ),
630631 {State , ok , Effects };
631632apply_ (Meta ,
632- # update_config {config = #{} = Conf },
633- #? STATE {cfg = # cfg {dead_letter_handler = OldDLH ,
634- resource = QRes },
635- dlx = DlxState0 } = State0 ) ->
633+ # update_config {config = #{} = Conf },
634+ #? STATE {cfg = # cfg {dead_letter_handler = OldDLH ,
635+ resource = QRes },
636+ dlx = DlxState0 } = State0 ) ->
636637 NewDLH = maps :get (dead_letter_handler , Conf , OldDLH ),
637638 {DlxState , Effects0 } = update_config (OldDLH , NewDLH , QRes ,
638639 DlxState0 ),
@@ -657,27 +658,26 @@ live_indexes(#?STATE{cfg = #cfg{},
657658 messages = Messages ,
658659 consumers = Consumers ,
659660 dlx = #? DLX {discards = Discards }}) ->
661+ MsgsIdxs = rabbit_fifo_q :indexes (Messages ),
660662 DlxIndexes = lqueue :fold (fun (? TUPLE (_ , ? MSG (I , _ )), Acc ) ->
661663 [I | Acc ]
662- end , [], Discards ),
663- RtnIndexes = [I || ? MSG (I , _ ) <- lqueue :to_list (Returns )],
664- CheckedIdxs = maps :fold (
665- fun (_Cid , # consumer {checked_out = Ch }, Acc0 ) ->
666- maps :fold (
667- fun (_MsgId , ? MSG (I , _ ), Acc ) ->
668- [I | Acc ]
669- end , Acc0 , Ch )
670- end , RtnIndexes ++ DlxIndexes , Consumers ),
671-
672- CheckedIdxs ++ rabbit_fifo_q :indexes (Messages ).
664+ end , MsgsIdxs , Discards ),
665+ RtnIndexes = lqueue :fold (fun (? MSG (I , _ ), Acc ) -> [I | Acc ] end ,
666+ DlxIndexes , Returns ),
667+ maps :fold (fun (_Cid , # consumer {checked_out = Ch }, Acc0 ) ->
668+ maps :fold (
669+ fun (_MsgId , ? MSG (I , _ ), Acc ) ->
670+ [I | Acc ]
671+ end , Acc0 , Ch )
672+ end , RtnIndexes , Consumers ).
673673
674674-spec snapshot_installed (Meta , State , OldMeta , OldState ) ->
675675 ra_machine :effects () when
676676 Meta :: ra_snapshot :meta (),
677677 State :: state (),
678678 OldMeta :: ra_snapshot :meta (),
679679 OldState :: state ().
680- snapshot_installed (_Meta , #? MODULE {cfg = # cfg {resource = _QR },
680+ snapshot_installed (_Meta , #? MODULE {cfg = # cfg {},
681681 consumers = Consumers } = State ,
682682 _OldMeta , _OldState ) ->
683683 % % here we need to redliver all pending consumer messages
0 commit comments