Skip to content

Commit

Permalink
amqp_client_SUITE: resolve conflicts #12722 #12726
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelklishin committed Nov 14, 2024
1 parent c07ea7d commit 746a7f2
Showing 1 changed file with 13 additions and 263 deletions.
276 changes: 13 additions & 263 deletions deps/rabbit/test/amqp_client_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1164,7 +1164,7 @@ roundtrip_with_drain(Config, QueueType, QName)
% wait for a delivery
receive {amqp10_msg, Receiver, InMsg} ->
ok = amqp10_client:accept_msg(Receiver, InMsg)
after 30000 ->
after 2000 ->
Reason = delivery_timeout,
flush(Reason),
ct:fail(Reason)
Expand Down Expand Up @@ -1251,7 +1251,7 @@ drain_many(Config, QueueType, QName)
%% We expect the server to send us the last message and
%% to advance the delivery-count promptly.
receive {amqp10_msg, _, _} -> ok
after 30000 -> ct:fail({missing_delivery, ?LINE})
after 2000 -> ct:fail({missing_delivery, ?LINE})
end,
receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok
after 300 -> ct:fail("expected credit_exhausted")
Expand Down Expand Up @@ -3367,7 +3367,7 @@ last_queue_confirms(Config) ->
ok = rabbit_ct_broker_helpers:start_node(Config, 2),
%% Since the quorum queue has become available, we should now get a confirmation for m2.
receive {amqp10_disposition, {accepted, DTag2}} -> ok
after 30_000 -> ct:fail({missing_accepted, DTag2})
after 10_000 -> ct:fail({missing_accepted, DTag2})
end,

ok = amqp10_client:detach_link(SenderClassicQ),
Expand Down Expand Up @@ -3442,7 +3442,7 @@ target_queue_deleted(Config) ->
ok = rabbit_ct_broker_helpers:start_node(Config, ReplicaNode),
%% Since the quorum queue has become available, we should now get a confirmation for m2.
receive {amqp10_disposition, {accepted, DTag2}} -> ok
after 30_000 -> ct:fail({missing_accepted, DTag2})
after 10_000 -> ct:fail({missing_accepted, DTag2})
end,

ok = amqp10_client:detach_link(Sender),
Expand Down Expand Up @@ -3485,7 +3485,7 @@ target_classic_queue_down(Config) ->
%% We expect that the server closes links that receive from classic queues that are down.
ExpectedError = #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_ILLEGAL_STATE},
receive {amqp10_event, {link, Receiver1, {detached, ExpectedError}}} -> ok
after 30_000 -> ct:fail({missing_event, ?LINE})
after 10_000 -> ct:fail({missing_event, ?LINE})
end,
%% However the server should not close links that send to classic queues that are down.
receive Unexpected -> ct:fail({unexpected, Unexpected})
Expand Down Expand Up @@ -4397,15 +4397,6 @@ trace(Config) ->
{ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["trace_on"]),
{ok, SessionReceiver} = amqp10_client:begin_session_sync(Connection),

<<<<<<< HEAD
=======
{ok, Receiver} = amqp10_client:attach_receiver_link(SessionReceiver,
<<"test-receiver">>,
rabbitmq_amqp_address:queue(Q)),
receive {amqp10_event, {link, Receiver, attached}} -> ok
after 30000 -> ct:fail({missing_event, ?LINE})
end,
>>>>>>> 5ef4fba851 (tests: amqp_client_SUITE longer wait on receive for CI)
{ok, Sender} = amqp10_client:attach_sender_link(
SessionSender,
<<"test-sender">>,
Expand Down Expand Up @@ -4611,15 +4602,10 @@ idle_time_out_on_client(Config) ->
receive
{amqp10_event,
{connection, Connection,
<<<<<<< HEAD
{closed,
{resource_limit_exceeded,
<<"remote idle-time-out">>}}}} -> ok
after 5000 ->
=======
{closed, _}}} -> ok
after 30000 ->
>>>>>>> 5ef4fba851 (tests: amqp_client_SUITE longer wait on receive for CI)
ct:fail({missing_event, ?LINE})
end,

Expand Down Expand Up @@ -4655,7 +4641,7 @@ credential_expires(Config) ->
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
receive {amqp10_event, {connection, Connection, opened}} -> ok
after 30000 -> ct:fail({missing_event, ?LINE})
after 2000 -> ct:fail({missing_event, ?LINE})
end,

%% Since we don't renew our credential, we expect the server to close our connection.
Expand All @@ -4664,7 +4650,7 @@ credential_expires(Config) ->
{connection, Connection,
{closed,
{unauthorized_access, <<"credential expired">>}}}} -> ok
after 30_000 ->
after 10_000 ->
flush(?LINE),
ct:fail({missing_event, ?LINE})
end,
Expand Down Expand Up @@ -5949,242 +5935,6 @@ reserved_annotation(Config) ->
end,
ok = close_connection_sync(Connection).

