Skip to content

Commit

Permalink
khepri_machine: Use ra:key_metrics/2 instead of ra:member_overview/2
Browse files Browse the repository at this point in the history
[Why]
`ra:member_overview/2` is a very expensive call.

[How]
We just need the last index and the current term from the leader and
`ra:key_metrics/2` provides this piece of information too.

The difference is huge: in my benchmark, the query rate goes from 15
queries per second to 100k. This is in association with a related change
in Ra; see rabbitmq/ra#462.
  • Loading branch information
dumbbell committed Aug 14, 2024
1 parent 503a29a commit c484a0c
Showing 1 changed file with 18 additions and 20 deletions.
38 changes: 18 additions & 20 deletions src/khepri_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1043,12 +1043,15 @@ add_applied_condition2(StoreId, Options, Timeout) ->
end.

add_applied_condition3(StoreId, Options, LeaderId, Timeout) ->
%% We query the leader to know the last index it committed. We also
%% double-check it is still the leader; if it is not, we recurse.
%% We query the leader to know the last index it committed in which term.
%%
%% We pay attention to its state because a map is still returned even if
%% the Ra server is stopped.
T0 = khepri_utils:start_timeout_window(Timeout),
case ra:member_overview(LeaderId, Timeout) of
{ok, Overview, LeaderId} ->
NewTimeout = khepri_utils:end_timeout_window(Timeout, T0),
try ra:key_metrics(LeaderId, Timeout) of
#{last_index := LastIndex, term := Term, state := State}
when State =/= noproc andalso State =/= unknown ->
NewTimeout1 = khepri_utils:end_timeout_window(Timeout, T0),

%% Now that we know the last committed index of the leader, we can
%% perform an arbitrary query on the local server. The query will
Expand All @@ -1057,26 +1060,21 @@ add_applied_condition3(StoreId, Options, LeaderId, Timeout) ->
%%
%% We don't care about the result of that query. We just want to
%% block until the latest commands are applied locally.
#{log := #{last_index := LastIndex},
current_term := CurrentTerm} = Overview,
Condition = {applied, {LastIndex, CurrentTerm}},
Condition = {applied, {LastIndex, Term}},
Options1 = Options#{condition => Condition,
timeout => NewTimeout},
timeout => NewTimeout1},
{ok, Options1};
{ok, _Overview, NewLeaderId} ->
_ ->
timer:sleep(200),
NewTimeout = khepri_utils:end_timeout_window(Timeout, T0),
add_applied_condition3(StoreId, Options, NewLeaderId, NewTimeout);
{timeout, _LeaderId} ->
add_applied_condition1(StoreId, Options, NewTimeout)
catch
error:{erpc, timeout} ->
{error, timeout};
{error, Reason}
when ?HAS_TIME_LEFT(Timeout) andalso
(Reason == noproc orelse Reason == nodedown orelse
Reason == shutdown) ->
error:{erpc, noconnection} ->
timer:sleep(200),
NewTimeout = khepri_utils:end_timeout_window(Timeout, T0),
add_applied_condition1(StoreId, Options, NewTimeout);
Error ->
Error
NewTimeout2 = khepri_utils:end_timeout_window(Timeout, T0),
add_applied_condition1(StoreId, Options, NewTimeout2)
end.

-spec get_timeout(Options) -> Timeout when
Expand Down

0 comments on commit c484a0c

Please sign in to comment.