From 5c3361ab9ba28c6317ee74329c254f03fcde3610 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Mon, 16 Oct 2023 10:17:39 +0200 Subject: [PATCH 1/2] Add khepri:handle_async_ret/2 for handling async Ra events This is similar to `wait_for_async_ret/1,2` except that the caller is expected to `receive` the Ra event. This is necessary because the Ra event sent when a batch of commands is applied includes a list of correlation and result pairs, making it impossible to use a selective receive to look for a single correlation ID. We could alternatively map the `ra_event` message into some other value like `{ok, Correlations} | {error, CorrelationId}` but this isn't much easier to work with than the `ra_event`s themselves and would be brittle for changes to `ra_event`. Instead we leave it to the caller to receive and handle the events. This function is then responsible only for calling internal functions to update the cached leader for the given store ID. --- src/khepri.erl | 57 ++++++++++++++++++++++ test/async_option.erl | 87 +++++++++++++++++++++------------ test/cluster_SUITE.erl | 107 +++++++++++++++++++++++++++++++++++++++-- 3 files changed, 216 insertions(+), 35 deletions(-) diff --git a/src/khepri.erl b/src/khepri.erl index 7e43a78b..22c58fb9 100644 --- a/src/khepri.erl +++ b/src/khepri.erl @@ -125,6 +125,7 @@ transaction/1, transaction/2, transaction/3, transaction/4, transaction/5, + handle_async_ret/1, handle_async_ret/2, wait_for_async_ret/1, wait_for_async_ret/2, %% Bang functions: they return the value directly or throw an error. @@ -3292,6 +3293,62 @@ transaction(FunOrPath, Args, ReadWrite, Options) transaction(StoreId, FunOrPath, Args, ReadWrite, Options) -> khepri_machine:transaction(StoreId, FunOrPath, Args, ReadWrite, Options). +%% ------------------------------------------------------------------- +%% handle_async_ret(). +%% ------------------------------------------------------------------- + +-spec handle_async_ret(RaEvent) -> ok when + RaEvent :: ra_server_proc:ra_event(). +%% @doc Handles the Ra event sent for asynchronous call results. +%% +%% Calling this function is the same as calling +%% `handle_async_ret(StoreId, RaEvent)' with the default store ID (see {@link +%% khepri_cluster:get_default_store_id/0}). +%% +%% @see handle_async_ret/2. + +handle_async_ret(RaEvent) -> + StoreId = khepri_cluster:get_default_store_id(), + handle_async_ret(StoreId, RaEvent). + +-spec handle_async_ret(StoreId, RaEvent) -> ok when + StoreId :: khepri:store_id(), + RaEvent :: ra_server_proc:ra_event(). +%% @doc Handles the Ra event sent for asynchronous call results. +%% +%% When sending commands with `async' {@link command_options()}, the calling +%% process will receive Ra events with the following structure: +%% +%% `{ra_event, CurrentLeader, {applied, [{Correlation1, Reply1}, ..]}}' +%% +%% or +%% +%% `{ra_event, FromId, {rejected, {not_leader, Leader | undefined, Correlation}}}' +%% +%% The first event acknowledges all commands handled in a batch while the +%% second is sent per-command when commands are sent against a non-leader +%% member. +%% +%% These events should be passed to this function in order to update leader +%% information. This function does not handle retrying rejected commands or +%% return values from applied commands - the caller is responsible for those +%% tasks. +%% +%% @see async_option(). +%% @see ra:pipeline_command/4. + +handle_async_ret( + StoreId, + {ra_event, _CurrentLeader, {applied, _Correlations}}) + when ?IS_KHEPRI_STORE_ID(StoreId) -> + ok; +handle_async_ret( + StoreId, + {ra_event, FromId, {rejected, {not_leader, MaybeLeader, _CorrelationId}}}) + when ?IS_KHEPRI_STORE_ID(StoreId) -> + ok = khepri_cluster:cache_leader_if_changed(StoreId, FromId, MaybeLeader), + ok. + %% ------------------------------------------------------------------- %% wait_for_async_ret(). %% ------------------------------------------------------------------- diff --git a/test/async_option.erl b/test/async_option.erl index 9584b2d7..1a3e1110 100644 --- a/test/async_option.erl +++ b/test/async_option.erl @@ -81,10 +81,13 @@ async_with_correlation_in_put_test_() -> khepri:put( ?FUNCTION_NAME, [foo], ?NO_PAYLOAD, #{async => Correlation})), - Ret = khepri:wait_for_async_ret(Correlation), - ?assertEqual( - {ok, #{[foo] => #{}}}, - Ret), + receive + {ra_event, _, {applied, Correlations}} = AsyncRet -> + ok = khepri:handle_async_ret(?FUNCTION_NAME, AsyncRet), + ?assertEqual( + [{Correlation, {ok, #{[foo] => #{}}}}], + Correlations) + end, ?assertEqual( {ok, #{payload_version => 1}}, khepri_adv:get(?FUNCTION_NAME, [foo])) @@ -127,10 +130,13 @@ async_with_correlation_and_priority_in_put_test_() -> khepri:put( ?FUNCTION_NAME, [foo], ?NO_PAYLOAD, #{async => {Correlation, low}})), - Ret = khepri:wait_for_async_ret(Correlation), - ?assertEqual( - {ok, #{[foo] => #{}}}, - Ret), + receive + {ra_event, _, {applied, Correlations}} = AsyncRet -> + ok = khepri:handle_async_ret(?FUNCTION_NAME, AsyncRet), + ?assertEqual( + [{Correlation, {ok, #{[foo] => #{}}}}], + Correlations) + end, ?assertEqual( {ok, #{payload_version => 1}}, khepri_adv:get(?FUNCTION_NAME, [foo])) @@ -224,10 +230,13 @@ async_with_correlation_in_delete_test_() -> ok, khepri:delete( ?FUNCTION_NAME, [foo], #{async => Correlation})), - Ret = khepri:wait_for_async_ret(Correlation), - ?assertEqual( - {ok, #{[foo] => #{}}}, - Ret), + receive + {ra_event, _, {applied, Correlations}} = AsyncRet -> + ok = khepri:handle_async_ret(?FUNCTION_NAME, AsyncRet), + ?assertEqual( + [{Correlation, {ok, #{[foo] => #{}}}}], + Correlations) + end, ?assertEqual( {error, ?khepri_error(node_not_found, #{node_name => foo, @@ -281,10 +290,14 @@ async_with_correlation_and_priority_in_delete_test_() -> ok, khepri_adv:delete( ?FUNCTION_NAME, [foo], #{async => {Correlation, low}})), - Ret = khepri:wait_for_async_ret(Correlation), - ?assertEqual( - {ok, #{[foo] => #{payload_version => 1}}}, - Ret), + receive + {ra_event, _, {applied, Correlations}} = AsyncRet -> + ok = khepri:handle_async_ret(?FUNCTION_NAME, AsyncRet), + ?assertEqual( + [{Correlation, {ok, #{[foo] => + #{payload_version => 1}}}}], + Correlations) + end, ?assertEqual( {error, ?khepri_error(node_not_found, #{node_name => foo, @@ -364,10 +377,11 @@ async_with_correlation_in_transaction_test_() -> ok, khepri:transaction( ?FUNCTION_NAME, Fun, #{async => Correlation})), - Ret = khepri:wait_for_async_ret(Correlation), - ?assertEqual( - ok, - Ret), + receive + {ra_event, _, {applied, Correlations}} = AsyncRet -> + ok = khepri:handle_async_ret(?FUNCTION_NAME, AsyncRet), + ?assertEqual([{Correlation, ok}], Correlations) + end, ?assertEqual( {ok, #{payload_version => 1}}, khepri_adv:get(?FUNCTION_NAME, [foo])) @@ -391,10 +405,14 @@ async_with_correlation_in_aborted_transaction_test_() -> ok, khepri:transaction( ?FUNCTION_NAME, Fun, rw, #{async => Correlation})), - Ret = khepri:wait_for_async_ret(Correlation), - ?assertEqual( - {error, abort_reason}, - Ret), + receive + {ra_event, _, {applied, Correlations}} = AsyncRet -> + ok = khepri:handle_async_ret(?FUNCTION_NAME, AsyncRet), + [{Correlation, Result}] = Correlations, + ?assertMatch({exception, _, _, _}, Result), + Error = khepri_machine:handle_tx_exception(Result), + ?assertEqual({error, abort_reason}, Error) + end, ?assertEqual( {error, ?khepri_error(node_not_found, #{node_name => foo, @@ -442,10 +460,11 @@ async_with_correlation_and_priority_in_transaction_test_() -> khepri:transaction( ?FUNCTION_NAME, Fun, #{async => {Correlation, low}})), - Ret = khepri:wait_for_async_ret(Correlation), - ?assertEqual( - ok, - Ret), + receive + {ra_event, _, {applied, Correlations}} = AsyncRet -> + ok = khepri:handle_async_ret(?FUNCTION_NAME, AsyncRet), + ?assertEqual([{Correlation, ok}], Correlations) + end, ?assertEqual( {ok, #{payload_version => 1}}, khepri_adv:get(?FUNCTION_NAME, [foo])) @@ -464,10 +483,14 @@ wait_for_async_error_test_() -> khepri:update( ?FUNCTION_NAME, [foo], ?NO_PAYLOAD, #{async => Correlation})), - Ret = khepri:wait_for_async_ret(Correlation), - ?assertMatch( - {error, ?khepri_error(node_not_found, _)}, - Ret) + receive + {ra_event, _, {applied, Correlations}} = AsyncRet -> + ok = khepri:handle_async_ret(?FUNCTION_NAME, AsyncRet), + ?assertMatch( + [{Correlation, + {error, ?khepri_error(node_not_found, _)}}], + Correlations) + end end) ]}. diff --git a/test/cluster_SUITE.erl b/test/cluster_SUITE.erl index 1eb4ea64..f8ede47e 100644 --- a/test/cluster_SUITE.erl +++ b/test/cluster_SUITE.erl @@ -43,7 +43,8 @@ handle_leader_down_on_three_node_cluster_command/1, handle_leader_down_on_three_node_cluster_response/1, can_set_snapshot_interval/1, - projections_are_consistent_on_three_node_cluster/1]). + projections_are_consistent_on_three_node_cluster/1, + async_command_leader_change_in_three_node_cluster/1]). all() -> [can_start_a_single_node, @@ -63,7 +64,8 @@ all() -> handle_leader_down_on_three_node_cluster_command, handle_leader_down_on_three_node_cluster_response, can_set_snapshot_interval, - projections_are_consistent_on_three_node_cluster]. + projections_are_consistent_on_three_node_cluster, + async_command_leader_change_in_three_node_cluster]. groups() -> []. @@ -103,7 +105,8 @@ init_per_testcase(Testcase, Config) Testcase =:= fail_to_join_non_existing_store orelse Testcase =:= handle_leader_down_on_three_node_cluster_command orelse Testcase =:= handle_leader_down_on_three_node_cluster_response orelse - Testcase =:= projections_are_consistent_on_three_node_cluster -> + Testcase =:= projections_are_consistent_on_three_node_cluster orelse + Testcase =:= async_command_leader_change_in_three_node_cluster -> Nodes = start_n_nodes(Testcase, 3), PropsPerNode0 = [begin {ok, _} = rpc:call( @@ -1379,6 +1382,104 @@ wait_for_projection_on_nodes([Node | Rest] = Nodes, ProjectionName) -> wait_for_projection_on_nodes(Rest, ProjectionName) end. +async_command_leader_change_in_three_node_cluster(Config) -> + PropsPerNode = ?config(ra_system_props, Config), + [Node1, Node2, Node3] = Nodes = maps:keys(PropsPerNode), + + %% We assume all nodes are using the same Ra system name & store ID. + #{ra_system := RaSystem} = maps:get(Node1, PropsPerNode), + StoreId = RaSystem, + + ct:pal("Start database + cluster nodes"), + lists:foreach( + fun(Node) -> + ct:pal("- khepri:start() from node ~s", [Node]), + ?assertEqual( + {ok, StoreId}, + rpc:call(Node, khepri, start, [RaSystem, StoreId])) + end, Nodes), + lists:foreach( + fun(Node) -> + ct:pal("- khepri_cluster:join() from node ~s", [Node]), + ?assertEqual( + ok, + rpc:call(Node, khepri_cluster, join, [StoreId, Node3])) + end, [Node1, Node2]), + + LeaderId = get_leader_in_store(StoreId, Nodes), + {StoreId, LeaderNode} = LeaderId, + + ct:pal("Send an async command from the leader node ~s", [LeaderNode]), + ok = erpc:call( + LeaderNode, + fun() -> + %% This member hasn't sent any commands so the leader isn't + %% cached yet. The async call will succeed though because this + %% member is the leader. + ?assertEqual( + undefined, + khepri_cluster:get_cached_leader(StoreId)), + CorrelationId = 1, + Extra = #{async => CorrelationId}, + ok = khepri:put(StoreId, [foo], ?NO_PAYLOAD, Extra), + receive + {ra_event, + _FromId, + {applied, + [{CorrelationId, {ok, _}}]}} = AsyncRet -> + ok = khepri:handle_async_ret(StoreId, AsyncRet) + after + 5_000 -> + throw(timeout) + end + end), + + [FollowerNode, _] = Nodes -- [LeaderNode], + ct:pal("Send async commands from a follower node ~s", [FollowerNode]), + ok = erpc:call( + FollowerNode, + fun() -> + %% This member hasn't sent any commands so the leader isn't + %% cached yet. This member is not the leader so the async + %% command will fail. + ?assertEqual( + undefined, + khepri_cluster:get_cached_leader(StoreId)), + CorrelationId1 = 1, + Extra1 = #{async => CorrelationId1}, + ok = khepri:put(StoreId, [foo], ?NO_PAYLOAD, Extra1), + receive + {ra_event, + _, + {rejected, + {not_leader, _, CorrelationId1}}} = AsyncRet1 -> + ok = khepri:handle_async_ret(StoreId, AsyncRet1) + after + 1_000 -> + throw(timeout) + end, + + %% `khepri:handle_async_ret/2' updated the cached leader so + %% the async call will now send the command to the leader. + ?assertNotEqual( + undefined, + khepri_cluster:get_cached_leader(StoreId)), + CorrelationId2 = 2, + Extra2 = #{async => CorrelationId2}, + ok = khepri:put(StoreId, [foo], ?NO_PAYLOAD, Extra2), + receive + {ra_event, + _, + {applied, + [{CorrelationId2, {ok, _}}]}} = AsyncRet2 -> + ok = khepri:handle_async_ret(StoreId, AsyncRet2) + after + 1_000 -> + throw(timeout) + end + end), + ok. + %% ------------------------------------------------------------------- %% Internal functions %% ------------------------------------------------------------------- From a25e695efb3e75c280be42a5c45908e25ce53101 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Mon, 16 Oct 2023 10:17:59 +0200 Subject: [PATCH 2/2] Remove khepri:wait_for_async_ret/1,2 in favor of handle_async_ret/2 See the parent commit: `wait_for_async_ret/1,2` used a selective receive to find whether a given correlation ID was applied. The `ra_event` emitted when a batch is applied gives all correlation and result pairs applied in the batch though, making it impossible to reliably find a single correlation ID. We should instead have the caller perform the `receive` to find the applied/rejected event and handle all of the correlation IDs found within. So this change removes `wait_for_async_ret/1,2` in favor of the `handle_async_ret/2` function added in the parent commit. --- src/khepri.erl | 67 +++---------------------------------------- src/khepri_adv.erl | 16 +++++------ test/async_option.erl | 5 ---- 3 files changed, 12 insertions(+), 76 deletions(-) diff --git a/src/khepri.erl b/src/khepri.erl index 22c58fb9..1f2606dd 100644 --- a/src/khepri.erl +++ b/src/khepri.erl @@ -126,7 +126,6 @@ transaction/5, handle_async_ret/1, handle_async_ret/2, - wait_for_async_ret/1, wait_for_async_ret/2, %% Bang functions: they return the value directly or throw an error. 'get!'/1, 'get!'/2, 'get!'/3, @@ -2054,8 +2053,8 @@ put(StoreId, PathPattern, Data) -> %% khepri:command_options()}, {@link khepri:tree_options()} and {@link %% khepri:put_options()}. %% -%% When doing an asynchronous update, the {@link wait_for_async_ret/1} -%% function can be used to receive the message from Ra. +%% When doing an asynchronous update, the {@link handle_async_ret/1} +%% function can be used to handle the message received from Ra. %% %% Example: %% ``` @@ -2164,8 +2163,8 @@ put_many(StoreId, PathPattern, Data) -> %% khepri:command_options()}, {@link khepri:tree_options()} and {@link %% khepri:put_options()}. %% -%% When doing an asynchronous update, the {@link wait_for_async_ret/1} -%% function can be used to receive the message from Ra. +%% When doing an asynchronous update, the {@link handle_async_ret/1} +%% function can be used to handle the message received from Ra. %% %% Example: %% ``` @@ -3349,64 +3348,6 @@ handle_async_ret( ok = khepri_cluster:cache_leader_if_changed(StoreId, FromId, MaybeLeader), ok. -%% ------------------------------------------------------------------- -%% wait_for_async_ret(). -%% ------------------------------------------------------------------- - --spec wait_for_async_ret(Correlation) -> Ret when - Correlation :: ra_server:command_correlation(), - Ret :: khepri:minimal_ret() | - khepri:payload_ret() | - khepri:many_payloads_ret() | - khepri_adv:single_result() | - khepri_adv:many_results() | - khepri_machine:tx_ret(). -%% @doc Waits for an asynchronous call. -%% -%% Calling this function is the same as calling -%% `wait_for_async_ret(Correlation)' with the default timeout (see {@link -%% khepri_app:get_default_timeout/0}). -%% -%% @see wait_for_async_ret/2. - -wait_for_async_ret(Correlation) -> - Timeout = khepri_app:get_default_timeout(), - wait_for_async_ret(Correlation, Timeout). - --spec wait_for_async_ret(Correlation, Timeout) -> Ret when - Correlation :: ra_server:command_correlation(), - Timeout :: timeout(), - Ret :: khepri:minimal_ret() | - khepri:payload_ret() | - khepri:many_payloads_ret() | - khepri_adv:single_result() | - khepri_adv:many_results() | - khepri_machine:tx_ret(). -%% @doc Waits for an asynchronous call. -%% -%% This function waits maximum `Timeout' milliseconds (or `infinity') for the -%% result of a previous call where the `async' option was set with a -%% correlation ID. That correlation ID must be passed to this function. -%% -%% @see wait_for_async_ret/2. - -wait_for_async_ret(Correlation, Timeout) -> - receive - {ra_event, _, {applied, [{Correlation, Reply}]}} -> - case Reply of - {exception, _, _, _} = Exception -> - khepri_machine:handle_tx_exception(Exception); - ok -> - Reply; - {ok, _} -> - Reply; - {error, _} -> - Reply - end - after Timeout -> - {error, timeout} - end. - %% ------------------------------------------------------------------- %% Bang functions. %% ------------------------------------------------------------------- diff --git a/src/khepri_adv.erl b/src/khepri_adv.erl index 5813db5d..db71af63 100644 --- a/src/khepri_adv.erl +++ b/src/khepri_adv.erl @@ -360,8 +360,8 @@ put(StoreId, PathPattern, Data) -> %% khepri:command_options()}, {@link khepri:tree_options()} and {@link %% khepri:put_options()}. %% -%% When doing an asynchronous update, the {@link khepri:wait_for_async_ret/1} -%% function can be used to receive the message from Ra. +%% When doing an asynchronous update, the {@link handle_async_ret/1} +%% function should be used to handle the message received from Ra. %% %% The returned `{ok, NodeProps}' tuple contains a map with the properties and %% payload (if any) of the targeted tree node as they were before the put. @@ -481,8 +481,8 @@ put_many(StoreId, PathPattern, Data) -> %% khepri:command_options()}, {@link khepri:tree_options()} and {@link %% khepri:put_options()}. %% -%% When doing an asynchronous update, the {@link khepri:wait_for_async_ret/1} -%% function can be used to receive the message from Ra. +%% When doing an asynchronous update, the {@link handle_async_ret/1} +%% function should be used to handle the message received from Ra. %% %% Example: %% ``` @@ -847,8 +847,8 @@ delete(PathPattern, Options) when is_map(Options) -> %% payload (if any) of the targeted tree node as they were before the delete. %% If the targeted tree node didn't exist, `NodeProps' will be an empty map. %% -%% When doing an asynchronous update, the {@link khepri:wait_for_async_ret/1} -%% function can be used to receive the message from Ra. +%% When doing an asynchronous update, the {@link handle_async_ret/1} +%% function should be used to handle the message received from Ra. %% %% Example: %% ``` @@ -951,8 +951,8 @@ delete_many(PathPattern, Options) when is_map(Options) -> %% map containing the properties and payload (if any) of that deleted tree %% node as they were before the delete. %% -%% When doing an asynchronous update, the {@link khepri:wait_for_async_ret/1} -%% function can be used to receive the message from Ra. +%% When doing an asynchronous update, the {@link handle_async_ret/1} +%% function should be used to handle the message received from Ra. %% %% Example: %% ``` diff --git a/test/async_option.erl b/test/async_option.erl index 1a3e1110..b933ee4b 100644 --- a/test/async_option.erl +++ b/test/async_option.erl @@ -493,8 +493,3 @@ wait_for_async_error_test_() -> end end) ]}. - -wait_for_async_ret_but_no_async_call_test() -> - ?assertEqual( - {error, timeout}, - khepri:wait_for_async_ret(1, 100)).