Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the fence mechanism performances #283

Merged
merged 3 commits into from
Aug 14, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 115 additions & 36 deletions src/khepri_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -865,8 +865,12 @@ do_process_sync_command(StoreId, Command, Options) ->
CommandOptions = #{timeout => Timeout, reply_from => ReplyFrom},
T0 = khepri_utils:start_timeout_window(Timeout),
Dest = case ra_leaderboard:lookup_leader(StoreId) of
LeaderId when LeaderId =/= undefined -> LeaderId;
undefined -> RaServer
LeaderId when LeaderId =/= undefined ->
sending_command_remotely(StoreId),
LeaderId;
undefined ->
sending_sync_command_locally(StoreId),
RaServer
end,
case ra:process_command(Dest, Command, CommandOptions) of
{ok, Ret, _LeaderId} ->
Expand Down Expand Up @@ -903,15 +907,18 @@ process_async_command(
StoreId, Command, ?DEFAULT_RA_COMMAND_CORRELATION = Correlation, Priority) ->
ThisNode = node(),
RaServer = khepri_cluster:node_to_member(StoreId, ThisNode),
sending_async_command_locally(StoreId),
ra:pipeline_command(RaServer, Command, Correlation, Priority);
process_async_command(
StoreId, Command, Correlation, Priority) ->
case ra_leaderboard:lookup_leader(StoreId) of
LeaderId when LeaderId =/= undefined ->
sending_command_remotely(StoreId),
ra:pipeline_command(LeaderId, Command, Correlation, Priority);
undefined ->
ThisNode = node(),
RaServer = khepri_cluster:node_to_member(StoreId, ThisNode),
sending_async_command_locally(StoreId),
ra:pipeline_command(RaServer, Command, Correlation, Priority)
end.

Expand Down Expand Up @@ -983,6 +990,7 @@ process_query(StoreId, QueryFun, Options) ->
end.

process_query1(StoreId, QueryFun, Options) ->
sending_query_locally(StoreId),
LocalServerId = {StoreId, node()},
case ra:local_query(LocalServerId, QueryFun, Options) of
{ok, {_RaIdxTerm, Ret}, _NewLeaderId} ->
Expand All @@ -1009,9 +1017,10 @@ add_applied_condition1(StoreId, Options, Timeout) ->
%% the order of operations between updates and queries. We have to follow
%% several steps to prepare that condition.
%%
%% We first send an arbitrary query to the local Ra server. This is to
%% make sure that previously submitted pipelined commands were processed
%% by that server.
%% If the last message from the calling process to the local Ra server was
%% an async command or if it never sent a command yet, we first send an
%% arbitrary query to the local Ra server. This is to make sure that
%% previously submitted pipelined commands were processed by that server.
%%
%% For instance, if there was a pipelined command without any correlation
%% ID, it ensures it was forwarded to the leader. Likewise for a
Expand All @@ -1020,35 +1029,45 @@ add_applied_condition1(StoreId, Options, Timeout) ->
%% We can't have this guaranty for pipelined commands with a correlation
%% because the caller is responsible for receiving the rejection from the
%% follower and handle the redirect to the leader.
T0 = khepri_utils:start_timeout_window(Timeout),
QueryFun = fun erlang:is_tuple/1,
InternalOptions = #{favor => low_latency,
timeout => Timeout},
case process_query(StoreId, QueryFun, InternalOptions) of
case can_skip_fence_preliminary_query(StoreId) of
true ->
NewTimeout = khepri_utils:end_timeout_window(Timeout, T0),
add_applied_condition2(StoreId, Options, NewTimeout);
Other when Other =/= false ->
Other
add_applied_condition2(StoreId, Options, Timeout);
false ->
T0 = khepri_utils:start_timeout_window(Timeout),
QueryFun = fun erlang:is_tuple/1,
case process_query1(StoreId, QueryFun, Timeout) of
true ->
NewTimeout = khepri_utils:end_timeout_window(Timeout, T0),
add_applied_condition2(StoreId, Options, NewTimeout);
Other when Other =/= false ->
Other
end
end.

add_applied_condition2(StoreId, Options, Timeout) ->
%% After the previous local query, there is a great chance that the leader
%% was cached, though not 100% guarantied.
%% After the previous local query or sync command if there was one, there
%% is a great chance that the leader was cached, though not 100%
%% guarantied.
case ra_leaderboard:lookup_leader(StoreId) of
LeaderId when LeaderId =/= undefined ->
add_applied_condition3(StoreId, Options, LeaderId, Timeout);
undefined ->
%% If the leader is unknown, executing a preliminary query should
%% tell us who the leader is.
ask_fence_preliminary_query(StoreId),
add_applied_condition1(StoreId, Options, Timeout)
end.

