Skip to content

Commit 143a2ea

Browse files
dcorbachomergify[bot]
authored andcommitted
Shovel local bugfix: handle cluster alarms
Local shovels must stop publishing when alarms are set in the destination cluster. Messages are stored in memory and sent when the alarm clears, the same way it is done for AMQP091 (cherry picked from commit f038115)
1 parent 31d60f8 commit 143a2ea

File tree

2 files changed

+120
-7
lines changed

2 files changed

+120
-7
lines changed

deps/rabbitmq_shovel/src/rabbit_local_shovel.erl

Lines changed: 75 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
-export([
2626
boot_step/0,
27+
conserve_resources/3,
2728
parse/2,
2829
connect_source/1,
2930
connect_dest/1,
@@ -76,6 +77,12 @@ boot_step() ->
7677
rabbit_global_counters:init(Labels#{queue_type => rabbit_quorum_queue}),
7778
rabbit_global_counters:init(Labels#{queue_type => rabbit_stream_queue}).
7879

80+
-spec conserve_resources(pid(),
81+
rabbit_alarm:resource_alarm_source(),
82+
rabbit_alarm:resource_alert()) -> ok.
83+
conserve_resources(Pid, Source, {_, Conserve, _}) ->
84+
gen_server:cast(Pid, {conserve_resources, Source, Conserve}).
85+
7986
parse(_Name, {source, Source}) ->
8087
Queue = parse_parameter(queue, fun parse_binary/1,
8188
proplists:get_value(queue, Source)),
@@ -234,14 +241,17 @@ init_dest(#{name := Name,
234241
dest := #{add_forward_headers := AFH} = Dst} = State) ->
235242
rabbit_global_counters:publisher_created(?PROTOCOL),
236243
_TRef = erlang:send_after(1000, self(), send_confirms_and_nacks),
244+
Alarms0 = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
245+
Alarms = sets:from_list(Alarms0),
237246
case AFH of
238247
true ->
239248
Props = #{<<"x-opt-shovelled-by">> => rabbit_nodes:cluster_name(),
240249
<<"x-opt-shovel-type">> => rabbit_data_coercion:to_binary(Type),
241250
<<"x-opt-shovel-name">> => rabbit_data_coercion:to_binary(Name)},
242-
State#{dest => Dst#{cached_forward_headers => Props}};
251+
State#{dest => Dst#{cached_forward_headers => Props,
252+
alarms => Alarms}};
243253
false ->
244-
State
254+
State#{dest => Dst#{alarms => Alarms}}
245255
end.
246256

247257
source_uri(_State) ->
@@ -359,6 +369,19 @@ handle_dest({{'DOWN', #resource{kind = queue,
359369
{eol, QState1, _QRef} ->
360370
State0#{dest => Dest#{current => Current#{queue_states => QState1}}}
361371
end;
372+
handle_dest({conserve_resources, Alarm, Conserve}, #{dest := #{alarms := Alarms0} = Dest} = State0) ->
373+
Alarms = case Conserve of
374+
true -> sets:add_element(Alarm, Alarms0);
375+
false -> sets:del_element(Alarm, Alarms0)
376+
end,
377+
State = State0#{dest => Dest#{alarms => Alarms}},
378+
case {sets:is_empty(Alarms0), sets:is_empty(Alarms)} of
379+
{false, true} ->
380+
%% All alarms cleared
381+
forward_pending_delivery(State);
382+
{_, _} ->
383+
State
384+
end;
362385
handle_dest(_Msg, State) ->
363386
State.
364387

@@ -374,7 +397,16 @@ forward(_, _, #{source := #{remaining_unacked := 0}} = State) ->
374397
%% come back. So drop subsequent messages on the floor to be
375398
%% requeued later
376399
State;
377-
forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current} = Dest,
400+
forward(Tag, Msg, State) ->
401+
case is_blocked(State) of
402+
true ->
403+
PendingEntry = {Tag, Msg},
404+
add_pending_delivery(PendingEntry, State);
405+
false ->
406+
do_forward(Tag, Msg, State)
407+
end.
408+
409+
do_forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current} = Dest,
378410
ack_mode := AckMode} = State0) ->
379411
{Options, #{dest := #{current := Current1} = Dest1} = State} =
380412
case AckMode of
@@ -437,10 +469,15 @@ add_routing(Msg0, Dest) ->
437469
RK -> mc:set_annotation(?ANN_ROUTING_KEYS, [RK], Msg)
438470
end.
439471

440-
status(_) ->
441-
running.
472+
status(State) ->
473+
case is_blocked(State) of
474+
true -> blocked;
475+
false -> running
476+
end.
442477

443-
pending_count(_State) ->
478+
pending_count(#{dest := #{pending_delivery := Pending}}) ->
479+
queue:len(Pending);
480+
pending_count(_) ->
444481
0.
445482

446483
%% Internal
@@ -903,3 +940,35 @@ messages_delivered(QName, S0) ->
903940
_ ->
904941
ok
905942
end.
943+
944+
is_blocked(#{dest := #{alarms := Alarms}}) ->
945+
not sets:is_empty(Alarms);
946+
is_blocked(_) ->
947+
false.
948+
949+
add_pending_delivery(Elem, State = #{dest := Dest}) ->
950+
Pending = maps:get(pending_delivery, Dest, queue:new()),
951+
State#{dest => Dest#{pending_delivery => queue:in(Elem, Pending)}}.
952+
953+
pop_pending_delivery(State = #{dest := Dest}) ->
954+
Pending = maps:get(pending_delivery, Dest, queue:new()),
955+
case queue:out(Pending) of
956+
{empty, _} ->
957+
empty;
958+
{{value, Elem}, Pending2} ->
959+
{Elem, State#{dest => Dest#{pending_delivery => Pending2}}}
960+
end.
961+
962+
forward_pending_delivery(State) ->
963+
case pop_pending_delivery(State) of
964+
empty ->
965+
State;
966+
{{Tag, Mc}, S} ->
967+
S2 = do_forward(Tag, Mc, S),
968+
case is_blocked(S2) of
969+
true ->
970+
S2;
971+
false ->
972+
forward_pending_delivery(S2)
973+
end
974+
end.

deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ groups() ->
3636
local_to_local_delete_dest_queue,
3737
local_to_local_stream_credit_flow_no_ack,
3838
local_to_local_simple_uri,
39-
local_to_local_counters
39+
local_to_local_counters,
40+
local_to_local_alarms
4041
]}
4142
].
4243

@@ -247,6 +248,41 @@ local_to_local_counters(Config) ->
247248
get_global_counters(Config), 30_000)
248249
end).
249250

251+
local_to_local_alarms(Config) ->
252+
Src = ?config(srcq, Config),
253+
Dest = ?config(destq, Config),
254+
ShovelArgs = [{<<"src-protocol">>, <<"local">>},
255+
{<<"src-queue">>, Src},
256+
{<<"dest-protocol">>, <<"local">>},
257+
{<<"dest-queue">>, Dest}],
258+
with_amqp10_session(
259+
Config,
260+
fun (Sess) ->
261+
amqp10_publish(Sess, Src, <<"hello">>, 1000),
262+
rabbit_ct_broker_helpers:set_alarm(Config, 0, disk),
263+
rabbit_ct_broker_helpers:set_alarm(Config, 0, disk),
264+
shovel_test_utils:set_param(Config, ?PARAM, ShovelArgs),
265+
?awaitMatch({running, blocked}, get_blocked_status(Config), 30000),
266+
amqp10_expect_empty(Sess, Dest),
267+
rabbit_ct_broker_helpers:clear_alarm(Config, 0, disk),
268+
?awaitMatch({running, running}, get_blocked_status(Config), 30000),
269+
amqp10_expect_count(Sess, Dest, 1000),
270+
271+
shovel_test_utils:clear_param(Config, ?PARAM),
272+
273+
amqp10_publish(Sess, Src, <<"hello">>, 1000),
274+
rabbit_ct_broker_helpers:set_alarm(Config, 0, disk),
275+
rabbit_ct_broker_helpers:set_alarm(Config, 0, memory),
276+
shovel_test_utils:set_param(Config, ?PARAM, ShovelArgs),
277+
?awaitMatch({running, blocked}, get_blocked_status(Config), 30000),
278+
amqp10_expect_empty(Sess, Dest),
279+
rabbit_ct_broker_helpers:clear_alarm(Config, 0, disk),
280+
?awaitMatch({running, blocked}, get_blocked_status(Config), 30000),
281+
amqp10_expect_empty(Sess, Dest),
282+
rabbit_ct_broker_helpers:clear_alarm(Config, 0, memory),
283+
?awaitMatch({running, running}, get_blocked_status(Config), 30000),
284+
amqp10_expect_count(Sess, Dest, 1000)
285+
end).
250286
%%----------------------------------------------------------------------------
251287
declare_queue(Config, VHost, QName) ->
252288
declare_queue(Config, VHost, QName, []).
@@ -303,3 +339,11 @@ get_global_counters(Config) ->
303339
get_global_counters0(Config, Key) ->
304340
Overview = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_global_counters, overview, []),
305341
maps:get(Key, Overview).
342+
343+
get_blocked_status(Config) ->
344+
case rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status, status, []) of
345+
[{_, _, {Status, PropList}, _, _}] ->
346+
{Status, proplists:get_value(blocked_status, PropList)};
347+
_ ->
348+
empty
349+
end.

0 commit comments

Comments
 (0)