Skip to content

Commit

Permalink
Merge remote-tracking branch 'quviq/develop-3.0' into develop-3.0
Browse files Browse the repository at this point in the history
  • Loading branch information
martinsumner committed May 29, 2020
2 parents 3b3bca8 + f457a2a commit d5e8f84
Show file tree
Hide file tree
Showing 2 changed files with 213 additions and 58 deletions.
266 changes: 212 additions & 54 deletions eqc/sidejob_eqc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,77 @@
%%% Created : 13 May 2013 by Ulf Norell
-module(sidejob_eqc).

%% Sidejob is intended to run jobs (of the form call or cast), running
%% at most W jobs in parallel, and returning 'overload' if more than
%% K*W jobs are waiting to be completed. Here W is the number of
%% sidejob workers, and K-1 is the maximum number of jobs that can be
%% waiting for any particular worker. When new jobs are submitted,
%% sidejob looks for an available worker (with fewer than K-1 jobs in
%% its queue), starting with one corresponding to the scheduler number
%% that the caller is running on; it returns overload only if every
%% worker has K-1 waiting jobs at the time sidejob checks.

%% If a job crashes, then the worker that was running it is restarted,
%% but any jobs waiting for that worker are lost (and, in the event of
%% a call job, cause their caller to crash too).

%% Sidejob is inherently non-deterministic. For example, if K*W jobs
%% are running, one is about to finish, and another is about to be
%% submitted, then there is a race between these two events that can
%% lead the new job to be rejected as overload, or not. Even in a
%% non-overload situation, a worker with a full queue which is about
%% to finish a job may be assigned a new job if the finish happens
%% first, or it may be assigned to the next worker if the finish
%% happens second. Thus it is impossible to predict reliably which
%% worker a job will be assigned to, and thus which jobs will be
%% discarded when a job crashes.

%% Nevertheless, this model tries to predict such outcomes
%% precisely. As a result, the tests suffer from race conditions, and
%% (even the sequential) tests have been failing. To address this, the
%% model sleeps after every action, to allow sidejob to complete all
%% the resulting actions. This sleep was originally 1ms, which was not
%% always long enough, leading tests to fail. Now
%% * we sleep for 2ms,
%% * we check to see if the VM is "quiescent" before continuing, and
%% if not, we sleep again,
%% * we retry calls that return results that could be transient
%% ('overload' from call and cast, 'blocked' from get_status)
%% * after a restart of the task supervisor, we wait 10ms (!) because
%% weird stuff happens if we don't
%% This makes tests much more deterministic, at least. Fewer than one
%% test in 300,000 should fail--if they fail more often than that,
%% there is something wrong.

%% The disadvantages of this approach are:
%% * It is still possible, if much less likely, that a test fail when
%% nothing is wrong.
%% * This model cannot test rapid sequences of events, and so risks
%% missing some bugs, because it must wait for quiescence after
%% every operation.
%% * It does not make sense to run parallel tests with this model.

%% Three ways in which testing could be improved are:
%% 1. Use PULSE, not for parallel testing, but to run sequential
%% tests, because PULSE can guarantee quiescence before proceeding
%% to the next operation, without sleeping in reality. This could
%% make tests very much faster to run.
%% 2. Create a different model that tolerates non-determinism,
%% instead checking global properties such as that no more than
%% W jobs are in progress simultaneously, that jobs are rejected
%% as overload iff K*W jobs are currently in the system, that *at
%% least* ceiling (N/K) jobs are actually running when N jobs are
%% in the system. Such a model could be used to test sidejob with
%% rapidly arriving events, and so might find race conditions that
%% this model misses. It could also potentially run much faster,
%% and thus find bugs that are simply too rare to find in a
%% realistic time with a model that sleeps frequently.
%% 3. Make the 'intensity' parameter of the sidejob supervisor
%% configurable--at present it is always 10, which means that a
%% supervisor restart only happens after ten jobs crash. This
%% makes test that fail in this situation long, and as a result
%% they shrink very slowly.