add_applied_condition3(StoreId, Options, LeaderId, Timeout) ->
%% We query the leader to know the last index it committed. We also
%% double-check it is still the leader; if it is not, we recurse.
%% We query the leader to know the last index it committed in which term.
%%
%% We pay attention to its state because a map is still returned even if
%% the Ra server is stopped.
T0 = khepri_utils:start_timeout_window(Timeout),
case ra:member_overview(LeaderId, Timeout) of
{ok, Overview, LeaderId} ->
NewTimeout = khepri_utils:end_timeout_window(Timeout, T0),
try ra:key_metrics(LeaderId, Timeout) of
#{last_index := LastIndex, term := Term, state := State}
when State =/= noproc andalso State =/= unknown ->
NewTimeout1 = khepri_utils:end_timeout_window(Timeout, T0),

%% Now that we know the last committed index of the leader, we can
%% perform an arbitrary query on the local server. The query will
Expand All @@ -1057,26 +1076,21 @@ add_applied_condition3(StoreId, Options, LeaderId, Timeout) ->
%%
%% We don't care about the result of that query. We just want to
%% block until the latest commands are applied locally.
#{log := #{last_index := LastIndex},
current_term := CurrentTerm} = Overview,
Condition = {applied, {LastIndex, CurrentTerm}},
Condition = {applied, {LastIndex, Term}},
Options1 = Options#{condition => Condition,
timeout => NewTimeout},
timeout => NewTimeout1},
{ok, Options1};
{ok, _Overview, NewLeaderId} ->
_ ->
timer:sleep(200),
NewTimeout = khepri_utils:end_timeout_window(Timeout, T0),
add_applied_condition3(StoreId, Options, NewLeaderId, NewTimeout);
{timeout, _LeaderId} ->
add_applied_condition1(StoreId, Options, NewTimeout)
catch
error:{erpc, timeout} ->
{error, timeout};
{error, Reason}
when ?HAS_TIME_LEFT(Timeout) andalso
(Reason == noproc orelse Reason == nodedown orelse
Reason == shutdown) ->
error:{erpc, noconnection} ->
timer:sleep(200),
NewTimeout = khepri_utils:end_timeout_window(Timeout, T0),
add_applied_condition1(StoreId, Options, NewTimeout);
Error ->
Error
NewTimeout2 = khepri_utils:end_timeout_window(Timeout, T0),
add_applied_condition1(StoreId, Options, NewTimeout2)
end.

-spec get_timeout(Options) -> Timeout when
Expand All @@ -1096,6 +1110,71 @@ get_timeout(_) -> khepri_app:get_default_timeout().
clear_cache(_StoreId) ->
ok.

-define(CAN_SKIP_FENCE_PRELIMINARY_QUERY_KEY(StoreId),
{?MODULE, can_skip_fence_preliminary_query, StoreId}).

-spec sending_sync_command_locally(StoreId) -> ok when
StoreId :: khepri:store_id().
%% @doc Records that a synchronous command is about to be sent locally.
%%
%% After that, we know we don't need a fence preliminary query.

sending_sync_command_locally(StoreId) ->
Key = ?CAN_SKIP_FENCE_PRELIMINARY_QUERY_KEY(StoreId),
_ = erlang:put(Key, true),
ok.

-spec sending_query_locally(StoreId) -> ok when
StoreId :: khepri:store_id().
%% @doc Records that a query is about to be executed locally.
%%
%% After that, we know we don't need a fence preliminary query.

sending_query_locally(StoreId) ->
%% Same behavior as a local sync command.
sending_sync_command_locally(StoreId).

-spec sending_async_command_locally(StoreId) -> ok when
StoreId :: khepri:store_id().
%% @doc Records that an asynchronous command is about to be sent locally.

sending_async_command_locally(StoreId) ->
Key = ?CAN_SKIP_FENCE_PRELIMINARY_QUERY_KEY(StoreId),
_ = erlang:erase(Key),
ok.

-spec sending_command_remotely(StoreId) -> ok when
StoreId :: khepri:store_id().
%% @doc Records that a command is about to be sent to a remote store.

sending_command_remotely(StoreId) ->
%% Same behavior as a local async command.
sending_async_command_locally(StoreId).

-spec ask_fence_preliminary_query(StoreId) -> ok when
StoreId :: khepri:store_id().
%% @doc Explicitly requests that a call to {@link
%% can_skip_fence_preliminary_query/1} returns `true'.

ask_fence_preliminary_query(StoreId) ->
%% Same behavior as a local async command.
sending_async_command_locally(StoreId).

-spec can_skip_fence_preliminary_query(StoreId) -> LastMsgWasSync when
StoreId :: khepri:store_id(),
LastMsgWasSync :: boolean().
%% @doc Indicates if the calling process sent a synchronous command or a query
%% before this call.
%%
%% @returns `true' if the calling process sent a synchrorous command or a query
%% to the given store before this call, `false' if the calling process never
%% sent anything to the given store, if the last message was an asynchrorous
%% command, or if the last message was sent to a remote store.

can_skip_fence_preliminary_query(StoreId) ->
Key = ?CAN_SKIP_FENCE_PRELIMINARY_QUERY_KEY(StoreId),
erlang:get(Key) =:= true.

%% -------------------------------------------------------------------
%% ra_machine callbacks.
%% -------------------------------------------------------------------
Expand Down