<<<<<<< HEAD
=======
%% Test that x-cc routing keys work together with target address
%% /exchanges/:exchange/:routing-key
x_cc_annotation_exchange(Config) ->
QName1 = <<"queue 1">>,
QName2 = <<"queue 2">>,
{Connection, Session, LinkPair} = init(Config),
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName1, #{}),
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName2, #{}),
ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName1, <<"amq.direct">>, <<"key 1">>, #{}),
ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName2, <<"amq.direct">>, <<"key 2">>, #{}),
Address = rabbitmq_amqp_address:exchange(<<"amq.direct">>, <<"key 1">>),
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address),
ok = wait_for_credit(Sender),

Payload = <<"my message">>,
ok = amqp10_client:send_msg(Sender, amqp10_msg:set_message_annotations(
#{<<"x-cc">> => {list, [{utf8, <<"key 2">>}]}},
amqp10_msg:new(<<"tag">>, Payload))),
ok = wait_for_accepted(<<"tag">>),
ok = amqp10_client:detach_link(Sender),

{ok, Receiver1} = amqp10_client:attach_receiver_link(
Session, <<"receiver 1">>, rabbitmq_amqp_address:queue(QName1), settled),
{ok, Receiver2} = amqp10_client:attach_receiver_link(
Session, <<"receiver 2">>, rabbitmq_amqp_address:queue(QName2), settled),
{ok, Msg1} = amqp10_client:get_msg(Receiver1),
{ok, Msg2} = amqp10_client:get_msg(Receiver2),
?assertEqual([Payload], amqp10_msg:body(Msg1)),
?assertEqual([Payload], amqp10_msg:body(Msg2)),

{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName1),
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName2),
ok = end_session_sync(Session),
ok = amqp10_client:close_connection(Connection).

%% Test that x-cc routing keys work together with target address
%% /exchanges/:exchange
x_cc_annotation_exchange_routing_key_empty(Config) ->
QName1 = <<"queue 1">>,
QName2 = <<"queue 2">>,
{Connection, Session, LinkPair} = init(Config),
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName1, #{}),
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName2, #{}),
ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName1, <<"amq.direct">>, <<"key 1">>, #{}),
ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName2, <<"amq.direct">>, <<"key 2">>, #{}),
AddressEmptyRoutingKey = rabbitmq_amqp_address:exchange(<<"amq.direct">>),
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, AddressEmptyRoutingKey),
ok = wait_for_credit(Sender),

Payload = <<"my message">>,
ok = amqp10_client:send_msg(Sender, amqp10_msg:set_message_annotations(
#{<<"x-cc">> => {list, [{utf8, <<"key 1">>},
{utf8, <<"key 2">>}]}},
amqp10_msg:new(<<"tag">>, Payload))),
ok = wait_for_accepted(<<"tag">>),
ok = amqp10_client:detach_link(Sender),

{ok, Receiver1} = amqp10_client:attach_receiver_link(
Session, <<"receiver 1">>, rabbitmq_amqp_address:queue(QName1), settled),
{ok, Receiver2} = amqp10_client:attach_receiver_link(
Session, <<"receiver 2">>, rabbitmq_amqp_address:queue(QName2), settled),
{ok, Msg1} = amqp10_client:get_msg(Receiver1),
{ok, Msg2} = amqp10_client:get_msg(Receiver2),
?assertEqual([Payload], amqp10_msg:body(Msg1)),
?assertEqual([Payload], amqp10_msg:body(Msg2)),

{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName1),
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName2),
ok = end_session_sync(Session),
ok = amqp10_client:close_connection(Connection).

%% Test that x-cc routing keys work together with target address
%% /queues/:queue
x_cc_annotation_queue(Config) ->
QName1 = <<"queue 1">>,
QName2 = <<"queue 2">>,
Address1 = rabbitmq_amqp_address:queue(QName1),
Address2 = rabbitmq_amqp_address:queue(QName2),
{Connection, Session, LinkPair} = init(Config),
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName1, #{}),
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName2, #{}),
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address1),
ok = wait_for_credit(Sender),

Payload = <<"my message">>,
ok = amqp10_client:send_msg(Sender, amqp10_msg:set_message_annotations(
#{<<"x-cc">> => {list, [{utf8, QName2}]}},
amqp10_msg:new(<<"tag">>, Payload))),
ok = wait_for_accepted(<<"tag">>),
ok = amqp10_client:detach_link(Sender),

