diff --git a/src/khepri_machine.erl b/src/khepri_machine.erl
index bbfd4619..70f693d9 100644
--- a/src/khepri_machine.erl
+++ b/src/khepri_machine.erl
@@ -49,6 +49,8 @@
%%
Changed the data structure for the reverse index used to track
%% keep-while conditions to be a prefix tree (see {@link khepri_prefix_tree}).
%%
+%% Moved the expiration of dedups to the `tick' aux effect (see {@link
+%% handle_aux/5}). This also introduces a new command `#drop_dedups{}'.
%%
%%
%%
@@ -119,7 +121,8 @@
get_dedups/1]).
-ifdef(TEST).
--export([make_virgin_state/1,
+-export([do_process_sync_command/3,
+ make_virgin_state/1,
convert_state/3,
set_tree/2]).
-endif.
@@ -1277,6 +1280,37 @@ handle_aux(
Tree = get_tree(State),
ok = restore_projection(Projection, Tree, PathPattern),
{no_reply, AuxState, IntState};
+handle_aux(leader, cast, tick, AuxState, IntState) ->
+ %% Expiring dedups in the tick handler is only available on versions 2
+ %% and greater. In versions 0 and 1, expiration of dedups is done in
+ %% `drop_expired_dedups/2'. This proved to be quite expensive when handling
+ %% a very large batch of transactions at once, so this expiration step was
+ %% moved to the `tick' handler in version 2.
+ case ra_aux:effective_machine_version(IntState) of
+ N when N >= 2 ->
+ State = ra_aux:machine_state(IntState),
+ Timestamp = erlang:system_time(millisecond),
+ Dedups = get_dedups(State),
+ RefsToDrop = maps:fold(
+ fun(CommandRef, {_Reply, Expiry}, Acc) ->
+ case Expiry =< Timestamp of
+ true ->
+ [CommandRef | Acc];
+ false ->
+ Acc
+ end
+ end, [], Dedups),
+ Effects = case RefsToDrop of
+ [] ->
+ [];
+ _ ->
+ DropDedups = #drop_dedups{refs = RefsToDrop},
+ [{append, DropDedups}]
+ end,
+ {no_reply, AuxState, IntState, Effects};
+ _ ->
+ {no_reply, AuxState, IntState}
+ end;
handle_aux(_RaState, _Type, _Command, AuxState, IntState) ->
{no_reply, AuxState, IntState}.
@@ -1475,6 +1509,20 @@ apply(
end,
Ret = {State1, ok},
post_apply(Ret, Meta);
+apply(
+ #{machine_version := MacVer} = Meta,
+ #drop_dedups{refs = RefsToDrop},
+ State) when MacVer >= 2 ->
+ %% `#drop_dedups{}' is emitted by the `handle_aux/5' clause for the `tick'
+ %% effect to periodically drop dedups that have expired. This expiration
+ %% was originally done in `post_apply/2' via `drop_expired_dedups/2' until
+ %% machine version 2. Note that `drop_expired_dedups/2' is used until a
+ %% cluster reaches an effective machine version of 2 or higher.
+ Dedups = get_dedups(State),
+ Dedups1 = maps:without(RefsToDrop, Dedups),
+ State1 = set_dedups(State, Dedups1),
+ Ret = {State1, ok},
+ post_apply(Ret, Meta);
apply(Meta, {machine_version, OldMacVer, NewMacVer}, OldState) ->
NewState = convert_state(OldState, OldMacVer, NewMacVer),
Ret = {NewState, ok},
@@ -1556,18 +1604,38 @@ reset_applied_command_count(State) ->
Result :: any(),
Meta :: ra_machine:command_meta_data(),
SideEffects :: ra_machine:effects().
+%% Removes any dedups from the `dedups' field in state that have expired
+%% according to the timestamp in the handled command.
+%%
+%% This function is a no-op in any other version than version 1. This proved to
+%% be expensive to execute as part of `apply/3' so dedup expiration moved to
+%% the `handle_aux/5' for `tick' which is executed periodically. See that
+%% function clause above for more information.
+%%
%% @private
drop_expired_dedups(
{State, Result, SideEffects},
- #{system_time := Timestamp}) ->
+ #{system_time := Timestamp,
+ machine_version := MacVer}) when MacVer =< 1 ->
Dedups = get_dedups(State),
+ %% Historical note: `maps:filter/2' can be surprisingly expensive when
+ %% used in a tight loop like `apply/3' depending on how many elements are
+ %% retained. As of Erlang/OTP 27, the BIF which implements `maps:filter/2'
+ %% collects any key-value pairs for which the predicate returns `true' into
+ %% a list, sorts/dedups the list and then creates a new map. This is slow
+ %% if the filter function always returns `true'. In cases like this where
+ %% the common usage is to retain most elements, `maps:fold/3' plus a `case'
+ %% expression and `maps:remove/2' is likely to be less expensive.
Dedups1 = maps:filter(
fun(_CommandRef, {_Reply, Expiry}) ->
Expiry >= Timestamp
end, Dedups),
State1 = set_dedups(State, Dedups1),
- {State1, Result, SideEffects}.
+ {State1, Result, SideEffects};
+drop_expired_dedups({State, Result, SideEffects}, _Meta) ->
+ %% No-op on versions 2 and higher.
+ {State, Result, SideEffects}.
%% @private
diff --git a/src/khepri_machine.hrl b/src/khepri_machine.hrl
index 24765001..0e6bc605 100644
--- a/src/khepri_machine.hrl
+++ b/src/khepri_machine.hrl
@@ -64,6 +64,13 @@
-record(dedup_ack, {ref :: reference()}).
+-record(drop_dedups, {refs :: [reference()]}).
+%% A command introduced in machine version 2 which is meant to drop expired
+%% dedups.
+%%
+%% This is emitted internally by the `handle_aux/5' callback clause which
+%% handles the `tick' Ra aux effect.
+
%% Old commands, kept for backward-compatibility.
-record(unregister_projection, {name :: khepri_projection:name()}).
diff --git a/test/protect_against_dups_option.erl b/test/protect_against_dups_option.erl
index 85c3daf0..196755ee 100644
--- a/test/protect_against_dups_option.erl
+++ b/test/protect_against_dups_option.erl
@@ -127,62 +127,79 @@ dedup_and_dedup_ack_test() ->
?assertEqual(ok, Ret2),
?assertEqual([], SE2).
-dedup_expiry_test() ->
- S00 = khepri_machine:init(?MACH_PARAMS()),
- S0 = khepri_machine:convert_state(S00, 0, 1),
-
- Command = #put{path = [foo],
- payload = khepri_payload:data(value),
- options = #{expect_specific_node => true,
- props_to_return => [payload,
- payload_version]}},
- CommandRef = make_ref(),
- Delay = 2000,
- Expiry = erlang:system_time(millisecond) + Delay,
- DedupCommand = #dedup{ref = CommandRef,
- expiry = Expiry,
- command = Command},
- {S1, Ret1, SE1} = khepri_machine:apply(?META, DedupCommand, S0),
- ExpectedRet = {ok, #{[foo] => #{payload_version => 1}}},
-
- Dedups1 = khepri_machine:get_dedups(S1),
- ?assertEqual(#{CommandRef => {ExpectedRet, Expiry}}, Dedups1),
-
- Root1 = khepri_machine:get_root(S1),
- ?assertEqual(
- #node{
- props =
- #{payload_version => 1,
- child_list_version => 2},
- child_nodes =
- #{foo =>
- #node{
- props = ?INIT_NODE_PROPS,
- payload = khepri_payload:data(value)}}},
- Root1),
- ?assertEqual(ExpectedRet, Ret1),
- ?assertEqual([], SE1),
-
- timer:sleep(Delay + 1000),
-
- %% The put command is idempotent, so not really ideal to test
- %% deduplication. Instead, we mess up with the state and silently restore
- %% the initial empty tree. If the dedup mechanism works, the returned
- %% state shouldn't have the `foo' node either because it didn't process
- %% the command.
- PatchedS1 = khepri_machine:set_tree(S1, khepri_machine:get_tree(S0)),
- {S2, Ret2, SE2} = khepri_machine:apply(?META, DedupCommand, PatchedS1),
-
- %% The dedups entry was dropped at the end of apply because it expired.
- Dedups2 = khepri_machine:get_dedups(S2),
- ?assertEqual(#{}, Dedups2),
-
- Root0 = khepri_machine:get_root(S0),
- Root2 = khepri_machine:get_root(S2),
- ?assertEqual(Root0, Root2),
-
- ?assertEqual(ExpectedRet, Ret2),
- ?assertEqual([], SE2).
+dedup_expiry_test_() ->
+ TickTimeout = 200,
+ Config = #{tick_timeout => TickTimeout},
+ StoredProcPath = [sproc],
+ Path = [stock, wood, <<"oak">>],
+ Command = #tx{'fun' = StoredProcPath, args = []},
+ CommandRef = make_ref(),
+ Expiry = erlang:system_time(millisecond),
+ DedupCommand = #dedup{ref = CommandRef,
+ command = Command,
+ expiry = Expiry},
+ {setup,
+ fun() -> test_ra_server_helpers:setup(?FUNCTION_NAME, Config) end,
+ fun(Priv) -> test_ra_server_helpers:cleanup(Priv) end,
+ [{inorder,
+ [{"Store a procedure for later use",
+ ?_assertEqual(
+ ok,
+ khepri:put(
+ ?FUNCTION_NAME, StoredProcPath,
+ fun() ->
+ {ok, N} = khepri_tx:get(Path),
+ khepri_tx:put(Path, N + 1)
+ end))},
+
+ {"Store initial data",
+ ?_assertEqual(
+ ok,
+ khepri:put(?FUNCTION_NAME, Path, 1))},
+
+ {"Trigger the transaction",
+ ?_assertEqual(
+ ok,
+ khepri_machine:do_process_sync_command(
+ ?FUNCTION_NAME, DedupCommand, #{}))},
+
+ {"The transaction was applied and the data is incremented",
+ ?_assertEqual(
+ {ok, 2},
+ khepri:get(?FUNCTION_NAME, Path))},
+
+ {"Trigger the transaction again before the dedup can be expired",
+ ?_assertEqual(
+ ok,
+ khepri_machine:do_process_sync_command(
+ ?FUNCTION_NAME, DedupCommand, #{}))},
+
+ {"The transaction was deduplicated and the data is unchanged",
+ ?_assertEqual(
+ {ok, 2},
+ khepri:get(?FUNCTION_NAME, Path))},
+
+ {"Sleep and send the same transaction command",
+ ?_test(
+ begin
+ %% Sleep a little extra for the sake of slow CI runners.
+ %% During this sleep time the machine will receive Ra's
+ %% periodic `tick' aux effect which will trigger expiration of
+ %% dedups.
+ SleepTime = TickTimeout + erlang:floor(TickTimeout * 3 / 4),
+ timer:sleep(SleepTime),
+ %% The dedup should be expired so this duplicate command should
+ %% be handled and the data should be incremented.
+ ?assertEqual(
+ ok,
+ khepri_machine:do_process_sync_command(
+ ?FUNCTION_NAME, DedupCommand, #{}))
+ end)},
+
+ {"The transaction was applied again and the data is incremented",
+ ?_assertEqual(
+ {ok, 3},
+ khepri:get(?FUNCTION_NAME, Path))}]}]}.
dedup_ack_after_no_dedup_test() ->
S00 = khepri_machine:init(?MACH_PARAMS()),