Skip to content

Commit 9b4e97b

Browse files
committed
Shovel: tests delete after with queue rejections
1 parent 8ff21c2 commit 9b4e97b

File tree

3 files changed

+99
-0
lines changed

3 files changed

+99
-0
lines changed

deps/rabbitmq_shovel/test/amqp091_dynamic_SUITE.erl

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ groups() ->
3838
restart,
3939
change_definition,
4040
autodelete,
41+
autodelete_with_rejections,
4142
validation,
4243
security_validation,
4344
get_connection_name,
@@ -535,6 +536,35 @@ autodelete_do(Config, {AckMode, After, ExpSrc, ExpDest}) ->
535536
expect_count(Ch, <<"src">>, <<"hello">>, ExpSrc)
536537
end.
537538

539+
autodelete_with_rejections(Config) ->
540+
Src = <<"src">>,
541+
Dest = <<"dst">>,
542+
Args = [{<<"x-max-length">>, long, 5},
543+
{<<"x-overflow">>, longstr, <<"reject-publish">>}],
544+
with_ch(Config,
545+
fun (Ch) ->
546+
amqp_channel:call(Ch, #'queue.declare'{queue = Dest,
547+
durable = true,
548+
arguments = Args}),
549+
shovel_test_utils:set_param(Config, <<"test">>,
550+
[{<<"src-protocol">>, <<"local">>},
551+
{<<"src-queue">>, Src},
552+
{<<"src-delete-after">>, 10},
553+
{<<"dest-protocol">>, <<"local">>},
554+
{<<"dest-predeclared">>, true},
555+
{<<"dest-queue">>, Dest}
556+
]),
557+
publish_count(Ch, <<>>, Src, <<"hello">>, 10),
558+
await_autodelete(Config, <<"test">>),
559+
Expected = lists:sort([[Src, <<"5">>], [Dest, <<"5">>]]),
560+
eventually(
561+
?_assertMatch(
562+
Expected,
563+
lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list(
564+
Config, 0,
565+
["list_queues", "name", "messages_ready", "--no-table-headers"]))))
566+
end).
567+
538568
validation(Config) ->
539569
URIs = [{<<"src-uri">>, <<"amqp://">>},
540570
{<<"dest-uri">>, <<"amqp://">>}],

deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
-include_lib("common_test/include/ct.hrl").
1111
-include_lib("eunit/include/eunit.hrl").
12+
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
1213
-compile(export_all).
1314

1415
-import(shovel_test_utils, [await_credit/1]).
@@ -28,6 +29,7 @@ groups() ->
2829
autodelete_amqp091_src_on_publish,
2930
autodelete_amqp091_dest_on_confirm,
3031
autodelete_amqp091_dest_on_publish,
32+
autodelete_with_rejections,
3133
simple_amqp10_dest,
3234
simple_amqp10_src,
3335
amqp091_to_amqp10_with_dead_lettering,
@@ -77,6 +79,7 @@ init_per_testcase(Testcase, Config0) ->
7779
rabbit_ct_helpers:testcase_started(Config, Testcase).
7880

7981
end_per_testcase(Testcase, Config) ->
82+
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_all_queues, []),
8083
rabbit_ct_helpers:testcase_finished(Config, Testcase).
8184

8285
%% -------------------------------------------------------------------
@@ -344,6 +347,36 @@ autodelete_amqp091_dest(Config, {AckMode, After, ExpSrc, ExpDest}) ->
344347
expect_count(Session, Src, ExpSrc)
345348
end.
346349

350+
autodelete_with_rejections(Config) ->
351+
Src = ?config(srcq, Config),
352+
Dest = ?config(destq, Config),
353+
with_session(
354+
Config,
355+
fun (Sess) ->
356+
{ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Sess, <<"my link pair">>),
357+
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, Dest,
358+
#{arguments =>#{<<"x-max-length">> => {uint, 5},
359+
<<"x-overflow">> => {utf8, <<"reject-publish">>}}}),
360+
361+
shovel_test_utils:set_param(Config, <<"test">>,
362+
[{<<"src-protocol">>, <<"local">>},
363+
{<<"src-queue">>, Src},
364+
{<<"src-delete-after">>, 10},
365+
{<<"dest-protocol">>, <<"local">>},
366+
{<<"dest-predeclared">>, true},
367+
{<<"dest-queue">>, Dest}
368+
]),
369+
publish_count(Sess, Src, <<"hello">>, 10),
370+
await_autodelete(Config, <<"test">>),
371+
Expected = lists:sort([[Src, <<"5">>], [Dest, <<"5">>]]),
372+
?awaitMatch(
373+
Expected,
374+
lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list(
375+
Config, 0,
376+
["list_queues", "name", "messages_ready", "--no-table-headers"])),
377+
30_000)
378+
end).
379+
347380
test_amqp10_delete_after_queue_length(Config) ->
348381
Src = ?config(srcq, Config),
349382
Dest = ?config(destq, Config),
@@ -512,3 +545,10 @@ await_autodelete1(_Config, Name) ->
512545
shovels_from_parameters() ->
513546
L = rabbit_runtime_parameters:list(<<"/">>, <<"shovel">>),
514547
[rabbit_misc:pget(name, Shovel) || Shovel <- L].
548+
549+
delete_all_queues() ->
550+
Queues = rabbit_amqqueue:list(),
551+
lists:foreach(
552+
fun(Q) ->
553+
{ok, _} = rabbit_amqqueue:delete(Q, false, false, <<"dummy">>)
554+
end, Queues).

deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ groups() ->
5050
local_to_local_delete_after_queue_length,
5151
local_to_local_delete_after_queue_length_zero,
5252
local_to_local_delete_after_number,
53+
local_to_local_delete_after_with_rejections,
5354
local_to_local_no_ack,
5455
local_to_local_quorum_no_ack,
5556
local_to_local_stream_no_ack,
@@ -600,6 +601,34 @@ local_to_local_delete_after_number(Config) ->
600601
expect_none(Sess, Dest)
601602
end).
602603

604+
local_to_local_delete_after_with_rejections(Config) ->
605+
Src = ?config(srcq, Config),
606+
Dest = ?config(destq, Config),
607+
VHost = <<"/">>,
608+
declare_queue(Config, VHost, Dest, [{<<"x-max-length">>, long, 5},
609+
{<<"x-overflow">>, longstr, <<"reject-publish">>}]),
610+
with_session(Config,
611+
fun (Sess) ->
612+
shovel_test_utils:set_param(Config, ?PARAM,
613+
[{<<"src-protocol">>, <<"local">>},
614+
{<<"src-queue">>, Src},
615+
{<<"src-delete-after">>, 10},
616+
{<<"dest-protocol">>, <<"local">>},
617+
{<<"dest-predeclared">>, true},
618+
{<<"dest-queue">>, Dest}
619+
]),
620+
publish_many(Sess, Src, Dest, <<"tag1">>, 10),
621+
?awaitMatch(not_found, rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_runtime_parameters, lookup, [<<"/">>, <<"shovel">>, ?PARAM]), 30_000),
622+
Expected = lists:sort([[Src, <<"5">>], [Dest, <<"5">>]]),
623+
?awaitMatch(
624+
Expected,
625+
lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list(
626+
Config, 0,
627+
["list_queues", "name", "messages_ready", "--no-table-headers"])),
628+
30_000)
629+
630+
end).
631+
603632
local_to_local_no_ack(Config) ->
604633
Src = ?config(srcq, Config),
605634
Dest = ?config(destq, Config),

0 commit comments

Comments
 (0)