Skip to content

Commit c75597c

Browse files
committed
amqp_client: Emit 'connection.blocked' in direct connections
1 parent 2303570 commit c75597c

File tree

2 files changed

+40
-2
lines changed

2 files changed

+40
-2
lines changed

deps/amqp_client/src/amqp_direct_connection.erl

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
-export([socket_adapter_info/2,
2222
socket_adapter_info/3]).
2323

24+
-export([conserve_resources/3]).
25+
2426
-record(state, {node,
2527
user,
2628
vhost,
@@ -158,6 +160,8 @@ connect(Params = #amqp_params_direct{username = Username,
158160
%% supervisor; that way we find out if the node goes down
159161
%% or the rabbit app stops.
160162
erlang:monitor(process, {rabbit_direct_client_sup, Node}),
163+
rpc:call(Node, rabbit_alarm, register,
164+
[self(), {?MODULE, conserve_resources, []}]),
161165
{ok, {ServerProperties, 0, ChMgr, State2}};
162166
{error, _} = E ->
163167
E;
@@ -242,3 +246,9 @@ ssl_cert_info(Sock) ->
242246
_ ->
243247
[]
244248
end.
249+
250+
-spec conserve_resources(pid(),
251+
rabbit_alarm:resource_alarm_source(),
252+
rabbit_alarm:resource_alert()) -> 'ok'.
253+
conserve_resources(ChannelPid, Source, {_, Conserve, _}) ->
254+
gen_server:cast(ChannelPid, {conserve_resources, Source, Conserve}).

deps/amqp_client/src/amqp_gen_connection.erl

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
server_properties,
3232
%% connection.block, connection.unblock handler
3333
block_handler,
34+
blocked_by = sets:new([{version, 2}]),
3435
closing = false %% #closing{} | false
3536
}).
3637

@@ -199,9 +200,36 @@ handle_cast({server_misbehaved, AmqpError}, State) ->
199200
server_misbehaved_close(AmqpError, State);
200201
handle_cast({server_close, #'connection.close'{} = Close}, State) ->
201202
server_initiated_close(Close, State);
202-
handle_cast({register_blocked_handler, HandlerPid}, State) ->
203+
handle_cast({register_blocked_handler, HandlerPid},
204+
#state{blocked_by = BlockedBy} = State) ->
203205
Ref = erlang:monitor(process, HandlerPid),
204-
{noreply, State#state{block_handler = {HandlerPid, Ref}}}.
206+
State1 = State#state{block_handler = {HandlerPid, Ref}},
207+
%% If an alarm is already active, immediately block the handler.
208+
_ = case sets:is_empty(BlockedBy) of
209+
false ->
210+
HandlerPid ! #'connection.blocked'{};
211+
true ->
212+
ok
213+
end,
214+
{noreply, State1};
215+
handle_cast({conserve_resources, Source, Conserve},
216+
#state{blocked_by = BlockedBy} = State) ->
217+
WasNotBlocked = sets:is_empty(BlockedBy),
218+
BlockedBy1 = case Conserve of
219+
true ->
220+
sets:add_element(Source, BlockedBy);
221+
false ->
222+
sets:del_element(Source, BlockedBy)
223+
end,
224+
State1 = State#state{blocked_by = BlockedBy1},
225+
case sets:is_empty(BlockedBy1) of
226+
true ->
227+
handle_method(#'connection.unblocked'{}, State1);
228+
false when WasNotBlocked ->
229+
handle_method(#'connection.blocked'{}, State1);
230+
false ->
231+
{noreply, State1}
232+
end.
205233

206234
%% @private
207235
handle_info({'DOWN', _, process, BlockHandler, Reason},

0 commit comments

Comments
 (0)