diff --git a/src/khepri.erl b/src/khepri.erl index 7e43a78b..1f2606dd 100644 --- a/src/khepri.erl +++ b/src/khepri.erl @@ -125,7 +125,7 @@ transaction/1, transaction/2, transaction/3, transaction/4, transaction/5, - wait_for_async_ret/1, wait_for_async_ret/2, + handle_async_ret/1, handle_async_ret/2, %% Bang functions: they return the value directly or throw an error. 'get!'/1, 'get!'/2, 'get!'/3, @@ -2053,8 +2053,8 @@ put(StoreId, PathPattern, Data) -> %% khepri:command_options()}, {@link khepri:tree_options()} and {@link %% khepri:put_options()}. %% -%% When doing an asynchronous update, the {@link wait_for_async_ret/1} -%% function can be used to receive the message from Ra. +%% When doing an asynchronous update, the {@link handle_async_ret/1} +%% function can be used to handle the message received from Ra. %% %% Example: %% ``` @@ -2163,8 +2163,8 @@ put_many(StoreId, PathPattern, Data) -> %% khepri:command_options()}, {@link khepri:tree_options()} and {@link %% khepri:put_options()}. %% -%% When doing an asynchronous update, the {@link wait_for_async_ret/1} -%% function can be used to receive the message from Ra. +%% When doing an asynchronous update, the {@link handle_async_ret/1} +%% function can be used to handle the message received from Ra. %% %% Example: %% ``` @@ -3293,62 +3293,60 @@ transaction(StoreId, FunOrPath, Args, ReadWrite, Options) -> khepri_machine:transaction(StoreId, FunOrPath, Args, ReadWrite, Options). %% ------------------------------------------------------------------- -%% wait_for_async_ret(). +%% handle_async_ret(). %% ------------------------------------------------------------------- --spec wait_for_async_ret(Correlation) -> Ret when - Correlation :: ra_server:command_correlation(), - Ret :: khepri:minimal_ret() | - khepri:payload_ret() | - khepri:many_payloads_ret() | - khepri_adv:single_result() | - khepri_adv:many_results() | - khepri_machine:tx_ret(). -%% @doc Waits for an asynchronous call. +-spec handle_async_ret(RaEvent) -> ok when + RaEvent :: ra_server_proc:ra_event(). +%% @doc Handles the Ra event sent for asynchronous call results. %% %% Calling this function is the same as calling -%% `wait_for_async_ret(Correlation)' with the default timeout (see {@link -%% khepri_app:get_default_timeout/0}). +%% `handle_async_ret(StoreId, RaEvent)' with the default store ID (see {@link +%% khepri_cluster:get_default_store_id/0}). %% -%% @see wait_for_async_ret/2. +%% @see handle_async_ret/2. -wait_for_async_ret(Correlation) -> - Timeout = khepri_app:get_default_timeout(), - wait_for_async_ret(Correlation, Timeout). +handle_async_ret(RaEvent) -> + StoreId = khepri_cluster:get_default_store_id(), + handle_async_ret(StoreId, RaEvent). --spec wait_for_async_ret(Correlation, Timeout) -> Ret when - Correlation :: ra_server:command_correlation(), - Timeout :: timeout(), - Ret :: khepri:minimal_ret() | - khepri:payload_ret() | - khepri:many_payloads_ret() | - khepri_adv:single_result() | - khepri_adv:many_results() | - khepri_machine:tx_ret(). -%% @doc Waits for an asynchronous call. -%% -%% This function waits maximum `Timeout' milliseconds (or `infinity') for the -%% result of a previous call where the `async' option was set with a -%% correlation ID. That correlation ID must be passed to this function. -%% -%% @see wait_for_async_ret/2. - -wait_for_async_ret(Correlation, Timeout) -> - receive - {ra_event, _, {applied, [{Correlation, Reply}]}} -> - case Reply of - {exception, _, _, _} = Exception -> - khepri_machine:handle_tx_exception(Exception); - ok -> - Reply; - {ok, _} -> - Reply; - {error, _} -> - Reply - end - after Timeout -> - {error, timeout} - end. +-spec handle_async_ret(StoreId, RaEvent) -> ok when + StoreId :: khepri:store_id(), + RaEvent :: ra_server_proc:ra_event(). +%% @doc Handles the Ra event sent for asynchronous call results. +%% +%% When sending commands with `async' {@link command_options()}, the calling +%% process will receive Ra events with the following structure: +%% +%% `{ra_event, CurrentLeader, {applied, [{Correlation1, Reply1}, ..]}}' +%% +%% or +%% +%% `{ra_event, FromId, {rejected, {not_leader, Leader | undefined, Correlation}}}' +%% +%% The first event acknowledges all commands handled in a batch while the +%% second is sent per-command when commands are sent against a non-leader +%% member. +%% +%% These events should be passed to this function in order to update leader +%% information. This function does not handle retrying rejected commands or +%% return values from applied commands - the caller is responsible for those +%% tasks. +%% +%% @see async_option(). +%% @see ra:pipeline_command/4. + +handle_async_ret( + StoreId, + {ra_event, _CurrentLeader, {applied, _Correlations}}) + when ?IS_KHEPRI_STORE_ID(StoreId) -> + ok; +handle_async_ret( + StoreId, + {ra_event, FromId, {rejected, {not_leader, MaybeLeader, _CorrelationId}}}) + when ?IS_KHEPRI_STORE_ID(StoreId) -> + ok = khepri_cluster:cache_leader_if_changed(StoreId, FromId, MaybeLeader), + ok. %% ------------------------------------------------------------------- %% Bang functions. diff --git a/src/khepri_adv.erl b/src/khepri_adv.erl index 5813db5d..db71af63 100644 --- a/src/khepri_adv.erl +++ b/src/khepri_adv.erl @@ -360,8 +360,8 @@ put(StoreId, PathPattern, Data) -> %% khepri:command_options()}, {@link khepri:tree_options()} and {@link %% khepri:put_options()}. %% -%% When doing an asynchronous update, the {@link khepri:wait_for_async_ret/1} -%% function can be used to receive the message from Ra. +%% When doing an asynchronous update, the {@link handle_async_ret/1} +%% function should be used to handle the message received from Ra. %% %% The returned `{ok, NodeProps}' tuple contains a map with the properties and %% payload (if any) of the targeted tree node as they were before the put. @@ -481,8 +481,8 @@ put_many(StoreId, PathPattern, Data) -> %% khepri:command_options()}, {@link khepri:tree_options()} and {@link %% khepri:put_options()}. %% -%% When doing an asynchronous update, the {@link khepri:wait_for_async_ret/1} -%% function can be used to receive the message from Ra. +%% When doing an asynchronous update, the {@link handle_async_ret/1} +%% function should be used to handle the message received from Ra. %% %% Example: %% ``` @@ -847,8 +847,8 @@ delete(PathPattern, Options) when is_map(Options) -> %% payload (if any) of the targeted tree node as they were before the delete. %% If the targeted tree node didn't exist, `NodeProps' will be an empty map. %% -%% When doing an asynchronous update, the {@link khepri:wait_for_async_ret/1} -%% function can be used to receive the message from Ra. +%% When doing an asynchronous update, the {@link handle_async_ret/1} +%% function should be used to handle the message received from Ra. %% %% Example: %% ``` @@ -951,8 +951,8 @@ delete_many(PathPattern, Options) when is_map(Options) -> %% map containing the properties and payload (if any) of that deleted tree %% node as they were before the delete. %% -%% When doing an asynchronous update, the {@link khepri:wait_for_async_ret/1} -%% function can be used to receive the message from Ra. +%% When doing an asynchronous update, the {@link handle_async_ret/1} +%% function should be used to handle the message received from Ra. %% %% Example: %% ``` diff --git a/test/async_option.erl b/test/async_option.erl index 9584b2d7..b933ee4b 100644 --- a/test/async_option.erl +++ b/test/async_option.erl @@ -81,10 +81,13 @@ async_with_correlation_in_put_test_() -> khepri:put( ?FUNCTION_NAME, [foo], ?NO_PAYLOAD, #{async => Correlation})), - Ret = khepri:wait_for_async_ret(Correlation), - ?assertEqual( - {ok, #{[foo] => #{}}}, - Ret), + receive + {ra_event, _, {applied, Correlations}} = AsyncRet -> + ok = khepri:handle_async_ret(?FUNCTION_NAME, AsyncRet), + ?assertEqual( + [{Correlation, {ok, #{[foo] => #{}}}}], + Correlations) + end, ?assertEqual( {ok, #{payload_version => 1}}, khepri_adv:get(?FUNCTION_NAME, [foo])) @@ -127,10 +130,13 @@ async_with_correlation_and_priority_in_put_test_() -> khepri:put( ?FUNCTION_NAME, [foo], ?NO_PAYLOAD, #{async => {Correlation, low}})), - Ret = khepri:wait_for_async_ret(Correlation), - ?assertEqual( - {ok, #{[foo] => #{}}}, - Ret), + receive + {ra_event, _, {applied, Correlations}} = AsyncRet -> + ok = khepri:handle_async_ret(?FUNCTION_NAME, AsyncRet), + ?assertEqual( + [{Correlation, {ok, #{[foo] => #{}}}}], + Correlations) + end, ?assertEqual( {ok, #{payload_version => 1}}, khepri_adv:get(?FUNCTION_NAME, [foo])) @@ -224,10 +230,13 @@ async_with_correlation_in_delete_test_() -> ok, khepri:delete( ?FUNCTION_NAME, [foo], #{async => Correlation})), - Ret = khepri:wait_for_async_ret(Correlation), - ?assertEqual( - {ok, #{[foo] => #{}}}, - Ret), + receive + {ra_event, _, {applied, Correlations}} = AsyncRet -> + ok = khepri:handle_async_ret(?FUNCTION_NAME, AsyncRet), + ?assertEqual( + [{Correlation, {ok, #{[foo] => #{}}}}], + Correlations) + end, ?assertEqual( {error, ?khepri_error(node_not_found, #{node_name => foo, @@ -281,10 +290,14 @@ async_with_correlation_and_priority_in_delete_test_() -> ok, khepri_adv:delete( ?FUNCTION_NAME, [foo], #{async => {Correlation, low}})), - Ret = khepri:wait_for_async_ret(Correlation), - ?assertEqual( - {ok, #{[foo] => #{payload_version => 1}}}, - Ret), + receive + {ra_event, _, {applied, Correlations}} = AsyncRet -> + ok = khepri:handle_async_ret(?FUNCTION_NAME, AsyncRet), + ?assertEqual( + [{Correlation, {ok, #{[foo] => + #{payload_version => 1}}}}], + Correlations) + end, ?assertEqual( {error, ?khepri_error(node_not_found, #{node_name => foo, @@ -364,10 +377,11 @@ async_with_correlation_in_transaction_test_() -> ok, khepri:transaction( ?FUNCTION_NAME, Fun, #{async => Correlation})), - Ret = khepri:wait_for_async_ret(Correlation), - ?assertEqual( - ok, - Ret), + receive + {ra_event, _, {applied, Correlations}} = AsyncRet -> + ok = khepri:handle_async_ret(?FUNCTION_NAME, AsyncRet), + ?assertEqual([{Correlation, ok}], Correlations) + end, ?assertEqual( {ok, #{payload_version => 1}}, khepri_adv:get(?FUNCTION_NAME, [foo])) @@ -391,10 +405,14 @@ async_with_correlation_in_aborted_transaction_test_() -> ok, khepri:transaction( ?FUNCTION_NAME, Fun, rw, #{async => Correlation})), - Ret = khepri:wait_for_async_ret(Correlation), - ?assertEqual( - {error, abort_reason}, - Ret), + receive + {ra_event, _, {applied, Correlations}} = AsyncRet -> + ok = khepri:handle_async_ret(?FUNCTION_NAME, AsyncRet), + [{Correlation, Result}] = Correlations, + ?assertMatch({exception, _, _, _}, Result), + Error = khepri_machine:handle_tx_exception(Result), + ?assertEqual({error, abort_reason}, Error) + end, ?assertEqual( {error, ?khepri_error(node_not_found, #{node_name => foo, @@ -442,10 +460,11 @@ async_with_correlation_and_priority_in_transaction_test_() -> khepri:transaction( ?FUNCTION_NAME, Fun, #{async => {Correlation, low}})), - Ret = khepri:wait_for_async_ret(Correlation), - ?assertEqual( - ok, - Ret), + receive + {ra_event, _, {applied, Correlations}} = AsyncRet -> + ok = khepri:handle_async_ret(?FUNCTION_NAME, AsyncRet), + ?assertEqual([{Correlation, ok}], Correlations) + end, ?assertEqual( {ok, #{payload_version => 1}}, khepri_adv:get(?FUNCTION_NAME, [foo])) @@ -464,14 +483,13 @@ wait_for_async_error_test_() -> khepri:update( ?FUNCTION_NAME, [foo], ?NO_PAYLOAD, #{async => Correlation})), - Ret = khepri:wait_for_async_ret(Correlation), - ?assertMatch( - {error, ?khepri_error(node_not_found, _)}, - Ret) + receive + {ra_event, _, {applied, Correlations}} = AsyncRet -> + ok = khepri:handle_async_ret(?FUNCTION_NAME, AsyncRet), + ?assertMatch( + [{Correlation, + {error, ?khepri_error(node_not_found, _)}}], + Correlations) + end end) ]}. - -wait_for_async_ret_but_no_async_call_test() -> - ?assertEqual( - {error, timeout}, - khepri:wait_for_async_ret(1, 100)). diff --git a/test/cluster_SUITE.erl b/test/cluster_SUITE.erl index 1eb4ea64..f8ede47e 100644 --- a/test/cluster_SUITE.erl +++ b/test/cluster_SUITE.erl @@ -43,7 +43,8 @@ handle_leader_down_on_three_node_cluster_command/1, handle_leader_down_on_three_node_cluster_response/1, can_set_snapshot_interval/1, - projections_are_consistent_on_three_node_cluster/1]). + projections_are_consistent_on_three_node_cluster/1, + async_command_leader_change_in_three_node_cluster/1]). all() -> [can_start_a_single_node, @@ -63,7 +64,8 @@ all() -> handle_leader_down_on_three_node_cluster_command, handle_leader_down_on_three_node_cluster_response, can_set_snapshot_interval, - projections_are_consistent_on_three_node_cluster]. + projections_are_consistent_on_three_node_cluster, + async_command_leader_change_in_three_node_cluster]. groups() -> []. @@ -103,7 +105,8 @@ init_per_testcase(Testcase, Config) Testcase =:= fail_to_join_non_existing_store orelse Testcase =:= handle_leader_down_on_three_node_cluster_command orelse Testcase =:= handle_leader_down_on_three_node_cluster_response orelse - Testcase =:= projections_are_consistent_on_three_node_cluster -> + Testcase =:= projections_are_consistent_on_three_node_cluster orelse + Testcase =:= async_command_leader_change_in_three_node_cluster -> Nodes = start_n_nodes(Testcase, 3), PropsPerNode0 = [begin {ok, _} = rpc:call( @@ -1379,6 +1382,104 @@ wait_for_projection_on_nodes([Node | Rest] = Nodes, ProjectionName) -> wait_for_projection_on_nodes(Rest, ProjectionName) end. +async_command_leader_change_in_three_node_cluster(Config) -> + PropsPerNode = ?config(ra_system_props, Config), + [Node1, Node2, Node3] = Nodes = maps:keys(PropsPerNode), + + %% We assume all nodes are using the same Ra system name & store ID. + #{ra_system := RaSystem} = maps:get(Node1, PropsPerNode), + StoreId = RaSystem, + + ct:pal("Start database + cluster nodes"), + lists:foreach( + fun(Node) -> + ct:pal("- khepri:start() from node ~s", [Node]), + ?assertEqual( + {ok, StoreId}, + rpc:call(Node, khepri, start, [RaSystem, StoreId])) + end, Nodes), + lists:foreach( + fun(Node) -> + ct:pal("- khepri_cluster:join() from node ~s", [Node]), + ?assertEqual( + ok, + rpc:call(Node, khepri_cluster, join, [StoreId, Node3])) + end, [Node1, Node2]), + + LeaderId = get_leader_in_store(StoreId, Nodes), + {StoreId, LeaderNode} = LeaderId, + + ct:pal("Send an async command from the leader node ~s", [LeaderNode]), + ok = erpc:call( + LeaderNode, + fun() -> + %% This member hasn't sent any commands so the leader isn't + %% cached yet. The async call will succeed though because this + %% member is the leader. + ?assertEqual( + undefined, + khepri_cluster:get_cached_leader(StoreId)), + CorrelationId = 1, + Extra = #{async => CorrelationId}, + ok = khepri:put(StoreId, [foo], ?NO_PAYLOAD, Extra), + receive + {ra_event, + _FromId, + {applied, + [{CorrelationId, {ok, _}}]}} = AsyncRet -> + ok = khepri:handle_async_ret(StoreId, AsyncRet) + after + 5_000 -> + throw(timeout) + end + end), + + [FollowerNode, _] = Nodes -- [LeaderNode], + ct:pal("Send async commands from a follower node ~s", [FollowerNode]), + ok = erpc:call( + FollowerNode, + fun() -> + %% This member hasn't sent any commands so the leader isn't + %% cached yet. This member is not the leader so the async + %% command will fail. + ?assertEqual( + undefined, + khepri_cluster:get_cached_leader(StoreId)), + CorrelationId1 = 1, + Extra1 = #{async => CorrelationId1}, + ok = khepri:put(StoreId, [foo], ?NO_PAYLOAD, Extra1), + receive + {ra_event, + _, + {rejected, + {not_leader, _, CorrelationId1}}} = AsyncRet1 -> + ok = khepri:handle_async_ret(StoreId, AsyncRet1) + after + 1_000 -> + throw(timeout) + end, + + %% `khepri:handle_async_ret/2' updated the cached leader so + %% the async call will now send the command to the leader. + ?assertNotEqual( + undefined, + khepri_cluster:get_cached_leader(StoreId)), + CorrelationId2 = 2, + Extra2 = #{async => CorrelationId2}, + ok = khepri:put(StoreId, [foo], ?NO_PAYLOAD, Extra2), + receive + {ra_event, + _, + {applied, + [{CorrelationId2, {ok, _}}]}} = AsyncRet2 -> + ok = khepri:handle_async_ret(StoreId, AsyncRet2) + after + 1_000 -> + throw(timeout) + end + end), + ok. + %% ------------------------------------------------------------------- %% Internal functions %% -------------------------------------------------------------------