Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
ack/3,
nack/3,
status/1,
forward/3
forward/3,
pending_count/1
]).

%% Function references should not be stored on the metadata store.
Expand Down Expand Up @@ -360,6 +361,10 @@ status(#{dest := #{blocked_by := BlockReasons}}) when BlockReasons =/= [] ->
status(_) ->
running.

pending_count(#{dest := Dest}) ->
Pending = maps:get(pending, Dest, queue:new()),
queue:len(Pending).

add_pending(Elem, State = #{dest := Dest}) ->
Pending = maps:get(pending, Dest, queue:new()),
State#{dest => Dest#{pending => queue:in(Elem, Pending)}}.
Expand Down
7 changes: 6 additions & 1 deletion deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
ack/3,
nack/3,
status/1,
forward/3
forward/3,
pending_count/1
]).

-import(rabbit_misc, [pget/2, pget/3]).
Expand Down Expand Up @@ -317,6 +318,10 @@ status(_) ->
%% Destination not yet connected
ignore.

pending_count(#{dest := Dest}) ->
Pending = maps:get(pending, Dest, []),
length(Pending).

-spec forward(Tag :: tag(), Mc :: mc:state(), state()) ->
state() | {stop, any()}.
forward(_Tag, _Mc,
Expand Down
6 changes: 5 additions & 1 deletion deps/rabbitmq_shovel/src/rabbit_local_shovel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@
ack/3,
nack/3,
forward/3,
status/1
status/1,
pending_count/1
]).

-export([
Expand Down Expand Up @@ -443,6 +444,9 @@ add_routing(Msg0, Dest) ->
status(_) ->
running.

pending_count(_State) ->
0.

%% Internal

parse_parameter(_, _, none) ->
Expand Down
11 changes: 6 additions & 5 deletions deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
-callback forward(Tag :: tag(), Msg :: mc:state(), state()) ->
state() | {stop, any()}.
-callback status(state()) -> rabbit_shovel_status:shovel_status().
-callback pending_count(state()) -> non_neg_integer().

-spec parse(atom(), binary(), {source | destination, proplists:proplist()}) ->
source_config() | dest_config().
Expand Down Expand Up @@ -164,12 +165,12 @@ incr_forwarded(State = #{dest := Dest}) ->
State#{dest => maps:put(forwarded, maps:get(forwarded, Dest, 0) + 1, Dest)}.

-spec metrics(state()) -> rabbit_shovel_status:metrics().
metrics(_State = #{source := Source,
dest := Dest}) ->
metrics(#{source := Source,
dest := #{module := Mod}} = State) ->
#{remaining => maps:get(remaining, Source, unlimited),
remaining_unacked => maps:get(remaining_unacked, Source, 0),
pending => maps:get(pending, Dest, 0),
forwarded => maps:get(forwarded, Dest, 0)}.
remaining_unacked => maps:get(remaining_unacked, Source, 0),
pending => Mod:pending_count(State),
forwarded => maps:get(forwarded, maps:get(dest, State), 0)}.


%% Common functions
Expand Down
144 changes: 144 additions & 0 deletions deps/rabbitmq_shovel/test/pending_count_SUITE.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

-module(pending_count_SUITE).

-compile(export_all).

-include_lib("eunit/include/eunit.hrl").
-include_lib("rabbit/include/mc.hrl").
-include("../include/rabbit_shovel.hrl").

%%%===================================================================
%%% Common Test callbacks
%%%===================================================================

all() ->
[
{group, pending_count_tests}
].

groups() ->
[
{pending_count_tests, [], [
amqp091_pending_count_empty_queue,
amqp091_pending_count_with_messages,
amqp091_pending_count_after_drain,
amqp10_pending_count_empty_list,
amqp10_pending_count_with_messages,
amqp10_pending_count_after_clear,
local_pending_count_empty_queue,
local_pending_count_after_settle,
behaviour_metrics_includes_pending,
behaviour_pending_count_delegation
]}
].

init_per_suite(Config) ->
Config.

end_per_suite(_Config) ->
ok.

init_per_group(_Group, Config) ->
Config.

end_per_group(_Group, _Config) ->
ok.

init_per_testcase(_TestCase, Config) ->
Config.

end_per_testcase(_TestCase, _Config) ->
meck:unload(),
ok.

%%%===================================================================
%%% Test cases
%%%===================================================================

%% Test AMQP 0.9.1 pending_count functionality
amqp091_pending_count_empty_queue(_Config) ->
%% Test that pending_count returns 0 when no messages are pending
State = #{dest => #{}},
?assertEqual(0, rabbit_amqp091_shovel:pending_count(State)).

amqp091_pending_count_with_messages(_Config) ->
%% Test that pending_count returns correct count when messages are pending
PendingQueue = queue:from_list([{1, msg1}, {2, msg2}, {3, msg3}]),
State = #{dest => #{pending => PendingQueue}},
?assertEqual(3, rabbit_amqp091_shovel:pending_count(State)).

amqp091_pending_count_after_drain(_Config) ->
%% Test that pending_count returns 0 after messages are drained
EmptyQueue = queue:new(),
State = #{dest => #{pending => EmptyQueue}},
?assertEqual(0, rabbit_amqp091_shovel:pending_count(State)).

%% Test AMQP 1.0 pending_count functionality
amqp10_pending_count_empty_list(_Config) ->
%% Test that pending_count returns 0 when no messages are pending
State = #{dest => #{}},
?assertEqual(0, rabbit_amqp10_shovel:pending_count(State)).

amqp10_pending_count_with_messages(_Config) ->
%% Test that pending_count returns correct count when messages are pending
PendingList = [{1, msg1}, {2, msg2}],
State = #{dest => #{pending => PendingList}},
?assertEqual(2, rabbit_amqp10_shovel:pending_count(State)).

amqp10_pending_count_after_clear(_Config) ->
%% Test that pending_count returns 0 after pending list is cleared
State = #{dest => #{pending => []}},
?assertEqual(0, rabbit_amqp10_shovel:pending_count(State)).

%% Test Local shovel pending_count functionality
local_pending_count_empty_queue(_Config) ->
%% Test that pending_count returns 0 when unacked message queue is empty
EmptyQueue = lqueue:new(),
State = #{source => #{current => #{unacked_message_q => EmptyQueue}}},
?assertEqual(0, rabbit_local_shovel:pending_count(State)).


local_pending_count_after_settle(_Config) ->
%% Test that pending_count returns 0 when state doesn't contain unacked queue
State = #{source => #{current => #{}}},
?assertEqual(0, rabbit_local_shovel:pending_count(State)).

%% Test behaviour module integration
behaviour_metrics_includes_pending(_Config) ->
%% Mock the destination module's pending_count and status functions
meck:new(rabbit_amqp091_shovel, [passthrough]),
meck:expect(rabbit_amqp091_shovel, pending_count, fun(_) -> 5 end),
meck:expect(rabbit_amqp091_shovel, status, fun(_) -> running end),

State = #{source => #{remaining => 10, remaining_unacked => 3},
dest => #{module => rabbit_amqp091_shovel, forwarded => 7}},

{_Status, Metrics} = rabbit_shovel_behaviour:status(State),

?assertMatch(#{remaining := 10,
remaining_unacked := 3,
pending := 5,
forwarded := 7}, Metrics),

?assert(meck:validate(rabbit_amqp091_shovel)).

behaviour_pending_count_delegation(_Config) ->
%% Test that the behaviour module correctly delegates to the specific implementation
meck:new(rabbit_amqp10_shovel, [passthrough]),
meck:expect(rabbit_amqp10_shovel, pending_count, fun(_State) -> 3 end),
meck:expect(rabbit_amqp10_shovel, status, fun(_State) -> running end),

State = #{dest => #{module => rabbit_amqp10_shovel}},

%% This would be called indirectly through status/1
{_Status, Metrics} = rabbit_shovel_behaviour:status(#{source => #{},
dest => maps:get(dest, State)}),

?assertEqual(3, maps:get(pending, Metrics)),
?assert(meck:validate(rabbit_amqp10_shovel)).
Loading