Skip to content

Commit 3f1c98b

Browse files
Merge pull request #14660 from rabbitmq/md/block-direct-publishes
amqp_client: Emit 'connection.blocked' in direct connections
2 parents df7b065 + 9393ec9 commit 3f1c98b

File tree

5 files changed

+329
-144
lines changed

5 files changed

+329
-144
lines changed

deps/amqp_client/src/amqp_gen_connection.erl

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
server_properties,
3232
%% connection.block, connection.unblock handler
3333
block_handler,
34+
blocked_by = sets:new([{version, 2}]),
3435
closing = false %% #closing{} | false
3536
}).
3637

@@ -199,9 +200,36 @@ handle_cast({server_misbehaved, AmqpError}, State) ->
199200
server_misbehaved_close(AmqpError, State);
200201
handle_cast({server_close, #'connection.close'{} = Close}, State) ->
201202
server_initiated_close(Close, State);
202-
handle_cast({register_blocked_handler, HandlerPid}, State) ->
203+
handle_cast({register_blocked_handler, HandlerPid},
204+
#state{blocked_by = BlockedBy} = State) ->
203205
Ref = erlang:monitor(process, HandlerPid),
204-
{noreply, State#state{block_handler = {HandlerPid, Ref}}}.
206+
State1 = State#state{block_handler = {HandlerPid, Ref}},
207+
%% If an alarm is already active, immediately block the handler.
208+
_ = case sets:is_empty(BlockedBy) of
209+
false ->
210+
HandlerPid ! #'connection.blocked'{};
211+
true ->
212+
ok
213+
end,
214+
{noreply, State1};
215+
handle_cast({conserve_resources, Source, Conserve},
216+
#state{blocked_by = BlockedBy} = State) ->
217+
WasNotBlocked = sets:is_empty(BlockedBy),
218+
BlockedBy1 = case Conserve of
219+
true ->
220+
sets:add_element(Source, BlockedBy);
221+
false ->
222+
sets:del_element(Source, BlockedBy)
223+
end,
224+
State1 = State#state{blocked_by = BlockedBy1},
225+
case sets:is_empty(BlockedBy1) of
226+
true ->
227+
handle_method(#'connection.unblocked'{}, State1);
228+
false when WasNotBlocked ->
229+
handle_method(#'connection.blocked'{}, State1);
230+
false ->
231+
{noreply, State1}
232+
end.
205233

206234
%% @private
207235
handle_info({'DOWN', _, process, BlockHandler, Reason},

deps/rabbit/src/rabbit_direct.erl

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
-deprecated([{force_event_refresh, 1, eventually}]).
1414

1515
%% Internal
16-
-export([list_local/0]).
16+
-export([list_local/0,
17+
conserve_resources/3]).
1718

1819
%% For testing only
1920
-export([extract_extra_auth_props/4]).
@@ -206,6 +207,8 @@ connect1(User = #user{username = Username}, VHost, Protocol, Pid, Infos) ->
206207
ok -> ok = pg_local:join(rabbit_direct, Pid),
207208
rabbit_core_metrics:connection_created(Pid, Infos),
208209
rabbit_event:notify(connection_created, Infos),
210+
_ = rabbit_alarm:register(
211+
Pid, {?MODULE, conserve_resources, []}),
209212
{ok, {User, rabbit_reader:server_properties(Protocol)}}
210213
catch
211214
exit:#amqp_error{name = Reason = not_allowed} ->
@@ -252,3 +255,9 @@ disconnect(Pid, Infos) ->
252255
pg_local:leave(rabbit_direct, Pid),
253256
rabbit_core_metrics:connection_closed(Pid),
254257
rabbit_event:notify(connection_closed, Infos).
258+
259+
-spec conserve_resources(pid(),
260+
rabbit_alarm:resource_alarm_source(),
261+
rabbit_alarm:resource_alert()) -> 'ok'.
262+
conserve_resources(ChannelPid, Source, {_, Conserve, _}) ->
263+
gen_server:cast(ChannelPid, {conserve_resources, Source, Conserve}).
Lines changed: 283 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,283 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
8+
-module(amqp091_alarm_SUITE).
9+
10+
-include_lib("common_test/include/ct.hrl").
11+
-include_lib("eunit/include/eunit.hrl").
12+
-include_lib("amqp_client/include/amqp_client.hrl").
13+
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
14+
15+
-compile([export_all, nowarn_export_all]).
16+
17+
all() ->
18+
[
19+
{group, network_connection},
20+
{group, direct_connection}
21+
].
22+
23+
groups() ->
24+
[
25+
{network_connection, [], [
26+
dest_resource_alarm_on_confirm,
27+
dest_resource_alarm_on_publish,
28+
dest_resource_alarm_no_ack
29+
]},
30+
{direct_connection, [], [
31+
dest_resource_alarm_on_confirm
32+
]}
33+
].
34+
35+
all_tests() ->
36+
[
37+
dest_resource_alarm_on_confirm,
38+
dest_resource_alarm_on_publish,
39+
dest_resource_alarm_no_ack
40+
].
41+
42+
%% -------------------------------------------------------------------
43+
%% Testsuite setup/teardown.
44+
%% -------------------------------------------------------------------
45+
46+
init_per_suite(Config) ->
47+
rabbit_ct_helpers:log_environment(),
48+
Config1 = rabbit_ct_helpers:set_config(Config, [
49+
{rmq_nodename_suffix, ?MODULE},
50+
{rmq_nodes_count, 2},
51+
{rmq_nodes_clustered, false}
52+
]),
53+
rabbit_ct_helpers:run_setup_steps(Config1,
54+
rabbit_ct_broker_helpers:setup_steps() ++
55+
rabbit_ct_client_helpers:setup_steps()).
56+
57+
end_per_suite(Config) ->
58+
rabbit_ct_helpers:run_teardown_steps(Config,
59+
rabbit_ct_client_helpers:teardown_steps() ++
60+
rabbit_ct_broker_helpers:teardown_steps()).
61+
62+
init_per_group(network_connection, Config) ->
63+
rabbit_ct_helpers:set_config(
64+
Config,
65+
[{shovel_source_uri, shovel_test_utils:make_uri(Config, 1)},
66+
{shovel_source_idx, 1},
67+
{shovel_dest_uri, shovel_test_utils:make_uri(Config, 0)},
68+
{shovel_dest_idx, 0}
69+
]);
70+
init_per_group(direct_connection, Config) ->
71+
rabbit_ct_helpers:set_config(
72+
Config,
73+
[{shovel_source_uri, shovel_test_utils:make_uri(Config, 1)},
74+
{shovel_source_idx, 1},
75+
{shovel_dest_uri, <<"amqp://">>},
76+
{shovel_dest_idx, 0}
77+
]).
78+
79+
end_per_group(_, Config) ->
80+
Config.
81+
82+
init_per_testcase(Testcase, Config) ->
83+
rabbit_ct_helpers:testcase_started(Config, Testcase).
84+
85+
end_per_testcase(Testcase, Config) ->
86+
rabbit_ct_helpers:testcase_finished(Config, Testcase).
87+
88+
%% -------------------------------------------------------------------
89+
%% Testcases.
90+
%% -------------------------------------------------------------------
91+
92+
dest_resource_alarm_on_confirm(Config) ->
93+
dest_resource_alarm(<<"on-confirm">>, Config).
94+
95+
dest_resource_alarm_on_publish(Config) ->
96+
dest_resource_alarm(<<"on-publish">>, Config).
97+
98+
dest_resource_alarm_no_ack(Config) ->
99+
dest_resource_alarm(<<"no-ack">>, Config).
100+
101+
dest_resource_alarm(AckMode, Config) ->
102+
SourceUri = ?config(shovel_source_uri, Config),
103+
SourceIdx = ?config(shovel_source_idx, Config),
104+
DestUri = ?config(shovel_dest_uri, Config),
105+
DestIdx = ?config(shovel_dest_idx, Config),
106+
107+
{Conn1, Ch1} = rabbit_ct_client_helpers:open_connection_and_channel(
108+
Config, SourceIdx),
109+
amqp_channel:call(Ch1, #'confirm.select'{}),
110+
amqp_channel:call(Ch1, #'queue.declare'{queue = <<"src">>}),
111+
publish(Ch1, <<>>, <<"src">>, <<"hello">>),
112+
true = amqp_channel:wait_for_confirms(Ch1),
113+
#{messages := 1} = message_count(Config, SourceIdx, <<"src">>),
114+
while_blocked(Config, DestIdx,
115+
fun() ->
116+
ok = rabbit_ct_broker_helpers:rpc(
117+
Config, DestIdx,
118+
rabbit_runtime_parameters, set,
119+
[
120+
<<"/">>, <<"shovel">>, <<"test">>,
121+
[{<<"src-uri">>, SourceUri},
122+
{<<"dest-uri">>, [DestUri]},
123+
{<<"src-queue">>, <<"src">>},
124+
{<<"dest-queue">>, <<"dest">>},
125+
{<<"src-prefetch-count">>, 50},
126+
{<<"ack-mode">>, AckMode},
127+
{<<"src-delete-after">>, <<"never">>}], none]),
128+
%% The destination is blocked, so the shovel is blocked.
129+
?awaitMatch(
130+
blocked,
131+
shovel_test_utils:get_shovel_status(Config, DestIdx,
132+
<<"test">>),
133+
3_000),
134+
135+
%% The shoveled message triggered a
136+
%% connection.blocked notification, but hasn't
137+
%% reached the dest queue because of the resource
138+
%% alarm
139+
InitialMsgCnt =
140+
case AckMode of
141+
<<"on-confirm">> -> 1;
142+
_ -> 0
143+
end,
144+
#{messages := InitialMsgCnt,
145+
messages_unacknowledged := InitialMsgCnt
146+
} = message_count(Config, SourceIdx, <<"src">>),
147+
#{messages := 0} = message_count(Config, DestIdx, <<"dest">>),
148+
149+
%% Now publish more messages to "src" queue.
150+
publish_count(Ch1, <<>>, <<"src">>, <<"hello">>, 1000),
151+
true = amqp_channel:wait_for_confirms(Ch1),
152+
153+
%% No messages reached the dest queue
154+
#{messages := 0} = message_count(Config, DestIdx, <<"dest">>),
155+
156+
%% When the shovel sets a prefetch_count
157+
%% (on-confirm/on-publish mode), all messages are in
158+
%% the source queue, prefrech count are
159+
%% unacknowledged and buffered in the shovel
160+
MsgCnts =
161+
case AckMode of
162+
<<"on-confirm">> ->
163+
#{messages => 1001,
164+
messages_unacknowledged => 50};
165+
<<"on-publish">> ->
166+
#{messages => 1000,
167+
messages_unacknowledged => 50};
168+
<<"no-ack">> ->
169+
%% no prefetch limit, all messages are
170+
%% buffered in the shovel
171+
#{messages => 0,
172+
messages_unacknowledged => 0}
173+
end,
174+
175+
MsgCnts = message_count(Config, SourceIdx, <<"src">>),
176+
177+
%% There should be no process with a message buildup
178+
?awaitMatch(
179+
0,
180+
begin
181+
Top = [{_, P, _}] = rabbit_ct_broker_helpers:rpc(
182+
Config, 0, recon, proc_count, [message_queue_len, 1]),
183+
ct:pal("Top process by message queue length: ~p", [Top]),
184+
P
185+
end, 5_000),
186+
187+
ok
188+
end),
189+
190+
%% After the alarm clears, all messages should arrive in the dest queue.
191+
?awaitMatch(
192+
#{messages := 1001},
193+
message_count(Config, DestIdx, <<"dest">>),
194+
5_000),
195+
#{messages := 0} = message_count(Config, SourceIdx, <<"src">>),
196+
running = shovel_test_utils:get_shovel_status(Config, DestIdx, <<"test">>),
197+
198+
rabbit_ct_client_helpers:close_connection_and_channel(Conn1, Ch1),
199+
cleanup(Config),
200+
ok.
201+
202+
%%----------------------------------------------------------------------------
203+
204+
conserve_resources(Pid, Source, {_, Conserve, AlarmedNode}) ->
205+
case Conserve of
206+
true ->
207+
ct:log("node ~w set alarm for resource ~ts",
208+
[AlarmedNode, Source]),
209+
Pid ! {block, Source};
210+
false ->
211+
ct:log("node ~w cleared alarm for resource ~ts",
212+
[AlarmedNode, Source]),
213+
Pid ! {unblock, Source}
214+
end,
215+
ok.
216+
217+
while_blocked(Config, Node, Fun) when is_function(Fun, 0) ->
218+
OrigLimit = rabbit_ct_broker_helpers:rpc(Config, Node,
219+
vm_memory_monitor,
220+
get_vm_memory_high_watermark, []),
221+
222+
ok = rabbit_ct_broker_helpers:add_code_path_to_node(
223+
rabbit_ct_broker_helpers:get_node_config(Config, Node, nodename),
224+
?MODULE),
225+
[] = rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_alarm, register,
226+
[self(),
227+
{?MODULE, conserve_resources, []}]),
228+
ok = rabbit_ct_broker_helpers:rpc(Config, Node, vm_memory_monitor,
229+
set_vm_memory_high_watermark, [0]),
230+
Source = receive
231+
{block, S} ->
232+
S
233+
after
234+
15_000 ->
235+
ct:fail(alarm_set_timeout)
236+
end,
237+
try
238+
Fun()
239+
after
240+
ok = rabbit_ct_broker_helpers:rpc(Config, Node, vm_memory_monitor,
241+
set_vm_memory_high_watermark,
242+
[OrigLimit]),
243+
receive
244+
{unblock, Source} ->
245+
ok
246+
after
247+
10_000 ->
248+
ct:fail(alarm_clear_timeout)
249+
end
250+
end.
251+
252+
publish(Ch, X, Key, Payload) when is_binary(Payload) ->
253+
publish(Ch, X, Key, #amqp_msg{payload = Payload});
254+
255+
publish(Ch, X, Key, Msg = #amqp_msg{}) ->
256+
amqp_channel:cast(Ch, #'basic.publish'{exchange = X,
257+
routing_key = Key}, Msg).
258+
259+
publish_count(Ch, X, Key, M, Count) ->
260+
[begin
261+
262+
publish(Ch, X, Key, M)
263+
end || _ <- lists:seq(1, Count)].
264+
265+
message_count(Config, Node, QueueName) ->
266+
Resource = rabbit_misc:r(<<"/">>, queue, QueueName),
267+
{ok, Q} = rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_amqqueue,
268+
lookup, [Resource]),
269+
maps:from_list(
270+
rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_amqqueue, info,
271+
[Q, [messages, messages_unacknowledged]])).
272+
273+
cleanup(Config) ->
274+
rabbit_ct_broker_helpers:rpc_all(Config, ?MODULE, cleanup1, []).
275+
276+
cleanup1() ->
277+
[rabbit_runtime_parameters:clear(rabbit_misc:pget(vhost, P),
278+
rabbit_misc:pget(component, P),
279+
rabbit_misc:pget(name, P),
280+
<<"acting-user">>) ||
281+
P <- rabbit_runtime_parameters:list()],
282+
[rabbit_amqqueue:delete(Q, false, false, <<"acting-user">>)
283+
|| Q <- rabbit_amqqueue:list()].

0 commit comments

Comments
 (0)