Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion deps/rabbitmq_ct_helpers/include/rabbit_assert.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
{line, ?LINE},
{expression, (??Expr)},
{pattern, (??Guard)},
{value, __V}]})
{value, __V},
{timeout, Timeout},
{polling_interval, PollingInterval}]})
end
end
end)(erlang:monotonic_time(millisecond) + Timeout))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,19 +240,29 @@ maybe_cleanup(State, UnreachableNodes) ->
?LOG_DEBUG(
"Peer discovery: cleanup discovered unreachable nodes: ~tp",
[UnreachableNodes]),
case lists:subtract(as_list(UnreachableNodes), as_list(service_discovery_nodes())) of
[] ->
?LOG_DEBUG(
"Peer discovery: all unreachable nodes are still "
"registered with the discovery backend ~tp",
[rabbit_peer_discovery:backend()],
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
ok;
Nodes ->
?LOG_DEBUG(
"Peer discovery: unreachable nodes are not registered "
"with the discovery backend ~tp", [Nodes]),
maybe_remove_nodes(Nodes, State#state.warn_only)
Module = rabbit_peer_discovery:backend(),
case rabbit_peer_discovery:normalize(Module:list_nodes()) of
{ok, {OneOrMultipleNodes, _Type}} ->
DiscoveredNodes = as_list(OneOrMultipleNodes),
case lists:subtract(UnreachableNodes, DiscoveredNodes) of
[] ->
?LOG_DEBUG(
"Peer discovery: all unreachable nodes are still "
"registered with the discovery backend ~tp",
[rabbit_peer_discovery:backend()],
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
ok;
Nodes ->
?LOG_DEBUG(
"Peer discovery: unreachable nodes are not registered "
"with the discovery backend ~tp", [Nodes]),
maybe_remove_nodes(Nodes, State#state.warn_only)
end;
{error, Reason} ->
?LOG_INFO(
"Peer discovery cleanup: ~tp returned error ~tp",
[Module, Reason]),
ok
end.

%%--------------------------------------------------------------------
Expand Down Expand Up @@ -288,26 +298,3 @@ maybe_remove_nodes([Node | Nodes], false) ->
-spec unreachable_nodes() -> [node()].
unreachable_nodes() ->
rabbit_nodes:list_unreachable().

%%--------------------------------------------------------------------
%% @private
%% @doc Return the nodes that the service discovery backend knows about
%% @spec service_discovery_nodes() -> [node()]
%% @end
%%--------------------------------------------------------------------
-spec service_discovery_nodes() -> [node()].
service_discovery_nodes() ->
Module = rabbit_peer_discovery:backend(),
case rabbit_peer_discovery:normalize(Module:list_nodes()) of
{ok, {OneOrMultipleNodes, _Type}} ->
Nodes = as_list(OneOrMultipleNodes),
?LOG_DEBUG(
"Peer discovery cleanup: ~tp returned ~tp",
[Module, Nodes]),
Nodes;
{error, Reason} ->
?LOG_DEBUG(
"Peer discovery cleanup: ~tp returned error ~tp",
[Module, Reason]),
[]
end.
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

-module(peer_discovery_cleanup_SUITE).

-compile([export_all, nowarn_export_all]).

-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").

all() ->
[
{group, all_tests}
].

groups() ->
[
{all_tests, [], all_tests()}
].

all_tests() ->
[
cleanup_queues,
backend_errors_do_not_trigger_cleanup
].

%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------


init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
rabbit_ct_helpers:run_setup_steps(
Config,
[fun rabbit_ct_broker_helpers:configure_dist_proxy/1]).

end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).

init_per_group(_Group, Config) ->
Config.

end_per_group(_Group, Config) ->
Config.

init_per_testcase(Testcase, Config) ->
Hostname = <<(atom_to_binary(Testcase, utf8))/binary,
("." ?MODULE_STRING ".local")>>,
Env = [{rabbit,
[{cluster_formation,
[{peer_discovery_backend, rabbit_peer_discovery_dns},
{peer_discovery_dns, [{hostname, Hostname}]},
%% Enable cleanup but set the interval high. Test cases should
%% call rabbit_peer_discovery_cleanup:check_cluster/0 to force
%% cleanup evaluation.
{node_cleanup, [{cleanup_interval, 3600},
{cleanup_only_log_warning, false}]}]}]}],
Config1 = rabbit_ct_helpers:merge_app_env(Config, Env),
Config2 = rabbit_ct_helpers:set_config(Config1,
[
{rmq_nodes_count, 3},
{rmq_nodes_clustered, false},
{rmq_nodename_suffix, Testcase},
{net_ticktime, 5}
]),
rabbit_ct_helpers:testcase_started(Config, Testcase),
rabbit_ct_helpers:run_steps(Config2,
rabbit_ct_broker_helpers:setup_steps() ++
[fun setup_meck/1,
fun mock_list_nodes/1,
fun rabbit_ct_broker_helpers:cluster_nodes/1] ++
rabbit_ct_client_helpers:setup_steps()).

end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:run_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()),
rabbit_ct_helpers:testcase_finished(Config, Testcase).

%% ---------------------------------------------------------------------------
%% Test Cases
%% ---------------------------------------------------------------------------

cleanup_queues(Config) ->
%% Happy path: unreachable nodes not recognized by the backend are cleaned
%% up.
[A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),

QQ = <<"quorum-queue">>,
declare_queue(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),

%% Remove node C from peer discovery responses.
mock_list_nodes(Config, {ok, [A, B]}),
%% Make node C unreachable.
rabbit_ct_broker_helpers:block_traffic_between(A, C),
rabbit_ct_broker_helpers:block_traffic_between(B, C),
Ts1 = erlang:system_time(millisecond),
?awaitMatch([C],
rabbit_ct_broker_helpers:rpc(Config, A,
rabbit_nodes,
list_unreachable, []),
30_000, 1_000),
ct:log(?LOW_IMPORTANCE, "Node C became unreachable in ~bms",
[erlang:system_time(millisecond) - Ts1]),

ok = rabbit_ct_broker_helpers:rpc(Config, A,
rabbit_peer_discovery_cleanup,
check_cluster, []),

%% Node C should be removed from the quorum queue members.
?assertEqual(
lists:sort([A, B]),
begin
Info = rpc:call(A, rabbit_quorum_queue, infos,
[rabbit_misc:r(<<"/">>, queue, QQ)]),
lists:sort(proplists:get_value(members, Info))
end),

%% Cleanup.
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).

backend_errors_do_not_trigger_cleanup(Config) ->
%% The backend could have some transient failures. While the backend is
%% not giving reliable peer information, skip cleanup.
[A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),

QQ = <<"quorum-queue">>,
declare_queue(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),

%% Have the backend return an error.
mock_list_nodes(Config, {error, "...internal server error..."}),
%% Make node C unreachable.
rabbit_ct_broker_helpers:block_traffic_between(A, C),
rabbit_ct_broker_helpers:block_traffic_between(B, C),
Ts1 = erlang:system_time(millisecond),
?awaitMatch([C],
rabbit_ct_broker_helpers:rpc(Config, A,
rabbit_nodes,
list_unreachable, []),
30_000, 1_000),
ct:log(?LOW_IMPORTANCE, "Node C became unreachable in ~bms",
[erlang:system_time(millisecond) - Ts1]),

ok = rabbit_ct_broker_helpers:rpc(Config, A,
rabbit_peer_discovery_cleanup,
check_cluster, []),

%% Node C should remain in the quorum queue members.
?assertEqual(
lists:sort([A, B, C]),
begin
Info = rpc:call(A, rabbit_quorum_queue, infos,
[rabbit_misc:r(<<"/">>, queue, QQ)]),
lists:sort(proplists:get_value(members, Info))
end),

%% Cleanup.
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).

%%%
%%% Implementation
%%%

setup_meck(Config) ->
rabbit_ct_broker_helpers:setup_meck(Config),
rabbit_ct_broker_helpers:rpc_all(Config, meck, new,
[rabbit_peer_discovery_dns,
[no_link, passthrough]]),
Config.

mock_list_nodes(Config) ->
Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
mock_list_nodes(Config, {ok, Nodes}).

mock_list_nodes(Config, Response) ->
rabbit_ct_broker_helpers:rpc_all(Config, meck, expect,
[rabbit_peer_discovery_dns,
list_nodes, 0, Response]),
Config.

declare_queue(Ch, QueueName, Args)
when is_pid(Ch), is_binary(QueueName), is_list(Args) ->
#'queue.declare_ok'{} = amqp_channel:call(
Ch, #'queue.declare'{
queue = QueueName,
durable = true,
arguments = Args}).
Loading