Skip to content

Commit

Permalink
Make eqc tests more deterministic so they (usually) pass; remove para…
Browse files Browse the repository at this point in the history
…llel eqc tests.
  • Loading branch information
rjmh committed May 27, 2020
1 parent a4853fe commit f457a2a
Showing 1 changed file with 166 additions and 49 deletions.
215 changes: 166 additions & 49 deletions eqc/sidejob_eqc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,77 @@
%%% Created : 13 May 2013 by Ulf Norell
-module(sidejob_eqc).

%% Sidejob is intended to run jobs (of the form call or cast), running
%% at most W jobs in parallel, and returning 'overload' if more than
%% K*W jobs are waiting to be completed. Here W is the number of
%% sidejob workers, and K-1 is the maximum number of jobs that can be
%% waiting for any particular worker. When new jobs are submitted,
%% sidejob looks for an available worker (with fewer than K-1 jobs in
%% its queue), starting with one corresponding to the scheduler number
%% that the caller is running on; it returns overload only if every
%% worker has K-1 waiting jobs at the time sidejob checks.

%% If a job crashes, then the worker that was running it is restarted,
%% but any jobs waiting for that worker are lost (and, in the event of
%% a call job, cause their caller to crash too).

%% Sidejob is inherently non-deterministic. For example, if K*W jobs
%% are running, one is about to finish, and another is about to be
%% submitted, then there is a race between these two events that can
%% lead the new job to be rejected as overload, or not. Even in a
%% non-overload situation, a worker with a full queue which is about
%% to finish a job may be assigned a new job if the finish happens
%% first, or it may be assigned to the next worker if the finish
%% happens second. Thus it is impossible to predict reliably which
%% worker a job will be assigned to, and thus which jobs will be
%% discarded when a job crashes.

%% Nevertheless, this model tries to predict such outcomes
%% precisely. As a result, the tests suffer from race conditions, and
%% (even the sequential) tests have been failing. To address this, the
%% model sleeps after every action, to allow sidejob to complete all
%% the resulting actions. This sleep was originally 1ms, which was not
%% always long enough, leading tests to fail. Now
%% * we sleep for 2ms,
%% * we check to see if the VM is "quiescent" before continuing, and
%% if not, we sleep again,
%% * we retry calls that return results that could be transient
%% ('overload' from call and cast, 'blocked' from get_status)
%% * after a restart of the task supervisor, we wait 10ms (!) because
%% weird stuff happens if we don't
%% This makes tests much more deterministic, at least. Fewer than one
%% test in 300,000 should fail--if they fail more often than that,
%% there is something wrong.

%% The disadvantages of this approach are:
%% * It is still possible, if much less likely, that a test fail when
%% nothing is wrong.
%% * This model cannot test rapid sequences of events, and so risks
%% missing some bugs, because it must wait for quiescence after
%% every operation.
%% * It does not make sense to run parallel tests with this model.

%% Three ways in which testing could be improved are:
%% 1. Use PULSE, not for parallel testing, but to run sequential
%% tests, because PULSE can guarantee quiescence before proceeding
%% to the next operation, without sleeping in reality. This could
%% make tests very much faster to run.
%% 2. Create a different model that tolerates non-determinism,
%% instead checking global properties such as that no more than
%% W jobs are in progress simultaneously, that jobs are rejected
%% as overload iff K*W jobs are currently in the system, that *at
%% least* ceiling (N/K) jobs are actually running when N jobs are
%% in the system. Such a model could be used to test sidejob with
%% rapidly arriving events, and so might find race conditions that
%% this model misses. It could also potentially run much faster,
%% and thus find bugs that are simply too rare to find in a
%% realistic time with a model that sleeps frequently.
%% 3. Make the 'intensity' parameter of the sidejob supervisor
%% configurable--at present it is always 10, which means that a
%% supervisor restart only happens after ten jobs crash. This
%% makes test that fail in this situation long, and as a result
%% they shrink very slowly.

-include_lib("eqc/include/eqc_statem.hrl").
-include_lib("eqc/include/eqc.hrl").
-ifdef(PULSE).
Expand All @@ -12,7 +83,7 @@
-endif.

-export([initial_state/0]).
-export([prop_seq/0, prop_par/0]).
-export([prop_seq/0]).
-export([work/2, finish_work/1, crash/1, get_element/2, get_status/1]).
-export([new_resource_command/1,
new_resource_pre/1, new_resource_next/3, new_resource_post/3]).
Expand Down Expand Up @@ -62,13 +133,14 @@ new_resource_post(_, _, V) ->

