Skip to content

Commit df7b065

Browse files
Merge pull request #14606 from rabbitmq/md/peer-disc-cleanup-error
Skip peer discovery cleanup when backend returns error
2 parents 88abb96 + 2d4f19c commit df7b065

File tree

3 files changed

+222
-37
lines changed

3 files changed

+222
-37
lines changed

deps/rabbitmq_ct_helpers/include/rabbit_assert.hrl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
{line, ?LINE},
1919
{expression, (??Expr)},
2020
{pattern, (??Guard)},
21-
{value, __V}]})
21+
{value, __V},
22+
{timeout, Timeout},
23+
{polling_interval, PollingInterval}]})
2224
end
2325
end
2426
end)(erlang:monotonic_time(millisecond) + Timeout))

deps/rabbitmq_peer_discovery_common/src/rabbit_peer_discovery_cleanup.erl

Lines changed: 23 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -240,19 +240,29 @@ maybe_cleanup(State, UnreachableNodes) ->
240240
?LOG_DEBUG(
241241
"Peer discovery: cleanup discovered unreachable nodes: ~tp",
242242
[UnreachableNodes]),
243-
case lists:subtract(as_list(UnreachableNodes), as_list(service_discovery_nodes())) of
244-
[] ->
245-
?LOG_DEBUG(
246-
"Peer discovery: all unreachable nodes are still "
247-
"registered with the discovery backend ~tp",
248-
[rabbit_peer_discovery:backend()],
249-
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
250-
ok;
251-
Nodes ->
252-
?LOG_DEBUG(
253-
"Peer discovery: unreachable nodes are not registered "
254-
"with the discovery backend ~tp", [Nodes]),
255-
maybe_remove_nodes(Nodes, State#state.warn_only)
243+
Module = rabbit_peer_discovery:backend(),
244+
case rabbit_peer_discovery:normalize(Module:list_nodes()) of
245+
{ok, {OneOrMultipleNodes, _Type}} ->
246+
DiscoveredNodes = as_list(OneOrMultipleNodes),
247+
case lists:subtract(UnreachableNodes, DiscoveredNodes) of
248+
[] ->
249+
?LOG_DEBUG(
250+
"Peer discovery: all unreachable nodes are still "
251+
"registered with the discovery backend ~tp",
252+
[rabbit_peer_discovery:backend()],
253+
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
254+
ok;
255+
Nodes ->
256+
?LOG_DEBUG(
257+
"Peer discovery: unreachable nodes are not registered "
258+
"with the discovery backend ~tp", [Nodes]),
259+
maybe_remove_nodes(Nodes, State#state.warn_only)
260+
end;
261+
{error, Reason} ->
262+
?LOG_INFO(
263+
"Peer discovery cleanup: ~tp returned error ~tp",
264+
[Module, Reason]),
265+
ok
256266
end.
257267

258268
%%--------------------------------------------------------------------
@@ -288,26 +298,3 @@ maybe_remove_nodes([Node | Nodes], false) ->
288298
-spec unreachable_nodes() -> [node()].
289299
unreachable_nodes() ->
290300
rabbit_nodes:list_unreachable().
291-
292-
%%--------------------------------------------------------------------
293-
%% @private
294-
%% @doc Return the nodes that the service discovery backend knows about
295-
%% @spec service_discovery_nodes() -> [node()]
296-
%% @end
297-
%%--------------------------------------------------------------------
298-
-spec service_discovery_nodes() -> [node()].
299-
service_discovery_nodes() ->
300-
Module = rabbit_peer_discovery:backend(),
301-
case rabbit_peer_discovery:normalize(Module:list_nodes()) of
302-
{ok, {OneOrMultipleNodes, _Type}} ->
303-
Nodes = as_list(OneOrMultipleNodes),
304-
?LOG_DEBUG(
305-
"Peer discovery cleanup: ~tp returned ~tp",
306-
[Module, Nodes]),
307-
Nodes;
308-
{error, Reason} ->
309-
?LOG_DEBUG(
310-
"Peer discovery cleanup: ~tp returned error ~tp",
311-
[Module, Reason]),
312-
[]
313-
end.
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
8+
-module(peer_discovery_cleanup_SUITE).
9+
10+
-compile([export_all, nowarn_export_all]).
11+
12+
-include_lib("common_test/include/ct.hrl").
13+
-include_lib("eunit/include/eunit.hrl").
14+
-include_lib("amqp_client/include/amqp_client.hrl").
15+
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
16+
17+
all() ->
18+
[
19+
{group, all_tests}
20+
].
21+
22+
groups() ->
23+
[
24+
{all_tests, [], all_tests()}
25+
].
26+
27+
all_tests() ->
28+
[
29+
cleanup_queues,
30+
backend_errors_do_not_trigger_cleanup
31+
].
32+
33+
%% -------------------------------------------------------------------
34+
%% Testsuite setup/teardown.
35+
%% -------------------------------------------------------------------
36+
37+
38+
init_per_suite(Config) ->
39+
rabbit_ct_helpers:log_environment(),
40+
rabbit_ct_helpers:run_setup_steps(
41+
Config,
42+
[fun rabbit_ct_broker_helpers:configure_dist_proxy/1]).
43+
44+
end_per_suite(Config) ->
45+
rabbit_ct_helpers:run_teardown_steps(Config).
46+
47+
init_per_group(_Group, Config) ->
48+
Config.
49+
50+
end_per_group(_Group, Config) ->
51+
Config.
52+
53+
init_per_testcase(Testcase, Config) ->
54+
Hostname = <<(atom_to_binary(Testcase, utf8))/binary,
55+
("." ?MODULE_STRING ".local")>>,
56+
Env = [{rabbit,
57+
[{cluster_formation,
58+
[{peer_discovery_backend, rabbit_peer_discovery_dns},
59+
{peer_discovery_dns, [{hostname, Hostname}]},
60+
%% Enable cleanup but set the interval high. Test cases should
61+
%% call rabbit_peer_discovery_cleanup:check_cluster/0 to force
62+
%% cleanup evaluation.
63+
{node_cleanup, [{cleanup_interval, 3600},
64+
{cleanup_only_log_warning, false}]}]}]}],
65+
Config1 = rabbit_ct_helpers:merge_app_env(Config, Env),
66+
Config2 = rabbit_ct_helpers:set_config(Config1,
67+
[
68+
{rmq_nodes_count, 3},
69+
{rmq_nodes_clustered, false},
70+
{rmq_nodename_suffix, Testcase},
71+
{net_ticktime, 5}
72+
]),
73+
rabbit_ct_helpers:testcase_started(Config, Testcase),
74+
rabbit_ct_helpers:run_steps(Config2,
75+
rabbit_ct_broker_helpers:setup_steps() ++
76+
[fun setup_meck/1,
77+
fun mock_list_nodes/1,
78+
fun rabbit_ct_broker_helpers:cluster_nodes/1] ++
79+
rabbit_ct_client_helpers:setup_steps()).
80+
81+
end_per_testcase(Testcase, Config) ->
82+
rabbit_ct_helpers:run_steps(Config,
83+
rabbit_ct_client_helpers:teardown_steps() ++
84+
rabbit_ct_broker_helpers:teardown_steps()),
85+
rabbit_ct_helpers:testcase_finished(Config, Testcase).
86+
87+
%% ---------------------------------------------------------------------------
88+
%% Test Cases
89+
%% ---------------------------------------------------------------------------
90+
91+
cleanup_queues(Config) ->
92+
%% Happy path: unreachable nodes not recognized by the backend are cleaned
93+
%% up.
94+
[A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
95+
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
96+
97+
QQ = <<"quorum-queue">>,
98+
declare_queue(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
99+
100+
%% Remove node C from peer discovery responses.
101+
mock_list_nodes(Config, {ok, [A, B]}),
102+
%% Make node C unreachable.
103+
rabbit_ct_broker_helpers:block_traffic_between(A, C),
104+
rabbit_ct_broker_helpers:block_traffic_between(B, C),
105+
Ts1 = erlang:system_time(millisecond),
106+
?awaitMatch([C],
107+
rabbit_ct_broker_helpers:rpc(Config, A,
108+
rabbit_nodes,
109+
list_unreachable, []),
110+
30_000, 1_000),
111+
ct:log(?LOW_IMPORTANCE, "Node C became unreachable in ~bms",
112+
[erlang:system_time(millisecond) - Ts1]),
113+
114+
ok = rabbit_ct_broker_helpers:rpc(Config, A,
115+
rabbit_peer_discovery_cleanup,
116+
check_cluster, []),
117+
118+
%% Node C should be removed from the quorum queue members.
119+
?assertEqual(
120+
lists:sort([A, B]),
121+
begin
122+
Info = rpc:call(A, rabbit_quorum_queue, infos,
123+
[rabbit_misc:r(<<"/">>, queue, QQ)]),
124+
lists:sort(proplists:get_value(members, Info))
125+
end),
126+
127+
%% Cleanup.
128+
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
129+
130+
backend_errors_do_not_trigger_cleanup(Config) ->
131+
%% The backend could have some transient failures. While the backend is
132+
%% not giving reliable peer information, skip cleanup.
133+
[A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
134+
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
135+
136+
QQ = <<"quorum-queue">>,
137+
declare_queue(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
138+
139+
%% Have the backend return an error.
140+
mock_list_nodes(Config, {error, "...internal server error..."}),
141+
%% Make node C unreachable.
142+
rabbit_ct_broker_helpers:block_traffic_between(A, C),
143+
rabbit_ct_broker_helpers:block_traffic_between(B, C),
144+
Ts1 = erlang:system_time(millisecond),
145+
?awaitMatch([C],
146+
rabbit_ct_broker_helpers:rpc(Config, A,
147+
rabbit_nodes,
148+
list_unreachable, []),
149+
30_000, 1_000),
150+
ct:log(?LOW_IMPORTANCE, "Node C became unreachable in ~bms",
151+
[erlang:system_time(millisecond) - Ts1]),
152+
153+
ok = rabbit_ct_broker_helpers:rpc(Config, A,
154+
rabbit_peer_discovery_cleanup,
155+
check_cluster, []),
156+
157+
%% Node C should remain in the quorum queue members.
158+
?assertEqual(
159+
lists:sort([A, B, C]),
160+
begin
161+
Info = rpc:call(A, rabbit_quorum_queue, infos,
162+
[rabbit_misc:r(<<"/">>, queue, QQ)]),
163+
lists:sort(proplists:get_value(members, Info))
164+
end),
165+
166+
%% Cleanup.
167+
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
168+
169+
%%%
170+
%%% Implementation
171+
%%%
172+
173+
setup_meck(Config) ->
174+
rabbit_ct_broker_helpers:setup_meck(Config),
175+
rabbit_ct_broker_helpers:rpc_all(Config, meck, new,
176+
[rabbit_peer_discovery_dns,
177+
[no_link, passthrough]]),
178+
Config.
179+
180+
mock_list_nodes(Config) ->
181+
Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
182+
mock_list_nodes(Config, {ok, Nodes}).
183+
184+
mock_list_nodes(Config, Response) ->
185+
rabbit_ct_broker_helpers:rpc_all(Config, meck, expect,
186+
[rabbit_peer_discovery_dns,
187+
list_nodes, 0, Response]),
188+
Config.
189+
190+
declare_queue(Ch, QueueName, Args)
191+
when is_pid(Ch), is_binary(QueueName), is_list(Args) ->
192+
#'queue.declare_ok'{} = amqp_channel:call(
193+
Ch, #'queue.declare'{
194+
queue = QueueName,
195+
durable = true,
196+
arguments = Args}).

0 commit comments

Comments
 (0)