Skip to content

Commit 9bf2fab

Browse files
committed
Shovel local bugfix: handle cluster alarms
Local shovels must stop publishing when alarms are set in the destination cluster. Messages are stored in memory and sent when the alarm clears, the same way it is done for AMQP091
1 parent 822f800 commit 9bf2fab

File tree

2 files changed

+115
-6
lines changed

2 files changed

+115
-6
lines changed

deps/rabbitmq_shovel/src/rabbit_local_shovel.erl

Lines changed: 70 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
-export([
2626
boot_step/0,
27+
conserve_resources/3,
2728
parse/2,
2829
connect_source/1,
2930
connect_dest/1,
@@ -76,6 +77,12 @@ boot_step() ->
7677
rabbit_global_counters:init(Labels#{queue_type => rabbit_quorum_queue}),
7778
rabbit_global_counters:init(Labels#{queue_type => rabbit_stream_queue}).
7879

80+
-spec conserve_resources(pid(),
81+
rabbit_alarm:resource_alarm_source(),
82+
rabbit_alarm:resource_alert()) -> ok.
83+
conserve_resources(Pid, Source, {_, Conserve, _}) ->
84+
gen_server:cast(Pid, {conserve_resources, Source, Conserve}).
85+
7986
parse(_Name, {source, Source}) ->
8087
Queue = parse_parameter(queue, fun parse_binary/1,
8188
proplists:get_value(queue, Source)),
@@ -222,14 +229,17 @@ init_dest(#{name := Name,
222229
dest := #{add_forward_headers := AFH} = Dst} = State) ->
223230
rabbit_global_counters:publisher_created(?PROTOCOL),
224231
_TRef = erlang:send_after(1000, self(), send_confirms_and_nacks),
232+
Alarms0 = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
233+
Alarms = sets:from_list(Alarms0),
225234
case AFH of
226235
true ->
227236
Props = #{<<"x-opt-shovelled-by">> => rabbit_nodes:cluster_name(),
228237
<<"x-opt-shovel-type">> => rabbit_data_coercion:to_binary(Type),
229238
<<"x-opt-shovel-name">> => rabbit_data_coercion:to_binary(Name)},
230-
State#{dest => Dst#{cached_forward_headers => Props}};
239+
State#{dest => Dst#{cached_forward_headers => Props,
240+
alarms => Alarms}};
231241
false ->
232-
State
242+
State#{dest => Dst#{alarms => Alarms}}
233243
end.
234244

235245
source_uri(_State) ->
@@ -347,6 +357,19 @@ handle_dest({{'DOWN', #resource{kind = queue,
347357
{eol, QState1, _QRef} ->
348358
State0#{dest => Dest#{current => Current#{queue_states => QState1}}}
349359
end;
360+
handle_dest({conserve_resources, Alarm, Conserve}, #{dest := #{alarms := Alarms0} = Dest} = State0) ->
361+
Alarms = case Conserve of
362+
true -> sets:add_element(Alarm, Alarms0);
363+
false -> sets:del_element(Alarm, Alarms0)
364+
end,
365+
State = State0#{dest => Dest#{alarms => Alarms}},
366+
case {sets:is_empty(Alarms0), sets:is_empty(Alarms)} of
367+
{false, true} ->
368+
%% All alarms cleared
369+
forward_pending_delivery(State);
370+
{_, _} ->
371+
State
372+
end;
350373
handle_dest(_Msg, State) ->
351374
State.
352375

@@ -362,7 +385,16 @@ forward(_, _, #{source := #{remaining_unacked := 0}} = State) ->
362385
%% come back. So drop subsequent messages on the floor to be
363386
%% requeued later
364387
State;
365-
forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current} = Dest,
388+
forward(Tag, Msg, State) ->
389+
case is_blocked(State) of
390+
true ->
391+
PendingEntry = {Tag, Msg},
392+
add_pending_delivery(PendingEntry, State);
393+
false ->
394+
do_forward(Tag, Msg, State)
395+
end.
396+
397+
do_forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current} = Dest,
366398
ack_mode := AckMode} = State0) ->
367399
{Options, #{dest := #{current := Current1} = Dest1} = State} =
368400
case AckMode of
@@ -425,8 +457,11 @@ add_routing(Msg0, Dest) ->
425457
RK -> mc:set_annotation(?ANN_ROUTING_KEYS, [RK], Msg)
426458
end.
427459

428-
status(_) ->
429-
running.
460+
status(State) ->
461+
case is_blocked(State) of
462+
true -> blocked;
463+
false -> running
464+
end.
430465

431466
pending_count(_State) ->
432467
0.
@@ -891,3 +926,33 @@ messages_delivered(QName, S0) ->
891926
_ ->
892927
ok
893928
end.
929+
930+
is_blocked(#{dest := #{alarms := Alarms}}) ->
931+
not sets:is_empty(Alarms).
932+
933+
add_pending_delivery(Elem, State = #{dest := Dest}) ->
934+
Pending = maps:get(pending_delivery, Dest, queue:new()),
935+
State#{dest => Dest#{pending_delivery => queue:in(Elem, Pending)}}.
936+
937+
pop_pending_delivery(State = #{dest := Dest}) ->
938+
Pending = maps:get(pending_delivery, Dest, queue:new()),
939+
case queue:out(Pending) of
940+
{empty, _} ->
941+
empty;
942+
{{value, Elem}, Pending2} ->
943+
{Elem, State#{dest => Dest#{pending_delivery => Pending2}}}
944+
end.
945+
946+
forward_pending_delivery(State) ->
947+
case pop_pending_delivery(State) of
948+
empty ->
949+
State;
950+
{{Tag, Mc}, S} ->
951+
S2 = do_forward(Tag, Mc, S),
952+
case is_blocked(S2) of
953+
true ->
954+
S2;
955+
false ->
956+
forward_pending_delivery(S2)
957+
end
958+
end.

deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ groups() ->
3636
local_to_local_delete_dest_queue,
3737
local_to_local_stream_credit_flow_no_ack,
3838
local_to_local_simple_uri,
39-
local_to_local_counters
39+
local_to_local_counters,
40+
local_to_local_alarms
4041
]}
4142
].
4243

@@ -240,6 +241,41 @@ local_to_local_counters(Config) ->
240241
get_global_counters(Config), 30_000)
241242
end).
242243

244+
local_to_local_alarms(Config) ->
245+
Src = ?config(srcq, Config),
246+
Dest = ?config(destq, Config),
247+
ShovelArgs = [{<<"src-protocol">>, <<"local">>},
248+
{<<"src-queue">>, Src},
249+
{<<"dest-protocol">>, <<"local">>},
250+
{<<"dest-queue">>, Dest}],
251+
with_amqp10_session(
252+
Config,
253+
fun (Sess) ->
254+
amqp10_publish(Sess, Src, <<"hello">>, 1000),
255+
rabbit_ct_broker_helpers:set_alarm(Config, 0, disk),
256+
rabbit_ct_broker_helpers:set_alarm(Config, 0, disk),
257+
shovel_test_utils:set_param(Config, ?PARAM, ShovelArgs),
258+
?awaitMatch({running, blocked}, get_blocked_status(Config), 30000),
259+
amqp10_expect_empty(Sess, Dest),
260+
rabbit_ct_broker_helpers:clear_alarm(Config, 0, disk),
261+
?awaitMatch({running, running}, get_blocked_status(Config), 30000),
262+
amqp10_expect_count(Sess, Dest, 1000),
263+
264+
shovel_test_utils:clear_param(Config, ?PARAM),
265+
266+
amqp10_publish(Sess, Src, <<"hello">>, 1000),
267+
rabbit_ct_broker_helpers:set_alarm(Config, 0, disk),
268+
rabbit_ct_broker_helpers:set_alarm(Config, 0, memory),
269+
shovel_test_utils:set_param(Config, ?PARAM, ShovelArgs),
270+
?awaitMatch({running, blocked}, get_blocked_status(Config), 30000),
271+
amqp10_expect_empty(Sess, Dest),
272+
rabbit_ct_broker_helpers:clear_alarm(Config, 0, disk),
273+
?awaitMatch({running, blocked}, get_blocked_status(Config), 30000),
274+
amqp10_expect_empty(Sess, Dest),
275+
rabbit_ct_broker_helpers:clear_alarm(Config, 0, memory),
276+
?awaitMatch({running, running}, get_blocked_status(Config), 30000),
277+
amqp10_expect_count(Sess, Dest, 1000)
278+
end).
243279
%%----------------------------------------------------------------------------
244280
declare_queue(Config, VHost, QName) ->
245281
declare_queue(Config, VHost, QName, []).
@@ -296,3 +332,11 @@ get_global_counters(Config) ->
296332
get_global_counters0(Config, Key) ->
297333
Overview = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_global_counters, overview, []),
298334
maps:get(Key, Overview).
335+
336+
get_blocked_status(Config) ->
337+
case rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status, status, []) of
338+
[{_, _, {Status, PropList}, _, _}] ->
339+
{Status, proplists:get_value(blocked_status, PropList)};
340+
_ ->
341+
empty
342+
end.

0 commit comments

Comments
 (0)