%% -- work
work(Cmd, Scheduler) ->
wait_until_quiescent(),
{Worker, Status} = work0 (Cmd, Scheduler),
case Status of
overload ->
%%overload ->
%% Temporary overload is not necessarily a problem--there
%% may be workers stopping/dying/being replaced.
wait_until_quiescent(),
work0(Cmd, Scheduler);
%% wait_until_quiescent(),
%%work0(Cmd, Scheduler);
_ ->
{Worker, Status}
end.
Expand Down Expand Up @@ -147,6 +219,7 @@ get_status_next(S, V, [WPid]) ->
{finished, _} -> stopped;
blocked -> blocked;
zombie -> zombie;
crashed -> crashed;
working -> {working, {call, ?MODULE, get_element, [2, V]}}
end,
set_worker_status(S, WPid, NewStatus).
Expand All @@ -156,6 +229,7 @@ get_status_post(S, [WPid], R) ->
{finished, Res} -> eq(R, Res);
blocked -> eq(R, blocked);
zombie -> eq(R, blocked);
crashed -> eq(R, crashed);
working ->
case R of
{working, Pid} when is_pid(Pid) -> true;
Expand All @@ -166,7 +240,8 @@ get_status_post(S, [WPid], R) ->
%% -- finish
finish_work(bad_element) -> ok;
finish_work(Pid) ->
Pid ! finish.
Pid ! finish,
wait_until_quiescent().

finish_work_args(S) ->
[elements(working_workers(S))].
Expand All @@ -189,12 +264,13 @@ finish_work_next(S, _, [Pid]) ->
%. -- crash
crash(bad_element) -> ok;
crash(Pid) ->
Pid ! crash.
Pid ! crash,
wait_until_quiescent().

crash_args(S) ->
[elements(working_workers(S))].

crash_pre(S) ->
crash_pre(S) ->
working_workers(S) /= [].

