Skip to content

Commit a0e44fa

Browse files
Merge pull request #14872 from rabbitmq/mergify/bp/v4.2.x/pr-14794
Shovel tests and bugfixes (backport #14794)
2 parents 6ce68dd + d8add90 commit a0e44fa

10 files changed

+1133
-1270
lines changed

deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,10 @@ handle_source({#'basic.deliver'{delivery_tag = Tag,
229229
% forward to destination
230230
rabbit_shovel_behaviour:forward(Tag, Msg, State);
231231

232+
handle_source(#'basic.cancel'{}, #{name := Name}) ->
233+
?LOG_WARNING("Shovel ~tp received a 'basic.cancel' from the server", [Name]),
234+
{stop, {shutdown, restart}};
235+
232236
handle_source({'EXIT', Conn, Reason},
233237
#{source := #{current := {Conn, _, _}}}) ->
234238
{stop, {inbound_conn_died, Reason}};
@@ -251,10 +255,6 @@ handle_dest(#'basic.nack'{delivery_tag = Seq, multiple = Multiple},
251255
rabbit_shovel_behaviour:nack(Tag, Multi, StateX)
252256
end, Seq, Multiple, State);
253257

254-
handle_dest(#'basic.cancel'{}, #{name := Name}) ->
255-
?LOG_WARNING("Shovel ~tp received a 'basic.cancel' from the server", [Name]),
256-
{stop, {shutdown, restart}};
257-
258258
handle_dest({'EXIT', Conn, Reason}, #{dest := #{current := {Conn, _, _}}}) ->
259259
{stop, {outbound_conn_died, Reason}};
260260

deps/rabbitmq_shovel/src/rabbit_local_shovel.erl

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -256,12 +256,10 @@ source_protocol(_State) ->
256256
dest_protocol(_State) ->
257257
local.
258258

259-
source_endpoint(#{source := #{queue := Queue,
260-
exchange := SrcX,
259+
source_endpoint(#{source := #{exchange := SrcX,
261260
routing_key := SrcXKey}}) ->
262261
[{src_exchange, SrcX},
263-
{src_exchange_key, SrcXKey},
264-
{src_queue, Queue}];
262+
{src_exchange_key, SrcXKey}];
265263
source_endpoint(#{source := #{queue := Queue}}) ->
266264
[{src_queue, Queue}];
267265
source_endpoint(_Config) ->

deps/rabbitmq_shovel/src/rabbit_shovel_status.erl

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
-export([start_link/0]).
1616

17-
-export([report/3,
17+
-export([report/3, report/4,
1818
report_blocked_status/2,
1919
remove/1,
2020
status/0,
@@ -78,6 +78,12 @@ start_link() ->
7878
report(Name, Type, Info) ->
7979
gen_server:cast(?SERVER, {report, Name, Type, Info, calendar:local_time()}).
8080

81+
-spec report(name(), type(), info(), metrics()) -> ok.
82+
report(Name, Type, Info, Metrics) ->
83+
%% Initialise metrics for protocols that don't immediately generate a
84+
%% blocked status report. This happens with AMQP 1.0
85+
gen_server:cast(?SERVER, {report, Name, Type, Info, Metrics, calendar:local_time()}).
86+
8187
-spec report_blocked_status(name(), {blocked_status(), metrics()} | blocked_status()) -> ok.
8288
report_blocked_status(Name, Status) ->
8389
gen_server:cast(?SERVER, {report_blocked_status, Name, Status, erlang:monotonic_time()}).
@@ -164,6 +170,19 @@ handle_cast({report, Name, Type, Info, Timestamp}, State) ->
164170
split_name(Name) ++ split_status(Info)),
165171
{noreply, State};
166172

173+
handle_cast({report, Name, Type, Info, Metrics, Timestamp}, State) ->
174+
Entry = #entry{
175+
name = Name,
176+
type = Type,
177+
info = Info,
178+
metrics = Metrics,
179+
timestamp = Timestamp
180+
},
181+
true = ets:insert(?ETS_NAME, Entry),
182+
rabbit_event:notify(shovel_worker_status,
183+
split_name(Name) ++ split_status(Info)),
184+
{noreply, State};
185+
167186
handle_cast({report_blocked_status, Name, {Status, Metrics}, Timestamp}, State) ->
168187
case Status of
169188
flow ->

deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,13 +279,15 @@ report_running(#state{config = Config} = State) ->
279279
OutProto = rabbit_shovel_behaviour:dest_protocol(Config),
280280
InEndpoint = rabbit_shovel_behaviour:source_endpoint(Config),
281281
OutEndpoint = rabbit_shovel_behaviour:dest_endpoint(Config),
282+
{_, Metrics} = rabbit_shovel_behaviour:status(Config),
282283
rabbit_shovel_status:report(State#state.name, State#state.type,
283284
{running, [{src_uri, rabbit_data_coercion:to_binary(InUri)},
284285
{src_protocol, rabbit_data_coercion:to_binary(InProto)},
285286
{dest_protocol, rabbit_data_coercion:to_binary(OutProto)},
286287
{dest_uri, rabbit_data_coercion:to_binary(OutUri)}]
287288
++ props_to_binary(InEndpoint) ++ props_to_binary(OutEndpoint)
288-
}).
289+
},
290+
Metrics).
289291

290292
props_to_binary(Props) ->
291293
[{K, rabbit_data_coercion:to_binary(V)} || {K, V} <- Props].

0 commit comments

Comments
 (0)