diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index aae2dfbe9ca..26e4e78f3da 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -821,7 +821,7 @@ sender_settle_mode_unsettled(Config) -> %% Wait for confirms. [receive {amqp10_disposition, {accepted, DTag}} -> ok - after 5000 -> ct:fail({missing_accepted, DTag}) + after 30000 -> ct:fail({missing_accepted, DTag}) end || DTag <- DTags], ok = amqp10_client:detach_link(Sender), @@ -854,7 +854,7 @@ sender_settle_mode_unsettled_fanout(Config) -> %% Wait for confirms. [receive {amqp10_disposition, {accepted, DTag}} -> ok - after 5000 -> ct:fail({missing_accepted, DTag}) + after 30000 -> ct:fail({missing_accepted, DTag}) end || DTag <- DTags], ok = amqp10_client:detach_link(Sender), @@ -897,7 +897,7 @@ sender_settle_mode_mixed(Config) -> %% Wait for confirms. [receive {amqp10_disposition, {accepted, DTag}} -> ok - after 5000 -> ct:fail({missing_accepted, DTag}) + after 30000 -> ct:fail({missing_accepted, DTag}) end || DTag <- DTags], ok = amqp10_client:detach_link(Sender), @@ -931,7 +931,7 @@ invalid_transfer_settled_flag(Config) -> ?assertEqual( <<"sender settle mode is 'settled' but transfer settled flag is interpreted as being 'false'">>, Description1) - after 5000 -> flush(missing_ended), + after 30000 -> flush(missing_ended), ct:fail({missing_event, ?LINE}) end, @@ -946,7 +946,7 @@ invalid_transfer_settled_flag(Config) -> ?assertEqual( <<"sender settle mode is 'unsettled' but transfer settled flag is interpreted as being 'true'">>, Description2) - after 5000 -> flush(missing_ended), + after 30000 -> flush(missing_ended), ct:fail({missing_event, ?LINE}) end, @@ -970,7 +970,7 @@ quorum_queue_rejects(Config) -> ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag a">>, <<>>, false)), ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag b">>, <<>>, false)), [receive {amqp10_disposition, {accepted, DTag}} -> ok - after 5000 -> ct:fail({missing_accepted, DTag}) + after 30000 -> ct:fail({missing_accepted, DTag}) end || DTag <- [<<"tag a">>, <<"tag b">>]], %% From now on the quorum queue should reject our publishes. @@ -988,7 +988,7 @@ quorum_queue_rejects(Config) -> ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag d">>, <<>>, false)), [receive {amqp10_disposition, {rejected, DTag}} -> ok - after 5000 -> ct:fail({missing_rejected, DTag}) + after 30000 -> ct:fail({missing_rejected, DTag}) end || DTag <- DTags ++ [<<"tag d">>]], ok = amqp10_client:detach_link(Sender), @@ -1022,7 +1022,7 @@ receiver_settle_mode_first(Config) -> ok = amqp10_client:flow_link_credit(Receiver, 9, never), Msgs_1_to_9 = receive_messages(Receiver, 9), receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok - after 5000 -> ct:fail("expected credit_exhausted") + after 30000 -> ct:fail("expected credit_exhausted") end, assert_messages(QName, 10, 9, Config), @@ -1360,45 +1360,45 @@ amqp_amqpl(QType, Config) -> #amqp_msg{payload = Payload1, props = #'P_basic'{type = <<"amqp-1.0">>}}} -> ?assertEqual([Body1], amqp10_framing:decode_bin(Payload1)) - after 5000 -> ct:fail({missing_deliver, ?LINE}) + after 30000 -> ct:fail({missing_deliver, ?LINE}) end, receive {_, #amqp_msg{payload = Payload2, props = #'P_basic'{type = <<"amqp-1.0">>}}} -> ?assertEqual([Body2], amqp10_framing:decode_bin(Payload2)) - after 5000 -> ct:fail({missing_deliver, ?LINE}) + after 30000 -> ct:fail({missing_deliver, ?LINE}) end, receive {_, #amqp_msg{payload = Payload3, props = #'P_basic'{type = <<"amqp-1.0">>}}} -> ?assertEqual(Body3, amqp10_framing:decode_bin(Payload3)) - after 5000 -> ct:fail({missing_deliver, ?LINE}) + after 30000 -> ct:fail({missing_deliver, ?LINE}) end, receive {_, #amqp_msg{payload = Payload4, props = #'P_basic'{type = <<"amqp-1.0">>}}} -> ?assertEqual(Body4, amqp10_framing:decode_bin(Payload4)) - after 5000 -> ct:fail({missing_deliver, ?LINE}) + after 30000 -> ct:fail({missing_deliver, ?LINE}) end, receive {_, #amqp_msg{payload = Payload5, props = #'P_basic'{type = undefined}}} -> ?assertEqual(<<0, 255>>, Payload5) - after 5000 -> ct:fail({missing_deliver, ?LINE}) + after 30000 -> ct:fail({missing_deliver, ?LINE}) end, receive {_, #amqp_msg{payload = Payload6, props = #'P_basic'{type = undefined}}} -> %% We expect that RabbitMQ concatenates the binaries of multiple data sections. ?assertEqual(<<0, 1, 2, 3>>, Payload6) - after 5000 -> ct:fail({missing_deliver, ?LINE}) + after 30000 -> ct:fail({missing_deliver, ?LINE}) end, receive {_, #amqp_msg{payload = Payload7, props = #'P_basic'{headers = Headers7}}} -> ?assertEqual([Body1], amqp10_framing:decode_bin(Payload7)), ?assertEqual({signedint, -2}, rabbit_misc:table_lookup(Headers7, <<"my int">>)) - after 5000 -> ct:fail({missing_deliver, ?LINE}) + after 30000 -> ct:fail({missing_deliver, ?LINE}) end, receive {_, #amqp_msg{payload = Payload8, props = #'P_basic'{correlation_id = Corr8}}} -> ?assertEqual([Body1], amqp10_framing:decode_bin(Payload8)), ?assertEqual(CorrelationID, Corr8) - after 5000 -> ct:fail({missing_deliver, ?LINE}) + after 30000 -> ct:fail({missing_deliver, ?LINE}) end, receive {_, #amqp_msg{payload = Payload9, props = #'P_basic'{headers = Headers9, @@ -1406,20 +1406,20 @@ amqp_amqpl(QType, Config) -> ?assertEqual([Body1], amqp10_framing:decode_bin(Payload9)), ?assertEqual(CorrelationID, Corr9), ?assertEqual({signedint, -2}, rabbit_misc:table_lookup(Headers9, <<"my int">>)) - after 5000 -> ct:fail({missing_deliver, ?LINE}) + after 30000 -> ct:fail({missing_deliver, ?LINE}) end, receive {_, #amqp_msg{payload = Payload10}} -> %% RabbitMQ converts the entire AMQP encoded body including the footer %% to AMQP legacy payload. ?assertEqual([Body1, Footer], amqp10_framing:decode_bin(Payload10)) - after 5000 -> ct:fail({missing_deliver, ?LINE}) + after 30000 -> ct:fail({missing_deliver, ?LINE}) end, receive {_, #amqp_msg{payload = Payload11, props = #'P_basic'{headers = Headers11}}} -> ?assertEqual([Body1], amqp10_framing:decode_bin(Payload11)), ?assertEqual({array, [{longstr, <<"e1">>}, {longstr, <<"e2">>}]}, rabbit_misc:table_lookup(Headers11, <<"x-array">>)) - after 5000 -> ct:fail({missing_deliver, ?LINE}) + after 30000 -> ct:fail({missing_deliver, ?LINE}) end, ok = rabbit_ct_client_helpers:close_channel(Ch), @@ -1534,10 +1534,10 @@ multiple_sessions(Config) -> {ok, Receiver2} = amqp10_client:attach_receiver_link( Session2, <<"receiver link 2">>, Q2, settled, configuration), receive {amqp10_event, {link, Receiver1, attached}} -> ok - after 5000 -> ct:fail("missing attached") + after 30000 -> ct:fail("missing attached") end, receive {amqp10_event, {link, Receiver2, attached}} -> ok - after 5000 -> ct:fail("missing attached") + after 30000 -> ct:fail("missing attached") end, NMsgsPerSender = 20, NMsgsPerReceiver = NMsgsPerSender * 2, % due to fanout @@ -1621,7 +1621,7 @@ server_closes_link(QType, Config) -> {ok, Receiver} = amqp10_client:attach_receiver_link( Session, <<"test-receiver">>, Address, unsettled), receive {amqp10_event, {link, Receiver, attached}} -> ok - after 5000 -> ct:fail("missing ATTACH frame from server") + after 30000 -> ct:fail("missing ATTACH frame from server") end, ok = amqp10_client:flow_link_credit(Receiver, 5, never), @@ -1636,7 +1636,7 @@ server_closes_link(QType, Config) -> receive {amqp10_msg, Receiver, Msg} -> ?assertEqual([Body], amqp10_msg:body(Msg)) - after 5000 -> ct:fail("missing msg") + after 30000 -> ct:fail("missing msg") end, [SessionPid] = rpc(Config, rabbit_amqp_session, list_local, []), @@ -1656,11 +1656,11 @@ server_closes_link(QType, Config) -> %% i.e. the server sends us DETACH frames. ExpectedError = #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_RESOURCE_DELETED}, receive {amqp10_event, {link, Sender, {detached, ExpectedError}}} -> ok - after 5000 -> ct:fail("server did not close our outgoing link") + after 30000 -> ct:fail("server did not close our outgoing link") end, receive {amqp10_event, {link, Receiver, {detached, ExpectedError}}} -> ok - after 5000 -> ct:fail("server did not close our incoming link") + after 30000 -> ct:fail("server did not close our incoming link") end, %% Our client has not and will not settle the delivery since the source queue got deleted and @@ -1723,7 +1723,7 @@ server_closes_link_exchange(Settled, Config) -> receive {amqp10_event, {link, Sender, {detached, #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_NOT_FOUND}}}} -> ok - after 5000 -> ct:fail("server did not close our outgoing link") + after 30000 -> ct:fail("server did not close our outgoing link") end, ?assertMatch(#{publishers := 0}, get_global_counters(Config)), @@ -1784,7 +1784,7 @@ link_target_queue_deleted(QType, Config) -> %% that the target link endpoint - the queue - got deleted. ExpectedError = #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_RESOURCE_DELETED}, receive {amqp10_event, {link, Sender, {detached, ExpectedError}}} -> ok - after 5000 -> ct:fail("server did not close our outgoing link") + after 30000 -> ct:fail("server did not close our outgoing link") end, ?assert(rpc(Config, meck, validate, [Mod])), @@ -1845,7 +1845,7 @@ target_queues_deleted_accepted(Config) -> ?assertEqual(#'queue.delete_ok'{message_count = 1}, amqp_channel:call(Ch, #'queue.delete'{queue = Q3})), receive {amqp10_disposition, {accepted, DTag2}} -> ok - after 5000 -> ct:fail(accepted_timeout) + after 30000 -> ct:fail(accepted_timeout) end, ?assertEqual(#'queue.delete_ok'{message_count = 2}, @@ -1872,7 +1872,7 @@ events(Config) -> OpnConf = OpnConf0#{properties => #{<<"ignore-maintenance">> => {boolean, true}}}, {ok, Connection} = amqp10_client:open_connection(OpnConf), receive {amqp10_event, {connection, Connection, opened}} -> ok - after 5000 -> ct:fail(opened_timeout) + after 30000 -> ct:fail(opened_timeout) end, ok = close_connection_sync(Connection), @@ -1949,7 +1949,7 @@ sync_get_unsettled(QType, Config) -> {ok, Receiver} = amqp10_client:attach_receiver_link( Session, <<"test-receiver">>, Address, SenderSettleMode), receive {amqp10_event, {link, Receiver, attached}} -> ok - after 5000 -> ct:fail("missing attached") + after 30000 -> ct:fail("missing attached") end, flush(receiver_attached), @@ -1971,10 +1971,10 @@ sync_get_unsettled(QType, Config) -> M1 = receive {amqp10_msg, Receiver, Msg1} -> ?assertEqual([<<"m1">>], amqp10_msg:body(Msg1)), Msg1 - after 5000 -> ct:fail("missing m1") + after 30000 -> ct:fail("missing m1") end, receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok - after 5000 -> ct:fail("expected credit_exhausted") + after 30000 -> ct:fail("expected credit_exhausted") end, receive {amqp10_msg, _, _} = Unexp2 -> ct:fail("received unexpected message ~p", [Unexp2]) after 10 -> ok @@ -1985,10 +1985,10 @@ sync_get_unsettled(QType, Config) -> M2 = receive {amqp10_msg, Receiver, Msg2} -> ?assertEqual([<<"m2">>], amqp10_msg:body(Msg2)), Msg2 - after 5000 -> ct:fail("missing m2") + after 30000 -> ct:fail("missing m2") end, receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok - after 5000 -> ct:fail("expected credit_exhausted") + after 30000 -> ct:fail("expected credit_exhausted") end, receive {amqp10_msg, _, _} = Unexp3 -> ct:fail("received unexpected message ~p", [Unexp3]) after 10 -> ok @@ -2006,10 +2006,10 @@ sync_get_unsettled(QType, Config) -> ok = amqp10_client:flow_link_credit(Receiver, 1, never), receive {amqp10_msg, Receiver, Msg3} -> ?assertEqual([<<"m3">>], amqp10_msg:body(Msg3)) - after 5000 -> ct:fail("missing m3") + after 30000 -> ct:fail("missing m3") end, receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok - after 5000 -> ct:fail("expected credit_exhausted") + after 30000 -> ct:fail("expected credit_exhausted") end, receive {amqp10_msg, _, _} = Unexp5 -> ct:fail("received unexpected message ~p", [Unexp5]) after 10 -> ok @@ -2056,7 +2056,7 @@ sync_get_unsettled_2(QType, Config) -> Address, SenderSettleMode), receive {amqp10_event, {link, Receiver, attached}} -> ok - after 5000 -> ct:fail("missing attached") + after 30000 -> ct:fail("missing attached") end, flush(receiver_attached), @@ -2069,13 +2069,13 @@ sync_get_unsettled_2(QType, Config) -> %% We should receive exactly 2 messages. receive {amqp10_msg, Receiver, Msg1} -> ?assertEqual([<<"m1">>], amqp10_msg:body(Msg1)) - after 5000 -> ct:fail("missing m1") + after 30000 -> ct:fail("missing m1") end, receive {amqp10_msg, Receiver, Msg2} -> ?assertEqual([<<"m2">>], amqp10_msg:body(Msg2)) - after 5000 -> ct:fail("missing m2") + after 30000 -> ct:fail("missing m2") end, receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok - after 5000 -> ct:fail("expected credit_exhausted") + after 30000 -> ct:fail("expected credit_exhausted") end, receive {amqp10_msg, _, _} = Unexp1 -> ct:fail("received unexpected message ~p", [Unexp1]) after 50 -> ok @@ -2085,13 +2085,13 @@ sync_get_unsettled_2(QType, Config) -> ok = amqp10_client:flow_link_credit(Receiver, 2, never), %% Again, we should receive exactly 2 messages. receive {amqp10_msg, Receiver, Msg3} -> ?assertEqual([<<"m3">>], amqp10_msg:body(Msg3)) - after 5000 -> ct:fail("missing m3") + after 30000 -> ct:fail("missing m3") end, receive {amqp10_msg, Receiver, Msg4} -> ?assertEqual([<<"m4">>], amqp10_msg:body(Msg4)) - after 5000 -> ct:fail("missing m4") + after 30000 -> ct:fail("missing m4") end, receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok - after 5000 -> ct:fail("expected credit_exhausted") + after 30000 -> ct:fail("expected credit_exhausted") end, receive {amqp10_msg, _, _} = Unexp2 -> ct:fail("received unexpected message ~p", [Unexp2]) after 50 -> ok @@ -2102,7 +2102,7 @@ sync_get_unsettled_2(QType, Config) -> %% We should receive the last (5th) message. receive {amqp10_msg, Receiver, Msg5} -> ?assertEqual([<<"m5">>], amqp10_msg:body(Msg5)) - after 5000 -> ct:fail("missing m5") + after 30000 -> ct:fail("missing m5") end, ok = amqp10_client:detach_link(Sender), @@ -2143,7 +2143,7 @@ sync_get_settled(QType, Config) -> {ok, Receiver} = amqp10_client:attach_receiver_link( Session, <<"my receiver">>, Address, SenderSettleMode), receive {amqp10_event, {link, Receiver, attached}} -> ok - after 5000 -> ct:fail("missing attached") + after 30000 -> ct:fail("missing attached") end, flush(receiver_attached), @@ -2163,10 +2163,10 @@ sync_get_settled(QType, Config) -> %% Since we previously granted only 1 credit, we should get only the 1st message. receive {amqp10_msg, Receiver, Msg1} -> ?assertEqual([<<"m1">>], amqp10_msg:body(Msg1)) - after 5000 -> ct:fail("missing m1") + after 30000 -> ct:fail("missing m1") end, receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok - after 5000 -> ct:fail("expected credit_exhausted") + after 30000 -> ct:fail("expected credit_exhausted") end, receive {amqp10_msg, _, _} = Unexp2 -> ct:fail("received unexpected message ~p", [Unexp2]) after 10 -> ok @@ -2176,10 +2176,10 @@ sync_get_settled(QType, Config) -> ok = amqp10_client:flow_link_credit(Receiver, 1, never), receive {amqp10_msg, Receiver, Msg2} -> ?assertEqual([<<"m2">>], amqp10_msg:body(Msg2)) - after 5000 -> ct:fail("missing m2") + after 30000 -> ct:fail("missing m2") end, receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok - after 5000 -> ct:fail("expected credit_exhausted") + after 30000 -> ct:fail("expected credit_exhausted") end, receive {amqp10_msg, _, _} = Unexp3 -> ct:fail("received unexpected message ~p", [Unexp3]) after 10 -> ok @@ -2225,7 +2225,7 @@ timed_get(QType, Config) -> Address, unsettled), receive {amqp10_event, {link, Receiver, attached}} -> ok - after 5000 -> ct:fail("missing attached") + after 30000 -> ct:fail("missing attached") end, flush(receiver_attached), @@ -2238,7 +2238,7 @@ timed_get(QType, Config) -> ok = amqp10_client:flow_link_credit(Receiver, 1, never, true), receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok - after 5000 -> ct:fail("expected credit_exhausted") + after 30000 -> ct:fail("expected credit_exhausted") end, ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"my tag">>, <<"my msg">>, true)), @@ -2251,10 +2251,10 @@ timed_get(QType, Config) -> ok = amqp10_client:flow_link_credit(Receiver, 1, never, true), receive {amqp10_msg, Receiver, Msg1} -> ?assertEqual([<<"my msg">>], amqp10_msg:body(Msg1)) - after 5000 -> ct:fail("missing 'my msg'") + after 30000 -> ct:fail("missing 'my msg'") end, receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok - after 5000 -> ct:fail("expected credit_exhausted") + after 30000 -> ct:fail("expected credit_exhausted") end, ok = amqp10_client:detach_link(Receiver), @@ -2296,7 +2296,7 @@ stop(QType, Config) -> {ok, Receiver} = amqp10_client:attach_receiver_link( Session, <<"test-receiver">>, Address, settled), receive {amqp10_event, {link, Receiver, attached}} -> ok - after 5000 -> ct:fail("missing attached") + after 30000 -> ct:fail("missing attached") end, flush(receiver_attached), @@ -2398,25 +2398,25 @@ consumer_priority(QType, Config) -> ?assertEqual(<<"1">>, amqp10_msg:body_bin(Msg1)), ?assertEqual(ReceiverHighPrio, Rec1), ok = amqp10_client:accept_msg(Rec1, Msg1) - after 5000 -> ct:fail({missing_msg, ?LINE}) + after 30000 -> ct:fail({missing_msg, ?LINE}) end, receive {amqp10_msg, Rec2, Msg2} -> ?assertEqual(<<"2">>, amqp10_msg:body_bin(Msg2)), ?assertEqual(ReceiverHighPrio, Rec2), ok = amqp10_client:accept_msg(Rec2, Msg2) - after 5000 -> ct:fail({missing_msg, ?LINE}) + after 30000 -> ct:fail({missing_msg, ?LINE}) end, receive {amqp10_msg, Rec3, Msg3} -> ?assertEqual(<<"3">>, amqp10_msg:body_bin(Msg3)), ?assertEqual(ReceiverDefaultPrio, Rec3), ok = amqp10_client:accept_msg(Rec3, Msg3) - after 5000 -> ct:fail({missing_msg, ?LINE}) + after 30000 -> ct:fail({missing_msg, ?LINE}) end, receive {amqp10_msg, Rec4, Msg4} -> ?assertEqual(<<"4">>, amqp10_msg:body_bin(Msg4)), ?assertEqual(ReceiverLowPrio, Rec4), ok = amqp10_client:accept_msg(Rec4, Msg4) - after 5000 -> ct:fail({missing_msg, ?LINE}) + after 30000 -> ct:fail({missing_msg, ?LINE}) end, receive {amqp10_msg, _, _} = Unexpected -> ct:fail({unexpected_msg, Unexpected, ?LINE}) @@ -2455,7 +2455,7 @@ single_active_consumer_priority_quorum_queue(Config) -> {ok, Recv1} = amqp10_client:attach_receiver_link( Session1, <<"receiver 1">>, Address, unsettled), receive {amqp10_event, {link, Recv1, attached}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after 30000 -> ct:fail({missing_event, ?LINE}) end, {ok, Msg1} = amqp10_client:get_msg(Recv1), @@ -2466,7 +2466,7 @@ single_active_consumer_priority_quorum_queue(Config) -> Session1, <<"receiver 2">>, Address, unsettled, none, #{}, #{<<"rabbitmq:priority">> => {int, 1}}), receive {amqp10_event, {link, Recv2, attached}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after 30000 -> ct:fail({missing_event, ?LINE}) end, flush("attched receiver 2"), @@ -2480,7 +2480,7 @@ single_active_consumer_priority_quorum_queue(Config) -> ?assertEqual([<<"2">>], amqp10_msg:body(Msg2)), ?assertEqual(Recv2, R1), ok = amqp10_client:accept_msg(Recv2, Msg2) - after 5000 -> ct:fail({missing_msg, ?LINE}) + after 30000 -> ct:fail({missing_msg, ?LINE}) end, %% Attaching with same prio should not take over. @@ -2489,7 +2489,7 @@ single_active_consumer_priority_quorum_queue(Config) -> Session2, <<"receiver 3">>, Address, unsettled, none, #{}, #{<<"rabbitmq:priority">> => {int, 1}}), receive {amqp10_event, {link, Recv3, attached}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after 30000 -> ct:fail({missing_event, ?LINE}) end, ?assertEqual({error, timeout}, amqp10_client:get_msg(Recv3, 5)), ok = end_session_sync(Session2), @@ -2498,14 +2498,14 @@ single_active_consumer_priority_quorum_queue(Config) -> Session1, <<"receiver 4">>, Address, unsettled, none, #{}, #{<<"rabbitmq:priority">> => {int, 1}}), receive {amqp10_event, {link, Recv4, attached}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after 30000 -> ct:fail({missing_event, ?LINE}) end, {ok, Recv5} = amqp10_client:attach_receiver_link( Session1, <<"receiver 5">>, Address, unsettled, none, #{}, #{<<"rabbitmq:priority">> => {int, 1}}), receive {amqp10_event, {link, Recv5, attached}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after 30000 -> ct:fail({missing_event, ?LINE}) end, flush("attched receivers 4 and 5"), @@ -2515,7 +2515,7 @@ single_active_consumer_priority_quorum_queue(Config) -> %% Stop the active consumer. ok = amqp10_client:detach_link(Recv2), receive {amqp10_event, {link, Recv2, {detached, normal}}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after 30000 -> ct:fail({missing_event, ?LINE}) end, %% The 5th consumer should become the active one because it is up, @@ -2524,19 +2524,19 @@ single_active_consumer_priority_quorum_queue(Config) -> ?assertEqual([<<"3">>], amqp10_msg:body(Msg3)), ?assertEqual(Recv5, R2), ok = amqp10_client:accept_msg(Recv5, Msg3) - after 5000 -> ct:fail({missing_msg, ?LINE}) + after 30000 -> ct:fail({missing_msg, ?LINE}) end, receive {amqp10_msg, R3, Msg4} -> ?assertEqual([<<"4">>], amqp10_msg:body(Msg4)), ?assertEqual(Recv5, R3), ok = amqp10_client:accept_msg(Recv5, Msg4) - after 5000 -> ct:fail({missing_msg, ?LINE}) + after 30000 -> ct:fail({missing_msg, ?LINE}) end, %% Stop the active consumer. ok = amqp10_client:detach_link(Recv5), receive {amqp10_event, {link, Recv5, {detached, normal}}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after 30000 -> ct:fail({missing_event, ?LINE}) end, %% The 4th consumer should become the active one because it is up, @@ -2545,13 +2545,13 @@ single_active_consumer_priority_quorum_queue(Config) -> ?assertEqual([<<"5">>], amqp10_msg:body(Msg5)), ?assertEqual(Recv4, R4), ok = amqp10_client:accept_msg(Recv4, Msg5) - after 5000 -> ct:fail({missing_msg, ?LINE}) + after 30000 -> ct:fail({missing_msg, ?LINE}) end, %% Stop the active consumer. ok = amqp10_client:detach_link(Recv4), receive {amqp10_event, {link, Recv4, {detached, normal}}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after 30000 -> ct:fail({missing_event, ?LINE}) end, %% The only up consumer left is the 1st one (prio 0) which still has 1 credit. @@ -2559,7 +2559,7 @@ single_active_consumer_priority_quorum_queue(Config) -> ?assertEqual([<<"6">>], amqp10_msg:body(Msg6)), ?assertEqual(Recv1, R5), ok = amqp10_client:accept_msg(Recv1, Msg6) - after 5000 -> ct:fail({missing_msg, ?LINE}) + after 30000 -> ct:fail({missing_msg, ?LINE}) end, ok = amqp10_client:detach_link(Recv1), @@ -2595,7 +2595,7 @@ single_active_consumer(QType, Config) -> Address, unsettled), receive {amqp10_event, {link, Receiver1, attached}} -> ok - after 5000 -> ct:fail("missing attached") + after 30000 -> ct:fail("missing attached") end, ok = amqp10_client:flow_link_credit(Receiver1, 3, never), @@ -2606,7 +2606,7 @@ single_active_consumer(QType, Config) -> Address, unsettled), receive {amqp10_event, {link, Receiver2, attached}} -> ok - after 5000 -> ct:fail("missing attached") + after 30000 -> ct:fail("missing attached") end, ok = amqp10_client:flow_link_credit(Receiver2, 3, never), @@ -2619,16 +2619,16 @@ single_active_consumer(QType, Config) -> %% Only the active consumer should receive messages. M1 = receive {amqp10_msg, Receiver1, Msg1} -> ?assertEqual([<<"1">>], amqp10_msg:body(Msg1)), Msg1 - after 5000 -> ct:fail({missing_msg, ?LINE}) + after 30000 -> ct:fail({missing_msg, ?LINE}) end, receive {amqp10_msg, Receiver1, Msg2} -> ?assertEqual([<<"2">>], amqp10_msg:body(Msg2)) - after 5000 -> ct:fail({missing_msg, ?LINE}) + after 30000 -> ct:fail({missing_msg, ?LINE}) end, receive {amqp10_msg, Receiver1, Msg3} -> ?assertEqual([<<"3">>], amqp10_msg:body(Msg3)) - after 5000 -> ct:fail({missing_msg, ?LINE}) + after 30000 -> ct:fail({missing_msg, ?LINE}) end, receive {amqp10_event, {link, Receiver1, credit_exhausted}} -> ok - after 5000 -> ct:fail("expected credit_exhausted") + after 30000 -> ct:fail("expected credit_exhausted") end, receive Unexpected0 -> ct:fail("received unexpected ~p", [Unexpected0]) after 10 -> ok @@ -2640,7 +2640,7 @@ single_active_consumer(QType, Config) -> %% Cancelling the active consumer should cause the inactive to become active. ok = amqp10_client:detach_link(Receiver1), receive {amqp10_event, {link, Receiver1, {detached, normal}}} -> ok - after 5000 -> ct:fail("missing detached") + after 30000 -> ct:fail("missing detached") end, %% Since Receiver 1 didn't settle msg 2 and msg 3 but detached the link, @@ -2648,17 +2648,17 @@ single_active_consumer(QType, Config) -> %% With single-active-consumer, we expect the original message order to be retained. M2b = receive {amqp10_msg, Receiver2, Msg2b} -> ?assertEqual([<<"2">>], amqp10_msg:body(Msg2b)), Msg2b - after 5000 -> ct:fail({missing_msg, ?LINE}) + after 30000 -> ct:fail({missing_msg, ?LINE}) end, receive {amqp10_msg, Receiver2, Msg3b} -> ?assertEqual([<<"3">>], amqp10_msg:body(Msg3b)) - after 5000 -> ct:fail({missing_msg, ?LINE}) + after 30000 -> ct:fail({missing_msg, ?LINE}) end, M4 = receive {amqp10_msg, Receiver2, Msg4} -> ?assertEqual([<<"4">>], amqp10_msg:body(Msg4)), Msg4 - after 5000 -> ct:fail({missing_msg, ?LINE}) + after 30000 -> ct:fail({missing_msg, ?LINE}) end, receive {amqp10_event, {link, Receiver2, credit_exhausted}} -> ok - after 5000 -> ct:fail("expected credit_exhausted") + after 30000 -> ct:fail("expected credit_exhausted") end, receive Unexpected1 -> ct:fail("received unexpected ~p", [Unexpected1]) after 10 -> ok @@ -2704,7 +2704,7 @@ single_active_consumer_drain(QType, Config) -> Address, unsettled), receive {amqp10_event, {link, Receiver1, attached}} -> ok - after 5000 -> ct:fail("missing attached") + after 30000 -> ct:fail("missing attached") end, %% The 2nd consumer will become inactive. {ok, Receiver2} = amqp10_client:attach_receiver_link( @@ -2713,7 +2713,7 @@ single_active_consumer_drain(QType, Config) -> Address, unsettled), receive {amqp10_event, {link, Receiver2, attached}} -> ok - after 5000 -> ct:fail("missing attached") + after 30000 -> ct:fail("missing attached") end, flush(attached), @@ -2721,10 +2721,10 @@ single_active_consumer_drain(QType, Config) -> ok = amqp10_client:flow_link_credit(Receiver1, 100, never, true), ok = amqp10_client:flow_link_credit(Receiver2, 100, never, true), receive {amqp10_event, {link, Receiver1, credit_exhausted}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after 30000 -> ct:fail({missing_event, ?LINE}) end, receive {amqp10_event, {link, Receiver2, credit_exhausted}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after 30000 -> ct:fail({missing_event, ?LINE}) end, %% Send 2 messages. @@ -2745,24 +2745,24 @@ single_active_consumer_drain(QType, Config) -> receive {amqp10_msg, Receiver1, Msg1} -> ?assertEqual([<<"m1">>], amqp10_msg:body(Msg1)), ok = amqp10_client:accept_msg(Receiver1, Msg1) - after 5000 -> ct:fail({missing_msg, ?LINE}) + after 30000 -> ct:fail({missing_msg, ?LINE}) end, receive {amqp10_msg, Receiver1, Msg2} -> ?assertEqual([<<"m2">>], amqp10_msg:body(Msg2)), ok = amqp10_client:accept_msg(Receiver1, Msg2) - after 5000 -> ct:fail({missing_msg, ?LINE}) + after 30000 -> ct:fail({missing_msg, ?LINE}) end, receive {amqp10_event, {link, Receiver1, credit_exhausted}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after 30000 -> ct:fail({missing_event, ?LINE}) end, receive {amqp10_event, {link, Receiver2, credit_exhausted}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after 30000 -> ct:fail({missing_event, ?LINE}) end, %% Cancelling the active consumer should cause the inactive to become active. ok = amqp10_client:detach_link(Receiver1), receive {amqp10_event, {link, Receiver1, {detached, normal}}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after 30000 -> ct:fail({missing_event, ?LINE}) end, %% Send 1 more message. @@ -2780,15 +2780,15 @@ single_active_consumer_drain(QType, Config) -> receive {amqp10_msg, Receiver2, Msg3} -> ?assertEqual([<<"m3">>], amqp10_msg:body(Msg3)), ok = amqp10_client:accept_msg(Receiver2, Msg3) - after 5000 -> ct:fail({missing_msg, ?LINE}) + after 30000 -> ct:fail({missing_msg, ?LINE}) end, receive {amqp10_event, {link, Receiver2, credit_exhausted}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after 30000 -> ct:fail({missing_event, ?LINE}) end, ok = amqp10_client:detach_link(Receiver2), receive {amqp10_event, {link, Receiver2, {detached, normal}}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after 30000 -> ct:fail({missing_event, ?LINE}) end, ?assertMatch({ok, #{message_count := 0}}, rabbitmq_amqp_client:delete_queue(LinkPair, QName)), @@ -2845,12 +2845,12 @@ detach_requeue_one_session(QType, Config) -> {ok, Receiver1} = amqp10_client:attach_receiver_link( Session, <<"recv 1">>, Address, unsettled), receive {amqp10_event, {link, Receiver1, attached}} -> ok - after 5000 -> ct:fail("missing attached") + after 30000 -> ct:fail("missing attached") end, {ok, Receiver2} = amqp10_client:attach_receiver_link( Session, <<"recv 2">>, Address, unsettled), receive {amqp10_event, {link, Receiver2, attached}} -> ok - after 5000 -> ct:fail("missing attached") + after 30000 -> ct:fail("missing attached") end, flush(attached), @@ -2875,7 +2875,7 @@ detach_requeue_one_session(QType, Config) -> %% Let's detach the 1st receiver. ok = amqp10_client:detach_link(Receiver1), receive {amqp10_event, {link, Receiver1, {detached, normal}}} -> ok - after 5000 -> ct:fail("missing detached") + after 30000 -> ct:fail("missing detached") end, %% Since Receiver1 hasn't settled its 2 deliveries, @@ -2928,11 +2928,11 @@ detach_requeues_drop_head_classic_queue(Config) -> ok = wait_for_credit(Sender), {ok, Receiver1} = amqp10_client:attach_receiver_link(Session, <<"recv 1">>, Addr1, unsettled), receive {amqp10_event, {link, Receiver1, attached}} -> ok - after 5000 -> ct:fail("missing attached") + after 30000 -> ct:fail("missing attached") end, {ok, Receiver2} = amqp10_client:attach_receiver_link(Session, <<"recv 2">>, Addr2, unsettled), receive {amqp10_event, {link, Receiver2, attached}} -> ok - after 5000 -> ct:fail("missing attached") + after 30000 -> ct:fail("missing attached") end, flush(attached), @@ -2952,7 +2952,7 @@ detach_requeues_drop_head_classic_queue(Config) -> %% Since x-max-length is now exceeded, m1 should be dead-lettered to q2. ok = amqp10_client:detach_link(Receiver1), receive {amqp10_event, {link, Receiver1, {detached, normal}}} -> ok - after 5000 -> ct:fail("missing detached") + after 30000 -> ct:fail("missing detached") end, assert_messages(QName1, 1, 0, Config), %% m2 assert_messages(QName2, 1, 0, Config), %% m1 @@ -3000,7 +3000,7 @@ detach_requeues_two_connections(QType, Config) -> {ok, Receiver0} = amqp10_client:attach_receiver_link(Session0, <<"receiver 0">>, Address, unsettled), receive {amqp10_event, {link, Receiver0, attached}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after 30000 -> ct:fail({missing_event, ?LINE}) end, ok = gen_statem:cast(Session0, {flow_session, #'v1_0.flow'{incoming_window = {uint, 1}}}), ok = amqp10_client:flow_link_credit(Receiver0, 50, never), @@ -3009,7 +3009,7 @@ detach_requeues_two_connections(QType, Config) -> {ok, Receiver1} = amqp10_client:attach_receiver_link(Session1, <<"receiver 1">>, Address, unsettled), receive {amqp10_event, {link, Receiver1, attached}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after 30000 -> ct:fail({missing_event, ?LINE}) end, ok = amqp10_client:flow_link_credit(Receiver1, 40, never), %% Wait for credit being applied to the queue. @@ -3039,7 +3039,7 @@ detach_requeues_two_connections(QType, Config) -> %% this sends a consumer removal message from the new node to the old node). ok = amqp10_client:detach_link(Receiver0), receive {amqp10_event, {link, Receiver0, {detached, normal}}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after 30000 -> ct:fail({missing_event, ?LINE}) end, %% Since Receiver0 hasn't settled any deliveries, @@ -3300,7 +3300,7 @@ max_message_size_server_to_client(Config) -> {ended, #'v1_0.error'{ condition = ?V_1_0_LINK_ERROR_MESSAGE_SIZE_EXCEEDED}}}} -> ok - after 5000 -> flush(missing_ended), + after 30000 -> flush(missing_ended), ct:fail("did not receive expected error") end, @@ -3343,7 +3343,7 @@ last_queue_confirms(Config) -> DTag1 = <<"t1">>, ok = amqp10_client:send_msg(SenderFanout, amqp10_msg:new(DTag1, <<"m1">>, false)), receive {amqp10_disposition, {accepted, DTag1}} -> ok - after 5000 -> ct:fail({missing_accepted, DTag1}) + after 30000 -> ct:fail({missing_accepted, DTag1}) end, %% Make quorum queue unavailable. @@ -3357,7 +3357,7 @@ last_queue_confirms(Config) -> %% Since quorum queue is down, we should only get a confirmation for m3. receive {amqp10_disposition, {accepted, DTag3}} -> ok - after 5000 -> ct:fail({missing_accepted, DTag3}) + after 30000 -> ct:fail({missing_accepted, DTag3}) end, receive {amqp10_disposition, Unexpected} -> ct:fail({unexpected_disposition, Unexpected}) after 200 -> ok @@ -3413,7 +3413,7 @@ target_queue_deleted(Config) -> DTag1 = <<"t1">>, ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag1, <<"m1">>, false)), receive {amqp10_disposition, {accepted, DTag1}} -> ok - after 5000 -> ct:fail({missing_accepted, DTag1}) + after 30000 -> ct:fail({missing_accepted, DTag1}) end, N0 = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), @@ -3501,7 +3501,7 @@ target_classic_queue_down(Config) -> %% and be able to send to and receive from the classic queue. {ok, Receiver2} = amqp10_client:attach_receiver_link(Session, <<"receiver 2">>, Address), receive {amqp10_event, {link, Receiver2, attached}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after 30000 -> ct:fail({missing_event, ?LINE}) end, DTag3 = <<"t3">>, ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag3, <<"m3">>, false)), @@ -3576,7 +3576,7 @@ async_notify(SenderSettleMode, QType, Config) -> Session, <<"test-receiver">>, Address, SenderSettleMode, configuration, Filter), receive {amqp10_event, {link, Receiver, attached}} -> ok - after 5000 -> ct:fail("missing attached") + after 30000 -> ct:fail("missing attached") end, %% Initially, grant 10 credits to the sending queue. @@ -3728,7 +3728,7 @@ queue_and_client_different_nodes(QueueLeaderNode, ClientNode, QueueType, Config) Address, unsettled), receive {amqp10_event, {link, Receiver, attached}} -> ok - after 5000 -> ct:fail("missing attached") + after 30000 -> ct:fail("missing attached") end, flush(receiver_attached), @@ -3765,7 +3765,7 @@ queue_and_client_different_nodes(QueueLeaderNode, ClientNode, QueueType, Config) [Msg] = receive_messages(Receiver, 1), ?assertEqual([Body], amqp10_msg:body(Msg)), receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok - after 5000 -> ct:fail("expected credit_exhausted") + after 30000 -> ct:fail("expected credit_exhausted") end, ok = amqp10_client:accept_msg(Receiver, Msg); false -> @@ -3784,10 +3784,10 @@ maintenance(Config) -> {ok, C0} = amqp10_client:open_connection(connection_config(0, Config)), {ok, C2} = amqp10_client:open_connection(connection_config(2, Config)), receive {amqp10_event, {connection, C0, opened}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after 30000 -> ct:fail({missing_event, ?LINE}) end, receive {amqp10_event, {connection, C2, opened}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after 30000 -> ct:fail({missing_event, ?LINE}) end, ok = drain_node(Config, 2), @@ -3797,7 +3797,7 @@ maintenance(Config) -> {closed, {internal_error, <<"Connection forced: \"Node was put into maintenance mode\"">>}}}} -> ok - after 5000 -> + after 30000 -> flush(?LINE), ct:fail({missing_event, ?LINE}) end, @@ -3919,10 +3919,10 @@ list_connections(Config) -> {ok, C0} = amqp10_client:open_connection(connection_config(0, Config)), {ok, C2} = amqp10_client:open_connection(connection_config(2, Config)), receive {amqp10_event, {connection, C0, opened}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after 30000 -> ct:fail({missing_event, ?LINE}) end, receive {amqp10_event, {connection, C2, opened}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after 30000 -> ct:fail({missing_event, ?LINE}) end, {ok, StdOut} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["list_connections", "--silent", "protocol"]), @@ -4049,10 +4049,10 @@ global_counters(Config) -> #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QQ}), ExpectedError = #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_RESOURCE_DELETED}, receive {amqp10_event, {link, QQSender, {detached, ExpectedError}}} -> ok - after 5000 -> ct:fail("server did not close our sending link") + after 30000 -> ct:fail("server did not close our sending link") end, receive {amqp10_event, {link, QQReceiver, {detached, ExpectedError}}} -> ok - after 5000 -> ct:fail("server did not close our receiving link") + after 30000 -> ct:fail("server did not close our receiving link") end, ?assertMatch(#{publishers := 1, consumers := 1}, @@ -4240,7 +4240,7 @@ available_messages(QType, Config) -> {ok, Receiver} = amqp10_client:attach_receiver_link( Session, <<"test-receiver">>, Address), receive {amqp10_event, {link, Receiver, attached}} -> ok - after 5000 -> ct:fail("missing attached") + after 30000 -> ct:fail("missing attached") end, flush(receiver_attached), @@ -4263,7 +4263,7 @@ available_messages(QType, Config) -> ok = amqp10_client_session:flow(Session, OutputHandle, Flow0, never), receive_messages(Receiver, 1), receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after 30000 -> ct:fail({missing_event, ?LINE}) end, eventually(?_assertEqual(3, get_available_messages(Receiver))), @@ -4273,7 +4273,7 @@ available_messages(QType, Config) -> ok = amqp10_client:flow_link_credit(Receiver, 1, never, false), receive_messages(Receiver, 1), receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after 30000 -> ct:fail({missing_event, ?LINE}) end, ?assertEqual(2, get_available_messages(Receiver)), @@ -4282,7 +4282,7 @@ available_messages(QType, Config) -> ok = amqp10_client:flow_link_credit(Receiver, 99, never, true), receive_messages(Receiver, 2), receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after 30000 -> ct:fail({missing_event, ?LINE}) end, ?assertEqual(0, get_available_messages(Receiver)), @@ -4305,7 +4305,7 @@ available_messages(QType, Config) -> ok = amqp10_client_session:flow(Session, OutputHandle, Flow2, never), receive_messages(Receiver, 1), receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after 30000 -> ct:fail({missing_event, ?LINE}) end, eventually(?_assertEqual(5000, get_available_messages(Receiver))), @@ -4482,7 +4482,7 @@ user_id(Config) -> #'v1_0.error'{ condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, description = {utf8, <<"user_id property set to 'fake user' but authenticated user was 'guest'">>}}}}} -> ok - after 5000 -> flush(missing_ended), + after 30000 -> flush(missing_ended), ct:fail("did not receive expected error") end, @@ -4512,10 +4512,10 @@ message_ttl(Config) -> ok = amqp10_client:flow_link_credit(Receiver, 2, never, true), receive {amqp10_msg, Receiver, Msg} -> ?assertEqual([<<"m2">>], amqp10_msg:body(Msg)) - after 5000 -> ct:fail(delivery_timeout) + after 30000 -> ct:fail(delivery_timeout) end, receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after 30000 -> ct:fail({missing_event, ?LINE}) end, receive Unexpected -> ct:fail({received_unexpected_message, Unexpected}) after 5 -> ok @@ -4548,7 +4548,7 @@ idle_time_out_on_server(Config) -> OpnConf = connection_config(Config), {ok, Connection} = amqp10_client:open_connection(OpnConf), receive {amqp10_event, {connection, Connection, opened}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after 30000 -> ct:fail({missing_event, ?LINE}) end, %% Mock the server socket to not have received any bytes. @@ -4566,7 +4566,7 @@ idle_time_out_on_server(Config) -> {closed, {resource_limit_exceeded, <<"no frame received from client within idle timeout threshold">>}}}} -> ok - after 5000 -> + after 30000 -> ct:fail({missing_event, ?LINE}) end, @@ -4582,7 +4582,7 @@ idle_time_out_on_client(Config) -> OpnConf = OpnConf0#{idle_time_out => 1000}, {ok, Connection} = amqp10_client:open_connection(OpnConf), receive {amqp10_event, {connection, Connection, opened}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after 30000 -> ct:fail({missing_event, ?LINE}) end, receive Unexpected -> ct:fail({unexpected, Unexpected}) @@ -4605,7 +4605,7 @@ idle_time_out_on_client(Config) -> {closed, {resource_limit_exceeded, <<"remote idle-time-out">>}}}} -> ok - after 5000 -> + after 30000 -> ct:fail({missing_event, ?LINE}) end, @@ -4618,7 +4618,7 @@ idle_time_out_too_short(Config) -> OpnConf = OpnConf0#{idle_time_out => 900}, {ok, Connection} = amqp10_client:open_connection(OpnConf), receive {amqp10_event, {connection, Connection, {closed, _}}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after 30000 -> ct:fail({missing_event, ?LINE}) end. handshake_timeout(Config) -> @@ -4679,7 +4679,7 @@ attach_to_exclusive_queue(Config) -> condition = ?V_1_0_AMQP_ERROR_RESOURCE_LOCKED, description = {utf8, <<"cannot obtain exclusive access to locked " "queue 'my queue' in vhost '/'">>}}}}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after 30000 -> ct:fail({missing_event, ?LINE}) end, ok = amqp10_client:close_connection(Connection), @@ -5567,7 +5567,7 @@ receive_many_auto_flow(QType, Config) -> Session, <<"receiver">>, Address, settled, configuration, Filter), receive {amqp10_event, {link, Receiver, attached}} -> ok - after 5000 -> ct:fail(missing_attached) + after 30000 -> ct:fail(missing_attached) end, flush(receiver_attached), @@ -5602,7 +5602,7 @@ incoming_window_closed_transfer_flow_order(Config) -> ok = amqp10_client:detach_link(Sender), {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, unsettled), receive {amqp10_event, {link, Receiver, attached}} -> ok - after 5000 -> ct:fail(missing_attached) + after 30000 -> ct:fail(missing_attached) end, flush(receiver_attached), @@ -5621,11 +5621,11 @@ incoming_window_closed_transfer_flow_order(Config) -> receive First -> {amqp10_msg, Receiver, Msg} = First, ?assertEqual([Body], amqp10_msg:body(Msg)) - after 5000 -> ct:fail("timeout receiving message") + after 30000 -> ct:fail("timeout receiving message") end, receive Second -> ?assertEqual({amqp10_event, {link, Receiver, credit_exhausted}}, Second) - after 5000 -> ct:fail("timeout receiving credit_exhausted") + after 30000 -> ct:fail("timeout receiving credit_exhausted") end, ok = delete_queue(Session, QName), @@ -5653,7 +5653,7 @@ incoming_window_closed_stop_link(Config) -> {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, unsettled), receive {amqp10_event, {link, Receiver, attached}} -> ok - after 5000 -> ct:fail(missing_attached) + after 30000 -> ct:fail(missing_attached) end, flush(receiver_attached), @@ -5702,7 +5702,7 @@ incoming_window_closed_close_link(Config) -> ok = amqp10_client:detach_link(Sender), {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, unsettled), receive {amqp10_event, {link, Receiver, attached}} -> ok - after 5000 -> ct:fail(missing_attached) + after 30000 -> ct:fail(missing_attached) end, flush(receiver_attached), @@ -5753,7 +5753,7 @@ incoming_window_closed_rabbitmq_internal_flow(QType, Config) -> {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, settled), receive {amqp10_event, {link, Receiver, attached}} -> ok - after 5000 -> ct:fail(missing_attached) + after 30000 -> ct:fail(missing_attached) end, flush(receiver_attached), @@ -5818,7 +5818,7 @@ tcp_back_pressure_rabbitmq_internal_flow(QType, Config) -> {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, settled), receive {amqp10_event, {link, Receiver, attached}} -> ok - after 5000 -> ct:fail(missing_attached) + after 30000 -> ct:fail(missing_attached) end, flush(receiver_attached), @@ -5868,7 +5868,7 @@ session_max_per_connection(Config) -> OpnConf = connection_config(Config), {ok, Connection} = amqp10_client:open_connection(OpnConf), receive {amqp10_event, {connection, Connection, opened}} -> ok - after 5000 -> ct:fail(opened_timeout) + after 30000 -> ct:fail(opened_timeout) end, %% The 1st session should succeed. {ok, _Session1} = amqp10_client:begin_session_sync(Connection), @@ -5878,7 +5878,7 @@ session_max_per_connection(Config) -> ?assertEqual( {framing_error, <<"channel number (1) exceeds maximum channel number (0)">>}, Reason) - after 5000 -> ct:fail(missing_closed) + after 30000 -> ct:fail(missing_closed) end, ok = rpc(Config, application, set_env, [App, Par, Default]). @@ -5893,7 +5893,7 @@ link_max_per_session(Config) -> OpnConf = connection_config(Config), {ok, Connection} = amqp10_client:open_connection(OpnConf), receive {amqp10_event, {connection, Connection, opened}} -> ok - after 5000 -> ct:fail(opened_timeout) + after 30000 -> ct:fail(opened_timeout) end, {ok, Session} = amqp10_client:begin_session_sync(Connection), Address1 = rabbitmq_amqp_address:exchange(<<"amq.direct">>, <<"k1">>), @@ -5930,7 +5930,7 @@ reserved_annotation(Config) -> ?assertMatch( <<"{reserved_annotation_key,{symbol,<<\"reserved-key\">>}}", _/binary>>, Description) - after 5000 -> flush(missing_ended), + after 30000 -> flush(missing_ended), ct:fail({missing_event, ?LINE}) end, ok = close_connection_sync(Connection). @@ -5958,7 +5958,7 @@ receive_all_messages0(Receiver, Accept, Acc) -> false -> ok end, receive_all_messages0(Receiver, Accept, [Msg | Acc]) - after 1000 -> + after 5000 -> lists:reverse(Acc) end. @@ -5986,7 +5986,7 @@ open_and_close_connection(Config) -> OpnConf = connection_config(Config), {ok, Connection} = amqp10_client:open_connection(OpnConf), receive {amqp10_event, {connection, Connection, opened}} -> ok - after 5000 -> ct:fail(opened_timeout) + after 30000 -> ct:fail(opened_timeout) end, ok = close_connection_sync(Connection). @@ -5995,7 +5995,7 @@ wait_for_credit(Sender) -> receive {amqp10_event, {link, Sender, credited}} -> ok - after 5000 -> + after 30000 -> flush("wait_for_credit timed out"), ct:fail(credited_timeout) end. @@ -6009,7 +6009,7 @@ wait_for_link_detach(Link) -> {amqp10_event, {link, Link, {detached, normal}}} -> flush(?FUNCTION_NAME), ok - after 5000 -> + after 30000 -> flush("wait_for_link_detach timed out"), ct:fail({link_detach_timeout, Link}) end. @@ -6023,7 +6023,7 @@ wait_for_session_end(Session) -> {amqp10_event, {session, Session, {ended, _}}} -> flush(?FUNCTION_NAME), ok - after 5000 -> + after 30000 -> flush("wait_for_session_end timed out"), ct:fail({session_end_timeout, Session}) end. @@ -6037,7 +6037,7 @@ wait_for_connection_close(Connection) -> {amqp10_event, {connection, Connection, {closed, normal}}} -> flush(?FUNCTION_NAME), ok - after 5000 -> + after 30000 -> flush("wait_for_connection_close timed out"), ct:fail({connection_close_timeout, Connection}) end. @@ -6049,7 +6049,7 @@ wait_for_settlement(Tag, State) -> receive {amqp10_disposition, {State, Tag}} -> ok - after 5000 -> + after 30000 -> flush("wait_for_settlement timed out"), ct:fail({settled_timeout, Tag}) end. @@ -6060,7 +6060,7 @@ wait_for_accepts(N) -> receive {amqp10_disposition,{accepted,_}} -> wait_for_accepts(N - 1) - after 5000 -> + after 30000 -> ct:fail({missing_accepted, N}) end. @@ -6099,7 +6099,7 @@ receive_messages0(Receiver, N, Acc) -> receive {amqp10_msg, Receiver, Msg} -> receive_messages0(Receiver, N - 1, [Msg | Acc]) - after 5000 -> + after 30000 -> ct:fail({timeout, {num_received, length(Acc)}, {num_missing, N}}) end. @@ -6110,7 +6110,7 @@ count_received_messages0(Receiver, Count) -> receive {amqp10_msg, Receiver, _Msg} -> count_received_messages0(Receiver, Count + 1) - after 1000 -> + after 5000 -> Count end. diff --git a/deps/rabbit/test/amqpl_consumer_ack_SUITE.erl b/deps/rabbit/test/amqpl_consumer_ack_SUITE.erl index f907e77e0a2..1a3a878ccde 100644 --- a/deps/rabbit/test/amqpl_consumer_ack_SUITE.erl +++ b/deps/rabbit/test/amqpl_consumer_ack_SUITE.erl @@ -79,7 +79,8 @@ requeue_one_channel_quorum_queue(Config) -> requeue_one_channel(QType, Config) -> QName = atom_to_binary(?FUNCTION_NAME), Ctag = <<"my consumer tag">>, - Ch = rabbit_ct_client_helpers:open_channel(Config), + Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0), + {ok, Ch} = amqp_connection:open_channel(Conn), #'queue.declare_ok'{} = amqp_channel:call( Ch, diff --git a/deps/rabbit/test/per_node_limit_SUITE.erl b/deps/rabbit/test/per_node_limit_SUITE.erl index 98990c8dc36..a4b72ba778e 100644 --- a/deps/rabbit/test/per_node_limit_SUITE.erl +++ b/deps/rabbit/test/per_node_limit_SUITE.erl @@ -178,7 +178,7 @@ open_connections_to_limit(Config, Limit) -> Connections. close_all_connections(Connections) -> - [rabbit_ct_client_helpers:close_connection(C) || C <- Connections]. + [catch rabbit_ct_client_helpers:close_connection(C) || C <- Connections]. set_node_limit(Config, Type, Limit) -> rabbit_ct_broker_helpers:rpc(Config, 0, diff --git a/deps/rabbitmq_management/test/clustering_SUITE.erl b/deps/rabbitmq_management/test/clustering_SUITE.erl index 3febd56db0f..0e9039cc367 100644 --- a/deps/rabbitmq_management/test/clustering_SUITE.erl +++ b/deps/rabbitmq_management/test/clustering_SUITE.erl @@ -304,10 +304,15 @@ queue_consumer_channel_closed(Config) -> amqp_channel:close(Chan), force_stats(Config), - Res = http_get(Config, "/queues/%2F/some-queue"), - % assert there are no consumer details - [] = maps:get(consumer_details, Res), - <<"some-queue">> = maps:get(name, Res), + ?awaitMatch([], + %% assert there are no consumer details + maps:get(consumer_details, + http_get(Config, "/queues/%2F/some-queue")), + 30000), + ?awaitMatch(<<"some-queue">>, + maps:get(name, + http_get(Config, "/queues/%2F/some-queue")), + 30000), http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT), ok. @@ -325,10 +330,12 @@ queue(Config) -> basic_get(Chan2, <<"some-queue">>), force_stats(Config), - Res = http_get(Config, "/queues/%2F/some-queue"), % assert single queue is returned - [#{} | _] = maps:get(deliveries, Res), - + ?awaitMatch([#{} | _], + maps:get(deliveries, + http_get(Config, "/queues/%2F/some-queue")), + 30000), + amqp_channel:close(Chan), amqp_channel:close(Chan2), http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT), @@ -390,9 +397,10 @@ channels_multiple_on_different_nodes(Config) -> force_stats(Config), - Res = http_get(Config, "/channels"), % assert two channels are present - [_,_] = Res, + ?awaitMatch([_,_], + http_get(Config, "/channels"), + 30000), http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT), @@ -416,9 +424,12 @@ channel_closed(Config) -> force_stats(Config), - Res = http_get(Config, "/channels"), - % assert one channel is present - [_] = Res, + rabbit_ct_helpers:await_condition( + fun() -> + %% assert one channel is present + length(http_get(Config, "/channels")) == 1 + end, + 60000), http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT), diff --git a/deps/rabbitmq_management/test/clustering_prop_SUITE.erl b/deps/rabbitmq_management/test/clustering_prop_SUITE.erl index 613d84168e7..e006bad9077 100644 --- a/deps/rabbitmq_management/test/clustering_prop_SUITE.erl +++ b/deps/rabbitmq_management/test/clustering_prop_SUITE.erl @@ -113,10 +113,11 @@ prop_connection_channel_counts(Config) -> Cons = lists:foldl(fun (Op, Agg) -> execute_op(Config, Op, Agg) end, [], Ops), - force_stats(Config), %% TODO retry a few times Res = retry_for( - fun() -> validate_counts(Config, Cons) end, + fun() -> + force_stats(Config), + validate_counts(Config, Cons) end, 60), cleanup(Cons), rabbit_ct_helpers:await_condition( diff --git a/deps/rabbitmq_mqtt/test/shared_SUITE.erl b/deps/rabbitmq_mqtt/test/shared_SUITE.erl index da08f56e3fb..6b7d7fa8025 100644 --- a/deps/rabbitmq_mqtt/test/shared_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/shared_SUITE.erl @@ -1188,26 +1188,50 @@ management_plugin_connection(Config) -> Node = atom_to_binary(get_node_config(Config, 0, nodename)), C1 = connect(ClientId, Config, [{keepalive, KeepaliveSecs}]), - eventually(?_assertEqual(1, length(http_get(Config, "/connections"))), 1000, 10), + FilterFun = + fun(#{client_properties := #{client_id := CId}}) + when CId == ClientId -> true; + (_) -> false + end, + %% Sometimes connections remain open from other testcases, + %% let's match the one we're looking for + eventually( + ?_assertMatch( + [_], + lists:filter(FilterFun, http_get(Config, "/connections"))), + 1000, 10), [#{client_properties := #{client_id := ClientId}, timeout := KeepaliveSecs, node := Node, - name := ConnectionName}] = http_get(Config, "/connections"), + name := ConnectionName}] = + lists:filter(FilterFun, http_get(Config, "/connections")), process_flag(trap_exit, true), http_delete(Config, "/connections/" ++ binary_to_list(uri_string:quote(ConnectionName)), ?NO_CONTENT), await_exit(C1), - eventually(?_assertEqual([], http_get(Config, "/connections"))), + eventually( + ?_assertMatch( + [], + lists:filter(FilterFun, http_get(Config, "/connections"))), + 1000, 10), eventually(?_assertEqual([], all_connection_pids(Config)), 500, 3), - + C2 = connect(ClientId, Config, [{keepalive, KeepaliveSecs}]), - eventually(?_assertEqual(1, length(http_get(Config, "/connections"))), 1000, 10), + eventually( + ?_assertMatch( + [_], + lists:filter(FilterFun, http_get(Config, "/connections"))), + 1000, 10), http_delete(Config, "/connections/username/guest", ?NO_CONTENT), await_exit(C2), - eventually(?_assertEqual([], http_get(Config, "/connections"))), + eventually( + ?_assertMatch( + [], + lists:filter(FilterFun, http_get(Config, "/connections"))), + 1000, 10), eventually(?_assertEqual([], all_connection_pids(Config)), 500, 3). management_plugin_enable(Config) -> @@ -1217,10 +1241,22 @@ management_plugin_enable(Config) -> %% If the (web) MQTT connection is established **before** the management plugin is enabled, %% the management plugin should still list the (web) MQTT connection. - C = connect(?FUNCTION_NAME, Config), + ClientId = atom_to_binary(?FUNCTION_NAME), + C = connect(ClientId, Config), ok = rabbit_ct_broker_helpers:enable_plugin(Config, 0, rabbitmq_management_agent), ok = rabbit_ct_broker_helpers:enable_plugin(Config, 0, rabbitmq_management), - eventually(?_assertEqual(1, length(http_get(Config, "/connections"))), 1000, 10), + FilterFun = + fun(#{client_properties := #{client_id := CId}}) + when ClientId == CId -> true; + (_) -> false + end, + %% Sometimes connections remain open from other testcases, + %% let's match the one we're looking for + eventually( + ?_assertMatch( + [_], + lists:filter(FilterFun, http_get(Config, "/connections"))), + 1000, 10), ok = emqtt:disconnect(C).