diff --git a/eqc/sidejob_eqc.erl b/eqc/sidejob_eqc.erl index ebf9777..3c9e1a9 100644 --- a/eqc/sidejob_eqc.erl +++ b/eqc/sidejob_eqc.erl @@ -4,6 +4,77 @@ %%% Created : 13 May 2013 by Ulf Norell -module(sidejob_eqc). +%% Sidejob is intended to run jobs (of the form call or cast), running +%% at most W jobs in parallel, and returning 'overload' if more than +%% K*W jobs are waiting to be completed. Here W is the number of +%% sidejob workers, and K-1 is the maximum number of jobs that can be +%% waiting for any particular worker. When new jobs are submitted, +%% sidejob looks for an available worker (with fewer than K-1 jobs in +%% its queue), starting with one corresponding to the scheduler number +%% that the caller is running on; it returns overload only if every +%% worker has K-1 waiting jobs at the time sidejob checks. + +%% If a job crashes, then the worker that was running it is restarted, +%% but any jobs waiting for that worker are lost (and, in the event of +%% a call job, cause their caller to crash too). + +%% Sidejob is inherently non-deterministic. For example, if K*W jobs +%% are running, one is about to finish, and another is about to be +%% submitted, then there is a race between these two events that can +%% lead the new job to be rejected as overload, or not. Even in a +%% non-overload situation, a worker with a full queue which is about +%% to finish a job may be assigned a new job if the finish happens +%% first, or it may be assigned to the next worker if the finish +%% happens second. Thus it is impossible to predict reliably which +%% worker a job will be assigned to, and thus which jobs will be +%% discarded when a job crashes. + +%% Nevertheless, this model tries to predict such outcomes +%% precisely. As a result, the tests suffer from race conditions, and +%% (even the sequential) tests have been failing. To address this, the +%% model sleeps after every action, to allow sidejob to complete all +%% the resulting actions. This sleep was originally 1ms, which was not +%% always long enough, leading tests to fail. Now +%% * we sleep for 2ms, +%% * we check to see if the VM is "quiescent" before continuing, and +%% if not, we sleep again, +%% * we retry calls that return results that could be transient +%% ('overload' from call and cast, 'blocked' from get_status) +%% * after a restart of the task supervisor, we wait 10ms (!) because +%% weird stuff happens if we don't +%% This makes tests much more deterministic, at least. Fewer than one +%% test in 300,000 should fail--if they fail more often than that, +%% there is something wrong. + +%% The disadvantages of this approach are: +%% * It is still possible, if much less likely, that a test fail when +%% nothing is wrong. +%% * This model cannot test rapid sequences of events, and so risks +%% missing some bugs, because it must wait for quiescence after +%% every operation. +%% * It does not make sense to run parallel tests with this model. + +%% Three ways in which testing could be improved are: +%% 1. Use PULSE, not for parallel testing, but to run sequential +%% tests, because PULSE can guarantee quiescence before proceeding +%% to the next operation, without sleeping in reality. This could +%% make tests very much faster to run. +%% 2. Create a different model that tolerates non-determinism, +%% instead checking global properties such as that no more than +%% W jobs are in progress simultaneously, that jobs are rejected +%% as overload iff K*W jobs are currently in the system, that *at +%% least* ceiling (N/K) jobs are actually running when N jobs are +%% in the system. Such a model could be used to test sidejob with +%% rapidly arriving events, and so might find race conditions that +%% this model misses. It could also potentially run much faster, +%% and thus find bugs that are simply too rare to find in a +%% realistic time with a model that sleeps frequently. +%% 3. Make the 'intensity' parameter of the sidejob supervisor +%% configurable--at present it is always 10, which means that a +%% supervisor restart only happens after ten jobs crash. This +%% makes test that fail in this situation long, and as a result +%% they shrink very slowly. + -include_lib("eqc/include/eqc_statem.hrl"). -include_lib("eqc/include/eqc.hrl"). -ifdef(PULSE). @@ -12,10 +83,7 @@ -endif. -export([initial_state/0]). --export([ - prop_seq/0 %, - % prop_par/0 - ]). +-export([prop_seq/0]). -export([work/2, finish_work/1, crash/1, get_element/2, get_status/1]). -export([new_resource_command/1, new_resource_pre/1, new_resource_next/3, new_resource_post/3]). @@ -27,8 +95,9 @@ -import(eqc_statem, [tag/2]). +-compile(nowarn_unused_function). + -define(RESOURCE, resource). --define(SLEEP, 1). -define(TIMEOUT, 5000). -define(RESTART_LIMIT, 10). @@ -66,9 +135,21 @@ new_resource_post(_, _, V) -> %% -- work work(Cmd, Scheduler) -> + wait_until_quiescent(), + {Worker, Status} = work0 (Cmd, Scheduler), + case Status of + %%overload -> + %% Temporary overload is not necessarily a problem--there + %% may be workers stopping/dying/being replaced. + %% wait_until_quiescent(), + %%work0(Cmd, Scheduler); + _ -> + {Worker, Status} + end. + +work0(Cmd, Scheduler) -> status_keeper ! {start_worker, self(), Cmd, Scheduler}, Worker = receive {start_worker, Worker0} -> Worker0 end, - timer:sleep(?SLEEP), {Worker, get_status(Worker)}. -ifdef(PULSE). @@ -108,6 +189,15 @@ work_post(S, [Cmd, Sched], {Pid, Status}) -> %% -- get_status get_status(Worker) -> + case get_status0(Worker) of + blocked -> + %% May just not have started yet + wait_until_quiescent(), + get_status0 (Worker); + R -> R + end. + +get_status0(Worker) -> status_keeper ! {get_status, self(), Worker}, receive {Worker, R} -> R end. @@ -131,6 +221,7 @@ get_status_next(S, V, [WPid]) -> {finished, _} -> stopped; blocked -> blocked; zombie -> zombie; + crashed -> crashed; working -> {working, {call, ?MODULE, get_element, [2, V]}} end, set_worker_status(S, WPid, NewStatus). @@ -140,6 +231,7 @@ get_status_post(S, [WPid], R) -> {finished, Res} -> eq(R, Res); blocked -> eq(R, blocked); zombie -> eq(R, blocked); + crashed -> eq(R, crashed); working -> case R of {working, Pid} when is_pid(Pid) -> true; @@ -151,7 +243,7 @@ get_status_post(S, [WPid], R) -> finish_work(bad_element) -> ok; finish_work(Pid) -> Pid ! finish, - timer:sleep(?SLEEP). + wait_until_quiescent(). finish_work_args(S) -> [elements(working_workers(S))]. @@ -175,12 +267,12 @@ finish_work_next(S, _, [Pid]) -> crash(bad_element) -> ok; crash(Pid) -> Pid ! crash, - timer:sleep(?SLEEP). + wait_until_quiescent(). crash_args(S) -> [elements(working_workers(S))]. -crash_pre(S) -> +crash_pre(S) -> working_workers(S) /= []. crash_pre(S, [Pid]) -> @@ -195,15 +287,31 @@ crash_next(S, _, [Pid]) -> false -> kill_queue(S2, W#worker.queue) end. +crash_post(#state{ restarts=Restarts }, [_Pid], _) -> + %% This is a truly horrible hack! + %% At the restart limit, the sidejob supervisor is restarted, + %% which takes longer, and we see non-deterministic effects. In + %% *sequential* tests, the post-condition is called directly after + %% the call to crash, and we can tell from the dynamic state + %% whether or not the restart limit was reached. If so, we give + %% sidejob a bit more time, to avoid concommitant errors. + [begin status_keeper ! supervisor_restart, + timer:sleep(10) + end || Restarts==?RESTART_LIMIT], + true. + kill_queue(S, Q) -> Kill = - fun(W=#worker{ queue = Q1, status = blocked }) when Q1 == Q -> - W#worker{ queue = zombie }; + fun(W=#worker{ queue = Q1, status = blocked, cmd = Cmd }) when Q1 == Q -> + W#worker{ queue = zombie, status = case Cmd of call->crashed; cast->zombie end }; (W) -> W end, S#state{ workers = lists:map(Kill, S#state.workers) }. kill_all_queues(S) -> - Kill = fun(W) -> W#worker{ queue = zombie } end, + Kill = fun(W=#worker{ status = Status, cmd = Cmd }) when Status /= {finished,done} -> + W#worker{ queue = zombie, + status = case Cmd of call->crashed; cast->zombie end }; + (W) -> W end, S#state{ workers = lists:map(Kill, S#state.workers) }. %% -- Helpers ---------------------------------------------------------------- @@ -287,7 +395,8 @@ worker() -> overload -> overload; ok -> receive - {started, Ref, Pid} -> {working, Pid} + {started, Ref, Pid} -> + {working, Pid} end end, From ! {self(), Res} @@ -297,38 +406,62 @@ worker() -> %% When running with parallel_commands we need a proxy process that holds the %% statuses of the workers. start_status_keeper() -> - catch erlang:exit(whereis(status_keeper), kill), - timer:sleep(?SLEEP), + case whereis(status_keeper) of + undefined -> ok; + Pid -> unregister(status_keeper), exit(Pid,kill) + end, register(status_keeper, spawn(fun() -> status_keeper([]) end)). status_keeper(State) -> receive {start_worker, From, Cmd, Scheduler} -> Worker = spawn_opt(fun worker/0, [{scheduler, Scheduler}]), + monitor(process,Worker), Worker ! {Cmd, self()}, - timer:sleep(?SLEEP), From ! {start_worker, Worker}, - status_keeper([{worker, Worker, []} | State]); + status_keeper([{worker, Worker, [], Cmd} | State]); {Worker, Status} when is_pid(Worker) -> - {worker, Worker, OldStatus} = lists:keyfind(Worker, 2, State), - status_keeper(lists:keystore(Worker, 2, State, {worker, Worker, OldStatus ++ [Status]})); + {worker, Worker, OldStatus, Cmd} = lists:keyfind(Worker, 2, State), + status_keeper(lists:keystore(Worker, 2, State, + {worker, Worker, OldStatus ++ [Status], Cmd})); + {'DOWN',_,process,Worker,Reason} -> + [self() ! {Worker,crashed} || Reason/=normal], + status_keeper(State); {get_status, From, Worker} -> case lists:keyfind(Worker, 2, State) of - {worker, Worker, [Status | NewStatus]} -> + {worker, Worker, [Status | NewStatus0], Cmd} -> + NewStatus = case Status of crashed -> [crashed]; _ -> NewStatus0 end, From ! {Worker, Status}, - status_keeper(lists:keystore(Worker, 2, State, {worker, Worker, NewStatus})); + status_keeper(lists:keystore(Worker, 2, State, + {worker, Worker, NewStatus, Cmd})); _ -> - From ! {Worker, blocked}, - status_keeper(State) - end + From ! {Worker, blocked}, + status_keeper(State) + end; + supervisor_restart -> + %% all workers crash; pending status messages must be discarded + flush_all_messages(), + status_keeper([{worker,Worker, + [case Msg of + {working,_} when Cmd==call -> crashed; + {working,_} when Cmd==cast -> blocked; + _ -> Msg + end || Msg <- Msgs], + Cmd} + || {worker,Worker,Msgs,Cmd} <- State]) end. +flush_all_messages() -> + receive _ -> flush_all_messages() after 0 -> ok end. + %% -- Property --------------------------------------------------------------- prop_seq() -> + ?FORALL(Repetitions,?SHRINK(1,[100]), ?FORALL(Cmds, commands(?MODULE), + ?ALWAYS(Repetitions, ?TIMEOUT(?TIMEOUT, - ?SOMETIMES(10, + ?SOMETIMES(1,%10, begin cleanup(), HSR={_, S, R} = run_commands(?MODULE, Cmds), @@ -336,35 +469,38 @@ prop_seq() -> aggregate(command_names(Cmds), pretty_commands(?MODULE, Cmds, HSR, R == ok)) - end))). - -% prop_par() -> -% ?FORALL(Cmds, parallel_commands(?MODULE), -% ?TIMEOUT(?TIMEOUT, -% % ?SOMETIMES(4, -% begin -% cleanup(), -% HSR={SeqH, ParH, R} = run_parallel_commands(?MODULE, Cmds), -% kill_all_pids({SeqH, ParH}), -% aggregate(command_names(Cmds), -% pretty_commands(?MODULE, Cmds, HSR, -% R == ok)) -% end)). - --ifdef(PULSE). -prop_pulse() -> - ?SETUP(fun() -> N = erlang:system_flag(schedulers_online, 1), - fun() -> erlang:system_flag(schedulers_online, N) end end, - ?FORALL(Cmds, parallel_commands(?MODULE), - ?PULSE(HSR={_, _, R}, - begin - cleanup(), - run_parallel_commands(?MODULE, Cmds) - end, - aggregate(command_names(Cmds), - pretty_commands(?MODULE, Cmds, HSR, - R == ok))))). --endif. + end))))). + +%% Because these tests try to wait for quiescence after each +%% operation, it is not really meaninful to run parallel tests. + +%% prop_par() -> +%% ?FORALL(Cmds, parallel_commands(?MODULE), +%% ?TIMEOUT(?TIMEOUT, +%% % ?SOMETIMES(4, +%% begin +%% cleanup(), +%% HSR={SeqH, ParH, R} = run_parallel_commands(?MODULE, Cmds), +%% kill_all_pids({SeqH, ParH}), +%% aggregate(command_names(Cmds), +%% pretty_commands(?MODULE, Cmds, HSR, +%% R == ok)) +%% end)). +%% +%% -ifdef(PULSE). +%% prop_pulse() -> +%% ?SETUP(fun() -> N = erlang:system_flag(schedulers_online, 1), +%% fun() -> erlang:system_flag(schedulers_online, N) end end, +%% ?FORALL(Cmds, parallel_commands(?MODULE), +%% ?PULSE(HSR={_, _, R}, +%% begin +%% cleanup(), +%% run_parallel_commands(?MODULE, Cmds) +%% end, +%% aggregate(command_names(Cmds), +%% pretty_commands(?MODULE, Cmds, HSR, +%% R == ok))))). +%% -endif. kill_all_pids(Pid) when is_pid(Pid) -> exit(Pid, kill); kill_all_pids([H|T]) -> kill_all_pids(H), kill_all_pids(T); @@ -399,3 +535,25 @@ pulse_instrument(File) -> code:load_file(Mod), Mod. -endif. + +%% Wait for quiescence: to get deterministic testing, we need to let +%% sidejob finish what it is doing. + +busy_processes() -> + [Pid || Pid <- processes(), + {status,Status} <- [erlang:process_info(Pid,status)], + Status /= waiting, + Status /= suspended]. + +quiescent() -> + busy_processes() == [self()]. + +wait_until_quiescent() -> + timer:sleep(2), + case quiescent() of + true -> + ok; + false -> + %% This happens regularly + wait_until_quiescent() + end. diff --git a/eqc/supervisor_eqc.erl b/eqc/supervisor_eqc.erl index d6c6707..88bffd7 100644 --- a/eqc/supervisor_eqc.erl +++ b/eqc/supervisor_eqc.erl @@ -1,6 +1,6 @@ %%% File : supervisor_eqc.erl %%% Author : Ulf Norell -%%% Description : +%%% Description : %%% Created : 15 May 2013 by Ulf Norell -module(supervisor_eqc). @@ -41,9 +41,6 @@ -define(TIMEOUT, 5000). -define(RESTART_LIMIT, 10). --define(QC_OUT(P), - eqc:on_output(fun(Str, Args) -> io:format(user, Str, Args) end, P)). - initial_state() -> #state{}.