Skip to content

Commit f243661

Browse files
Merge pull request #14886 from rabbitmq/mergify/bp/v4.2.x/pr-14873
Shovel bugfix: handle cluster alarms in AMQP10 and local shovels (backport #14873)
2 parents 6c761a5 + 143a2ea commit f243661

File tree

4 files changed

+142
-9
lines changed

4 files changed

+142
-9
lines changed

deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
-include_lib("kernel/include/logger.hrl").
4343

4444
-define(LINK_CREDIT_TIMEOUT, 20_000).
45+
-define(AWAIT_SEND_MSG_TIMEOUT, 1_000).
4546

4647
-type state() :: rabbit_shovel_behaviour:state().
4748
-type uri() :: rabbit_shovel_behaviour:uri().
@@ -374,7 +375,11 @@ send_msg(Link, Msg) ->
374375
send_msg(Link, Msg)
375376
after ?LINK_CREDIT_TIMEOUT ->
376377
{stop, credited_timeout}
377-
end
378+
end;
379+
{error, remote_incoming_window_exceeded} ->
380+
%% We could be blocked because of an alarm
381+
timer:sleep(?AWAIT_SEND_MSG_TIMEOUT),
382+
send_msg(Link, Msg)
378383
end.
379384

380385
add_timestamp_header(#{dest := #{add_timestamp_header := true}}, Msg) ->

deps/rabbitmq_shovel/src/rabbit_local_shovel.erl

Lines changed: 75 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
-export([
2626
boot_step/0,
27+
conserve_resources/3,
2728
parse/2,
2829
connect_source/1,
2930
connect_dest/1,
@@ -76,6 +77,12 @@ boot_step() ->
7677
rabbit_global_counters:init(Labels#{queue_type => rabbit_quorum_queue}),
7778
rabbit_global_counters:init(Labels#{queue_type => rabbit_stream_queue}).
7879

80+
-spec conserve_resources(pid(),
81+
rabbit_alarm:resource_alarm_source(),
82+
rabbit_alarm:resource_alert()) -> ok.
83+
conserve_resources(Pid, Source, {_, Conserve, _}) ->
84+
gen_server:cast(Pid, {conserve_resources, Source, Conserve}).
85+
7986
parse(_Name, {source, Source}) ->
8087
Queue = parse_parameter(queue, fun parse_binary/1,
8188
proplists:get_value(queue, Source)),
@@ -234,14 +241,17 @@ init_dest(#{name := Name,
234241
dest := #{add_forward_headers := AFH} = Dst} = State) ->
235242
rabbit_global_counters:publisher_created(?PROTOCOL),
236243
_TRef = erlang:send_after(1000, self(), send_confirms_and_nacks),
244+
Alarms0 = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
245+
Alarms = sets:from_list(Alarms0),
237246
case AFH of
238247
true ->
239248
Props = #{<<"x-opt-shovelled-by">> => rabbit_nodes:cluster_name(),
240249
<<"x-opt-shovel-type">> => rabbit_data_coercion:to_binary(Type),
241250
<<"x-opt-shovel-name">> => rabbit_data_coercion:to_binary(Name)},
242-
State#{dest => Dst#{cached_forward_headers => Props}};
251+
State#{dest => Dst#{cached_forward_headers => Props,
252+
alarms => Alarms}};
243253
false ->
244-
State
254+
State#{dest => Dst#{alarms => Alarms}}
245255
end.
246256

247257
source_uri(_State) ->
@@ -359,6 +369,19 @@ handle_dest({{'DOWN', #resource{kind = queue,
359369
{eol, QState1, _QRef} ->
360370
State0#{dest => Dest#{current => Current#{queue_states => QState1}}}
361371
end;
372+
handle_dest({conserve_resources, Alarm, Conserve}, #{dest := #{alarms := Alarms0} = Dest} = State0) ->
373+
Alarms = case Conserve of
374+
true -> sets:add_element(Alarm, Alarms0);
375+
false -> sets:del_element(Alarm, Alarms0)
376+
end,
377+
State = State0#{dest => Dest#{alarms => Alarms}},
378+
case {sets:is_empty(Alarms0), sets:is_empty(Alarms)} of
379+
{false, true} ->
380+
%% All alarms cleared
381+
forward_pending_delivery(State);
382+
{_, _} ->
383+
State
384+
end;
362385
handle_dest(_Msg, State) ->
363386
State.
364387

@@ -374,7 +397,16 @@ forward(_, _, #{source := #{remaining_unacked := 0}} = State) ->
374397
%% come back. So drop subsequent messages on the floor to be
375398
%% requeued later
376399
State;
377-
forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current} = Dest,
400+
forward(Tag, Msg, State) ->
401+
case is_blocked(State) of
402+
true ->
403+
PendingEntry = {Tag, Msg},
404+
add_pending_delivery(PendingEntry, State);
405+
false ->
406+
do_forward(Tag, Msg, State)
407+
end.
408+
409+
do_forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current} = Dest,
378410
ack_mode := AckMode} = State0) ->
379411
{Options, #{dest := #{current := Current1} = Dest1} = State} =
380412
case AckMode of
@@ -437,10 +469,15 @@ add_routing(Msg0, Dest) ->
437469
RK -> mc:set_annotation(?ANN_ROUTING_KEYS, [RK], Msg)
438470
end.
439471

440-
status(_) ->
441-
running.
472+
status(State) ->
473+
case is_blocked(State) of
474+
true -> blocked;
475+
false -> running
476+
end.
442477

443-
pending_count(_State) ->
478+
pending_count(#{dest := #{pending_delivery := Pending}}) ->
479+
queue:len(Pending);
480+
pending_count(_) ->
444481
0.
445482

446483
%% Internal
@@ -903,3 +940,35 @@ messages_delivered(QName, S0) ->
903940
_ ->
904941
ok
905942
end.
943+
944+
is_blocked(#{dest := #{alarms := Alarms}}) ->
945+
not sets:is_empty(Alarms);
946+
is_blocked(_) ->
947+
false.
948+
949+
add_pending_delivery(Elem, State = #{dest := Dest}) ->
950+
Pending = maps:get(pending_delivery, Dest, queue:new()),
951+
State#{dest => Dest#{pending_delivery => queue:in(Elem, Pending)}}.
952+
953+
pop_pending_delivery(State = #{dest := Dest}) ->
954+
Pending = maps:get(pending_delivery, Dest, queue:new()),
955+
case queue:out(Pending) of
956+
{empty, _} ->
957+
empty;
958+
{{value, Elem}, Pending2} ->
959+
{Elem, State#{dest => Dest#{pending_delivery => Pending2}}}
960+
end.
961+
962+
forward_pending_delivery(State) ->
963+
case pop_pending_delivery(State) of
964+
empty ->
965+
State;
966+
{{Tag, Mc}, S} ->
967+
S2 = do_forward(Tag, Mc, S),
968+
case is_blocked(S2) of
969+
true ->
970+
S2;
971+
false ->
972+
forward_pending_delivery(S2)
973+
end
974+
end.

deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ groups() ->
3636
local_to_local_delete_dest_queue,
3737
local_to_local_stream_credit_flow_no_ack,
3838
local_to_local_simple_uri,
39-
local_to_local_counters
39+
local_to_local_counters,
40+
local_to_local_alarms
4041
]}
4142
].
4243

@@ -247,6 +248,41 @@ local_to_local_counters(Config) ->
247248
get_global_counters(Config), 30_000)
248249
end).
249250

251+
local_to_local_alarms(Config) ->
252+
Src = ?config(srcq, Config),
253+
Dest = ?config(destq, Config),
254+
ShovelArgs = [{<<"src-protocol">>, <<"local">>},
255+
{<<"src-queue">>, Src},
256+
{<<"dest-protocol">>, <<"local">>},
257+
{<<"dest-queue">>, Dest}],
258+
with_amqp10_session(
259+
Config,
260+
fun (Sess) ->
261+
amqp10_publish(Sess, Src, <<"hello">>, 1000),
262+
rabbit_ct_broker_helpers:set_alarm(Config, 0, disk),
263+
rabbit_ct_broker_helpers:set_alarm(Config, 0, disk),
264+
shovel_test_utils:set_param(Config, ?PARAM, ShovelArgs),
265+
?awaitMatch({running, blocked}, get_blocked_status(Config), 30000),
266+
amqp10_expect_empty(Sess, Dest),
267+
rabbit_ct_broker_helpers:clear_alarm(Config, 0, disk),
268+
?awaitMatch({running, running}, get_blocked_status(Config), 30000),
269+
amqp10_expect_count(Sess, Dest, 1000),
270+
271+
shovel_test_utils:clear_param(Config, ?PARAM),
272+
273+
amqp10_publish(Sess, Src, <<"hello">>, 1000),
274+
rabbit_ct_broker_helpers:set_alarm(Config, 0, disk),
275+
rabbit_ct_broker_helpers:set_alarm(Config, 0, memory),
276+
shovel_test_utils:set_param(Config, ?PARAM, ShovelArgs),
277+
?awaitMatch({running, blocked}, get_blocked_status(Config), 30000),
278+
amqp10_expect_empty(Sess, Dest),
279+
rabbit_ct_broker_helpers:clear_alarm(Config, 0, disk),
280+
?awaitMatch({running, blocked}, get_blocked_status(Config), 30000),
281+
amqp10_expect_empty(Sess, Dest),
282+
rabbit_ct_broker_helpers:clear_alarm(Config, 0, memory),
283+
?awaitMatch({running, running}, get_blocked_status(Config), 30000),
284+
amqp10_expect_count(Sess, Dest, 1000)
285+
end).
250286
%%----------------------------------------------------------------------------
251287
declare_queue(Config, VHost, QName) ->
252288
declare_queue(Config, VHost, QName, []).
@@ -303,3 +339,11 @@ get_global_counters(Config) ->
303339
get_global_counters0(Config, Key) ->
304340
Overview = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_global_counters, overview, []),
305341
maps:get(Key, Overview).
342+
343+
get_blocked_status(Config) ->
344+
case rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status, status, []) of
345+
[{_, _, {Status, PropList}, _, _}] ->
346+
{Status, proplists:get_value(blocked_status, PropList)};
347+
_ ->
348+
empty
349+
end.

deps/rabbitmq_shovel/test/shovel_dynamic_SUITE.erl

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,8 @@ tests() ->
108108
application_properties,
109109
delete_src_queue,
110110
shovel_status,
111-
change_definition
111+
change_definition,
112+
disk_alarm
112113
].
113114

114115
%% -------------------------------------------------------------------
@@ -594,6 +595,20 @@ change_definition(Config) ->
594595
amqp10_expect_empty(Sess, Dest2)
595596
end).
596597

598+
disk_alarm(Config) ->
599+
Src = ?config(srcq, Config),
600+
Dest = ?config(destq, Config),
601+
with_amqp10_session(Config,
602+
fun (Sess) ->
603+
ShovelArgs = ?config(shovel_args, Config),
604+
amqp10_publish(Sess, Src, <<"hello">>, 10),
605+
rabbit_ct_broker_helpers:set_alarm(Config, 0, disk),
606+
set_param(Config, ?PARAM, ShovelArgs),
607+
amqp10_expect_empty(Sess, Dest),
608+
rabbit_ct_broker_helpers:clear_alarm(Config, 0, disk),
609+
amqp10_expect_count(Sess, Dest, 10)
610+
end).
611+
597612
%%----------------------------------------------------------------------------
598613
maybe_skip_local_protocol(Config) ->
599614
[Node] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

0 commit comments

Comments
 (0)