Skip to content

Commit

Permalink
rabbit_peer_discovery: Retry RPC calls
Browse files Browse the repository at this point in the history
[Why]
In CI, we observe some timeouts in the Erlang distribution connections
between the temporary hidden node and the nodes it queries. This affects
peer discovery obviously.

[How]
We introduce some query retries to reduce the risk of an incomplete
query.

While here, we move the sorting of queried nodes from the
`query_node_props2/3` last clause (executed in the temporary hidden
node) to the function setting the temporary hidden node and asking for
these queries. This way the debug messages from that sorting are logged
by RabbitMQ out of the box.
  • Loading branch information
dumbbell committed Nov 25, 2024
1 parent 52e410d commit 8a4a684
Showing 1 changed file with 73 additions and 36 deletions.
109 changes: 73 additions & 36 deletions deps/rabbit/src/rabbit_peer_discovery.erl
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,12 @@ query_node_props(Nodes) when Nodes =/= [] ->
[Peer],
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
try
peer:call(Pid, ?MODULE, do_query_node_props, [Nodes, ThisNode], 180000)
NodesAndProps1 = peer:call(
Pid,
?MODULE, do_query_node_props,
[Nodes, ThisNode], 180000),
NodesAndProps2 = sort_nodes_and_props(NodesAndProps1),
NodesAndProps2
after
peer:stop(Pid)
end;
Expand Down Expand Up @@ -563,25 +568,31 @@ maybe_add_tls_arguments(VMArgs) ->
end,
VMArgs2.

do_query_node_props(Nodes, ThisNode) when Nodes =/= [] ->
do_query_node_props(Nodes, FromNode) when Nodes =/= [] ->
%% Make sure all log messages are forwarded from this temporary hidden
%% node to the upstream node, regardless of their level.
_ = logger:set_primary_config(level, debug),

%% TODO: Replace with `rabbit_nodes:list_members/0' when the oldest
%% supported version has it.
MembersPerNode = erpc:multicall(Nodes, rabbit_nodes, all, []),
query_node_props1(Nodes, MembersPerNode, [], ThisNode).
MembersPerNode = [try
{ok,
erpc_call(Node, rabbit_nodes, all, [], FromNode)}
catch
Class:Reason ->
{Class, Reason}
end || Node <- Nodes],
query_node_props1(Nodes, MembersPerNode, [], FromNode).

query_node_props1(
[Node | Nodes], [{ok, Members} | MembersPerNode], NodesAndProps,
ThisNode) ->
FromNode) ->
NodeAndProps = {Node, Members},
NodesAndProps1 = [NodeAndProps | NodesAndProps],
query_node_props1(Nodes, MembersPerNode, NodesAndProps1, ThisNode);
query_node_props1(Nodes, MembersPerNode, NodesAndProps1, FromNode);
query_node_props1(
[Node | Nodes], [{error, _} = Error | MembersPerNode], NodesAndProps,
ThisNode) ->
[Node | Nodes], [{_, _} = Error | MembersPerNode], NodesAndProps,
FromNode) ->
%% We consider that an error means the remote node is unreachable or not
%% ready. Therefore, we exclude it from the list of discovered nodes as we
%% won't be able to join it anyway.
Expand All @@ -590,20 +601,21 @@ query_node_props1(
"Peer discovery: node '~ts' excluded from the discovered nodes",
[Node, Error, Node],
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
query_node_props1(Nodes, MembersPerNode, NodesAndProps, ThisNode);
query_node_props1([], [], NodesAndProps, ThisNode) ->
query_node_props1(Nodes, MembersPerNode, NodesAndProps, FromNode);
query_node_props1([], [], NodesAndProps, FromNode) ->
NodesAndProps1 = lists:reverse(NodesAndProps),
query_node_props2(NodesAndProps1, [], ThisNode).
query_node_props2(NodesAndProps1, [], FromNode).

query_node_props2([{Node, Members} | Rest], NodesAndProps, ThisNode) ->
query_node_props2([{Node, Members} | Rest], NodesAndProps, FromNode) ->
NodesAndProps2 = try
erpc:call(
erpc_call(
Node, logger, debug,
["Peer discovery: temporary hidden node '~ts' "
"queries properties from node '~ts'",
[node(), Node]]),
StartTime = get_node_start_time(Node, microsecond),
IsReady = is_node_db_ready(Node, ThisNode),
[node(), Node]], FromNode),
StartTime = get_node_start_time(
Node, microsecond, FromNode),
IsReady = is_node_db_ready(Node, FromNode),
NodeAndProps = {Node, Members, StartTime, IsReady},
NodesAndProps1 = [NodeAndProps | NodesAndProps],
NodesAndProps1
Expand All @@ -623,17 +635,17 @@ query_node_props2([{Node, Members} | Rest], NodesAndProps, ThisNode) ->
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
NodesAndProps
end,
query_node_props2(Rest, NodesAndProps2, ThisNode);
query_node_props2([], NodesAndProps, _ThisNode) ->
query_node_props2(Rest, NodesAndProps2, FromNode);
query_node_props2([], NodesAndProps, _FromNode) ->
NodesAndProps1 = lists:reverse(NodesAndProps),
NodesAndProps2 = sort_nodes_and_props(NodesAndProps1),
?assertEqual([], nodes()),
?assert(length(NodesAndProps2) =< length(nodes(hidden))),
NodesAndProps2.
?assert(length(NodesAndProps1) =< length(nodes(hidden))),
NodesAndProps1.

-spec get_node_start_time(Node, Unit) -> StartTime when
-spec get_node_start_time(Node, Unit, FromNode) -> StartTime when
Node :: node(),
Unit :: erlang:time_unit(),
FromNode :: node(),
StartTime :: non_neg_integer().
%% @doc Returns the start time of the given `Node' in `Unit'.
%%
Expand All @@ -653,37 +665,62 @@ query_node_props2([], NodesAndProps, _ThisNode) ->
%%
%% @private

