diff --git a/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl index f673251b6ab..c084a9b83ac 100644 --- a/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl @@ -42,6 +42,7 @@ -include_lib("kernel/include/logger.hrl"). -define(LINK_CREDIT_TIMEOUT, 20_000). +-define(AWAIT_SEND_MSG_TIMEOUT, 1_000). -type state() :: rabbit_shovel_behaviour:state(). -type uri() :: rabbit_shovel_behaviour:uri(). @@ -374,7 +375,11 @@ send_msg(Link, Msg) -> send_msg(Link, Msg) after ?LINK_CREDIT_TIMEOUT -> {stop, credited_timeout} - end + end; + {error, remote_incoming_window_exceeded} -> + %% We could be blocked because of an alarm + timer:sleep(?AWAIT_SEND_MSG_TIMEOUT), + send_msg(Link, Msg) end. add_timestamp_header(#{dest := #{add_timestamp_header := true}}, Msg) -> diff --git a/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl index d8dd320b43c..b957ea1712e 100644 --- a/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl @@ -24,6 +24,7 @@ -export([ boot_step/0, + conserve_resources/3, parse/2, connect_source/1, connect_dest/1, @@ -76,6 +77,12 @@ boot_step() -> rabbit_global_counters:init(Labels#{queue_type => rabbit_quorum_queue}), rabbit_global_counters:init(Labels#{queue_type => rabbit_stream_queue}). +-spec conserve_resources(pid(), + rabbit_alarm:resource_alarm_source(), + rabbit_alarm:resource_alert()) -> ok. +conserve_resources(Pid, Source, {_, Conserve, _}) -> + gen_server:cast(Pid, {conserve_resources, Source, Conserve}). + parse(_Name, {source, Source}) -> Queue = parse_parameter(queue, fun parse_binary/1, proplists:get_value(queue, Source)), @@ -222,14 +229,17 @@ init_dest(#{name := Name, dest := #{add_forward_headers := AFH} = Dst} = State) -> rabbit_global_counters:publisher_created(?PROTOCOL), _TRef = erlang:send_after(1000, self(), send_confirms_and_nacks), + Alarms0 = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), + Alarms = sets:from_list(Alarms0), case AFH of true -> Props = #{<<"x-opt-shovelled-by">> => rabbit_nodes:cluster_name(), <<"x-opt-shovel-type">> => rabbit_data_coercion:to_binary(Type), <<"x-opt-shovel-name">> => rabbit_data_coercion:to_binary(Name)}, - State#{dest => Dst#{cached_forward_headers => Props}}; + State#{dest => Dst#{cached_forward_headers => Props, + alarms => Alarms}}; false -> - State + State#{dest => Dst#{alarms => Alarms}} end. source_uri(_State) -> @@ -347,6 +357,19 @@ handle_dest({{'DOWN', #resource{kind = queue, {eol, QState1, _QRef} -> State0#{dest => Dest#{current => Current#{queue_states => QState1}}} end; +handle_dest({conserve_resources, Alarm, Conserve}, #{dest := #{alarms := Alarms0} = Dest} = State0) -> + Alarms = case Conserve of + true -> sets:add_element(Alarm, Alarms0); + false -> sets:del_element(Alarm, Alarms0) + end, + State = State0#{dest => Dest#{alarms => Alarms}}, + case {sets:is_empty(Alarms0), sets:is_empty(Alarms)} of + {false, true} -> + %% All alarms cleared + forward_pending_delivery(State); + {_, _} -> + State + end; handle_dest(_Msg, State) -> State. @@ -362,7 +385,16 @@ forward(_, _, #{source := #{remaining_unacked := 0}} = State) -> %% come back. So drop subsequent messages on the floor to be %% requeued later State; -forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current} = Dest, +forward(Tag, Msg, State) -> + case is_blocked(State) of + true -> + PendingEntry = {Tag, Msg}, + add_pending_delivery(PendingEntry, State); + false -> + do_forward(Tag, Msg, State) + end. + +do_forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current} = Dest, ack_mode := AckMode} = State0) -> {Options, #{dest := #{current := Current1} = Dest1} = State} = case AckMode of @@ -425,10 +457,15 @@ add_routing(Msg0, Dest) -> RK -> mc:set_annotation(?ANN_ROUTING_KEYS, [RK], Msg) end. -status(_) -> - running. +status(State) -> + case is_blocked(State) of + true -> blocked; + false -> running + end. -pending_count(_State) -> +pending_count(#{dest := #{pending_delivery := Pending}}) -> + queue:len(Pending); +pending_count(_) -> 0. %% Internal @@ -891,3 +928,35 @@ messages_delivered(QName, S0) -> _ -> ok end. + +is_blocked(#{dest := #{alarms := Alarms}}) -> + not sets:is_empty(Alarms); +is_blocked(_) -> + false. + +add_pending_delivery(Elem, State = #{dest := Dest}) -> + Pending = maps:get(pending_delivery, Dest, queue:new()), + State#{dest => Dest#{pending_delivery => queue:in(Elem, Pending)}}. + +pop_pending_delivery(State = #{dest := Dest}) -> + Pending = maps:get(pending_delivery, Dest, queue:new()), + case queue:out(Pending) of + {empty, _} -> + empty; + {{value, Elem}, Pending2} -> + {Elem, State#{dest => Dest#{pending_delivery => Pending2}}} + end. + +forward_pending_delivery(State) -> + case pop_pending_delivery(State) of + empty -> + State; + {{Tag, Mc}, S} -> + S2 = do_forward(Tag, Mc, S), + case is_blocked(S2) of + true -> + S2; + false -> + forward_pending_delivery(S2) + end + end. diff --git a/deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl index 743ffc83e34..77904c9af91 100644 --- a/deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl @@ -36,7 +36,8 @@ groups() -> local_to_local_delete_dest_queue, local_to_local_stream_credit_flow_no_ack, local_to_local_simple_uri, - local_to_local_counters + local_to_local_counters, + local_to_local_alarms ]} ]. @@ -240,6 +241,41 @@ local_to_local_counters(Config) -> get_global_counters(Config), 30_000) end). +local_to_local_alarms(Config) -> + Src = ?config(srcq, Config), + Dest = ?config(destq, Config), + ShovelArgs = [{<<"src-protocol">>, <<"local">>}, + {<<"src-queue">>, Src}, + {<<"dest-protocol">>, <<"local">>}, + {<<"dest-queue">>, Dest}], + with_amqp10_session( + Config, + fun (Sess) -> + amqp10_publish(Sess, Src, <<"hello">>, 1000), + rabbit_ct_broker_helpers:set_alarm(Config, 0, disk), + rabbit_ct_broker_helpers:set_alarm(Config, 0, disk), + shovel_test_utils:set_param(Config, ?PARAM, ShovelArgs), + ?awaitMatch({running, blocked}, get_blocked_status(Config), 30000), + amqp10_expect_empty(Sess, Dest), + rabbit_ct_broker_helpers:clear_alarm(Config, 0, disk), + ?awaitMatch({running, running}, get_blocked_status(Config), 30000), + amqp10_expect_count(Sess, Dest, 1000), + + shovel_test_utils:clear_param(Config, ?PARAM), + + amqp10_publish(Sess, Src, <<"hello">>, 1000), + rabbit_ct_broker_helpers:set_alarm(Config, 0, disk), + rabbit_ct_broker_helpers:set_alarm(Config, 0, memory), + shovel_test_utils:set_param(Config, ?PARAM, ShovelArgs), + ?awaitMatch({running, blocked}, get_blocked_status(Config), 30000), + amqp10_expect_empty(Sess, Dest), + rabbit_ct_broker_helpers:clear_alarm(Config, 0, disk), + ?awaitMatch({running, blocked}, get_blocked_status(Config), 30000), + amqp10_expect_empty(Sess, Dest), + rabbit_ct_broker_helpers:clear_alarm(Config, 0, memory), + ?awaitMatch({running, running}, get_blocked_status(Config), 30000), + amqp10_expect_count(Sess, Dest, 1000) + end). %%---------------------------------------------------------------------------- declare_queue(Config, VHost, QName) -> declare_queue(Config, VHost, QName, []). @@ -296,3 +332,11 @@ get_global_counters(Config) -> get_global_counters0(Config, Key) -> Overview = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_global_counters, overview, []), maps:get(Key, Overview). + +get_blocked_status(Config) -> + case rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status, status, []) of + [{_, _, {Status, PropList}, _, _}] -> + {Status, proplists:get_value(blocked_status, PropList)}; + _ -> + empty + end. diff --git a/deps/rabbitmq_shovel/test/shovel_dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/shovel_dynamic_SUITE.erl index 15226936350..f01f253f2d6 100644 --- a/deps/rabbitmq_shovel/test/shovel_dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/shovel_dynamic_SUITE.erl @@ -108,7 +108,8 @@ tests() -> application_properties, delete_src_queue, shovel_status, - change_definition + change_definition, + disk_alarm ]. %% ------------------------------------------------------------------- @@ -589,6 +590,20 @@ change_definition(Config) -> amqp10_expect_empty(Sess, Dest2) end). +disk_alarm(Config) -> + Src = ?config(srcq, Config), + Dest = ?config(destq, Config), + with_amqp10_session(Config, + fun (Sess) -> + ShovelArgs = ?config(shovel_args, Config), + amqp10_publish(Sess, Src, <<"hello">>, 10), + rabbit_ct_broker_helpers:set_alarm(Config, 0, disk), + set_param(Config, ?PARAM, ShovelArgs), + amqp10_expect_empty(Sess, Dest), + rabbit_ct_broker_helpers:clear_alarm(Config, 0, disk), + amqp10_expect_count(Sess, Dest, 10) + end). + %%---------------------------------------------------------------------------- maybe_skip_local_protocol(Config) -> [Node] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),