{ok, Receiver1} = amqp10_client:attach_receiver_link(Session, <<"receiver 1">>, Address1, settled),
{ok, Receiver2} = amqp10_client:attach_receiver_link(Session, <<"receiver 2">>, Address2, settled),
{ok, Msg1} = amqp10_client:get_msg(Receiver1),
{ok, Msg2} = amqp10_client:get_msg(Receiver2),
?assertEqual([Payload], amqp10_msg:body(Msg1)),
?assertEqual([Payload], amqp10_msg:body(Msg2)),

{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName1),
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName2),
ok = end_session_sync(Session),
ok = amqp10_client:close_connection(Connection).

%% Test that x-cc routing keys work together with target address 'null'
x_cc_annotation_null(Config) ->
QName1 = <<"queue 1">>,
QName2 = <<"queue 2">>,
QAddress1 = rabbitmq_amqp_address:queue(QName1),
QAddress2 = rabbitmq_amqp_address:queue(QName2),
{Connection, Session, LinkPair} = init(Config),
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName1, #{}),
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName2, #{}),
ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName1, <<"amq.direct">>, <<"key-1">>, #{}),
ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName2, <<"amq.direct">>, <<"🗝️-2"/utf8>>, #{}),
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, null),
ok = wait_for_credit(Sender),
{ok, Receiver1} = amqp10_client:attach_receiver_link(Session, <<"receiver 1">>, QAddress1, settled),
{ok, Receiver2} = amqp10_client:attach_receiver_link(Session, <<"receiver 2">>, QAddress2, settled),

Msg1 = amqp10_msg:set_message_annotations(
#{<<"x-cc">> => {list, [{utf8, <<"key-1">>},
{utf8, <<"key-3">>}]}},
amqp10_msg:set_properties(
#{to => rabbitmq_amqp_address:exchange(<<"amq.direct">>, <<"🗝️-2"/utf8>>)},
amqp10_msg:new(<<"t1">>, <<"m1">>))),
ok = amqp10_client:send_msg(Sender, Msg1),
ok = wait_for_accepted(<<"t1">>),
{ok, R1M1} = amqp10_client:get_msg(Receiver1),
{ok, R2M1} = amqp10_client:get_msg(Receiver2),
?assertEqual([<<"m1">>], amqp10_msg:body(R1M1)),
?assertEqual([<<"m1">>], amqp10_msg:body(R2M1)),

Msg2 = amqp10_msg:set_message_annotations(
#{<<"x-cc">> => {list, [{utf8, <<"🗝️-2"/utf8>>},
{utf8, <<"key-1">>}]}},
amqp10_msg:set_properties(
#{to => rabbitmq_amqp_address:exchange(<<"amq.direct">>)},
amqp10_msg:new(<<"t2">>, <<"m2">>))),
ok = amqp10_client:send_msg(Sender, Msg2),
ok = wait_for_accepted(<<"t2">>),
{ok, R1M2} = amqp10_client:get_msg(Receiver1),
{ok, R2M2} = amqp10_client:get_msg(Receiver2),
?assertEqual([<<"m2">>], amqp10_msg:body(R1M2)),
?assertEqual([<<"m2">>], amqp10_msg:body(R2M2)),

Msg3 = amqp10_msg:set_message_annotations(
#{<<"x-cc">> => {list, [{utf8, QName1}]}},
amqp10_msg:set_properties(
#{to => rabbitmq_amqp_address:queue(QName2)},
amqp10_msg:new(<<"t3">>, <<"m3">>))),
ok = amqp10_client:send_msg(Sender, Msg3),
ok = wait_for_accepted(<<"t3">>),
{ok, R1M3} = amqp10_client:get_msg(Receiver1),
{ok, R2M3} = amqp10_client:get_msg(Receiver2),
?assertEqual([<<"m3">>], amqp10_msg:body(R1M3)),
?assertEqual([<<"m3">>], amqp10_msg:body(R2M3)),

Msg4 = amqp10_msg:set_message_annotations(
%% We send a symbol instead of utf8..
#{<<"x-cc">> => {list, [{symbol, QName1}]}},
amqp10_msg:set_properties(
#{to => rabbitmq_amqp_address:queue(QName2)},
amqp10_msg:new(<<"t4">>, <<"m4">>))),
ok = amqp10_client:send_msg(Sender, Msg4),
%% "If the source of the link supports the rejected outcome, and the message has not
%% already been settled by the sender, then the routing node MUST reject the message.
%% In this case the error field of rejected MUST contain the error which would have been communicated
%% in the detach which would have be sent if a link to the same address had been attempted."
%% https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-routingerrors
receive {amqp10_disposition, {{rejected, Error}, <<"t4">>}} ->
?assertMatch(
#'v1_0.error'{
condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD,
description = {utf8, <<"bad value for 'x-cc' message-annotation:", _/binary>>}},
Error)
after 30000 -> ct:fail({missing_event, ?LINE})
end,

