Skip to content

Commit 3293a24

Browse files
Reconcile QQ node dead during delete and redeclare
Co-authored-by: Péter Gömöri <[email protected]>
1 parent a09383d commit 3293a24

File tree

10 files changed

+282
-57
lines changed

10 files changed

+282
-57
lines changed

deps/rabbit/src/amqqueue.erl

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
% exclusive_owner
3131
get_exclusive_owner/1,
3232
get_leader_node/1,
33-
get_nodes/1,
3433
% name (#resource)
3534
get_name/1,
3635
set_name/2,
@@ -425,15 +424,6 @@ get_leader_node(#amqqueue{pid = {_, Leader}}) -> Leader;
425424
get_leader_node(#amqqueue{pid = none}) -> none;
426425
get_leader_node(#amqqueue{pid = Pid}) -> node(Pid).
427426

428-
-spec get_nodes(amqqueue_v2()) -> [node(),...].
429-
430-
get_nodes(Q) ->
431-
case amqqueue:get_type_state(Q) of
432-
#{nodes := Nodes} ->
433-
Nodes;
434-
_ ->
435-
[get_leader_node(Q)]
436-
end.
437427

438428
% operator_policy
439429

deps/rabbit/src/rabbit_amqp_management.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,7 @@ encode_queue(Q, NumMsgs, NumConsumers) ->
471471
{Leader :: node() | none, Replicas :: [node(),...]}.
472472
queue_topology(Q) ->
473473
Leader = amqqueue:get_leader_node(Q),
474-
Replicas = amqqueue:get_nodes(Q),
474+
Replicas = rabbit_amqqueue:get_nodes(Q),
475475
{Leader, Replicas}.
476476

477477
decode_exchange({map, KVList}) ->

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
-export([list/0, list_durable/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2,
2424
emit_info_all/5, list_local/1, info_local/1,
2525
emit_info_local/4, emit_info_down/4]).
26+
-export([get_nodes/1]).
2627
-export([count/0]).
2728
-export([list_down/1, list_down/2, list_all/1,
2829
count/1, list_names/0, list_names/1, list_local_names/0,
@@ -1226,6 +1227,12 @@ list() ->
12261227
count() ->
12271228
rabbit_db_queue:count().
12281229

1230+
-spec get_nodes(amqqueue:amqqueue_v2()) -> [node(),...].
1231+
1232+
get_nodes(Q) ->
1233+
[{members, Nodes}] = info(Q, [members]),
1234+
Nodes.
1235+
12291236
-spec list_names() -> [name()].
12301237

12311238
list_names() ->
@@ -2035,12 +2042,7 @@ pseudo_queue(#resource{kind = queue} = QueueName, Pid, Durable)
20352042
).
20362043

20372044
get_quorum_nodes(Q) ->
2038-
case amqqueue:get_type_state(Q) of
2039-
#{nodes := Nodes} ->
2040-
Nodes;
2041-
_ ->
2042-
[]
2043-
end.
2045+
rabbit_amqqueue:get_nodes(Q).
20442046

20452047
-spec prepend_extra_bcc(Qs) ->
20462048
Qs when Qs :: [amqqueue:target() | {amqqueue:target(), route_infos()}].

deps/rabbit/src/rabbit_core_ff.erl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,3 +219,10 @@
219219
depends_on => ['rabbitmq_4.1.0'],
220220
callbacks => #{enable => {rabbit_khepri, enable_feature_flag}}
221221
}}).
222+
223+
-rabbit_feature_flag(
224+
{'track_qq_members_uids',
225+
#{desc => "Track queue members UIDs in the metadata store",
226+
stability => stable,
227+
depends_on => []
228+
}}).

deps/rabbit/src/rabbit_queue_location.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ select_members(Size, _, AllNodes, RunningNodes, _, _, GetQueues) ->
143143
Counters0 = maps:from_list([{N, 0} || N <- lists:delete(?MODULE:node(), AllNodes)]),
144144
Queues = GetQueues(),
145145
Counters = lists:foldl(fun(Q, Acc) ->
146-
#{nodes := Nodes} = amqqueue:get_type_state(Q),
146+
Nodes = rabbit_amqqueue:get_nodes(Q),
147147
lists:foldl(fun(N, A)
148148
when is_map_key(N, A) ->
149149
maps:update_with(N, fun(C) -> C+1 end, A);

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 104 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -270,9 +270,17 @@ start_cluster(Q) ->
270270
{LeaderNode, FollowerNodes} =
271271
rabbit_queue_location:select_leader_and_followers(Q, QuorumSize),
272272
LeaderId = {RaName, LeaderNode},
273+
UIDs = maps:from_list([{Node, ra:new_uid(ra_lib:to_binary(RaName))}
274+
|| Node <- [LeaderNode | FollowerNodes]]),
273275
NewQ0 = amqqueue:set_pid(Q, LeaderId),
274-
NewQ1 = amqqueue:set_type_state(NewQ0,
275-
#{nodes => [LeaderNode | FollowerNodes]}),
276+
NewQ1 = case rabbit_feature_flags:is_enabled(track_qq_members_uids) of
277+
false ->
278+
amqqueue:set_type_state(NewQ0,
279+
#{nodes => [LeaderNode | FollowerNodes]});
280+
true ->
281+
amqqueue:set_type_state(NewQ0,
282+
#{nodes => UIDs})
283+
end,
276284

277285
Versions = [V || {ok, V} <- erpc:multicall(FollowerNodes,
278286
rabbit_fifo, version, [],
@@ -717,7 +725,7 @@ repair_amqqueue_nodes(Q0) ->
717725
{Name, _} = amqqueue:get_pid(Q0),
718726
Members = ra_leaderboard:lookup_members(Name),
719727
RaNodes = [N || {_, N} <- Members],
720-
#{nodes := Nodes} = amqqueue:get_type_state(Q0),
728+
Nodes = get_nodes(Q0),
721729
case lists:sort(RaNodes) =:= lists:sort(Nodes) of
722730
true ->
723731
%% up to date
@@ -726,7 +734,18 @@ repair_amqqueue_nodes(Q0) ->
726734
%% update amqqueue record
727735
Fun = fun (Q) ->
728736
TS0 = amqqueue:get_type_state(Q),
729-
TS = TS0#{nodes => RaNodes},
737+
TS = case rabbit_feature_flags:is_enabled(track_qq_members_uids)
738+
andalso has_uuid_tracking(TS0)
739+
of
740+
false ->
741+
TS0#{nodes => RaNodes};
742+
true ->
743+
RaUids = maps:from_list([{N, erpc:call(N, ra_directory, uid_of,
744+
[?RA_SYSTEM, Name],
745+
?RPC_TIMEOUT)}
746+
|| N <- RaNodes]),
747+
TS0#{nodes => RaUids}
748+
end,
730749
amqqueue:set_type_state(Q, TS)
731750
end,
732751
_ = rabbit_amqqueue:update(QName, Fun),
@@ -790,6 +809,23 @@ recover(_Vhost, Queues) ->
790809
ServerId = {Name, node()},
791810
QName = amqqueue:get_name(Q0),
792811
MutConf = make_mutable_config(Q0),
812+
RaUId = ra_directory:uid_of(?RA_SYSTEM, Name),
813+
#{nodes := Nodes} = QTypeState0 = amqqueue:get_type_state(Q0),
814+
QTypeState = case Nodes of
815+
List when is_list(List) ->
816+
%% Queue is not aware of node to uid mapping, do nothing
817+
QTypeState0;
818+
#{node() := RaUId} ->
819+
%% Queue is aware and uid for current node is correct, do nothing
820+
QTypeState0;
821+
_ ->
822+
%% Queue is aware but either current node has no UId or it
823+
%% does not match the one returned by ra_directory, regen uid
824+
maybe_delete_data_dir(RaUId),
825+
NewRaUId = ra:new_uid(ra_lib:to_binary(Name)),
826+
QTypeState0#{nodes := Nodes#{node() => NewRaUId}}
827+
end,
828+
Q = amqqueue:set_type_state(Q0, QTypeState),
793829
Res = case ra:restart_server(?RA_SYSTEM, ServerId, MutConf) of
794830
ok ->
795831
% queue was restarted, good
@@ -802,7 +838,7 @@ recover(_Vhost, Queues) ->
802838
[rabbit_misc:rs(QName), Err1]),
803839
% queue was never started on this node
804840
% so needs to be started from scratch.
805-
case start_server(make_ra_conf(Q0, ServerId)) of
841+
case start_server(make_ra_conf(Q, ServerId)) of
806842
ok -> ok;
807843
Err2 ->
808844
?LOG_WARNING("recover: quorum queue ~w could not"
@@ -824,8 +860,7 @@ recover(_Vhost, Queues) ->
824860
%% present in the rabbit_queue table and not just in
825861
%% rabbit_durable_queue
826862
%% So many code paths are dependent on this.
827-
ok = rabbit_db_queue:set_dirty(Q0),
828-
Q = Q0,
863+
ok = rabbit_db_queue:set_dirty(Q),
829864
case Res of
830865
ok ->
831866
{[Q | R0], F0};
@@ -1208,12 +1243,17 @@ cleanup_data_dir() ->
12081243
maybe_delete_data_dir(UId) ->
12091244
_ = ra_directory:unregister_name(?RA_SYSTEM, UId),
12101245
Dir = ra_env:server_data_dir(?RA_SYSTEM, UId),
1211-
{ok, Config} = ra_log:read_config(Dir),
1212-
case maps:get(machine, Config) of
1213-
{module, rabbit_fifo, _} ->
1214-
ra_lib:recursive_delete(Dir);
1215-
_ ->
1216-
ok
1246+
case filelib:is_dir(Dir) of
1247+
false ->
1248+
ok;
1249+
true ->
1250+
{ok, Config} = ra_log:read_config(Dir),
1251+
case maps:get(machine, Config) of
1252+
{module, rabbit_fifo, _} ->
1253+
ra_lib:recursive_delete(Dir);
1254+
_ ->
1255+
ok
1256+
end
12171257
end.
12181258

12191259
policy_changed(Q) ->
@@ -1378,16 +1418,29 @@ add_member(Q, Node, Membership) ->
13781418
do_add_member(Q, Node, Membership, ?MEMBER_CHANGE_TIMEOUT).
13791419

13801420

1381-
do_add_member(Q, Node, Membership, Timeout)
1382-
when ?is_amqqueue(Q) andalso
1383-
?amqqueue_is_quorum(Q) andalso
1421+
do_add_member(Q0, Node, Membership, Timeout)
1422+
when ?is_amqqueue(Q0) andalso
1423+
?amqqueue_is_quorum(Q0) andalso
13841424
is_atom(Node) ->
1385-
{RaName, _} = amqqueue:get_pid(Q),
1386-
QName = amqqueue:get_name(Q),
1425+
{RaName, _} = amqqueue:get_pid(Q0),
1426+
QName = amqqueue:get_name(Q0),
13871427
%% TODO parallel calls might crash this, or add a duplicate in quorum_nodes
13881428
ServerId = {RaName, Node},
1389-
Members = members(Q),
1390-
1429+
Members = members(Q0),
1430+
QTypeState0 = #{nodes := Nodes} = amqqueue:get_type_state(Q0),
1431+
NewRaUId = ra:new_uid(ra_lib:to_binary(RaName)),
1432+
QTypeState = case Nodes of
1433+
L when is_list(L) ->
1434+
%% Queue is not aware of node to uid mapping, just add the new node
1435+
QTypeState0#{nodes => lists:usort([Node | Nodes])};
1436+
#{Node := _} ->
1437+
%% Queue is aware and uid for targeted node exists, do nothing
1438+
QTypeState0;
1439+
_ ->
1440+
%% Queue is aware but current node has no UId, regen uid
1441+
QTypeState0#{nodes => Nodes#{Node => NewRaUId}}
1442+
end,
1443+
Q = amqqueue:set_type_state(Q0, QTypeState),
13911444
MachineVersion = erpc_call(Node, rabbit_fifo, version, [], infinity),
13921445
Conf = make_ra_conf(Q, ServerId, Membership, MachineVersion),
13931446
case ra:start_server(?RA_SYSTEM, Conf) of
@@ -1403,8 +1456,12 @@ do_add_member(Q, Node, Membership, Timeout)
14031456
{ok, {RaIndex, RaTerm}, Leader} ->
14041457
Fun = fun(Q1) ->
14051458
Q2 = update_type_state(
1406-
Q1, fun(#{nodes := Nodes} = Ts) ->
1407-
Ts#{nodes => lists:usort([Node | Nodes])}
1459+
Q1, fun(#{nodes := NodesList} = Ts) when is_list(NodesList) ->
1460+
Ts#{nodes => lists:usort([Node | NodesList])};
1461+
(#{nodes := #{Node := _}} = Ts) ->
1462+
Ts;
1463+
(#{nodes := NodesMap} = Ts) when is_map(NodesMap) ->
1464+
Ts#{nodes => maps:put(Node, NewRaUId, NodesMap)}
14081465
end),
14091466
amqqueue:set_pid(Q2, Leader)
14101467
end,
@@ -1477,8 +1534,10 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
14771534
Fun = fun(Q1) ->
14781535
update_type_state(
14791536
Q1,
1480-
fun(#{nodes := Nodes} = Ts) ->
1481-
Ts#{nodes => lists:delete(Node, Nodes)}
1537+
fun(#{nodes := Nodes} = Ts) when is_list(Nodes) ->
1538+
Ts#{nodes => lists:delete(Node, Nodes)};
1539+
(#{nodes := Nodes} = Ts) when is_map(Nodes) ->
1540+
Ts#{nodes => maps:remove(Node, Nodes)}
14821541
end)
14831542
end,
14841543
_ = rabbit_amqqueue:update(QName, Fun),
@@ -1999,7 +2058,15 @@ make_ra_conf(Q, ServerId, TickTimeout,
19992058
#resource{name = QNameBin} = QName,
20002059
RaMachine = ra_machine(Q),
20012060
[{ClusterName, _} | _] = Members = members(Q),
2002-
UId = ra:new_uid(ra_lib:to_binary(ClusterName)),
2061+
{_, Node} = ServerId,
2062+
UId = case amqqueue:get_type_state(Q) of
2063+
#{nodes := #{Node := Id}} ->
2064+
Id;
2065+
_ ->
2066+
%% Queue was declared on an older version of RabbitMQ
2067+
%% and does not have the node to uid mappings
2068+
ra:new_uid(ra_lib:to_binary(ClusterName))
2069+
end,
20032070
FName = rabbit_misc:rs(QName),
20042071
Formatter = {?MODULE, format_ra_event, [QName]},
20052072
LogCfg = #{uid => UId,
@@ -2031,7 +2098,12 @@ make_mutable_config(Q) ->
20312098

20322099
get_nodes(Q) when ?is_amqqueue(Q) ->
20332100
#{nodes := Nodes} = amqqueue:get_type_state(Q),
2034-
Nodes.
2101+
case Nodes of
2102+
List when is_list(List) ->
2103+
List;
2104+
Map when is_map(Map) ->
2105+
maps:keys(Map)
2106+
end.
20352107

20362108
get_connected_nodes(Q) when ?is_amqqueue(Q) ->
20372109
ErlangNodes = [node() | nodes()],
@@ -2138,7 +2210,7 @@ force_checkpoint_on_queue(QName) ->
21382210
{ok, Q} when ?amqqueue_is_quorum(Q) ->
21392211
{RaName, _} = amqqueue:get_pid(Q),
21402212
?LOG_DEBUG("Sending command to force ~ts to take a checkpoint", [QNameFmt]),
2141-
Nodes = amqqueue:get_nodes(Q),
2213+
Nodes = rabbit_amqqueue:get_nodes(Q),
21422214
_ = [ra:cast_aux_command({RaName, Node}, force_checkpoint)
21432215
|| Node <- Nodes],
21442216
ok;
@@ -2408,3 +2480,8 @@ queue_vm_stats_sups() ->
24082480
queue_vm_ets() ->
24092481
{[quorum_ets],
24102482
[[ra_log_ets]]}.
2483+
2484+
has_uuid_tracking(#{nodes := Nodes}) when is_map(Nodes) ->
2485+
true;
2486+
has_uuid_tracking(_QTypeState) ->
2487+
false.

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,8 @@ stop() ->
153153

154154
new_stream(Q, LeaderNode)
155155
when ?is_amqqueue(Q) andalso is_atom(LeaderNode) ->
156-
#{name := StreamId,
157-
nodes := Nodes} = amqqueue:get_type_state(Q),
156+
#{name := StreamId} = amqqueue:get_type_state(Q),
157+
Nodes = rabbit_amqqueue:get_nodes(Q),
158158
%% assertion leader is in nodes configuration
159159
true = lists:member(LeaderNode, Nodes),
160160
process_command({new_stream, StreamId,

0 commit comments

Comments
 (0)