Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion deps/rabbitmq_shovel/src/rabbit_local_shovel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,12 @@ decl_queue(QName, QArgs, VHost, User) ->
Method = #'queue.declare'{queue = QName,
durable = true,
arguments = Args},
decl_fun([Method], VHost, User).
try
decl_fun([#'queue.declare'{queue = QName,
passive = true}], VHost, User)
catch exit:{amqp_error, not_found, _, _} ->
decl_fun([Method], VHost, User)
end.

dest_check_queue(none, _, _, _) ->
ok;
Expand Down
30 changes: 30 additions & 0 deletions deps/rabbitmq_shovel/test/amqp091_dynamic_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ groups() ->
restart,
change_definition,
autodelete,
autodelete_with_rejections,
validation,
security_validation,
get_connection_name,
Expand Down Expand Up @@ -519,6 +520,35 @@ autodelete_do(Config, {AckMode, After, ExpSrc, ExpDest}) ->
expect_count(Ch, <<"src">>, <<"hello">>, ExpSrc)
end.

autodelete_with_rejections(Config) ->
Src = <<"src">>,
Dest = <<"dst">>,
Args = [{<<"x-max-length">>, long, 5},
{<<"x-overflow">>, longstr, <<"reject-publish">>}],
with_ch(Config,
fun (Ch) ->
amqp_channel:call(Ch, #'queue.declare'{queue = Dest,
durable = true,
arguments = Args}),
shovel_test_utils:set_param(Config, <<"test">>,
[{<<"src-protocol">>, <<"local">>},
{<<"src-queue">>, Src},
{<<"src-delete-after">>, 10},
{<<"dest-protocol">>, <<"local">>},
{<<"dest-predeclared">>, true},
{<<"dest-queue">>, Dest}
]),
publish_count(Ch, <<>>, Src, <<"hello">>, 10),
await_autodelete(Config, <<"test">>),
Expected = lists:sort([[Src, <<"5">>], [Dest, <<"5">>]]),
eventually(
?_assertMatch(
Expected,
lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list(
Config, 0,
["list_queues", "name", "messages_ready", "--no-table-headers"]))))
end).

validation(Config) ->
URIs = [{<<"src-uri">>, <<"amqp://">>},
{<<"dest-uri">>, <<"amqp://">>}],
Expand Down
146 changes: 89 additions & 57 deletions deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,19 @@

-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
-compile(export_all).

-import(shovel_test_utils, [with_amqp10_session/2,
amqp10_publish/3, amqp10_publish/5,
amqp10_expect_empty/2,
await_amqp10_event/3, amqp10_expect_one/2,
amqp10_expect_count/3, amqp10_publish/4,
amqp10_publish_expect/5,
amqp10_publish_expect/5, amqp10_declare_queue/3,
await_autodelete/2]).

-define(PARAM, <<"test">>).

all() ->
[
{group, non_parallel_tests},
Expand All @@ -34,6 +37,7 @@ groups() ->
autodelete_amqp091_src_on_publish,
autodelete_amqp091_dest_on_confirm,
autodelete_amqp091_dest_on_publish,
autodelete_with_rejections,
simple_amqp10_dest,
simple_amqp10_src,
amqp091_to_amqp10_with_dead_lettering,
Expand Down Expand Up @@ -83,6 +87,8 @@ init_per_testcase(Testcase, Config0) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).

end_per_testcase(Testcase, Config) ->
shovel_test_utils:clear_param(Config, ?PARAM),
rabbit_ct_broker_helpers:rpc(Config, 0, shovel_test_utils, delete_all_queues, []),
rabbit_ct_helpers:testcase_finished(Config, Testcase).