ok = amqp10_client:detach_link(Sender),
ok = amqp10_client:detach_link(Receiver1),
ok = amqp10_client:detach_link(Receiver2),
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName1),
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName2),
ok = end_session_sync(Session),
ok = amqp10_client:close_connection(Connection).

bad_x_cc_annotation_exchange(Config) ->
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session(Connection),

Address = rabbitmq_amqp_address:exchange(<<"amq.direct">>, <<"key-1">>),
{ok, Sender1} = amqp10_client:attach_sender_link(Session, <<"sender 1">>, Address),
ok = wait_for_credit(Sender1),
ok = amqp10_client:send_msg(
Sender1,
amqp10_msg:set_message_annotations(
%% We send an array instead of a list.
#{<<"x-cc">> => {array, utf8, [{utf8, <<"🗝️-2"/utf8>>}]}},
amqp10_msg:new(<<"t1">>, <<"m1">>))),
ok = wait_for_settlement(<<"t1">>, released),
receive {amqp10_event, {link, Sender1, {detached, Error1}}} ->
?assertMatch(
#'v1_0.error'{
condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD,
description = {utf8, <<"bad value for 'x-cc' message-annotation: "
"{array,utf8,[{utf8,<<\"🗝️-2"/utf8, _Rest/binary>>}},
Error1)
after 30000 -> ct:fail({missing_event, ?LINE})
end,

{ok, Sender2} = amqp10_client:attach_sender_link(Session, <<"sender 2">>, Address),
ok = wait_for_credit(Sender2),
ok = amqp10_client:send_msg(
Sender2,
amqp10_msg:set_message_annotations(
%% We include a non-utf8 type in the list.
#{<<"x-cc">> => {list, [{symbol, <<"key-3">>}]}},
amqp10_msg:new(<<"t2">>, <<"m2">>))),
ok = wait_for_settlement(<<"t2">>, released),
receive {amqp10_event, {link, Sender2, {detached, Error2}}} ->
?assertEqual(
#'v1_0.error'{
condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD,
description = {utf8, <<"bad value for 'x-cc' message-annotation: "
"{list,[{symbol,<<\"key-3\">>}]}">>}},
Error2)
after 30000 -> ct:fail({missing_event, ?LINE})
end,

ok = end_session_sync(Session),
ok = amqp10_client:close_connection(Connection).

>>>>>>> 5ef4fba851 (tests: amqp_client_SUITE longer wait on receive for CI)
%% internal
%%

Expand Down Expand Up @@ -6245,7 +5995,7 @@ wait_for_credit(Sender) ->
receive
{amqp10_event, {link, Sender, credited}} ->
ok
after 5000 ->
after 30000 ->
flush("wait_for_credit timed out"),
ct:fail(credited_timeout)
end.
Expand All @@ -6259,7 +6009,7 @@ wait_for_link_detach(Link) ->
{amqp10_event, {link, Link, {detached, normal}}} ->
flush(?FUNCTION_NAME),
ok
after 5000 ->
after 30000 ->
flush("wait_for_link_detach timed out"),
ct:fail({link_detach_timeout, Link})
end.
Expand All @@ -6273,7 +6023,7 @@ wait_for_session_end(Session) ->
{amqp10_event, {session, Session, {ended, _}}} ->
flush(?FUNCTION_NAME),
ok
after 5000 ->
after 30000 ->
flush("wait_for_session_end timed out"),
ct:fail({session_end_timeout, Session})
end.
Expand All @@ -6287,7 +6037,7 @@ wait_for_connection_close(Connection) ->
{amqp10_event, {connection, Connection, {closed, normal}}} ->
flush(?FUNCTION_NAME),
ok
after 5000 ->
after 30000 ->
flush("wait_for_connection_close timed out"),
ct:fail({connection_close_timeout, Connection})
end.
Expand All @@ -6310,7 +6060,7 @@ wait_for_accepts(N) ->
receive
{amqp10_disposition,{accepted,_}} ->
wait_for_accepts(N - 1)
after 5000 ->
after 30000 ->
ct:fail({missing_accepted, N})
end.

Expand Down Expand Up @@ -6402,7 +6152,7 @@ assert_link_credit_runs_out(Sender, Left) ->
receive {amqp10_event, {link, Sender, credited}} ->
ct:pal("credited with ~b messages left", [Left]),
assert_link_credit_runs_out(Sender, Left - 1)
after 30000 ->
after 500 ->
ct:pal("insufficient link credit with ~b messages left", [Left]),
ok
end
Expand Down

0 comments on commit 746a7f2

Please sign in to comment.