Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace wait_for_async_ret with handle_async_ret for handling async Ra events #229

Merged
merged 2 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 52 additions & 54 deletions src/khepri.erl
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@
transaction/1, transaction/2, transaction/3, transaction/4,
transaction/5,

wait_for_async_ret/1, wait_for_async_ret/2,
handle_async_ret/1, handle_async_ret/2,

%% Bang functions: they return the value directly or throw an error.
'get!'/1, 'get!'/2, 'get!'/3,
Expand Down Expand Up @@ -2053,8 +2053,8 @@ put(StoreId, PathPattern, Data) ->
%% khepri:command_options()}, {@link khepri:tree_options()} and {@link
%% khepri:put_options()}.
%%
%% When doing an asynchronous update, the {@link wait_for_async_ret/1}
%% function can be used to receive the message from Ra.
%% When doing an asynchronous update, the {@link handle_async_ret/1}
%% function can be used to handle the message received from Ra.
%%
%% Example:
%% ```
Expand Down Expand Up @@ -2163,8 +2163,8 @@ put_many(StoreId, PathPattern, Data) ->
%% khepri:command_options()}, {@link khepri:tree_options()} and {@link
%% khepri:put_options()}.
%%
%% When doing an asynchronous update, the {@link wait_for_async_ret/1}
%% function can be used to receive the message from Ra.
%% When doing an asynchronous update, the {@link handle_async_ret/1}
%% function can be used to handle the message received from Ra.
%%
%% Example:
%% ```
Expand Down Expand Up @@ -3293,62 +3293,60 @@ transaction(StoreId, FunOrPath, Args, ReadWrite, Options) ->
khepri_machine:transaction(StoreId, FunOrPath, Args, ReadWrite, Options).

%% -------------------------------------------------------------------
%% wait_for_async_ret().
%% handle_async_ret().
%% -------------------------------------------------------------------

-spec wait_for_async_ret(Correlation) -> Ret when
Correlation :: ra_server:command_correlation(),
Ret :: khepri:minimal_ret() |
khepri:payload_ret() |
khepri:many_payloads_ret() |
khepri_adv:single_result() |
khepri_adv:many_results() |
khepri_machine:tx_ret().
%% @doc Waits for an asynchronous call.
-spec handle_async_ret(RaEvent) -> ok when
RaEvent :: ra_server_proc:ra_event().
%% @doc Handles the Ra event sent for asynchronous call results.
%%
%% Calling this function is the same as calling
%% `wait_for_async_ret(Correlation)' with the default timeout (see {@link
%% khepri_app:get_default_timeout/0}).
%% `handle_async_ret(StoreId, RaEvent)' with the default store ID (see {@link
%% khepri_cluster:get_default_store_id/0}).
%%
%% @see wait_for_async_ret/2.
%% @see handle_async_ret/2.

wait_for_async_ret(Correlation) ->
Timeout = khepri_app:get_default_timeout(),
wait_for_async_ret(Correlation, Timeout).
handle_async_ret(RaEvent) ->
StoreId = khepri_cluster:get_default_store_id(),
handle_async_ret(StoreId, RaEvent).

-spec wait_for_async_ret(Correlation, Timeout) -> Ret when
Correlation :: ra_server:command_correlation(),
Timeout :: timeout(),
Ret :: khepri:minimal_ret() |
khepri:payload_ret() |
khepri:many_payloads_ret() |
khepri_adv:single_result() |
khepri_adv:many_results() |
khepri_machine:tx_ret().
%% @doc Waits for an asynchronous call.
%%
%% This function waits maximum `Timeout' milliseconds (or `infinity') for the
%% result of a previous call where the `async' option was set with a
%% correlation ID. That correlation ID must be passed to this function.
%%
%% @see wait_for_async_ret/2.

wait_for_async_ret(Correlation, Timeout) ->
receive
{ra_event, _, {applied, [{Correlation, Reply}]}} ->
case Reply of
{exception, _, _, _} = Exception ->
khepri_machine:handle_tx_exception(Exception);
ok ->
Reply;
{ok, _} ->
Reply;
{error, _} ->
Reply
end
after Timeout ->
{error, timeout}
end.
-spec handle_async_ret(StoreId, RaEvent) -> ok when
StoreId :: khepri:store_id(),
RaEvent :: ra_server_proc:ra_event().
the-mikedavis marked this conversation as resolved.
Show resolved Hide resolved
%% @doc Handles the Ra event sent for asynchronous call results.
%%
%% When sending commands with `async' {@link command_options()}, the calling
%% process will receive Ra events with the following structure:
%%
%% `{ra_event, CurrentLeader, {applied, [{Correlation1, Reply1}, ..]}}'
%%
%% or
%%
%% `{ra_event, FromId, {rejected, {not_leader, Leader | undefined, Correlation}}}'
%%
%% The first event acknowledges all commands handled in a batch while the
%% second is sent per-command when commands are sent against a non-leader
%% member.
%%
%% These events should be passed to this function in order to update leader
%% information. This function does not handle retrying rejected commands or
%% return values from applied commands - the caller is responsible for those
%% tasks.
%%
%% @see async_option().
%% @see ra:pipeline_command/4.

