Skip to content

Commit

Permalink
Some cleanups in the parallel code (#275)
Browse files Browse the repository at this point in the history
  • Loading branch information
kostis authored May 18, 2021
1 parent 6302ea0 commit 1daf130
Showing 1 changed file with 30 additions and 44 deletions.
74 changes: 30 additions & 44 deletions src/proper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1440,9 +1440,10 @@ perform(NumTests, Test, Opts) ->
perform(Passed, NumTests, Test, Opts) ->
Size = size_at_nth_test(Passed, Opts),
put('$size', Size),
%% When working on parallelizing PropEr initially we used to hit too easily
%% the default maximum number of tries that PropEr had, so when running on parallel
%% it has a higher than usual max number of tries.The number was picked after testing locally
%% When working on parallelizing PropEr initially we used to hit
%% too easily the default maximum number of tries that PropEr had,
%% so when running on parallel it has a higher than usual max
%% number of tries. The number was picked after testing locally
%% with different values.
perform(Passed, NumTests, 3 * ?MAX_TRIES_FACTOR * NumTests, Test, none, none, Opts).

Expand Down Expand Up @@ -1597,14 +1598,12 @@ perform_search(Steps, NumSteps, TriesLeft, Target, DTest,
Error
end.


-spec add_samples([sample()], [sample()] | 'none') -> [sample()].
add_samples(MoreSamples, none) ->
MoreSamples;
add_samples(MoreSamples, Samples) ->
[M ++ S || {M, S} <- proper_arith:safe_zip(MoreSamples, Samples)].


%% Evaluated only for its side-effects.
-spec gen_and_print_samples(proper_types:raw_type(),
proper_gen:size(), proper_gen:size()) -> 'ok'.
Expand Down Expand Up @@ -2190,29 +2189,24 @@ apply_skip(Args, Prop) ->
%% Output functions
%%-----------------------------------------------------------------------------

-spec aggregate_imm_result(list(pid()), imm_result()) -> imm_result().
-spec aggregate_imm_result([pid()], imm_result()) -> imm_result().
aggregate_imm_result([], ImmResult) ->
ImmResult;
aggregate_imm_result(WorkerList, #pass{performed = Passed, samples = Samples} = ImmResult) ->
Id = get('$property_id'),
receive
%% if we haven't received anything yet we use the first pass we get
{worker_msg, #pass{} = Received, From, Id} when Passed == undefined ->
{worker_msg, #pass{} = Received, From, Id} when Passed =:= undefined ->
aggregate_imm_result(WorkerList -- [From], Received);
%% from that moment on, we accumulate the count of passed tests
{worker_msg, #pass{performed = PassedRcvd, samples = SamplesRcvd}, From, Id}
when Samples == [none] ->
NewImmResult = ImmResult#pass{performed = Passed + PassedRcvd,
samples = SamplesRcvd},
aggregate_imm_result(WorkerList -- [From], NewImmResult);
{worker_msg, #pass{performed = PassedRcvd, samples = SamplesRcvd}, From, Id} ->
NewImmResult = ImmResult#pass{performed = Passed + PassedRcvd,
samples = Samples ++ SamplesRcvd},
aggregate_imm_result(WorkerList -- [From], NewImmResult);
{worker_msg, #fail{performed = FailedOn} = Received, From, Id} ->
lists:foreach(fun(P) ->
P ! {worker_msg, {failed_test, self()}, Id} end,
WorkerList -- [From]),
P ! {worker_msg, {failed_test, self()}, Id}
end, WorkerList -- [From]),
Performed = lists:foldl(fun(Worker, Acc) ->
receive
{worker_msg, {performed, undefined, Id}} -> Acc;
Expand Down Expand Up @@ -2386,20 +2380,19 @@ default_strategy_fun() ->
1 -> 0;
_ -> 1
end,
Seq = lists:seq(1, NumWorkers),
lists:map(fun(X) ->
L2 = lists:seq(X - 1, NumTests - Decr, NumWorkers),
{_Start, _NumTests} = {hd(L2), lists:last(L2)}
end, Seq)
[begin
L2 = lists:seq(X - 1, NumTests - Decr, NumWorkers),
{hd(L2), lists:last(L2)} % {_Start, _NumTests}
end || X <- lists:seq(1, NumWorkers)]
end.

%% @private
-spec update_worker_node_ref({node(), {already_running, boolean()}}) -> list(node()).
-spec update_worker_node_ref({node(), {already_running, boolean()}}) -> [node()].
update_worker_node_ref(NodeName) ->
NewMap = case get(worker_nodes) of
undefined -> [NodeName];
Map -> [NodeName|Map]
end,
undefined -> [NodeName];
Map -> [NodeName|Map]
end,
put(worker_nodes, NewMap).

%% @private
Expand Down Expand Up @@ -2457,27 +2450,23 @@ maybe_load_binary(Nodes, Module) ->
%% @private
-spec ensure_code_loaded([node()]) -> 'ok'.
ensure_code_loaded(Nodes) ->
%% we get all the files that need to be loaded from the current directory
%% get all the files that need to be loaded from the current directory
Files = filelib:wildcard("**/*.beam"),
%% but we only care about the filename, without the .beam extension
Modules = [erlang:list_to_atom(filename:basename(File, ".beam")) || File <- Files],

%% call the functions needed to ensure that all modules are available on the nodes
Modules = [list_to_atom(filename:basename(File, ".beam")) || File <- Files],
%% ensure that all modules are available on the nodes
lists:foreach(fun(Module) -> maybe_load_binary(Nodes, Module) end, Modules),
lists:foreach(fun(P) -> rpc:multicall(Nodes, code, add_patha, [P]) end, code:get_path()),
lists:foreach(fun(P) -> rpc:multicall(Nodes, code, add_patha, [P]) end,
code:get_path()),
_ = rpc:multicall(Nodes, code, ensure_modules_loaded, [Modules]),
ok.

%% @private
%% @doc Starts multiple (NumNodes) remote nodes.
-spec start_nodes(non_neg_integer()) -> list(node()).
-spec start_nodes(non_neg_integer()) -> [node()].
start_nodes(NumNodes) ->
StartNodeFun =
fun(N) ->
SlaveName = list_to_atom("proper_slave_" ++ integer_to_list(N)),
_ = start_node(SlaveName)
end,
lists:map(StartNodeFun, lists:seq(1, NumNodes)).
[start_node(list_to_atom("proper_slave_" ++ integer_to_list(N)))
|| N <- lists:seq(1, NumNodes)].

%% @private
%% @doc Stops all the registered (started) nodes.
Expand All @@ -2486,23 +2475,20 @@ stop_nodes() ->
case get(worker_nodes) of
undefined -> ok;
Nodes ->
NodesToStop = lists:filter(fun({_N, {already_running, Bool}}) -> not Bool end, Nodes),
lists:foreach(fun({Node, _}) -> slave:stop(Node) end, NodesToStop),
StopFun = fun({Node, {already_running, false}}) -> slave:stop(Node);
({_Node, {already_running, true}}) -> ok
end,
lists:foreach(StopFun, Nodes),
_ = net_kernel:stop(),
erase(worker_nodes),
ok
end.

%% @private
%% @doc Unlinks and kills all the workers.
-spec kill_workers(list(pid())) -> ok.
-spec kill_workers([pid()]) -> ok.
kill_workers(WorkerList) ->
UnlinkAndKill =
fun(P) ->
unlink(P),
exit(P, kill)
end,
lists:foreach(UnlinkAndKill, WorkerList).
lists:foreach(fun(P) -> unlink(P), exit(P, kill) end, WorkerList).

%%-----------------------------------------------------------------------------
%% Stats printing functions
Expand Down

0 comments on commit 1daf130

Please sign in to comment.