Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
258 changes: 109 additions & 149 deletions deps/rabbit/src/rabbit_peer_discovery.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@
normalize/1,
append_node_prefix/1,
node_prefix/0]).
-export([do_query_node_props/1,
group_leader_proxy/2]).
-export([do_query_node_props/2]).

-ifdef(TEST).
-export([query_node_props/1,
Expand Down Expand Up @@ -378,7 +377,8 @@ check_discovered_nodes_list_validity(DiscoveredNodes, BadNodeType)
%% @private

query_node_props(Nodes) when Nodes =/= [] ->
{Prefix, Suffix} = rabbit_nodes_common:parts(node()),
ThisNode = node(),
{Prefix, Suffix} = rabbit_nodes_common:parts(ThisNode),
PeerName = peer:random_name(Prefix),
%% We go through a temporary hidden node to query all other discovered
%% peers properties, instead of querying them directly.
Expand Down Expand Up @@ -440,7 +440,12 @@ query_node_props(Nodes) when Nodes =/= [] ->
[Peer],
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
try
peer:call(Pid, ?MODULE, do_query_node_props, [Nodes], 180000)
NodesAndProps1 = peer:call(
Pid,
?MODULE, do_query_node_props,
[Nodes, ThisNode], 180000),
NodesAndProps2 = sort_nodes_and_props(NodesAndProps1),
NodesAndProps2
after
peer:stop(Pid)
end;
Expand Down Expand Up @@ -563,80 +568,31 @@ maybe_add_tls_arguments(VMArgs) ->
end,
VMArgs2.

do_query_node_props(Nodes) when Nodes =/= [] ->
do_query_node_props(Nodes, FromNode) when Nodes =/= [] ->
%% Make sure all log messages are forwarded from this temporary hidden
%% node to the upstream node, regardless of their level.
_ = logger:set_primary_config(level, debug),

%% The group leader for all processes on this temporary hidden node is the
%% calling process' group leader on the upstream node.
%%
%% When we use `erpc:call/4' (or the multicall equivalent) to execute code
%% on one of the `Nodes', the remotely executed code will also use the
%% calling process' group leader by default.
%%
%% We use this temporary hidden node to ensure the downstream node will
%% not connected to the upstream node. Therefore, we must change the group
%% leader as well, otherwise any I/O from the downstream node will send a
%% message to the upstream node's group leader and thus open a connection.
%% This would defeat the entire purpose of this temporary hidden node.
%%
%% To avoid this, we start a proxy process which we will use as a group
%% leader. This process will send all messages it receives to the group
%% leader on the upstream node.
%%
%% There is one caveat: the logger (local to the temporary hidden node)
%% forwards log messages to the upstream logger (on the upstream node)
%% only if the group leader of that message is a remote PID. Because we
%% set a local PID, it stops forwarding log messages originating from that
%% temporary hidden node. That's why we use `with_group_leader_proxy/2' to
%% set the group leader to our proxy only around the use of `erpc'.
%%
%% That's a lot just to keep logging working while not reveal the upstream
%% node to the downstream node...
Parent = self(),
UpstreamGroupLeader = erlang:group_leader(),
ProxyGroupLeader = spawn_link(
?MODULE, group_leader_proxy,
[Parent, UpstreamGroupLeader]),

%% TODO: Replace with `rabbit_nodes:list_members/0' when the oldest
%% supported version has it.
MembersPerNode = with_group_leader_proxy(
ProxyGroupLeader,
fun() ->
erpc:multicall(Nodes, rabbit_nodes, all, [])
end),
query_node_props1(Nodes, MembersPerNode, [], ProxyGroupLeader).

with_group_leader_proxy(ProxyGroupLeader, Fun) ->
UpstreamGroupLeader = erlang:group_leader(),
try
true = erlang:group_leader(ProxyGroupLeader, self()),
Fun()
after
true = erlang:group_leader(UpstreamGroupLeader, self())
end.

group_leader_proxy(Parent, UpstreamGroupLeader) ->
receive
stop_proxy ->
erlang:unlink(Parent),
Parent ! proxy_stopped;
Message ->
UpstreamGroupLeader ! Message,
group_leader_proxy(Parent, UpstreamGroupLeader)
end.
MembersPerNode = [try
{ok,
erpc_call(Node, rabbit_nodes, all, [], FromNode)}
catch
Class:Reason ->
{Class, Reason}
end || Node <- Nodes],
query_node_props1(Nodes, MembersPerNode, [], FromNode).

query_node_props1(
[Node | Nodes], [{ok, Members} | MembersPerNode], NodesAndProps,
ProxyGroupLeader) ->
FromNode) ->
NodeAndProps = {Node, Members},
NodesAndProps1 = [NodeAndProps | NodesAndProps],
query_node_props1(Nodes, MembersPerNode, NodesAndProps1, ProxyGroupLeader);
query_node_props1(Nodes, MembersPerNode, NodesAndProps1, FromNode);
query_node_props1(
[Node | Nodes], [{error, _} = Error | MembersPerNode], NodesAndProps,
ProxyGroupLeader) ->
[Node | Nodes], [{_, _} = Error | MembersPerNode], NodesAndProps,
FromNode) ->
%% We consider that an error means the remote node is unreachable or not
%% ready. Therefore, we exclude it from the list of discovered nodes as we
%% won't be able to join it anyway.
Expand All @@ -645,55 +601,51 @@ query_node_props1(
"Peer discovery: node '~ts' excluded from the discovered nodes",
[Node, Error, Node],
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
query_node_props1(Nodes, MembersPerNode, NodesAndProps, ProxyGroupLeader);
query_node_props1([], [], NodesAndProps, ProxyGroupLeader) ->
query_node_props1(Nodes, MembersPerNode, NodesAndProps, FromNode);
query_node_props1([], [], NodesAndProps, FromNode) ->
NodesAndProps1 = lists:reverse(NodesAndProps),
query_node_props2(NodesAndProps1, [], ProxyGroupLeader).

query_node_props2([{Node, Members} | Rest], NodesAndProps, ProxyGroupLeader) ->
try
erpc:call(
Node, logger, debug,
["Peer discovery: temporary hidden node '~ts' queries properties "
"from node '~ts'", [node(), Node]]),
StartTime = get_node_start_time(Node, microsecond, ProxyGroupLeader),
IsReady = is_node_db_ready(Node, ProxyGroupLeader),
NodeAndProps = {Node, Members, StartTime, IsReady},
NodesAndProps1 = [NodeAndProps | NodesAndProps],
query_node_props2(Rest, NodesAndProps1, ProxyGroupLeader)
catch
_:Error:_ ->
%% If one of the erpc calls we use to get the start time fails,
%% there is something wrong with the remote node because it
%% doesn't depend on RabbitMQ. We exclude it from the discovered
%% nodes.
?LOG_DEBUG(
"Peer discovery: failed to query start time of node '~ts': "
"~0tp~n"
"Peer discovery: node '~ts' excluded from the discovered nodes",
[Node, Error, Node],
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
query_node_props2(Rest, NodesAndProps, ProxyGroupLeader)
end;
query_node_props2([], NodesAndProps, ProxyGroupLeader) ->
query_node_props2(NodesAndProps1, [], FromNode).

query_node_props2([{Node, Members} | Rest], NodesAndProps, FromNode) ->
NodesAndProps2 = try
erpc_call(
Node, logger, debug,
["Peer discovery: temporary hidden node '~ts' "
"queries properties from node '~ts'",
[node(), Node]], FromNode),
StartTime = get_node_start_time(
Node, microsecond, FromNode),
IsReady = is_node_db_ready(Node, FromNode),
NodeAndProps = {Node, Members, StartTime, IsReady},
NodesAndProps1 = [NodeAndProps | NodesAndProps],
NodesAndProps1
catch
_:Error:_ ->
%% If one of the erpc calls we use to get the
%% start time fails, there is something wrong with
%% the remote node because it doesn't depend on
%% RabbitMQ. We exclude it from the discovered
%% nodes.
?LOG_DEBUG(
"Peer discovery: failed to query start time "
"+ DB readyness of node '~ts': ~0tp~n"
"Peer discovery: node '~ts' excluded from the "
"discovered nodes",
[Node, Error, Node],
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
NodesAndProps
end,
query_node_props2(Rest, NodesAndProps2, FromNode);
query_node_props2([], NodesAndProps, _FromNode) ->
NodesAndProps1 = lists:reverse(NodesAndProps),
NodesAndProps2 = sort_nodes_and_props(NodesAndProps1),
%% Wait for the proxy group leader to flush its inbox.
ProxyGroupLeader ! stop_proxy,
receive
proxy_stopped ->
ok
after 120_000 ->
ok
end,
?assertEqual([], nodes()),
?assert(length(NodesAndProps2) =< length(nodes(hidden))),
NodesAndProps2.
?assert(length(NodesAndProps1) =< length(nodes(hidden))),
NodesAndProps1.

-spec get_node_start_time(Node, Unit, ProxyGroupLeader) -> StartTime when
-spec get_node_start_time(Node, Unit, FromNode) -> StartTime when
Node :: node(),
Unit :: erlang:time_unit(),
ProxyGroupLeader :: pid(),
FromNode :: node(),
StartTime :: non_neg_integer().
%% @doc Returns the start time of the given `Node' in `Unit'.
%%
Expand All @@ -713,52 +665,60 @@ query_node_props2([], NodesAndProps, ProxyGroupLeader) ->
%%
%% @private

get_node_start_time(Node, Unit, ProxyGroupLeader) ->
with_group_leader_proxy(
ProxyGroupLeader,
fun() ->
NativeStartTime = erpc:call(
Node, erlang, system_info, [start_time]),
TimeOffset = erpc:call(Node, erlang, time_offset, []),
SystemStartTime = NativeStartTime + TimeOffset,
StartTime = erpc:call(
Node, erlang, convert_time_unit,
[SystemStartTime, native, Unit]),
StartTime
end).

-spec is_node_db_ready(Node, ProxyGroupLeader) -> IsReady when
get_node_start_time(Node, Unit, FromNode) ->
NativeStartTime = erpc_call(
Node, erlang, system_info, [start_time], FromNode),
TimeOffset = erpc_call(Node, erlang, time_offset, [], FromNode),
SystemStartTime = NativeStartTime + TimeOffset,
StartTime = erpc_call(
Node, erlang, convert_time_unit,
[SystemStartTime, native, Unit], FromNode),
StartTime.

-spec is_node_db_ready(Node, FromNode) -> IsReady when
Node :: node(),
ProxyGroupLeader :: pid(),
FromNode :: node(),
IsReady :: boolean() | undefined.
%% @doc Returns if the node's DB layer is ready or not.
%%
%% @private

is_node_db_ready(Node, ProxyGroupLeader) ->
%% This code is running from a temporary hidden node. We derive the real
%% node interested in the properties from the group leader.
UpstreamGroupLeader = erlang:group_leader(),
ThisNode = node(UpstreamGroupLeader),
case Node of
ThisNode ->
%% The current node is running peer discovery, thus way before we
%% mark the DB layer as ready. Consider it ready in this case,
%% otherwise if the current node is selected, it will loop forever
%% waiting for itself to be ready.
true;
_ ->
with_group_leader_proxy(
ProxyGroupLeader,
fun() ->
try
erpc:call(Node, rabbit_db, is_init_finished, [])
catch
_:{exception, undef,
[{rabbit_db, is_init_finished, _, _} | _]} ->
undefined
end
end)
is_node_db_ready(FromNode, FromNode) ->
%% The function is called for rhe current node running peer discovery, thus
%% way before we mark the DB layer as ready. Consider it ready in this
%% case, otherwise if the current node is selected, it will loop forever
%% waiting for itself to be ready.
true;
is_node_db_ready(Node, FromNode) ->
try
erpc_call(Node, rabbit_db, is_init_finished, [], FromNode)
catch
_:{exception, undef, [{rabbit_db, is_init_finished, _, _} | _]} ->
undefined
end.

erpc_call(Node, Mod, Fun, Args, FromNode) ->
erpc_call(Node, Mod, Fun, Args, FromNode, 10000).

erpc_call(Node, Mod, Fun, Args, FromNode, Timeout) when Timeout >= 0 ->
try
erpc:call(Node, Mod, Fun, Args)
catch
error:{erpc, _} = Reason:Stacktrace ->
Peer = node(),
_ = catch erpc:call(
FromNode,
logger, debug,
["Peer discovery: temporary hidden node '~ts' "
"failed to connect to '~ts': ~0p",
[Peer, Node, Reason]]),
Sleep = 1000,
timer:sleep(Sleep),
NewTimeout = Timeout - Sleep,
case NewTimeout >= 0 of
true -> erpc_call(Node, Mod, Fun, Args, FromNode, NewTimeout);
false -> erlang:raise(error, Reason, Stacktrace)
end
end.

-spec sort_nodes_and_props(NodesAndProps) ->
Expand Down
Loading