handle_async_ret(
StoreId,
{ra_event, _CurrentLeader, {applied, _Correlations}})
when ?IS_KHEPRI_STORE_ID(StoreId) ->
ok;
handle_async_ret(
StoreId,
{ra_event, FromId, {rejected, {not_leader, MaybeLeader, _CorrelationId}}})
when ?IS_KHEPRI_STORE_ID(StoreId) ->
ok = khepri_cluster:cache_leader_if_changed(StoreId, FromId, MaybeLeader),
ok.

%% -------------------------------------------------------------------
%% Bang functions.
Expand Down
16 changes: 8 additions & 8 deletions src/khepri_adv.erl
Original file line number Diff line number Diff line change
Expand Up @@ -360,8 +360,8 @@ put(StoreId, PathPattern, Data) ->
%% khepri:command_options()}, {@link khepri:tree_options()} and {@link
%% khepri:put_options()}.
%%
%% When doing an asynchronous update, the {@link khepri:wait_for_async_ret/1}
%% function can be used to receive the message from Ra.
%% When doing an asynchronous update, the {@link handle_async_ret/1}
%% function should be used to handle the message received from Ra.
%%
%% The returned `{ok, NodeProps}' tuple contains a map with the properties and
%% payload (if any) of the targeted tree node as they were before the put.
Expand Down Expand Up @@ -481,8 +481,8 @@ put_many(StoreId, PathPattern, Data) ->
%% khepri:command_options()}, {@link khepri:tree_options()} and {@link
%% khepri:put_options()}.
%%
%% When doing an asynchronous update, the {@link khepri:wait_for_async_ret/1}
%% function can be used to receive the message from Ra.
%% When doing an asynchronous update, the {@link handle_async_ret/1}
%% function should be used to handle the message received from Ra.
%%
%% Example:
%% ```
Expand Down Expand Up @@ -847,8 +847,8 @@ delete(PathPattern, Options) when is_map(Options) ->
%% payload (if any) of the targeted tree node as they were before the delete.
%% If the targeted tree node didn't exist, `NodeProps' will be an empty map.
%%
%% When doing an asynchronous update, the {@link khepri:wait_for_async_ret/1}
%% function can be used to receive the message from Ra.
%% When doing an asynchronous update, the {@link handle_async_ret/1}
%% function should be used to handle the message received from Ra.
%%
%% Example:
%% ```
Expand Down Expand Up @@ -951,8 +951,8 @@ delete_many(PathPattern, Options) when is_map(Options) ->
%% map containing the properties and payload (if any) of that deleted tree
%% node as they were before the delete.
%%
%% When doing an asynchronous update, the {@link khepri:wait_for_async_ret/1}
%% function can be used to receive the message from Ra.
%% When doing an asynchronous update, the {@link handle_async_ret/1}
%% function should be used to handle the message received from Ra.
%%
%% Example:
%% ```
Expand Down
92 changes: 55 additions & 37 deletions test/async_option.erl
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,13 @@ async_with_correlation_in_put_test_() ->
khepri:put(
?FUNCTION_NAME, [foo], ?NO_PAYLOAD,
#{async => Correlation})),
Ret = khepri:wait_for_async_ret(Correlation),
?assertEqual(
{ok, #{[foo] => #{}}},
Ret),
receive
{ra_event, _, {applied, Correlations}} = AsyncRet ->
ok = khepri:handle_async_ret(?FUNCTION_NAME, AsyncRet),
?assertEqual(
[{Correlation, {ok, #{[foo] => #{}}}}],
Correlations)
end,
?assertEqual(
{ok, #{payload_version => 1}},
khepri_adv:get(?FUNCTION_NAME, [foo]))
Expand Down Expand Up @@ -127,10 +130,13 @@ async_with_correlation_and_priority_in_put_test_() ->
khepri:put(
?FUNCTION_NAME, [foo], ?NO_PAYLOAD,
#{async => {Correlation, low}})),
Ret = khepri:wait_for_async_ret(Correlation),
?assertEqual(
{ok, #{[foo] => #{}}},
Ret),
receive
{ra_event, _, {applied, Correlations}} = AsyncRet ->
ok = khepri:handle_async_ret(?FUNCTION_NAME, AsyncRet),
?assertEqual(
[{Correlation, {ok, #{[foo] => #{}}}}],
Correlations)
end,
?assertEqual(
{ok, #{payload_version => 1}},
khepri_adv:get(?FUNCTION_NAME, [foo]))
Expand Down Expand Up @@ -224,10 +230,13 @@ async_with_correlation_in_delete_test_() ->
ok,
khepri:delete(
?FUNCTION_NAME, [foo], #{async => Correlation})),
Ret = khepri:wait_for_async_ret(Correlation),
?assertEqual(
{ok, #{[foo] => #{}}},
Ret),
receive
{ra_event, _, {applied, Correlations}} = AsyncRet ->
ok = khepri:handle_async_ret(?FUNCTION_NAME, AsyncRet),
?assertEqual(
[{Correlation, {ok, #{[foo] => #{}}}}],
Correlations)
end,
?assertEqual(
{error,
?khepri_error(node_not_found, #{node_name => foo,
Expand Down Expand Up @@ -281,10 +290,14 @@ async_with_correlation_and_priority_in_delete_test_() ->
ok,
khepri_adv:delete(
?FUNCTION_NAME, [foo], #{async => {Correlation, low}})),
Ret = khepri:wait_for_async_ret(Correlation),
?assertEqual(
{ok, #{[foo] => #{payload_version => 1}}},
Ret),
receive
{ra_event, _, {applied, Correlations}} = AsyncRet ->
ok = khepri:handle_async_ret(?FUNCTION_NAME, AsyncRet),
?assertEqual(
[{Correlation, {ok, #{[foo] =>
#{payload_version => 1}}}}],
Correlations)
end,
?assertEqual(
{error,
?khepri_error(node_not_found, #{node_name => foo,
Expand Down Expand Up @@ -364,10 +377,11 @@ async_with_correlation_in_transaction_test_() ->
ok,
khepri:transaction(
?FUNCTION_NAME, Fun, #{async => Correlation})),
Ret = khepri:wait_for_async_ret(Correlation),
?assertEqual(
ok,
Ret),
receive
{ra_event, _, {applied, Correlations}} = AsyncRet ->
ok = khepri:handle_async_ret(?FUNCTION_NAME, AsyncRet),
?assertEqual([{Correlation, ok}], Correlations)
end,
?assertEqual(
{ok, #{payload_version => 1}},
khepri_adv:get(?FUNCTION_NAME, [foo]))
Expand All @@ -391,10 +405,14 @@ async_with_correlation_in_aborted_transaction_test_() ->
ok,
khepri:transaction(
?FUNCTION_NAME, Fun, rw, #{async => Correlation})),
Ret = khepri:wait_for_async_ret(Correlation),
?assertEqual(
{error, abort_reason},
Ret),
receive
{ra_event, _, {applied, Correlations}} = AsyncRet ->
ok = khepri:handle_async_ret(?FUNCTION_NAME, AsyncRet),
[{Correlation, Result}] = Correlations,
?assertMatch({exception, _, _, _}, Result),
Error = khepri_machine:handle_tx_exception(Result),
?assertEqual({error, abort_reason}, Error)
end,
?assertEqual(
{error,
?khepri_error(node_not_found, #{node_name => foo,
Expand Down Expand Up @@ -442,10 +460,11 @@ async_with_correlation_and_priority_in_transaction_test_() ->
khepri:transaction(
?FUNCTION_NAME, Fun,
#{async => {Correlation, low}})),
Ret = khepri:wait_for_async_ret(Correlation),
?assertEqual(
ok,
Ret),
receive
{ra_event, _, {applied, Correlations}} = AsyncRet ->
ok = khepri:handle_async_ret(?FUNCTION_NAME, AsyncRet),
?assertEqual([{Correlation, ok}], Correlations)
end,
?assertEqual(
{ok, #{payload_version => 1}},
khepri_adv:get(?FUNCTION_NAME, [foo]))
Expand All @@ -464,14 +483,13 @@ wait_for_async_error_test_() ->
khepri:update(
?FUNCTION_NAME, [foo], ?NO_PAYLOAD,
#{async => Correlation})),
Ret = khepri:wait_for_async_ret(Correlation),
?assertMatch(
{error, ?khepri_error(node_not_found, _)},
Ret)
receive
{ra_event, _, {applied, Correlations}} = AsyncRet ->
ok = khepri:handle_async_ret(?FUNCTION_NAME, AsyncRet),
?assertMatch(
[{Correlation,
{error, ?khepri_error(node_not_found, _)}}],
Correlations)
end
end)
]}.

wait_for_async_ret_but_no_async_call_test() ->
?assertEqual(
{error, timeout},
khepri:wait_for_async_ret(1, 100)).
Loading