crash_pre(S, [Pid]) ->
Expand All @@ -209,15 +285,31 @@ crash_next(S, _, [Pid]) ->
false -> kill_queue(S2, W#worker.queue)
end.

crash_post(#state{ restarts=Restarts }, [_Pid], _) ->
%% This is a truly horrible hack!
%% At the restart limit, the sidejob supervisor is restarted,
%% which takes longer, and we see non-deterministic effects. In
%% *sequential* tests, the post-condition is called directly after
%% the call to crash, and we can tell from the dynamic state
%% whether or not the restart limit was reached. If so, we give
%% sidejob a bit more time, to avoid concommitant errors.
[begin status_keeper ! supervisor_restart,
timer:sleep(10)
end || Restarts==?RESTART_LIMIT],
true.

kill_queue(S, Q) ->
Kill =
fun(W=#worker{ queue = Q1, status = blocked }) when Q1 == Q ->
W#worker{ queue = zombie };
fun(W=#worker{ queue = Q1, status = blocked, cmd = Cmd }) when Q1 == Q ->
W#worker{ queue = zombie, status = case Cmd of call->crashed; cast->zombie end };
(W) -> W end,
S#state{ workers = lists:map(Kill, S#state.workers) }.

kill_all_queues(S) ->
Kill = fun(W) -> W#worker{ queue = zombie } end,
Kill = fun(W=#worker{ status = Status, cmd = Cmd }) when Status /= {finished,done} ->
W#worker{ queue = zombie,
status = case Cmd of call->crashed; cast->zombie end };
(W) -> W end,
S#state{ workers = lists:map(Kill, S#state.workers) }.

%% -- Helpers ----------------------------------------------------------------
Expand Down Expand Up @@ -301,7 +393,8 @@ worker() ->
overload -> overload;
ok ->
receive
{started, Ref, Pid} -> {working, Pid}
{started, Ref, Pid} ->
{working, Pid}
end
end,
From ! {self(), Res}
Expand All @@ -321,27 +414,48 @@ status_keeper(State) ->
receive
{start_worker, From, Cmd, Scheduler} ->
Worker = spawn_opt(fun worker/0, [{scheduler, Scheduler}]),
monitor(process,Worker),
Worker ! {Cmd, self()},
From ! {start_worker, Worker},
status_keeper([{worker, Worker, []} | State]);
status_keeper([{worker, Worker, [], Cmd} | State]);
{Worker, Status} when is_pid(Worker) ->
{worker, Worker, OldStatus} = lists:keyfind(Worker, 2, State),
status_keeper(lists:keystore(Worker, 2, State, {worker, Worker, OldStatus ++ [Status]}));
{worker, Worker, OldStatus, Cmd} = lists:keyfind(Worker, 2, State),
status_keeper(lists:keystore(Worker, 2, State,
{worker, Worker, OldStatus ++ [Status], Cmd}));
{'DOWN',_,process,Worker,Reason} ->
[self() ! {Worker,crashed} || Reason/=normal],
status_keeper(State);
{get_status, From, Worker} ->
case lists:keyfind(Worker, 2, State) of
{worker, Worker, [Status | NewStatus]} ->
{worker, Worker, [Status | NewStatus0], Cmd} ->
NewStatus = case Status of crashed -> [crashed]; _ -> NewStatus0 end,
From ! {Worker, Status},
status_keeper(lists:keystore(Worker, 2, State, {worker, Worker, NewStatus}));
status_keeper(lists:keystore(Worker, 2, State,
{worker, Worker, NewStatus, Cmd}));
_ ->
From ! {Worker, blocked},
status_keeper(State)
end
end;
supervisor_restart ->
%% all workers crash; pending status messages must be discarded
flush_all_messages(),
status_keeper([{worker,Worker,
[case Msg of
{working,_} when Cmd==call -> crashed;
{working,_} when Cmd==cast -> blocked;
_ -> Msg
end || Msg <- Msgs],
Cmd}
|| {worker,Worker,Msgs,Cmd} <- State])
end.

flush_all_messages() ->
receive _ -> flush_all_messages() after 0 -> ok end.

%% -- Property ---------------------------------------------------------------

prop_seq() ->
?FORALL(Repetitions,?SHRINK(1,[1000]),
?FORALL(Repetitions,?SHRINK(1,[100]),
?FORALL(Cmds, commands(?MODULE),
?ALWAYS(Repetitions,
?TIMEOUT(?TIMEOUT,
Expand All @@ -355,33 +469,36 @@ prop_seq() ->
R == ok))
end))))).

prop_par() ->
?FORALL(Cmds, parallel_commands(?MODULE),
?TIMEOUT(?TIMEOUT,
% ?SOMETIMES(4,
begin
cleanup(),
HSR={SeqH, ParH, R} = run_parallel_commands(?MODULE, Cmds),
kill_all_pids({SeqH, ParH}),
aggregate(command_names(Cmds),
pretty_commands(?MODULE, Cmds, HSR,
R == ok))
end)).

-ifdef(PULSE).
prop_pulse() ->
?SETUP(fun() -> N = erlang:system_flag(schedulers_online, 1),
fun() -> erlang:system_flag(schedulers_online, N) end end,
?FORALL(Cmds, parallel_commands(?MODULE),
?PULSE(HSR={_, _, R},
begin
cleanup(),
run_parallel_commands(?MODULE, Cmds)
end,
aggregate(command_names(Cmds),
pretty_commands(?MODULE, Cmds, HSR,
R == ok))))).
-endif.
%% Because these tests try to wait for quiescence after each
%% operation, it is not really meaninful to run parallel tests.

%% prop_par() ->
%% ?FORALL(Cmds, parallel_commands(?MODULE),
%% ?TIMEOUT(?TIMEOUT,
%% % ?SOMETIMES(4,
%% begin
%% cleanup(),
%% HSR={SeqH, ParH, R} = run_parallel_commands(?MODULE, Cmds),
%% kill_all_pids({SeqH, ParH}),
%% aggregate(command_names(Cmds),
%% pretty_commands(?MODULE, Cmds, HSR,
%% R == ok))
%% end)).
%%
%% -ifdef(PULSE).
%% prop_pulse() ->
%% ?SETUP(fun() -> N = erlang:system_flag(schedulers_online, 1),
%% fun() -> erlang:system_flag(schedulers_online, N) end end,
%% ?FORALL(Cmds, parallel_commands(?MODULE),
%% ?PULSE(HSR={_, _, R},
%% begin
%% cleanup(),
%% run_parallel_commands(?MODULE, Cmds)
%% end,
%% aggregate(command_names(Cmds),
%% pretty_commands(?MODULE, Cmds, HSR,
%% R == ok))))).
%% -endif.

kill_all_pids(Pid) when is_pid(Pid) -> exit(Pid, kill);
kill_all_pids([H|T]) -> kill_all_pids(H), kill_all_pids(T);
Expand Down Expand Up @@ -417,7 +534,8 @@ pulse_instrument(File) ->
Mod.
-endif.

%% Wait for quiescence: needed to observe that sidejob is NOT starting a task.
%% Wait for quiescence: to get deterministic testing, we need to let
%% sidejob finish what it is doing.

busy_processes() ->
[Pid || Pid <- processes(),
Expand All @@ -429,12 +547,11 @@ quiescent() ->
busy_processes() == [self()].

wait_until_quiescent() ->
timer:sleep(1),
timer:sleep(2),
case quiescent() of
true ->
ok;
false ->
io:format("Not quiescent\n"),
timer:sleep(1),
%% This happens regularly
wait_until_quiescent()
end.

0 comments on commit f457a2a

Please sign in to comment.