Skip to content

Commit

Permalink
Merge pull request #229 from rabbitmq/md-handle-async-ret
Browse files Browse the repository at this point in the history
Replace `wait_for_async_ret` with `handle_async_ret` for handling async Ra events
  • Loading branch information
dumbbell authored Oct 31, 2023
2 parents f8a969d + a25e695 commit f843e4a
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 102 deletions.
106 changes: 52 additions & 54 deletions src/khepri.erl
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@
transaction/1, transaction/2, transaction/3, transaction/4,
transaction/5,

wait_for_async_ret/1, wait_for_async_ret/2,
handle_async_ret/1, handle_async_ret/2,

%% Bang functions: they return the value directly or throw an error.
'get!'/1, 'get!'/2, 'get!'/3,
Expand Down Expand Up @@ -2053,8 +2053,8 @@ put(StoreId, PathPattern, Data) ->
%% khepri:command_options()}, {@link khepri:tree_options()} and {@link
%% khepri:put_options()}.
%%
%% When doing an asynchronous update, the {@link wait_for_async_ret/1}
%% function can be used to receive the message from Ra.
%% When doing an asynchronous update, the {@link handle_async_ret/1}
%% function can be used to handle the message received from Ra.
%%
%% Example:
%% ```
Expand Down Expand Up @@ -2163,8 +2163,8 @@ put_many(StoreId, PathPattern, Data) ->
%% khepri:command_options()}, {@link khepri:tree_options()} and {@link
%% khepri:put_options()}.
%%
%% When doing an asynchronous update, the {@link wait_for_async_ret/1}
%% function can be used to receive the message from Ra.
%% When doing an asynchronous update, the {@link handle_async_ret/1}
%% function can be used to handle the message received from Ra.
%%
%% Example:
%% ```
Expand Down Expand Up @@ -3293,62 +3293,60 @@ transaction(StoreId, FunOrPath, Args, ReadWrite, Options) ->
khepri_machine:transaction(StoreId, FunOrPath, Args, ReadWrite, Options).

%% -------------------------------------------------------------------
%% wait_for_async_ret().
%% handle_async_ret().
%% -------------------------------------------------------------------

-spec wait_for_async_ret(Correlation) -> Ret when
Correlation :: ra_server:command_correlation(),
Ret :: khepri:minimal_ret() |
khepri:payload_ret() |
khepri:many_payloads_ret() |
khepri_adv:single_result() |
khepri_adv:many_results() |
khepri_machine:tx_ret().
%% @doc Waits for an asynchronous call.
-spec handle_async_ret(RaEvent) -> ok when
RaEvent :: ra_server_proc:ra_event().
%% @doc Handles the Ra event sent for asynchronous call results.
%%
%% Calling this function is the same as calling
%% `wait_for_async_ret(Correlation)' with the default timeout (see {@link
%% khepri_app:get_default_timeout/0}).
%% `handle_async_ret(StoreId, RaEvent)' with the default store ID (see {@link
%% khepri_cluster:get_default_store_id/0}).
%%
%% @see wait_for_async_ret/2.
%% @see handle_async_ret/2.

wait_for_async_ret(Correlation) ->
Timeout = khepri_app:get_default_timeout(),
wait_for_async_ret(Correlation, Timeout).
handle_async_ret(RaEvent) ->
StoreId = khepri_cluster:get_default_store_id(),
handle_async_ret(StoreId, RaEvent).

-spec wait_for_async_ret(Correlation, Timeout) -> Ret when
Correlation :: ra_server:command_correlation(),
Timeout :: timeout(),
Ret :: khepri:minimal_ret() |
khepri:payload_ret() |
khepri:many_payloads_ret() |
khepri_adv:single_result() |
khepri_adv:many_results() |
khepri_machine:tx_ret().
%% @doc Waits for an asynchronous call.
%%
%% This function waits maximum `Timeout' milliseconds (or `infinity') for the
%% result of a previous call where the `async' option was set with a
%% correlation ID. That correlation ID must be passed to this function.
%%
%% @see wait_for_async_ret/2.

wait_for_async_ret(Correlation, Timeout) ->
receive
{ra_event, _, {applied, [{Correlation, Reply}]}} ->
case Reply of
{exception, _, _, _} = Exception ->
khepri_machine:handle_tx_exception(Exception);
ok ->
Reply;
{ok, _} ->
Reply;
{error, _} ->
Reply
end
after Timeout ->
{error, timeout}
end.
-spec handle_async_ret(StoreId, RaEvent) -> ok when
StoreId :: khepri:store_id(),
RaEvent :: ra_server_proc:ra_event().
%% @doc Handles the Ra event sent for asynchronous call results.
%%
%% When sending commands with `async' {@link command_options()}, the calling
%% process will receive Ra events with the following structure:
%%
%% `{ra_event, CurrentLeader, {applied, [{Correlation1, Reply1}, ..]}}'
%%
%% or
%%
%% `{ra_event, FromId, {rejected, {not_leader, Leader | undefined, Correlation}}}'
%%
%% The first event acknowledges all commands handled in a batch while the
%% second is sent per-command when commands are sent against a non-leader
%% member.
%%
%% These events should be passed to this function in order to update leader
%% information. This function does not handle retrying rejected commands or
%% return values from applied commands - the caller is responsible for those
%% tasks.
%%
%% @see async_option().
%% @see ra:pipeline_command/4.