-include_lib("eqc/include/eqc_statem.hrl").
-include_lib("eqc/include/eqc.hrl").
-ifdef(PULSE).
Expand All @@ -12,10 +83,7 @@
-endif.

-export([initial_state/0]).
-export([
prop_seq/0 %,
% prop_par/0
]).
-export([prop_seq/0]).
-export([work/2, finish_work/1, crash/1, get_element/2, get_status/1]).
-export([new_resource_command/1,
new_resource_pre/1, new_resource_next/3, new_resource_post/3]).
Expand All @@ -27,8 +95,9 @@

-import(eqc_statem, [tag/2]).

-compile(nowarn_unused_function).

-define(RESOURCE, resource).
-define(SLEEP, 1).
-define(TIMEOUT, 5000).
-define(RESTART_LIMIT, 10).

Expand Down Expand Up @@ -66,9 +135,21 @@ new_resource_post(_, _, V) ->

%% -- work
work(Cmd, Scheduler) ->
wait_until_quiescent(),
{Worker, Status} = work0 (Cmd, Scheduler),
case Status of
%%overload ->
%% Temporary overload is not necessarily a problem--there
%% may be workers stopping/dying/being replaced.
%% wait_until_quiescent(),
%%work0(Cmd, Scheduler);
_ ->
{Worker, Status}
end.

work0(Cmd, Scheduler) ->
status_keeper ! {start_worker, self(), Cmd, Scheduler},
Worker = receive {start_worker, Worker0} -> Worker0 end,
timer:sleep(?SLEEP),
{Worker, get_status(Worker)}.

-ifdef(PULSE).
Expand Down Expand Up @@ -108,6 +189,15 @@ work_post(S, [Cmd, Sched], {Pid, Status}) ->

%% -- get_status
get_status(Worker) ->
case get_status0(Worker) of
blocked ->
%% May just not have started yet
wait_until_quiescent(),
get_status0 (Worker);
R -> R
end.

get_status0(Worker) ->
status_keeper ! {get_status, self(), Worker},
receive {Worker, R} -> R
end.
Expand All @@ -131,6 +221,7 @@ get_status_next(S, V, [WPid]) ->
{finished, _} -> stopped;
blocked -> blocked;
zombie -> zombie;
crashed -> crashed;
working -> {working, {call, ?MODULE, get_element, [2, V]}}
end,
set_worker_status(S, WPid, NewStatus).
Expand All @@ -140,6 +231,7 @@ get_status_post(S, [WPid], R) ->
{finished, Res} -> eq(R, Res);
blocked -> eq(R, blocked);
zombie -> eq(R, blocked);
crashed -> eq(R, crashed);
working ->
case R of
{working, Pid} when is_pid(Pid) -> true;
Expand All @@ -151,7 +243,7 @@ get_status_post(S, [WPid], R) ->
finish_work(bad_element) -> ok;
finish_work(Pid) ->
Pid ! finish,
timer:sleep(?SLEEP).
wait_until_quiescent().

finish_work_args(S) ->
[elements(working_workers(S))].
Expand All @@ -175,12 +267,12 @@ finish_work_next(S, _, [Pid]) ->
crash(bad_element) -> ok;
crash(Pid) ->
Pid ! crash,
timer:sleep(?SLEEP).
wait_until_quiescent().

crash_args(S) ->
[elements(working_workers(S))].

crash_pre(S) ->
crash_pre(S) ->
working_workers(S) /= [].