get_node_start_time(Node, Unit) ->
NativeStartTime = erpc:call(Node, erlang, system_info, [start_time]),
TimeOffset = erpc:call(Node, erlang, time_offset, []),
get_node_start_time(Node, Unit, FromNode) ->
NativeStartTime = erpc_call(
Node, erlang, system_info, [start_time], FromNode),
TimeOffset = erpc_call(Node, erlang, time_offset, [], FromNode),
SystemStartTime = NativeStartTime + TimeOffset,
StartTime = erpc:call(
StartTime = erpc_call(
Node, erlang, convert_time_unit,
[SystemStartTime, native, Unit]),
[SystemStartTime, native, Unit], FromNode),
StartTime.

-spec is_node_db_ready(Node, ThisNode) -> IsReady when
-spec is_node_db_ready(Node, FromNode) -> IsReady when
Node :: node(),
ThisNode :: node(),
FromNode :: node(),
IsReady :: boolean() | undefined.
%% @doc Returns if the node's DB layer is ready or not.
%%
%% @private

is_node_db_ready(ThisNode, ThisNode) ->
%% The current node is running peer discovery, thus way before we mark the
%% DB layer as ready. Consider it ready in this case, otherwise if the
%% current node is selected, it will loop forever waiting for itself to be
%% ready.
is_node_db_ready(FromNode, FromNode) ->
%% The function is called for rhe current node running peer discovery, thus
%% way before we mark the DB layer as ready. Consider it ready in this
%% case, otherwise if the current node is selected, it will loop forever
%% waiting for itself to be ready.
true;
is_node_db_ready(Node, _ThisNode) ->
is_node_db_ready(Node, FromNode) ->
try
erpc:call(Node, rabbit_db, is_init_finished, [])
erpc_call(Node, rabbit_db, is_init_finished, [], FromNode)
catch
_:{exception, undef, [{rabbit_db, is_init_finished, _, _} | _]} ->
undefined
end.

erpc_call(Node, Mod, Fun, Args, FromNode) ->
erpc_call(Node, Mod, Fun, Args, FromNode, 10000).

erpc_call(Node, Mod, Fun, Args, FromNode, Timeout) when Timeout >= 0 ->
try
erpc:call(Node, Mod, Fun, Args)
catch
error:{erpc, _} = Reason:Stacktrace ->
Peer = node(),
_ = catch erpc:call(
FromNode,
logger, debug,
["Peer discovery: temporary hidden node '~ts' "
"failed to connect to '~ts': ~0p",
[Peer, Node, Reason]]),
Sleep = 1000,
timer:sleep(Sleep),
NewTimeout = Timeout - Sleep,
case NewTimeout >= 0 of
true -> erpc_call(Node, Mod, Fun, Args, FromNode, NewTimeout);
false -> erlang:raise(error, Reason, Stacktrace)
end
end.

-spec sort_nodes_and_props(NodesAndProps) ->
SortedNodesAndProps when
NodesAndProps :: [node_and_props()],
Expand Down

0 comments on commit 8a4a684

Please sign in to comment.