%% -------------------------------------------------------------------
Expand Down Expand Up @@ -113,11 +119,9 @@ amqp091_to_amqp10_with_dead_lettering(Config) ->
TmpQ = <<"tmp">>,
with_amqp10_session(Config,
fun (Sess) ->
{ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Sess, <<"my link pair">>),
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, TmpQ,
#{arguments =>#{<<"x-max-length">> => {uint, 0},
<<"x-dead-letter-exchange">> => {utf8, <<"">>},
<<"x-dead-letter-routing-key">> => {utf8, Src}}}),
amqp10_declare_queue(Sess, TmpQ, #{<<"x-max-length">> => {uint, 0},
<<"x-dead-letter-exchange">> => {utf8, <<"">>},
<<"x-dead-letter-routing-key">> => {utf8, Src}}),
{ok, Sender} = amqp10_client:attach_sender_link(Sess,
<<"sender-tmp">>,
<<"/queues/", TmpQ/binary>>,
Expand All @@ -132,7 +136,7 @@ amqp091_to_amqp10_with_dead_lettering(Config) ->

test_amqp10_destination(Config, Src, Dest, Sess, Protocol, ProtocolSrc) ->
MapConfig = ?config(map_config, Config),
shovel_test_utils:set_param(Config, <<"test">>,
shovel_test_utils:set_param(Config, ?PARAM,
[{<<"src-protocol">>, Protocol},
{ProtocolSrc, Src},
{<<"dest-protocol">>, <<"amqp10">>},
Expand Down Expand Up @@ -186,18 +190,18 @@ simple_amqp10_src(Config) ->
fun (Sess) ->
shovel_test_utils:set_param(
Config,
<<"test">>, [{<<"src-protocol">>, <<"amqp10">>},
{<<"src-address">>, Src},
{<<"dest-protocol">>, <<"amqp091">>},
{<<"dest-queue">>, Dest},
{<<"add-forward-headers">>, true},
{<<"dest-add-timestamp-header">>, true},
{<<"publish-properties">>,
case MapConfig of
true -> #{<<"cluster_id">> => <<"x">>};
_ -> [{<<"cluster_id">>, <<"x">>}]
end}
]),
?PARAM, [{<<"src-protocol">>, <<"amqp10">>},
{<<"src-address">>, Src},
{<<"dest-protocol">>, <<"amqp091">>},
{<<"dest-queue">>, Dest},
{<<"add-forward-headers">>, true},
{<<"dest-add-timestamp-header">>, true},
{<<"publish-properties">>,
case MapConfig of
true -> #{<<"cluster_id">> => <<"x">>};
_ -> [{<<"cluster_id">>, <<"x">>}]
end}
]),
_Msg = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1),
% the fidelity loss is quite high when consuming using the amqp10
% plugin. For example custom headers aren't current translated.
Expand All @@ -213,18 +217,18 @@ amqp10_to_amqp091_application_properties(Config) ->
fun (Sess) ->
shovel_test_utils:set_param(
Config,
<<"test">>, [{<<"src-protocol">>, <<"amqp10">>},
{<<"src-address">>, Src},
{<<"dest-protocol">>, <<"amqp091">>},
{<<"dest-queue">>, Dest},
{<<"add-forward-headers">>, true},
{<<"dest-add-timestamp-header">>, true},
{<<"publish-properties">>,
case MapConfig of
true -> #{<<"cluster_id">> => <<"x">>};
_ -> [{<<"cluster_id">>, <<"x">>}]
end}
]),
?PARAM, [{<<"src-protocol">>, <<"amqp10">>},
{<<"src-address">>, Src},
{<<"dest-protocol">>, <<"amqp091">>},
{<<"dest-queue">>, Dest},
{<<"add-forward-headers">>, true},
{<<"dest-add-timestamp-header">>, true},
{<<"publish-properties">>,
case MapConfig of
true -> #{<<"cluster_id">> => <<"x">>};
_ -> [{<<"cluster_id">>, <<"x">>}]
end}
]),

MsgSent = amqp10_msg:set_application_properties(
#{<<"key">> => <<"value">>},
Expand All @@ -247,13 +251,13 @@ change_definition(Config) ->
Dest2 = ?config(destq2, Config),
with_amqp10_session(Config,
fun (Sess) ->
shovel_test_utils:set_param(Config, <<"test">>,
shovel_test_utils:set_param(Config, ?PARAM,
[{<<"src-address">>, Src},
{<<"src-protocol">>, <<"amqp10">>},
{<<"dest-protocol">>, <<"amqp10">>},
{<<"dest-address">>, Dest}]),
amqp10_publish_expect(Sess, Src, Dest, <<"hello1">>, 1),
shovel_test_utils:set_param(Config, <<"test">>,
shovel_test_utils:set_param(Config, ?PARAM,
[{<<"src-address">>, Src},
{<<"src-protocol">>, <<"amqp10">>},
{<<"dest-protocol">>, <<"amqp10">>},
Expand Down Expand Up @@ -296,14 +300,14 @@ autodelete_do(Config, {AckMode, After, ExpSrc, ExpDest}) ->
amqp10_publish(Session, Src, <<"hello">>, 100),
shovel_test_utils:set_param_nowait(
Config,
<<"test">>, [{<<"src-address">>, Src},
{<<"src-protocol">>, <<"amqp10">>},
{<<"src-delete-after">>, After},
{<<"src-prefetch-count">>, 5},
{<<"dest-address">>, Dest},
{<<"dest-protocol">>, <<"amqp10">>},
{<<"ack-mode">>, AckMode}
]),
?PARAM, [{<<"src-address">>, Src},
{<<"src-protocol">>, <<"amqp10">>},
{<<"src-delete-after">>, After},
{<<"src-prefetch-count">>, 5},
{<<"dest-address">>, Dest},
{<<"dest-protocol">>, <<"amqp10">>},
{<<"ack-mode">>, AckMode}
]),
await_autodelete(Config, <<"test">>),
amqp10_expect_count(Session, Dest, ExpDest),
amqp10_expect_count(Session, Src, ExpSrc)
Expand All @@ -316,14 +320,14 @@ autodelete_amqp091_src(Config, {AckMode, After, ExpSrc, ExpDest}) ->
amqp10_publish(Session, Src, <<"hello">>, 100),
shovel_test_utils:set_param_nowait(
Config,
<<"test">>, [{<<"src-queue">>, Src},
{<<"src-protocol">>, <<"amqp091">>},
{<<"src-delete-after">>, After},
{<<"src-prefetch-count">>, 5},
{<<"dest-address">>, Dest},
{<<"dest-protocol">>, <<"amqp10">>},
{<<"ack-mode">>, AckMode}
]),
?PARAM, [{<<"src-queue">>, Src},
{<<"src-protocol">>, <<"amqp091">>},
{<<"src-delete-after">>, After},
{<<"src-prefetch-count">>, 5},
{<<"dest-address">>, Dest},
{<<"dest-protocol">>, <<"amqp10">>},
{<<"ack-mode">>, AckMode}
]),
await_autodelete(Config, <<"test">>),
amqp10_expect_count(Session, Dest, ExpDest),
amqp10_expect_count(Session, Src, ExpSrc)
Expand All @@ -336,19 +340,47 @@ autodelete_amqp091_dest(Config, {AckMode, After, ExpSrc, ExpDest}) ->
amqp10_publish(Session, Src, <<"hello">>, 100),
shovel_test_utils:set_param_nowait(
Config,
<<"test">>, [{<<"src-address">>, Src},
{<<"src-protocol">>, <<"amqp10">>},
{<<"src-delete-after">>, After},
{<<"src-prefetch-count">>, 5},
{<<"dest-queue">>, Dest},
{<<"dest-protocol">>, <<"amqp091">>},
{<<"ack-mode">>, AckMode}
]),
?PARAM, [{<<"src-address">>, Src},
{<<"src-protocol">>, <<"amqp10">>},
{<<"src-delete-after">>, After},
{<<"src-prefetch-count">>, 5},
{<<"dest-queue">>, Dest},
{<<"dest-protocol">>, <<"amqp091">>},
{<<"ack-mode">>, AckMode}
]),
await_autodelete(Config, <<"test">>),
amqp10_expect_count(Session, Dest, ExpDest),
amqp10_expect_count(Session, Src, ExpSrc)
end.

autodelete_with_rejections(Config) ->
Src = ?config(srcq, Config),
Dest = ?config(destq, Config),
with_amqp10_session(
Config,
fun (Sess) ->
amqp10_declare_queue(Sess, Dest, #{<<"x-max-length">> => {uint, 5},
<<"x-overflow">> => {utf8, <<"reject-publish">>}}),

shovel_test_utils:set_param(Config, ?PARAM,
[{<<"src-protocol">>, <<"local">>},
{<<"src-queue">>, Src},
{<<"src-delete-after">>, 10},
{<<"dest-protocol">>, <<"local">>},
{<<"dest-predeclared">>, true},
{<<"dest-queue">>, Dest}
]),
amqp10_publish(Sess, Src, <<"hello">>, 10),
await_autodelete(Config, <<"test">>),
Expected = lists:sort([[Src, <<"5">>], [Dest, <<"5">>]]),
?awaitMatch(
Expected,
lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list(
Config, 0,
["list_queues", "name", "messages_ready", "--no-table-headers"])),
30_000)
end).

test_amqp10_delete_after_queue_length(Config) ->
Src = ?config(srcq, Config),
Dest = ?config(destq, Config),
Expand Down
Loading
Loading