Skip to content

Commit

Permalink
Remove khepri:wait_for_async_ret/1,2 in favor of handle_async_ret/2
Browse files Browse the repository at this point in the history
See the parent commit: `wait_for_async_ret/1,2` used a selective receive
to find whether a given correlation ID was applied. The `ra_event`
emitted when a batch is applied gives all correlation and result pairs
applied in the batch though, making it impossible to reliably find a
single correlation ID. We should instead have the caller perform the
`receive` to find the applied/rejected event and handle all of the
correlation IDs found within.

So this change removes `wait_for_async_ret/1,2` in favor of the
`handle_async_ret/2` function added in the parent commit.
  • Loading branch information
the-mikedavis committed Oct 31, 2023
1 parent 0362e0d commit 401f44b
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 76 deletions.
67 changes: 4 additions & 63 deletions src/khepri.erl
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@
transaction/5,

handle_async_ret/1, handle_async_ret/2,
wait_for_async_ret/1, wait_for_async_ret/2,

%% Bang functions: they return the value directly or throw an error.
'get!'/1, 'get!'/2, 'get!'/3,
Expand Down Expand Up @@ -2054,8 +2053,8 @@ put(StoreId, PathPattern, Data) ->
%% khepri:command_options()}, {@link khepri:tree_options()} and {@link
%% khepri:put_options()}.
%%
%% When doing an asynchronous update, the {@link wait_for_async_ret/1}
%% function can be used to receive the message from Ra.
%% When doing an asynchronous update, the {@link handle_async_ret/1}
%% function can be used to handle the message received from Ra.
%%
%% Example:
%% ```
Expand Down Expand Up @@ -2164,8 +2163,8 @@ put_many(StoreId, PathPattern, Data) ->
%% khepri:command_options()}, {@link khepri:tree_options()} and {@link
%% khepri:put_options()}.
%%
%% When doing an asynchronous update, the {@link wait_for_async_ret/1}
%% function can be used to receive the message from Ra.
%% When doing an asynchronous update, the {@link handle_async_ret/1}
%% function can be used to handle the message received from Ra.
%%
%% Example:
%% ```
Expand Down Expand Up @@ -3349,64 +3348,6 @@ handle_async_ret(
ok = khepri_cluster:cache_leader_if_changed(StoreId, FromId, MaybeLeader),
ok.

%% -------------------------------------------------------------------
%% wait_for_async_ret().
%% -------------------------------------------------------------------

-spec wait_for_async_ret(Correlation) -> Ret when
Correlation :: ra_server:command_correlation(),
Ret :: khepri:minimal_ret() |
khepri:payload_ret() |
khepri:many_payloads_ret() |
khepri_adv:single_result() |
khepri_adv:many_results() |
khepri_machine:tx_ret().
%% @doc Waits for an asynchronous call.
%%
%% Calling this function is the same as calling
%% `wait_for_async_ret(Correlation)' with the default timeout (see {@link
%% khepri_app:get_default_timeout/0}).
%%
%% @see wait_for_async_ret/2.

wait_for_async_ret(Correlation) ->
Timeout = khepri_app:get_default_timeout(),
wait_for_async_ret(Correlation, Timeout).

-spec wait_for_async_ret(Correlation, Timeout) -> Ret when
Correlation :: ra_server:command_correlation(),
Timeout :: timeout(),
Ret :: khepri:minimal_ret() |
khepri:payload_ret() |
khepri:many_payloads_ret() |
khepri_adv:single_result() |
khepri_adv:many_results() |
khepri_machine:tx_ret().
%% @doc Waits for an asynchronous call.
%%
%% This function waits maximum `Timeout' milliseconds (or `infinity') for the
%% result of a previous call where the `async' option was set with a
%% correlation ID. That correlation ID must be passed to this function.
%%
%% @see wait_for_async_ret/2.

wait_for_async_ret(Correlation, Timeout) ->
receive
{ra_event, _, {applied, [{Correlation, Reply}]}} ->
case Reply of
{exception, _, _, _} = Exception ->
khepri_machine:handle_tx_exception(Exception);
ok ->
Reply;
{ok, _} ->
Reply;
{error, _} ->
Reply
end
after Timeout ->
{error, timeout}
end.

%% -------------------------------------------------------------------
%% Bang functions.
%% -------------------------------------------------------------------
Expand Down
16 changes: 8 additions & 8 deletions src/khepri_adv.erl
Original file line number Diff line number Diff line change
Expand Up @@ -360,8 +360,8 @@ put(StoreId, PathPattern, Data) ->
%% khepri:command_options()}, {@link khepri:tree_options()} and {@link
%% khepri:put_options()}.
%%
%% When doing an asynchronous update, the {@link khepri:wait_for_async_ret/1}
%% function can be used to receive the message from Ra.
%% When doing an asynchronous update, the {@link handle_async_ret/1}
%% function should be used to handle the message received from Ra.
%%
%% The returned `{ok, NodeProps}' tuple contains a map with the properties and
%% payload (if any) of the targeted tree node as they were before the put.
Expand Down Expand Up @@ -481,8 +481,8 @@ put_many(StoreId, PathPattern, Data) ->
%% khepri:command_options()}, {@link khepri:tree_options()} and {@link
%% khepri:put_options()}.
%%
%% When doing an asynchronous update, the {@link khepri:wait_for_async_ret/1}
%% function can be used to receive the message from Ra.
%% When doing an asynchronous update, the {@link handle_async_ret/1}
%% function should be used to handle the message received from Ra.
%%
%% Example:
%% ```
Expand Down Expand Up @@ -847,8 +847,8 @@ delete(PathPattern, Options) when is_map(Options) ->
%% payload (if any) of the targeted tree node as they were before the delete.
%% If the targeted tree node didn't exist, `NodeProps' will be an empty map.
%%
%% When doing an asynchronous update, the {@link khepri:wait_for_async_ret/1}
%% function can be used to receive the message from Ra.
%% When doing an asynchronous update, the {@link handle_async_ret/1}
%% function should be used to handle the message received from Ra.
%%
%% Example:
%% ```
Expand Down Expand Up @@ -951,8 +951,8 @@ delete_many(PathPattern, Options) when is_map(Options) ->
%% map containing the properties and payload (if any) of that deleted tree
%% node as they were before the delete.
%%
%% When doing an asynchronous update, the {@link khepri:wait_for_async_ret/1}
%% function can be used to receive the message from Ra.
%% When doing an asynchronous update, the {@link handle_async_ret/1}
%% function should be used to handle the message received from Ra.
%%
%% Example:
%% ```
Expand Down
5 changes: 0 additions & 5 deletions test/async_option.erl
Original file line number Diff line number Diff line change
Expand Up @@ -493,8 +493,3 @@ wait_for_async_error_test_() ->
end
end)
]}.

wait_for_async_ret_but_no_async_call_test() ->
?assertEqual(
{error, timeout},
khepri:wait_for_async_ret(1, 100)).

0 comments on commit 401f44b

Please sign in to comment.