handle_async_ret(
StoreId,
{ra_event, _CurrentLeader, {applied, _Correlations}})
when ?IS_KHEPRI_STORE_ID(StoreId) ->
ok;
handle_async_ret(
StoreId,
{ra_event, FromId, {rejected, {not_leader, MaybeLeader, _CorrelationId}}})
when ?IS_KHEPRI_STORE_ID(StoreId) ->
ok = khepri_cluster:cache_leader_if_changed(StoreId, FromId, MaybeLeader),
ok.

%% -------------------------------------------------------------------
%% Bang functions.
Expand Down
16 changes: 8 additions & 8 deletions src/khepri_adv.erl
Original file line number Diff line number Diff line change
Expand Up @@ -360,8 +360,8 @@ put(StoreId, PathPattern, Data) ->
%% khepri:command_options()}, {@link khepri:tree_options()} and {@link
%% khepri:put_options()}.
%%
%% When doing an asynchronous update, the {@link khepri:wait_for_async_ret/1}
%% function can be used to receive the message from Ra.
%% When doing an asynchronous update, the {@link handle_async_ret/1}
%% function should be used to handle the message received from Ra.
%%
%% The returned `{ok, NodeProps}' tuple contains a map with the properties and
%% payload (if any) of the targeted tree node as they were before the put.
Expand Down Expand Up @@ -481,8 +481,8 @@ put_many(StoreId, PathPattern, Data) ->
%% khepri:command_options()}, {@link khepri:tree_options()} and {@link
%% khepri:put_options()}.
%%
%% When doing an asynchronous update, the {@link khepri:wait_for_async_ret/1}
%% function can be used to receive the message from Ra.
%% When doing an asynchronous update, the {@link handle_async_ret/1}
%% function should be used to handle the message received from Ra.
%%
%% Example:
%% ```
Expand Down Expand Up @@ -847,8 +847,8 @@ delete(PathPattern, Options) when is_map(Options) ->
%% payload (if any) of the targeted tree node as they were before the delete.
%% If the targeted tree node didn't exist, `NodeProps' will be an empty map.
%%
%% When doing an asynchronous update, the {@link khepri:wait_for_async_ret/1}
%% function can be used to receive the message from Ra.
%% When doing an asynchronous update, the {@link handle_async_ret/1}
%% function should be used to handle the message received from Ra.
%%
%% Example:
%% ```
Expand Down Expand Up @@ -951,8 +951,8 @@ delete_many(PathPattern, Options) when is_map(Options) ->
%% map containing the properties and payload (if any) of that deleted tree
%% node as they were before the delete.
%%
%% When doing an asynchronous update, the {@link khepri:wait_for_async_ret/1}
%% function can be used to receive the message from Ra.
%% When doing an asynchronous update, the {@link handle_async_ret/1}
%% function should be used to handle the message received from Ra.
%%
%% Example:
%% ```
Expand Down
92 changes: 55 additions & 37 deletions test/async_option.erl
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,13 @@ async_with_correlation_in_put_test_() ->
khepri:put(
?FUNCTION_NAME, [foo], ?NO_PAYLOAD,
#{async => Correlation})),
Ret = khepri:wait_for_async_ret(Correlation),
?assertEqual(
{ok, #{[foo] => #{}}},
Ret),
receive
{ra_event, _, {applied, Correlations}} = AsyncRet ->
ok = khepri:handle_async_ret(?FUNCTION_NAME, AsyncRet),
?assertEqual(
[{Correlation, {ok, #{[foo] => #{}}}}],
Correlations)
end,
?assertEqual(
{ok, #{payload_version => 1}},
khepri_adv:get(?FUNCTION_NAME, [foo]))
Expand Down Expand Up @@ -127,10 +130,13 @@ async_with_correlation_and_priority_in_put_test_() ->
khepri:put(
?FUNCTION_NAME, [foo], ?NO_PAYLOAD,
#{async => {Correlation, low}})),
Ret = khepri:wait_for_async_ret(Correlation),
?assertEqual(
{ok, #{[foo] => #{}}},
Ret),
receive
{ra_event, _, {applied, Correlations}} = AsyncRet ->
ok = khepri:handle_async_ret(?FUNCTION_NAME, AsyncRet),
?assertEqual(
[{Correlation, {ok, #{[foo] => #{}}}}],
Correlations)
end,
?assertEqual(
{ok, #{payload_version => 1}},
khepri_adv:get(?FUNCTION_NAME, [foo]))
Expand Down Expand Up @@ -224,10 +230,13 @@ async_with_correlation_in_delete_test_() ->
ok,
khepri:delete(
?FUNCTION_NAME, [foo], #{async => Correlation})),
Ret = khepri:wait_for_async_ret(Correlation),
?assertEqual(
{ok, #{[foo] => #{}}},
Ret),
receive
{ra_event, _, {applied, Correlations}} = AsyncRet ->
ok = khepri:handle_async_ret(?FUNCTION_NAME, AsyncRet),
?assertEqual(
[{Correlation, {ok, #{[foo] => #{}}}}],
Correlations)
end,
?assertEqual(
{error,
?khepri_error(node_not_found, #{node_name => foo,
Expand Down Expand Up @@ -281,10 +290,14 @@ async_with_correlation_and_priority_in_delete_test_() ->
ok,
khepri_adv:delete(
?FUNCTION_NAME, [foo], #{async => {Correlation, low}})),
Ret = khepri:wait_for_async_ret(Correlation),
?assertEqual(
{ok, #{[foo] => #{payload_version => 1}}},
Ret),
receive
{ra_event, _, {applied, Correlations}} = AsyncRet ->
ok = khepri:handle_async_ret(?FUNCTION_NAME, AsyncRet),
?assertEqual(
[{Correlation, {ok, #{[foo] =>
#{payload_version => 1}}}}],
Correlations)
end,
?assertEqual(
{error,
?khepri_error(node_not_found, #{node_name => foo,
Expand Down Expand Up @@ -364,10 +377,11 @@ async_with_correlation_in_transaction_test_() ->
ok,
khepri:transaction(
?FUNCTION_NAME, Fun, #{async => Correlation})),
Ret = khepri:wait_for_async_ret(Correlation),
?assertEqual(
ok,
Ret),
receive
{ra_event, _, {applied, Correlations}} = AsyncRet ->
ok = khepri:handle_async_ret(?FUNCTION_NAME, AsyncRet),
?assertEqual([{Correlation, ok}], Correlations)
end,
?assertEqual(
{ok, #{payload_version => 1}},
khepri_adv:get(?FUNCTION_NAME, [foo]))
Expand All @@ -391,10 +405,14 @@ async_with_correlation_in_aborted_transaction_test_() ->
ok,
khepri:transaction(
?FUNCTION_NAME, Fun, rw, #{async => Correlation})),
Ret = khepri:wait_for_async_ret(Correlation),
?assertEqual(
{error, abort_reason},
Ret),
receive
{ra_event, _, {applied, Correlations}} = AsyncRet ->
ok = khepri:handle_async_ret(?FUNCTION_NAME, AsyncRet),
[{Correlation, Result}] = Correlations,
?assertMatch({exception, _, _, _}, Result),
Error = khepri_machine:handle_tx_exception(Result),
?assertEqual({error, abort_reason}, Error)
end,
?assertEqual(
{error,
?khepri_error(node_not_found, #{node_name => foo,
Expand Down Expand Up @@ -442,10 +460,11 @@ async_with_correlation_and_priority_in_transaction_test_() ->
khepri:transaction(
?FUNCTION_NAME, Fun,
#{async => {Correlation, low}})),
Ret = khepri:wait_for_async_ret(Correlation),
?assertEqual(
ok,
Ret),
receive
{ra_event, _, {applied, Correlations}} = AsyncRet ->
ok = khepri:handle_async_ret(?FUNCTION_NAME, AsyncRet),
?assertEqual([{Correlation, ok}], Correlations)
end,
?assertEqual(
{ok, #{payload_version => 1}},
khepri_adv:get(?FUNCTION_NAME, [foo]))
Expand All @@ -464,14 +483,13 @@ wait_for_async_error_test_() ->
khepri:update(
?FUNCTION_NAME, [foo], ?NO_PAYLOAD,
#{async => Correlation})),
Ret = khepri:wait_for_async_ret(Correlation),
?assertMatch(
{error, ?khepri_error(node_not_found, _)},
Ret)
receive
{ra_event, _, {applied, Correlations}} = AsyncRet ->
ok = khepri:handle_async_ret(?FUNCTION_NAME, AsyncRet),
?assertMatch(
[{Correlation,
{error, ?khepri_error(node_not_found, _)}}],
Correlations)
end
end)
]}.

wait_for_async_ret_but_no_async_call_test() ->
?assertEqual(
{error, timeout},
khepri:wait_for_async_ret(1, 100)).
Loading

0 comments on commit f843e4a

Please sign in to comment.