crash_pre(S, [Pid]) ->
Expand All @@ -195,15 +287,31 @@ crash_next(S, _, [Pid]) ->
false -> kill_queue(S2, W#worker.queue)
end.

crash_post(#state{ restarts=Restarts }, [_Pid], _) ->
%% This is a truly horrible hack!
%% At the restart limit, the sidejob supervisor is restarted,
%% which takes longer, and we see non-deterministic effects. In
%% *sequential* tests, the post-condition is called directly after
%% the call to crash, and we can tell from the dynamic state
%% whether or not the restart limit was reached. If so, we give
%% sidejob a bit more time, to avoid concommitant errors.
[begin status_keeper ! supervisor_restart,
timer:sleep(10)
end || Restarts==?RESTART_LIMIT],
true.

kill_queue(S, Q) ->
Kill =
fun(W=#worker{ queue = Q1, status = blocked }) when Q1 == Q ->
W#worker{ queue = zombie };
fun(W=#worker{ queue = Q1, status = blocked, cmd = Cmd }) when Q1 == Q ->
W#worker{ queue = zombie, status = case Cmd of call->crashed; cast->zombie end };
(W) -> W end,
S#state{ workers = lists:map(Kill, S#state.workers) }.

kill_all_queues(S) ->
Kill = fun(W) -> W#worker{ queue = zombie } end,
Kill = fun(W=#worker{ status = Status, cmd = Cmd }) when Status /= {finished,done} ->
W#worker{ queue = zombie,
status = case Cmd of call->crashed; cast->zombie end };
(W) -> W end,
S#state{ workers = lists:map(Kill, S#state.workers) }.

%% -- Helpers ----------------------------------------------------------------
Expand Down Expand Up @@ -287,7 +395,8 @@ worker() ->
overload -> overload;
ok ->
receive
{started, Ref, Pid} -> {working, Pid}
{started, Ref, Pid} ->
{working, Pid}
end
end,
From ! {self(), Res}
Expand All @@ -297,74 +406,101 @@ worker() ->
%% When running with parallel_commands we need a proxy process that holds the
%% statuses of the workers.
start_status_keeper() ->
catch erlang:exit(whereis(status_keeper), kill),
timer:sleep(?SLEEP),
case whereis(status_keeper) of
undefined -> ok;
Pid -> unregister(status_keeper), exit(Pid,kill)
end,
register(status_keeper, spawn(fun() -> status_keeper([]) end)).

status_keeper(State) ->
receive
{start_worker, From, Cmd, Scheduler} ->
Worker = spawn_opt(fun worker/0, [{scheduler, Scheduler}]),
monitor(process,Worker),
Worker ! {Cmd, self()},
timer:sleep(?SLEEP),
From ! {start_worker, Worker},
status_keeper([{worker, Worker, []} | State]);
status_keeper([{worker, Worker, [], Cmd} | State]);
{Worker, Status} when is_pid(Worker) ->
{worker, Worker, OldStatus} = lists:keyfind(Worker, 2, State),
status_keeper(lists:keystore(Worker, 2, State, {worker, Worker, OldStatus ++ [Status]}));
{worker, Worker, OldStatus, Cmd} = lists:keyfind(Worker, 2, State),
status_keeper(lists:keystore(Worker, 2, State,
{worker, Worker, OldStatus ++ [Status], Cmd}));
{'DOWN',_,process,Worker,Reason} ->
[self() ! {Worker,crashed} || Reason/=normal],
status_keeper(State);
{get_status, From, Worker} ->
case lists:keyfind(Worker, 2, State) of
{worker, Worker, [Status | NewStatus]} ->
{worker, Worker, [Status | NewStatus0], Cmd} ->
NewStatus = case Status of crashed -> [crashed]; _ -> NewStatus0 end,
From ! {Worker, Status},
status_keeper(lists:keystore(Worker, 2, State, {worker, Worker, NewStatus}));
status_keeper(lists:keystore(Worker, 2, State,
{worker, Worker, NewStatus, Cmd}));
_ ->
From ! {Worker, blocked},
status_keeper(State)
end
From ! {Worker, blocked},
status_keeper(State)
end;
supervisor_restart ->
%% all workers crash; pending status messages must be discarded
flush_all_messages(),
status_keeper([{worker,Worker,
[case Msg of
{working,_} when Cmd==call -> crashed;
{working,_} when Cmd==cast -> blocked;
_ -> Msg
end || Msg <- Msgs],
Cmd}
|| {worker,Worker,Msgs,Cmd} <- State])
end.

flush_all_messages() ->
receive _ -> flush_all_messages() after 0 -> ok end.

%% -- Property ---------------------------------------------------------------

prop_seq() ->
?FORALL(Repetitions,?SHRINK(1,[100]),
?FORALL(Cmds, commands(?MODULE),
?ALWAYS(Repetitions,
?TIMEOUT(?TIMEOUT,
?SOMETIMES(10,
?SOMETIMES(1,%10,
begin
cleanup(),
HSR={_, S, R} = run_commands(?MODULE, Cmds),
[ exit(Pid, kill) || #worker{ pid = Pid } <- S#state.workers, is_pid(Pid) ],
aggregate(command_names(Cmds),
pretty_commands(?MODULE, Cmds, HSR,
R == ok))
end))).

% prop_par() ->
% ?FORALL(Cmds, parallel_commands(?MODULE),
% ?TIMEOUT(?TIMEOUT,
% % ?SOMETIMES(4,
% begin
% cleanup(),
% HSR={SeqH, ParH, R} = run_parallel_commands(?MODULE, Cmds),
% kill_all_pids({SeqH, ParH}),
% aggregate(command_names(Cmds),
% pretty_commands(?MODULE, Cmds, HSR,
% R == ok))
% end)).

-ifdef(PULSE).
prop_pulse() ->
?SETUP(fun() -> N = erlang:system_flag(schedulers_online, 1),
fun() -> erlang:system_flag(schedulers_online, N) end end,
?FORALL(Cmds, parallel_commands(?MODULE),
?PULSE(HSR={_, _, R},
begin
cleanup(),
run_parallel_commands(?MODULE, Cmds)
end,
aggregate(command_names(Cmds),
pretty_commands(?MODULE, Cmds, HSR,
R == ok))))).
-endif.
end))))).

%% Because these tests try to wait for quiescence after each
%% operation, it is not really meaninful to run parallel tests.

%% prop_par() ->
%% ?FORALL(Cmds, parallel_commands(?MODULE),
%% ?TIMEOUT(?TIMEOUT,
%% % ?SOMETIMES(4,
%% begin
%% cleanup(),
%% HSR={SeqH, ParH, R} = run_parallel_commands(?MODULE, Cmds),
%% kill_all_pids({SeqH, ParH}),
%% aggregate(command_names(Cmds),
%% pretty_commands(?MODULE, Cmds, HSR,
%% R == ok))
%% end)).
%%
%% -ifdef(PULSE).
%% prop_pulse() ->
%% ?SETUP(fun() -> N = erlang:system_flag(schedulers_online, 1),
%% fun() -> erlang:system_flag(schedulers_online, N) end end,
%% ?FORALL(Cmds, parallel_commands(?MODULE),
%% ?PULSE(HSR={_, _, R},
%% begin
%% cleanup(),
%% run_parallel_commands(?MODULE, Cmds)
%% end,
%% aggregate(command_names(Cmds),
%% pretty_commands(?MODULE, Cmds, HSR,
%% R == ok))))).
%% -endif.

kill_all_pids(Pid) when is_pid(Pid) -> exit(Pid, kill);
kill_all_pids([H|T]) -> kill_all_pids(H), kill_all_pids(T);
Expand Down Expand Up @@ -399,3 +535,25 @@ pulse_instrument(File) ->
code:load_file(Mod),
Mod.
-endif.

%% Wait for quiescence: to get deterministic testing, we need to let
%% sidejob finish what it is doing.

busy_processes() ->
[Pid || Pid <- processes(),
{status,Status} <- [erlang:process_info(Pid,status)],
Status /= waiting,
Status /= suspended].

quiescent() ->
busy_processes() == [self()].

wait_until_quiescent() ->
timer:sleep(2),
case quiescent() of
true ->
ok;
false ->
%% This happens regularly
wait_until_quiescent()
end.
Loading

0 comments on commit d5e8f84

Please sign in to comment.