From 74cae88c17932c68146f48e69c4937221b7241d7 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Sat, 10 Dec 2022 14:37:02 +0000 Subject: [PATCH] Mas i370 patch d (#816) * Remove old tags * Bodges to bypass test The riak_repl eqc tests have been failing on production releases for a significant time (probably since the start of the 3.0 branch). There is limited bandwidth available to understand if this is related to erroneous test behaviour, or erroneous code behaviour. Also the failing tests seem to be of limited obvious value: - The repl_leader eqc test tests the riak_repl_leader code (which is now effectively unused) and not the riak_repl2_leader code which is actually controlling leadership elections since R1.3. - The rt_source_eqc is largely testing code specifically generated to support the test scenario itself. --- .hgignore | 5 - .hgtags | 4 - ebin/.empty_for_hg | 0 eqc/repl_leader_eqc.erl | 624 ------------------------------------- rebar.config | 4 +- test/rt_source_eqc.erl | 615 ------------------------------------ test/rt_source_helpers.erl | 68 ---- 7 files changed, 2 insertions(+), 1318 deletions(-) delete mode 100644 .hgignore delete mode 100644 .hgtags delete mode 100644 ebin/.empty_for_hg delete mode 100644 eqc/repl_leader_eqc.erl delete mode 100644 test/rt_source_eqc.erl diff --git a/.hgignore b/.hgignore deleted file mode 100644 index 9fa67369..00000000 --- a/.hgignore +++ /dev/null @@ -1,5 +0,0 @@ -syntax regex -^ebin/.*.beam -.*~ -^.eunit/* -deps diff --git a/.hgtags b/.hgtags deleted file mode 100644 index 06f3b21b..00000000 --- a/.hgtags +++ /dev/null @@ -1,4 +0,0 @@ -ecd300bd87e35e52b1ee462830e66c05b1c315b4 riak_repl-0.13.0rc3 -dfcff7d31e9e83ea5f80b54543c5321b89d5814f riak_repl-0.13.0rc4 -ec9466b011933c4f78b18ae60e24e80df1259a54 riak_repl-0.13.0rc7 -fd9c0ba603c96fae87e9a563b0252efef398ff2c riak_repl-0.13.0 diff --git a/ebin/.empty_for_hg b/ebin/.empty_for_hg deleted file mode 100644 index e69de29b..00000000 diff --git a/eqc/repl_leader_eqc.erl b/eqc/repl_leader_eqc.erl deleted file mode 100644 index 63dd0570..00000000 --- a/eqc/repl_leader_eqc.erl +++ /dev/null @@ -1,624 +0,0 @@ -%% Riak EnterpriseDS -%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved. - -%% -%% Replication leader EQC -%% -%% This quickcheck test exercises leadership elections. It attempts -%% to simulate nodes going up and down with different candidates -%% for leaders in a way similar to ring gossips. The test state -%% keeps a list of nodes and for each whether node should be a -%% candidate (a listener) or a worker (a non-listener). Nodes -%% have their type toggled at random and the list is updated -%% at random. -%% -%% The test properties make sure that nodes with the same list -%% of candidates/workers elect the same leader. Nodes are -%% created using slave() which causes some interactions between -%% cover and generates some warning messages -%% - --module(repl_leader_eqc). - --include_lib("eqc/include/eqc.hrl"). --include_lib("eqc/include/eqc_fsm.hrl"). --include_lib("eunit/include/eunit.hrl"). - --compile([export_all, nowarn_export_all]). - --record(replnode, {node, - running=false, - type=worker, - candidates=[], - workers=[]}). --record(state, { replnodes=[] }). % {name, stopped | running} - --define(TEST_TIMEOUT, 30 * 60). --define(MAX_NODES, 5). --define(QC_OUT(P), - eqc:on_output(fun(Str, Args) -> io:format(user, Str, Args) end, P)). - --define(DBG(Fmt,Args),ok). -%%-define(DBG(Fmt,Args),io:format(user, Fmt, Args)). - -qc_test_() -> - %% try and clean the repl controller before cover ever runs - %code:purge(riak_repl_controller), - %code:delete(riak_repl_controller), - ?DBG("Cover modules:\n~p\n", [cover:modules()]), - Prop = ?QC_OUT(eqc:numtests(40, prop_main())), - case testcase() of - [] -> - {timeout, ?TEST_TIMEOUT, fun() -> ?assert(eqc:quickcheck(Prop)) end}; - Testcase -> - {timeout, ?TEST_TIMEOUT, fun() -> ?assert(eqc:check(Prop, Testcase)) end} - end. - -testcase() -> - []. - -prop_main() -> - ?FORALL(Cmds, commands(?MODULE), - begin - %% Setup - ?DBG("\n=== Starting ===\n", []), - os:cmd("rm -rf repl_leader_eqc_data"), - maybe_start_net_kernel(), - code:ensure_loaded(riak_repl_leader), - code:ensure_loaded(riak_repl_leader_helper), - %% Run tests - start_slave_driver(), - try - {H, {_State, _StateData}, Res} = run_commands(?MODULE,Cmds), - ?WHENFAIL(begin - io:format(user, "Test Failed\n~p\n", - [zip(state_names(H),command_names(Cmds))]), - io:format(user, "State: ~p\nStateData: ~p\nRes: ~p\n", - [_State, _StateData, Res]) - end, - %% Generate statistics - aggregate(zip(state_names(H),command_names(Cmds)), Res == ok)) - after - stop_slave_driver(), - net_kernel:stop() - end - end). - -% -%% ==================================================================== -%% eqc_fsm callbacks -%% ==================================================================== - -initial_state() -> - running. - -initial_state_data() -> - #state{}. - -running(S) -> - [{running, {call, ?MODULE, new_node, []}}, - {running, {call, ?MODULE, start_repl, [g_stopped_replnode(S)]}}, - {running, {call, ?MODULE, stop_repl, [g_running_node(S)]}}, - {running, {call, ?MODULE, toggle_type, [g_node(S)]}}, - %% {running, {call, ?MODULE, ping, [g_node(S)]}}, - {running, {call, ?MODULE, set_candidates, [g_running_node(S), - candidates(S), - workers(S), - S]}}, - {running, {call, ?MODULE, check_leaders, [S]}}, - {running, {call, ?MODULE, add_receiver, [g_running_node(S)]}}]. - -weight(_,_,{call,_,toggle_type,_}) -> 10; -weight(_,_,{call,_,stop_repl,_}) -> 10; -weight(_,_,_) -> 100. - -next_state_data(_From, _To, S, _Res, {call, ?MODULE, new_node, []}) -> - Node = make_node(length(S#state.replnodes)+1), - add_replnode(Node, S); -next_state_data(_From, _To, S, _Res, {call, ?MODULE, start_repl, [ReplNode]}) -> - upd_replnode(ReplNode#replnode{running = true}, S); -next_state_data(_From, _To, S, _Res, {call, ?MODULE, stop_repl, [Node]}) -> - ReplNode = get_replnode(Node, S), - upd_replnode(ReplNode#replnode{running = false}, S); -next_state_data(_From, _To, S, _Res, {call, ?MODULE, toggle_type, [Node]}) -> - ReplNode = get_replnode(Node, S), - case ReplNode#replnode.type of - worker-> - UpdReplNode=ReplNode#replnode{type = candidate}; - candidate -> - UpdReplNode=ReplNode#replnode{type = worker} - end, - upd_replnode(UpdReplNode, S); -next_state_data(_From, _To, S, _Res, {call, ?MODULE, set_candidates, - [Node, Candidates, Workers, S]}) -> - ReplNode = get_replnode(Node, S), - upd_replnode(ReplNode#replnode{candidates = lists:sort(Candidates), - workers = lists:sort(Workers)}, - S); -next_state_data(_From, _To, S, _Res, _Call) -> - S. - -precondition(_From, _To, S, {call, ?MODULE, new_node, []}) -> - length(S#state.replnodes) < ?MAX_NODES; -precondition(_From, _To, S, {call, ?MODULE, start_repl, [ReplNode]}) -> - lists:member(ReplNode, S#state.replnodes) andalso ReplNode#replnode.running =:= false; -precondition(_From, _To, S, {call, ?MODULE, stop_repl, [Node]}) -> - check_replnode(Node, #replnode.running, true, S); -precondition(_From, _To, S, {call, ?MODULE, toggle_type, [Node]}) -> - node_exists(Node, S); -precondition(_From, _To, S, {call, ?MODULE, set_candidates, [Node, _C, _W, S]}) -> - check_replnode(Node, #replnode.running, true, S); -precondition(_From, _To, S, {call, ?MODULE, add_receiver, [Node]}) -> - check_replnode(Node, #replnode.running, true, S); -precondition(_From, _To, _S, _Call) -> - true. - -postcondition(_From, _To, _S, {call, ?MODULE, start_repl, [_]}, Res) -> - case Res of - ok -> true; - _ -> {start_repl, Res} - end; -postcondition(_From, _To, S, {call, ?MODULE, ping, [Node]}, Res) -> - ReplNode = get_replnode(Node, S), - case ReplNode#replnode.running of - true -> - Res == pong; - false -> - Res == pang - end; -postcondition(_From, _To, S, {call, ?MODULE, check_leaders, _}, LeaderByNode) -> - NodesByCandidates0 = nodes_by_candidates(running_replnodes(S)), - NodesByCandidates = lists:filter(fun({{Candidates, _}, Nodes}) -> - lists:all(fun(Cand) -> lists:member(Cand, Nodes) end, Candidates) - end, NodesByCandidates0), - check_same_leaders(NodesByCandidates, LeaderByNode); -postcondition(_From, _To, _S, {call, ?MODULE, add_receiver, [Node]}, {Leader1, Leader2, Res, _Pid}) when Node == Leader1; Node == Leader2 -> - %% The node believes itself to be a leader - case Res of - ok -> - true; - _ -> - {add_receiver_expected_ok, Res} - end; -postcondition(_From, _To, _S, {call, ?MODULE, add_receiver, [_Node]}, - {_Leader1, _Leader2, Res, _Pid}=_R) when _Leader1 /= undefined, - _Leader2 /= undefined -> - %% Node thinks somebody else is the leader or not sure who the leader is. - %% If the candidates/workers is being changed and the node *was* the leader - %% then riak_repl_leader continues to act as the leader until the election - %% completes (but sets the leader undefined). Will only *definitely* - %% return {error, not_leader} once the election completes. - ?DBG("Postcond: n=~p r=~p\n", [_Node, _R]), - case Res of - {error, not_leader} -> - true; - _ -> - {add_receiver_expected_err, Res} - end; -postcondition(_From, _To, _S, _Call, _Res) -> - true. - -%% ==================================================================== -%% Generator functions -%% ==================================================================== - -g_node(S) -> - ?LET(RN, elements(S#state.replnodes), RN#replnode.node). - -g_running_node(S) -> - elements([RN#replnode.node || - RN <- S#state.replnodes, RN#replnode.running =:= true]). - -g_running_candidate(S) -> - elements([RN#replnode.node || - RN <- S#state.replnodes, - RN#replnode.running =:= true, - RN#replnode.type =:= candidate ]). - -g_stopped_node(S) -> - elements([RN#replnode.node || - RN <- S#state.replnodes, RN#replnode.running =/= true]). -g_stopped_replnode(S) -> - elements([RN || RN <- S#state.replnodes, RN#replnode.running =/= true]). - -%% ==================================================================== -%% Actions -%% ==================================================================== - -new_node() -> - ok. - -start_repl(ReplNode) -> - Node = ReplNode#replnode.node, - ?DBG("Starting slave ~p\n", [Node]), - ok = start_slave(Node), - pong = net_adm:ping(Node), - %{ok, _StartedNodes} = cover:start([Node]), - dbg:n(Node), - %?DBG("Cover nodes: ~p\n", [_StartedNodes]), - ?DBG("Started slave ~p\n", [Node]), - R = rpc:call(Node, ?MODULE, setup_slave, [ReplNode#replnode.candidates, - ReplNode#replnode.workers]), - ?DBG("slave start returned ~p~n", [R]), - R. - -stop_repl(Node) -> - ?DBG("Stopping cover on ~p\n", [Node]), - %cover:stop([Node]), - ?DBG("Stopping repl on ~p\n", [Node]), - ok = stop_slave(Node), - ?DBG("Stopped slave~p\n", [Node]), - ok. - -toggle_type(_Node) -> - ?DBG("Changing type for ~p\n", [_Node]), - ok. - -ping(Node) -> - ?DBG("Pinging ~p\n", [Node]), - Res = net_adm:ping(Node), - ?DBG("Pinged ~p: ~p\n", [Node, Res]), - Res. - -set_candidates(Node, Candidates, Workers, S) -> - ?DBG("Setting candidates for ~p to {~p, ~p}\n", - [Node, Candidates, Workers]), - pong = net_adm:ping(Node), - ?DBG("riak_repl_leader pid on ~p is ~p\n", - [Node, rpc:call(Node, erlang, whereis, [riak_repl_leader])]), - ok = rpc:call(Node, riak_repl_leader, set_candidates, [Candidates, Workers]), - - %% Request the helper leader node to make any elections stabalize - %% before calling the rest of the quickcheck code, otherwise results - %% are totally unpredictable. - - %% Have to duplicate some work done in next_state here - helper_leader_node - %% needs an updated [#replnode{}]. - ReplNode = get_replnode(Node, S), - UpdS = upd_replnode(ReplNode#replnode{candidates = lists:sort(Candidates), - workers = lists:sort(Workers)}, - S), - {_HLN, _UpCand} = helper_leader_node(Node, UpdS), - ?DBG("Set candidates for ~p, HLN=~p, UpCand=~p\n", [Node, _HLN, _UpCand]), - ok. - -check_leaders(S) -> % include a dummy anode from list so - ?DBG("CheckLeaders - running nodes ~p\n", [shorten(running_nodes(S))]), - F = fun(RN) -> % generator will blow up if none running - {HelperLN, UpCand} = helper_leader_node(RN#replnode.node, S), - %% Make sure at least one candidate node for the replnode - %% is running, is a candidate and belongs to the same set of - %% candidates/workers - N = RN#replnode.node, - - LN = rpc:call(N, riak_repl_leader, leader_node, []), - ?DBG(" ~p: {~p, ~p} LN=~p HLN=~p UpCand=~p\n", - [N, shorten(RN#replnode.candidates), - shorten(RN#replnode.workers), LN, HelperLN, - UpCand]), - {N, LN, HelperLN, UpCand} - end, - [F(N) || N <- running_replnodes(S)]. - -add_receiver(N) -> - % R = {Leader1, Leader2, Pid, Res}, - R = rpc:call(N, ?MODULE, register_receiver, []), - ?DBG("add_receiver: ~p\n", [R]), - R. - -%% ==================================================================== -%% Internal functions -%% ==================================================================== - -nodes_by_candidates(ReplNodes) -> - %% Build a dict of all nodes with the same config - %% Check they agree on who the leader is - F = fun(ReplNode, D) -> - Key = {lists:sort(ReplNode#replnode.candidates), - lists:sort(ReplNode#replnode.workers)}, - orddict:append_list(Key, [ReplNode#replnode.node], D) - end, - lists:foldl(F, orddict:new(), ReplNodes). - -check_same_leaders([], _LeaderByNode) -> - true; -check_same_leaders([{{C,W},Nodes}|Rest], LeaderByNode) -> - Leaders = lookup_leaders(Nodes, LeaderByNode), - UniqLeaders = lists:usort([Ldr || {_Node,Ldr} <- Leaders]), - case UniqLeaders of - [_SingleLeader] -> - check_same_leaders(Rest, LeaderByNode); - ManyLeaders -> - {different_leaders, ManyLeaders, - {candidates, C}, - {workers, W}, - {nodes, Nodes}, - {leaders, Leaders}, - {all_leader_info, LeaderByNode}} - end. - -%% For each node, lookup the current leader and build a {Node, LeaderName} tuple. -lookup_leaders(Nodes, LeaderByNode) -> - F = fun(N, A) -> - try - {N, LN, _HLN, _Cs} = lists:keyfind(N, 1, LeaderByNode), - [{N, LN} | A] - catch - _:Error -> - throw({cannot_find, N, LeaderByNode, Error}) - end - end, - lists:foldl(F, [], Nodes). - -maybe_start_net_kernel() -> - [] = os:cmd("epmd -daemon"), - case net_kernel:start(['repl_leader_eqc@127.0.0.1', longnames]) of - {ok, _} -> - ?DBG("Net kernel started as ~p\n", [node()]); - {error, {already_started, _}} -> - ok; - {error, Reason} -> - throw({start_net_kernel_failed, Reason}) - end. - -good_path() -> - [filename:absname(D) || - D <- lists:filter(fun filelib:is_dir/1, code:get_path())]. - -make_node(N) -> - list_to_atom("n" ++ integer_to_list(N) ++ "@" ++ get_host(node())). - -get_name(Node) -> - list_to_atom(hd(string:tokens(atom_to_list(Node), "@"))). - -get_host(_Node) -> - "127.0.0.1". - -shorten(Nodes) -> - [get_name(N) || N <- Nodes]. - -candidates(S) -> - [RN#replnode.node || RN <- S#state.replnodes, RN#replnode.type =:= candidate]. - -workers(S) -> - [RN#replnode.node || RN <- S#state.replnodes, RN#replnode.type =:= worker]. - -running_nodes(S) -> - [RN#replnode.node || RN <- running_replnodes(S)]. - -running_replnodes(S) -> - [RN || RN <- S#state.replnodes, RN#replnode.running =:= true]. - -get_replnode(Node, S) -> - {value, ReplNode} = lists:keysearch(Node, #replnode.node, S#state.replnodes), - ReplNode. - -add_replnode(Node, S) -> - ReplNode = #replnode{node = Node, - candidates = lists:sort(candidates(S)), - workers = lists:sort([Node | workers(S)])}, - UpdReplNodes = lists:keystore(Node, #replnode.node, - S#state.replnodes, ReplNode), - S#state{replnodes = UpdReplNodes}. - -upd_replnode(ReplNode, S) -> - ?DBG("Updating ~p\nin ~p\n", [ReplNode, S]), - UpdReplNodes = lists:keyreplace(ReplNode#replnode.node, #replnode.node, - S#state.replnodes, ReplNode), - ?DBG("Updated ~p\n", [UpdReplNodes]), - S#state{replnodes = UpdReplNodes}. - -%% Check if a node exists in the state -node_exists(Node,S) -> - try - get_replnode(Node, S), - true - catch - _:_ -> - false % no match on node name - end. - -%% Check if a node has the replnode element at Pos == Value -check_replnode(Node, Pos, Value, S) -> - try - ReplNode = get_replnode(Node, S), - element(Pos, ReplNode) =:= Value - catch - _:_ -> - false % no match on node name (or bad position) - end. - -wait_for_helper(Node) -> - wait_for_helper(Node, 1000). - -wait_for_helper(_Node, 0) -> - helper_timeout; -wait_for_helper(Node, Retries) -> - case rpc:call(Node, riak_repl_leader, helper_pid, []) of - undefined -> - timer:sleep(10), - wait_for_helper(Node, Retries -1); - Pid when is_pid(Pid) -> - {ok, Pid}; - {killed, _OldPid} -> - timer:sleep(10), - wait_for_helper(Node, Retries - 1) - end. - -%% Ask the helper who it thinks the leader is and all up candidates -%% that will answer. Nodes must be 'up' *and* have the same worker/candidate -%% lists. -%% Call this makes sure any elections have been propagated to the -%% riak_repl_helper process. The helper is only asked when at least on -%% candidate nodes should be up, otherwise it will block waiting for any -%% candidate. -helper_leader_node(N, S) -> - ?DBG("Handler leader node ~p\nState:\n~p\n", [N, S]), - RN = get_replnode(N, S), - C = RN#replnode.candidates, - W = RN#replnode.workers, - CRNs = [get_replnode(X, S) || X <- C], - ?DBG("Candidate replication nodes\n~p\n", [CRNs]), - UpCandidates = [CRN#replnode.node || CRN <- CRNs, - CRN#replnode.running =:= true, - lists:member(CRN#replnode.node, C), - CRN#replnode.candidates =:= C, - CRN#replnode.workers =:= W], - case UpCandidates of - [] -> - HelperLN = no_candidates; - _ -> - {ok, Helper} = wait_for_helper(N), - HelperLN = rpc:call(N, riak_repl_leader_helper, - leader_node, [Helper, 300000], 305000) - end, - ?DBG("UpCandidatess ~p~n", [UpCandidates]), - {HelperLN, UpCandidates}. - -%% ==================================================================== -%% Slave driver - link all slaves under a single process for easy cleanup -%% ==================================================================== - -start_slave_driver() -> - ?DBG("Starting slave driver\n", []), - ReplyTo = self(), - spawn(fun() -> - true = register(slave_driver, self()), - ?DBG("Started slave driver\n", []), - ReplyTo ! ready, - slave_driver_loop() - end), - receive - ready -> - ok - after - 5000 -> - throw(slave_driver_timeout) - end. - -slave_driver_loop() -> - receive - {start, Name, ReplyTo} -> - ?DBG("starting node ~p@~p from node ~p~n", [Name, - get_host(node()), node()]), - {ok, Node} = slave:start_link(get_host(node()), Name), - true = rpc:call(Node, code, set_path, [good_path()]), - ReplyTo ! {ok, Node}; - {stop, Node, ReplyTo} -> - ok = slave:stop(Node), - pang = net_adm:ping(Node), - ReplyTo ! {stopped, Node} - end, - slave_driver_loop(). - -stop_slave_driver() -> - ?DBG("Stopping slave driver\n", []), - case whereis(slave_driver) of - undefined -> - ?DBG("Slave driver not running\n", []), - ok; - Pid -> - Mref = erlang:monitor(process, Pid), - exit(Pid, kill), - receive - {'DOWN', Mref, process, _Obj, _Info} -> - ?DBG("Stopped slave driver\n", []), - ok - end - end. - -start_slave(Node) -> - Name = get_name(Node), - slave_driver ! {start, Name, self()}, - receive - {ok, Node} -> - ok - end. - -stop_slave(Node) -> - slave_driver ! {stop, Node, self()}, - receive - {stopped, Node} -> - ok - end. - -%% ==================================================================== -%% Slave functions - code that runs on the slave nodes for setup -%% ==================================================================== - -setup_slave(Candidates, Workers) -> - %mock_repl_controller(), - start_leader(Candidates, Workers). - -mock_repl_controller() -> - ?DBG("Mocking riak_repl_controller on ~p\n", [node()]), - {module, meck} = code:ensure_loaded(meck), - ok = meck:new(riak_repl_controller, [no_link]), - ok = meck:expect(riak_repl_controller, set_is_leader, - fun(_Bool) -> ok end), - ok = riak_repl_controller:set_is_leader(false), % call it, just to prove it works - ?DBG("Mocked riak_repl_controller on ~p\n", [node()]). - -start_leader(Candidates, Workers) -> - ?DBG("Starting repl on ~p\n", [node()]), - - application:start(ranch), - %% Set up the application config so multiple leaders do not - %% tread on one anothers toes - application:load(riak_repl), - DataRoot = "repl_leader_eqc_data/"++atom_to_list(node()), - application:set_env(riak_repl, data_root, DataRoot), - - %% cannot just call rpc:call(Node, riak_repl_leader, start_link, []) as it - %% would link to the rex process created for the call. This creates the - %% process and unlinks before returning. - {ok, REPid} = riak_core_ring_events:start_link(), - ?DBG("Started ring_events at ~p~n", [REPid]), - unlink(REPid), - {ok, RMPid} = riak_core_ring_manager:start_link(test), - ?DBG("Started ring_manager at ~p~n", [RMPid]), - unlink(RMPid), - {ok, NWEPid} = riak_core_node_watcher_events:start_link(), - ?DBG("Started node_watcher_events at ~p~n", [NWEPid]), - unlink(NWEPid), - application:set_env(riak_core, gossip_interval, 5000), - {ok, NWPid} = riak_core_node_watcher:start_link(), - ?DBG("Started node_watcher at ~p~n", [NWPid]), - unlink(NWPid), - {ok, CSPid} = riak_repl_client_sup:start_link(), - ?DBG("Started repl client_sup at ~p~n", [CSPid]), - unlink(CSPid), - {ok, SSPid} = riak_repl_server_sup:start_link(), - ?DBG("Started repl server_sup at ~p~n", [SSPid]), - unlink(SSPid), - {ok, Pid} = riak_repl_leader:start_link(), - ?DBG("Started repl leader at ~p", [Pid]), - unlink(Pid), - - %% set the candidates so that the repl helper is created - ok = riak_repl_leader:set_candidates(Candidates, Workers), - - ?DBG("Started repl on ~p as ~p with candidates {~p, ~p}\n", - [node(), Pid, Candidates, Workers]), - - %riak_repl_leader:ensure_sites(), - - %% Check leader completes election - %{ok, Helper} = wait_for_helper(node()), - %_HelperLN = riak_repl_leader_helper:leader_node(Helper, 10000), - ok. - - -%% Creates a dummy process that waits for the message 'die' -register_receiver() -> - Pid = spawn(fun() -> - receive - die -> - ok - end - end), - Leader1 = riak_repl_leader:leader_node(), - Res = riak_repl_leader:add_receiver_pid(Pid), - Leader2 = riak_repl_leader:leader_node(), - {Leader1, Leader2, Res, Pid}. - diff --git a/rebar.config b/rebar.config index 8787d31c..f071f418 100644 --- a/rebar.config +++ b/rebar.config @@ -22,9 +22,9 @@ {deps, [ {lager, {git, "https://github.com/erlang-lager/lager.git", {tag, "3.8.0"}}}, - {ranch, {git, "https://github.com/ninenines/ranch.git", {tag, "1.6.0"}}}, + {ranch, {git, "https://github.com/ninenines/ranch.git", {tag, "1.6.2"}}}, {ebloom,{git, "https://github.com/basho/ebloom.git", {tag, "2.1.0"}}}, - {riak_kv, {git, "https://github.com/basho/riak_kv.git", {tag, "riak_kv-3.0.11"}}} + {riak_kv, {git, "https://github.com/basho/riak_kv.git", {branch, "develop-3.0"}}} ]}. {plugins, [{rebar3_gpb_plugin, {git, "https://github.com/basho/rebar3_gpb_plugin", {tag, "2.15.1+riak.3.0.4"}}}, diff --git a/test/rt_source_eqc.erl b/test/rt_source_eqc.erl deleted file mode 100644 index 5c0520fd..00000000 --- a/test/rt_source_eqc.erl +++ /dev/null @@ -1,615 +0,0 @@ --module(rt_source_eqc). - --compile([export_all, nowarn_export_all]). - --ifdef(EQC). --include("rt_source_eqc.hrl"). --include_lib("eqc/include/eqc.hrl"). --include_lib("eqc/include/eqc_statem.hrl"). --include_lib("eunit/include/eunit.hrl"). - --define(P(EXPR), PPP = (EXPR), case PPP of true -> ok; _ -> io:format(user, "PPP ~p at line ~p\n", [PPP, ?LINE]) end, PPP). --define(QC_OUT(P), - eqc:on_output(fun(Str, Args) -> - io:format(user, Str, Args) end, P)). - --record(pushed, { - seq, - v1_seq, - push_res, - skips, - remotes_up -}). - - -%% =================================================================== -%% Helper Funcs -%% =================================================================== - --record(ctx, { - fs_pid, - fs_port -}). - -setup() -> - % ?debugMsg("enter setup()"), - application:load(sasl), - application:set_env(sasl, sasl_error_logger, {file, "rt_source_eqc_sasl.log"}), - error_logger:tty(false), - application:start(lager), - riak_repl_test_util:stop_test_ring(), - riak_repl_test_util:start_test_ring(), - riak_repl_test_util:abstract_gen_tcp(), - riak_repl_test_util:abstract_stats(), - riak_repl_test_util:abstract_stateful(), - kill_rtq(), - {ok, _RTPid} = rt_source_helpers:start_rt(), - {ok, _RTQPid} = rt_source_helpers:start_rtq(), - {ok, _TCPMonPid} = rt_source_helpers:start_tcp_mon(), - %% {ok, _Pid1} = riak_core_service_mgr:start_link(ClusterAddr), - %% {ok, _Pid2} = riak_core_connection_mgr:start_link(), - %% {ok, _Pid3} = riak_core_cluster_conn_sup:start_link(), - %% {ok, _Pid4 } = riak_core_cluster_mgr:start_link(), - {ok, FsPid, FsPort} = rt_source_helpers:init_fake_sink(), - ok = rt_source_helpers:abstract_connection_mgr(FsPort), - R = #ctx{fs_pid = FsPid, fs_port = FsPort}, - % ?debugFmt("leave setup() -> ~p", [R]), - R. - -cleanup() -> - % ?debugFmt("enter cleanup(~p)", [_Ctx]), - rt_source_helpers:kill_fake_sink(), - riak_repl_test_util:kill_and_wait(riak_core_tcp_mon), - riak_repl2_rtq:stop(), - riak_repl_test_util:kill_and_wait(riak_repl2_rt), - riak_repl_test_util:stop_test_ring(), - meck:unload(), - % ?debugMsg("leave cleanup(~p)", [_Ctx]), - ok. - -property_test() -> - {spawn, - [%% Run the quickcheck tests - {timeout, 120, - ?_assertEqual(true, eqc:quickcheck(eqc:numtests(5, ?QC_OUT(prop_main()))))} - ] - }. - -prop_main() -> - ?SETUP(fun() -> - setup(), - fun() -> - process_flag(trap_exit, false), - cleanup() - end - end, - ?FORALL(Cmds, noshrink(commands(?MODULE)), - begin - {H, S, Res} = run_commands(?MODULE, Cmds), - aggregate(command_names(Cmds), - pretty_commands(?MODULE, Cmds, {H,S,Res}, Res == ok)) - end)). - -%% ==================================================================== -%% Generators (including commands) -%% ==================================================================== - -command(S) -> - oneof( - [{call, ?MODULE, connect_to_v1, [remote_name(S), S#state.master_queue]} || S#state.remotes_available /= []] ++ - [{call, ?MODULE, connect_to_v2, [remote_name(S), S#state.master_queue]} || S#state.remotes_available /= []] ++ - [{call, ?MODULE, disconnect, [elements(S#state.sources)]} || S#state.sources /= []] ++ - % push an object that may already have been rt'ed - [{call, ?MODULE, push_object, [g_unique_remotes(), g_riak_object(), S]} || S#state.sources /= []] ++ - [?LET({_Remote, SrcState} = Source, elements(S#state.sources), {call, ?MODULE, ack_objects, [g_up_to_length(SrcState#src_state.unacked_objects), Source]}) || S#state.sources /= []] - ). - -g_riak_object() -> - ?LET({Bucket, Key, Content}, {binary(), binary(), binary()}, riak_object:new(Bucket, Key, Content)). - -g_up_to_length([]) -> - 0; -g_up_to_length(List) -> - Max = length(List), - choose(0, Max). - -g_unique_remotes() -> - ?LET(Remotes, list(g_remote_name()), lists:usort(Remotes)). - -g_remote_name() -> - oneof(?all_remotes). - -% name of a remote -remote_name(#state{remotes_available = []}) -> - erlang:error(no_name_available); -remote_name(#state{remotes_available = Remotes}) -> - oneof(Remotes). - -precondition(#state{master_queue = MasterQ} = S, {call, _, Connect, [Remote, MasterQ]}) when Connect =:= connect_to_v1; Connect =:= connect_to_v2 -> - %% ?debugFmt("Remote requested: ~p Remotes available: ~p", [Remote, S#state.remotes_available]), - lists:member(Remote, S#state.remotes_available); -precondition(#state{sources = []}, {call, _, disconnect, _Args}) -> - false; -precondition(S, {call, _, disconnect, _Args}) -> - S#state.sources /= []; -precondition(_S, {call, _, ack_objects, [0, _]}) -> - false; -precondition(#state{sources = []}, {call, _, ack_objects, _Args}) -> - false; -precondition(S, {call, _, ack_objects, _Args}) -> - S#state.sources /= []; -precondition(_S, {call, _, push_object, [[], _, _]}) -> - false; -precondition(S, {call, _, push_object, [_, _, S]}) -> - S#state.sources /= []; -precondition(_S, {call, _, push_object, [_, _, _NotS]}) -> - %% ?debugFmt("Bad states.~n State: ~p~nArg: ~p", [S, NotS]), - false; -precondition(_S, _Call) -> - true. - -dynamic_precondition(#state{sources = Sources}, {call, _, disconnect, [{Remote, _}]}) -> - is_tuple(lists:keyfind(Remote, 1, Sources)); -dynamic_precondition(#state{sources = Sources}, {call, _, ack_objects, [NumAck, {Remote, SrcState}]}) -> - case lists:keyfind(Remote, 1, Sources) of - {_, #src_state{unacked_objects = UnAcked}} when length(UnAcked) >= NumAck -> - UnAcked =:= SrcState#src_state.unacked_objects; - %true; - _ -> - false - end; -dynamic_precondition(_S, _Call) -> - true. -%% dynamic_precondition(_S, {call, _, Connect, [_Remote, _MasterQ]}) when Connect =:= connect_to_v1; Connect =:= connect_to_v2 -> -%% false; - -%% ==================================================================== -%% state generation -%% ==================================================================== - -initial_state() -> - %% catch unlink(whereis(fake_sink)), - %% catch exit(whereis(fake_sink), kill), - process_flag(trap_exit, true), - %% riak_repl_test_util:stop_test_ring(), - %% riak_repl_test_util:start_test_ring(), - %% riak_repl_test_util:abstract_gen_tcp(), - %% riak_repl_test_util:abstract_stats(), - %% riak_repl_test_util:abstract_stateful(), - %% abstract_connection_mgr(), - %% kill_rtq(), - %% {ok, _RTPid} = start_rt(), - %% {ok, _RTQPid} = start_rtq(), - %% {ok, _TCPMonPid} = start_tcp_mon(), - %% {ok, _FakeSinkPid} = start_fake_sink(), - #state{}. - -kill_rtq() -> - case whereis(riak_repl2_rtq) of - undefined -> - ok; - _ -> - riak_repl2_rtq:stop(), - catch exit(riak_repl2_rtq, kill) - end. - -next_state(S, Res, {call, _, connect_to_v1, [Remote, _MQ]}) -> - SrcState = #src_state{pids = Res, version = 1}, - next_state_connect(Remote, SrcState, S); - -next_state(S, Res, {call, _, connect_to_v2, [Remote, _MQ]}) -> - SrcState = #src_state{pids = Res, version = 2}, - next_state_connect(Remote, SrcState, S); - -next_state(S, _Res, {call, _, disconnect, [{Remote, _}]}) -> - %% ?debugFmt("Removing ~p from sources", [Remote]), - Sources = lists:keydelete(Remote, 1, S#state.sources), - Master = if - Sources == [] -> - []; - true -> - MasterKeys = ordsets:from_list([Seq || {Seq, _, _, _} <- S#state.master_queue]), - SourceQueues = [Source#src_state.unacked_objects || {_, Source} <- Sources], - ExtractSeqs = fun(Elem, Acc) -> - SrcSeqs = ordsets:from_list([PushedThang#pushed.seq || PushedThang <- Elem]), - ordsets:union(Acc, SrcSeqs) - end, - ActiveSeqs = lists:foldl(ExtractSeqs, [], SourceQueues), - RemoveSeqs = ordsets:subtract(MasterKeys, ActiveSeqs), - UpdateMaster = fun(RemoveSeq, Acc) -> - lists:keystore(RemoveSeq, 1, Acc, {RemoveSeq, tombstone}) - end, - lists:foldl(UpdateMaster, S#state.master_queue, RemoveSeqs) - end, - %% ?debugFmt("Updated remotes after disconnect: ~p", [[Remote | S#state.remotes_available]]), - S#state{master_queue = Master, sources = Sources, remotes_available = [Remote | S#state.remotes_available]}; - -next_state(S, Res, {call, _, push_object, [Remotes, RiakObj, _S]}) -> - Seq = S#state.seq + 1, - Sources = update_unacked_objects(Remotes, Seq, Res, S), - RoutingSources = [R || {R, _} <- Sources, not lists:member(R, Remotes)], - Master2 = case RoutingSources of - [] -> - [{Seq, tombstone} | S#state.master_queue]; - _ -> - Master = S#state.master_queue, - [{Seq, Remotes, RiakObj, Res} | Master] - end, - S#state{sources = Sources, master_queue = Master2, seq = Seq}; - -next_state(S, _Res, {call, _, ack_objects, [NumAcked, {Remote, _Source}]}) -> - case lists:keytake(Remote, 1, S#state.sources) of - false -> - S; - {value, {Remote, RealSource}, Sources} -> - UnAcked = RealSource#src_state.unacked_objects, - {Updated, Chopped} = model_ack_objects(NumAcked, UnAcked), - SrcState2 = RealSource#src_state{unacked_objects = Updated}, - Sources2 = [{Remote, SrcState2} | Sources], - %% ?debugFmt("Updated ack sources: ~p", [[R || {R, _} <- Sources2]]), - Master2 = remove_fully_acked(S#state.master_queue, Chopped, Sources2), - S#state{sources = Sources2, master_queue = Master2} - end. - -next_state_connect(Remote, SrcState, State) -> - case lists:keyfind(Remote, 1, State#state.sources) of - false -> - Remotes = lists:delete(Remote, State#state.remotes_available), - {NewQueue, Skips, Offset} = generate_unacked_from_master(State, Remote), - SrcState2 = SrcState#src_state{unacked_objects = NewQueue, skips = Skips, offset = Offset}, - %% ?debugFmt("Adding ~p to sources", [Remote]), - Sources = [{Remote, SrcState2} | State#state.sources], - %% ?debugFmt("Updated sources: ~p", [[R || {R, _} <- Sources]]), - State#state{sources = Sources, remotes_available = Remotes}; - _Tuple -> - State - end. - -generate_unacked_from_master(State, Remote) -> - UpRemotes = running_remotes(State), - generate_unacked_from_master(lists:reverse(State#state.master_queue), UpRemotes, Remote, undefined, 0, []). - -generate_unacked_from_master([], _UpRemotes, _Remote, Skips, Offset, Acc) -> - {Acc, Skips, Offset}; -generate_unacked_from_master([{_Seq, tombstone} | Tail], UpRemotes, Remote, undefined, Offset, Acc) -> - generate_unacked_from_master(Tail, UpRemotes, Remote, undefined, Offset, Acc); -generate_unacked_from_master([{_Seq, tombstone} | Tail], UpRemotes, Remote, Skips, Offset, Acc) -> - generate_unacked_from_master(Tail, UpRemotes, Remote, Skips + 1, Offset, Acc); -generate_unacked_from_master([{Seq, Remotes, _Binary, Res} | Tail], UpRemotes, Remote, Skips, Offset, Acc) -> - case {lists:member(Remote, Remotes), Skips} of - {true, undefined} -> - % on start up, we don't worry about skips until we've sent at least - % one; the sink doesn't care what the first seq number is anyway. - generate_unacked_from_master(Tail, UpRemotes, Remote, Skips, Offset, Acc); - {true, _} -> - generate_unacked_from_master(Tail, UpRemotes, Remote, Skips + 1, Offset, Acc); - {false, _} -> - NextSkips = case Skips of - undefined -> 0; - _ -> Skips - end, - Offset2 = Offset + NextSkips, - Obj = #pushed{seq = Seq, v1_seq = Seq - Offset2, push_res = Res, skips = NextSkips, - remotes_up = UpRemotes}, - Acc2 = [Obj | Acc], - generate_unacked_from_master(Tail, UpRemotes, Remote, 0, Offset2, Acc2) - end. - -remove_fully_acked(Master, [], _Sources) -> - Master; - -remove_fully_acked(Master, [Pushed | Chopped], Sources) -> - #pushed{seq = Seq} = Pushed, - case is_in_a_source(Seq, Sources) of - true -> - remove_fully_acked(Master, Chopped, Sources); - false -> - Master2 = lists:keystore(Seq, 1, Master, {Seq, tombstone}), - remove_fully_acked(Master2, Chopped, Sources) - end. - -is_in_a_source(_Seq, []) -> - false; -is_in_a_source(Seq, [{_Remote, #src_state{unacked_objects = UnAcked}} | Tail]) -> - case lists:keymember(Seq, #pushed.seq, UnAcked) of - true -> - true; - false -> - is_in_a_source(Seq, Tail) - end. - -running_remotes(State) -> - Remotes = [Remote || {Remote, _} <- State#state.sources], - ordsets:from_list(Remotes). - -update_unacked_objects(Remotes, Seq, Res, State) when is_record(State, state) -> - UpRemotes = running_remotes(State), - update_unacked_objects(Remotes, Seq, Res, UpRemotes, State#state.sources, []). - -update_unacked_objects(_Remotes, _Seq, _Res, _UpRemotes, [], Acc) -> - lists:reverse(Acc); - -update_unacked_objects(Remotes, Seq, Res, UpRemotes, [{Remote, Source} | Tail], Acc) -> - case lists:member(Remote, Remotes) of - true -> - Skipped = case Source#src_state.skips of - undefined -> undefined; - _ -> Source#src_state.skips + 1 - end, - update_unacked_objects(Remotes, Seq, Res, UpRemotes, Tail, [{Remote, Source#src_state{skips = Skipped}} | Acc]); - false -> - Entry = model_push_object(Seq, Res, UpRemotes, Source), - update_unacked_objects(Remotes, Seq, Res, UpRemotes, Tail, [{Remote, Entry} | Acc]) - end. - -model_push_object(Seq, Res, UpRemotes, SrcState = #src_state{unacked_objects = ObjQueue}) -> - {Seq2, Offset2} = case SrcState#src_state.skips of - undefined -> - {Seq, SrcState#src_state.offset}; - _ -> - Off2 = SrcState#src_state.offset + SrcState#src_state.skips, - {Seq - Off2, Off2} - end, - Obj = #pushed{seq = Seq, v1_seq = Seq2, push_res = Res, skips = SrcState#src_state.skips, remotes_up = UpRemotes}, - SrcState#src_state{unacked_objects = [Obj | ObjQueue], offset = Offset2, skips = 0}. - -insert_skip_meta({Seq, {Count, Bin, Meta}}, #src_state{skips = SkipCount}) -> - Meta2 = orddict:from_list(Meta), - Meta3 = orddict:store(skip_count, SkipCount, Meta2), - {Seq, {Count, Bin, Meta3}}. - -model_ack_objects(_Num, []) -> - {[], []}; -model_ack_objects(NumAcked, Unacked) when length(Unacked) >= NumAcked -> - NewLength = length(Unacked) - NumAcked, - lists:split(NewLength, Unacked). - -%% ==================================================================== -%% postcondition -%% ==================================================================== - -postcondition(_State, {call, _, connect_to_v1, _Args}, {error, _}) -> - ?P(false); -postcondition(_State, {call, _, connect_to_v1, _Args}, {Source, Sink}) -> - ?P(is_pid(Source) andalso is_pid(Sink)); - -postcondition(_State, {call, _, connect_to_v2, _Args}, {error, _}) -> - ?P(false); -postcondition(_State, {call, _, connect_to_v2, _Args}, {Source, Sink}) -> - ?P(is_pid(Source) andalso is_pid(Sink)); - -postcondition(_State, {call, _, disconnect, [_SourceState]}, {Source, _Sink}) -> - %% ?P(lists:all(fun(ok) -> true; (_) -> false end, Waits)); - %% ?debugFmt("Source alive? ~p", [is_process_alive(Source)]), - ?P(is_process_alive(Source) =:= false); - -postcondition(_State, {call, _, push_object, [Remotes, _RiakObj, State]}, Res) -> - ?P(assert_sink_bugs(Res, Remotes, State#state.sources)); - -postcondition(State, {call, _, ack_objects, [NumAck, {Remote, Source}]}, AckedStack) -> - RemoteLives = is_tuple(lists:keyfind(Remote, 1, State#state.sources)), - #src_state{unacked_objects = UnAcked} = Source, - {_, Acked} = lists:split(length(UnAcked) - NumAck, UnAcked), - if - RemoteLives == false -> - ?P(AckedStack == []); - length(Acked) =/= length(AckedStack) -> - ?debugMsg("Acked length is not the same as AckedStack length"), - ?P(false); - true -> - ?P(assert_sink_ackings(Remote, Source#src_state.version, Acked, AckedStack)) - end; - -postcondition(_S, _C, _R) -> - ?debugMsg("fall through postcondition"), - false. - -assert_sink_bugs(Object, Remotes, Sources) -> - Active = [A || {A, _} <- Sources], - assert_sink_bugs(Object, Remotes, Active, Sources, []). - -assert_sink_bugs(_Object, _Remotes, _Active, [], Acc) -> - lists:all(fun(true) -> true; (_) -> false end, Acc); - -assert_sink_bugs(Object, Remotes, Active, [{Remote, SrcState} | Tail], Acc) -> - Truthiness = assert_sink_bug(Object, Remotes, Remote, Active, SrcState), - assert_sink_bugs(Object, Remotes, Active, Tail, [Truthiness | Acc]). - -assert_sink_bug({_Num, RiakObj, BadMeta}, Remotes, Remote, Active, SrcState) -> - {_, Sink} = SrcState#src_state.pids, - Version = SrcState#src_state.version, - History = gen_server:call(Sink, history), - ShouldSkip = lists:member(Remote, Remotes), - BadMeta2 = set_skip_meta(BadMeta, SrcState), - Routed = ordsets:from_list(Remotes ++ [Remote] ++ Active ++ ["undefined"]), - Meta = orddict:store(routed_clusters, Routed, BadMeta2), - ObjBin = case SrcState#src_state.version of - 1 -> - term_to_binary([RiakObj]); - 2 -> - riak_repl_util:to_wire(w1, [RiakObj]) - end, - if - ShouldSkip andalso length(History) == length(SrcState#src_state.unacked_objects) -> - true; - ShouldSkip -> - ?debugFmt("assert sink history length failure!~n" - " Remote: ~p~n" - " Length Sink Hist: ~p~n" - " Length Model Hist: ~p~n" - " Sink:~n" - "~p~n" - " Model:~n" - "~p",[Remote, length(History), length(SrcState#src_state.unacked_objects), History, SrcState#src_state.unacked_objects]), - false; - true -> - Frame = hd(History), - case {Version, Frame} of - {1, {objects, {_Seq, ObjBin}}} -> - true; - {2, {objects_and_meta, {_Seq, ObjBin, _M}}} -> - true; - _ -> - ?debugFmt("assert sink bug failure!~n" - " Remote: ~p~n" - " Remotes: ~p~n" - " Active: ~p~n" - " ObjBin: ~p~n" - " Meta: ~p~n" - " ShouldSkip: ~p~n" - " Version: ~p~n" - " Frame: ~p~n" - " Length Sink Hist: ~p~n" - " History: ~p~n" - " Length Model Hist: ~p", [Remote, Remotes, Active, ObjBin, Meta, ShouldSkip, Version, Frame, length(History), History, length(SrcState#src_state.unacked_objects)]), - false - end - end. - -set_skip_meta(Meta, #src_state{skips = Skips}) -> - set_skip_meta(Meta, Skips); -set_skip_meta(Meta, undefined) -> - set_skip_meta(Meta, 0); -set_skip_meta(Meta, Skips) -> - orddict:store(skip_count, Skips, Meta). - -assert_sink_ackings(Remote, Version, Expecteds, Gots) -> - assert_sink_ackings(Remote, Version, Expecteds, Gots, []). - -assert_sink_ackings(_Remote, _Version, [], [], Acc) -> - lists:all(fun(true) -> true; (_) -> false end, Acc); - -assert_sink_ackings(Remote, Version, [Expected | ETail], [Got | GotTail], Acc) -> - AHead = assert_sink_acking(Remote, Version, Expected, Got), - assert_sink_ackings(Remote, Version, ETail, GotTail, [AHead | Acc]). - -assert_sink_acking(Remote, Version, Pushed, Got) -> - #pushed{seq = Seq, v1_seq = V1Seq, push_res = {1, RiakObj, PushMeta}, skips = Skip, - remotes_up = UpRemotes} = Pushed, - PushMeta1 = set_skip_meta(PushMeta, Skip), - PushMeta2 = fix_routed_meta(PushMeta1, ordsets:add_element(Remote, UpRemotes)), - Meta = PushMeta2, - ObjBin = case Version of - 1 -> riak_repl_util:to_wire(w0, [RiakObj]); - 2 -> riak_repl_util:to_wire(w1, [RiakObj]) - end, - case {Version, Got} of - {1, {objects, {V1Seq, ObjBin}}} -> - true; - {2, {objects_and_meta, {Seq, ObjBin, Meta}}} -> - true; - _ -> - ?debugFmt("Sink ack failure!~n" - " Remote: ~p~n" - " UpRemotes: ~p~n" - " Version: ~p~n" - " ObjBin: ~p~n" - " Seq: ~p~n" - " V1Seq: ~p~n" - " Skip: ~p~n" - " PushMeta: ~p~n" - " Meta: ~p~n" - " Got: ~p", [Remote, UpRemotes, Version, ObjBin, Seq, V1Seq, Skip, PushMeta, Meta, Got]), - false - end. - -fix_routed_meta(Meta, AdditionalRemotes) -> - Routed1 = case orddict:find(routed_clusters, Meta) of - error -> []; - {ok, V} -> V - end, - Routed2 = ordsets:union(AdditionalRemotes, Routed1), - Routed3 = ordsets:add_element("undefined", Routed2), - orddict:store(routed_clusters, Routed3, Meta). - -%% ==================================================================== -%% test callbacks -%% ==================================================================== - -connect_to_v1(RemoteName, MasterQueue) -> - %% ?debugFmt("connect_to_v1: ~p", [RemoteName]), - stateful:set(version, {realtime, {1,0}, {1,0}}), - connect(RemoteName, MasterQueue). - -connect_to_v2(RemoteName, MasterQueue) -> - %% ?debugFmt("connect_to_v2: ~p", [RemoteName]), - stateful:set(version, {realtime, {2,0}, {2,0}}), - connect(RemoteName, MasterQueue). - -connect(RemoteName, MasterQueue) -> - %% ?debugFmt("connect: ~p", [RemoteName]), - stateful:set(remote, RemoteName), - %% ?debugMsg("Starting rtsource link"), - {ok, SourcePid} = riak_repl2_rtsource_conn:start_link(RemoteName), - %% ?debugFmt("rtsource pid: ~p", [SourcePid]), - %% ?debugMsg("Waiting for sink_started"), - _ = wait_for_rtsource_helper(SourcePid), - FakeSink = whereis(fake_sink), - %% ?debugFmt("fake_sink pid: ~p", [FakeSink]), - FakeSink ! {status, self()}, - receive - {sink_started, SinkPid} -> - erlang:monitor(process, SinkPid), - %% ?debugFmt("V1 SinkPid: ~p", [SinkPid]), - rt_source_helpers:wait_for_valid_sink_history(SinkPid, RemoteName, MasterQueue), - ok = rt_source_helpers:ensure_registered(RemoteName), - {SourcePid, SinkPid} - after 5000 -> - {error, timeout} - end. - -wait_for_rtsource_helper(SourcePid) -> - Status = riak_repl2_rtsource_conn:status(SourcePid), - wait_for_rtsource_helper(SourcePid, 20, 1000, Status). - -wait_for_rtsource_helper(_SourcePid, 0, _Wait, _Status) -> - {error, rtsource_helper_failed}; -wait_for_rtsource_helper(SourcePid, RetriesLeft, Wait, Status) -> - case lists:keyfind(helper_pid, 1, Status) of - false -> - timer:sleep(Wait), - NewStatus = riak_repl2_rtsource_conn:status(SourcePid), - wait_for_rtsource_helper(SourcePid, RetriesLeft-1, Wait, NewStatus); - _ -> - ok - end. - -disconnect(ConnectState) -> - {Remote, SrcState} = ConnectState, - %% ?debugFmt("Disconnecting ~p", [Remote]), - #src_state{pids = {Source, Sink}} = SrcState, - %% ?debugFmt("is Source alive: ~p", [is_process_alive(Source)]), - %% ?debugFmt("is Sink ~p alive: ~p", [Sink, is_process_alive(Sink)]), - %% Stop the source, but no need to stop our fake sink - riak_repl2_rtsource_conn:stop(Source), - Sink ! stop, %% Reset the fake sink history - riak_repl2_rtq:unregister(Remote), - {Source, Sink}. - %% [riak_repl_test_util:wait_for_pid(P, 3000) || P <- [Source, Sink]]. - -push_object(Remotes, RiakObj, State) -> - %% ?debugFmt("push_object remotes: ~p", [Remotes]), - Meta = [{routed_clusters, Remotes}], - riak_repl2_rtq:push(1, riak_repl_util:to_wire(w1, [RiakObj]), Meta), - rt_source_helpers:wait_for_pushes(State, Remotes), - {1, RiakObj, Meta}. - -ack_objects(NumToAck, {Remote, SrcState}) -> - {_, Sink} = SrcState#src_state.pids, - ProcessAlive = is_process_alive(Sink), - if - ProcessAlive -> - {ok, Acked} = gen_server:call(Sink, {ack, NumToAck}), - case Acked of - [] -> - ok; - [{objects_and_meta, {Seq, _, _}} | _] -> - riak_repl2_rtq:ack(Remote, Seq); - [{objects, {Seq, _}} | _] -> - riak_repl2_rtq:ack(Remote, Seq) - end, - Acked; - true -> - [] - end. --endif. diff --git a/test/rt_source_helpers.erl b/test/rt_source_helpers.erl index 2bc58ff9..c04ff81a 100644 --- a/test/rt_source_helpers.erl +++ b/test/rt_source_helpers.erl @@ -12,74 +12,6 @@ %% helpful utility functions %% ==================================================================== -ensure_registered(RemoteName) -> - ensure_registered(RemoteName, 10). - -ensure_registered(_RemoteName, N) when N < 1 -> - {error, registration_timeout}; -ensure_registered(RemoteName, N) -> - Status = riak_repl2_rtq:status(), - %% ?debugFmt("RTQ Status: ~p~n", [Status]), - Consumers = proplists:get_value(consumers, Status), - case proplists:get_value(RemoteName, Consumers) of - undefined -> - timer:sleep(1000), - ensure_registered(RemoteName, N - 1); - _ -> - ok - end. - -wait_for_valid_sink_history(Pid, Remote, MasterQueue) -> - NewQueue = [{Seq, Queued} || {Seq, RoutedRemotes, _Binary, Queued} - <- MasterQueue, not lists:member(Remote, RoutedRemotes)], - if - length(NewQueue) > 0 -> - gen_server:call(Pid, {block_until, length(NewQueue)}, 30000); - true -> - ok - end. - -wait_for_pushes(State, Remotes) -> - [wait_for_push(SrcState, Remotes) || SrcState <- State#state.sources]. - -wait_for_push({Remote, SrcState}, Remotes) -> - case lists:member(Remote, Remotes) of - true -> ok; - _ -> - WaitLength = length(SrcState#src_state.unacked_objects) + 1, - {_, Sink} = SrcState#src_state.pids, - gen_server:call(Sink, {block_until, WaitLength}, 30000) - end. - -plant_bugs(_Remotes, []) -> - ok; -plant_bugs(Remotes, [{Remote, SrcState} | Tail]) -> - case lists:member(Remote, Remotes) of - true -> - plant_bugs(Remotes, Tail); - false -> - {_, Sink} = SrcState#src_state.pids, - ok = gen_server:call(Sink, bug), - plant_bugs(Remotes, Tail) - end. - -abstract_connection_mgr(RemotePort) when is_integer(RemotePort) -> - abstract_connection_mgr({"localhost", RemotePort}); -abstract_connection_mgr({RemoteHost, RemotePort} = RemoteName) -> - riak_repl_test_util:reset_meck(riak_core_connection_mgr, [no_link, passthrough]), - meck:expect(riak_core_connection_mgr, connect, fun(_ServiceAndRemote, ClientSpec) -> - proc_lib:spawn_link(fun() -> - %% ?debugFmt("connection_mgr connect for ~p", [ServiceAndRemote]), - Version = stateful:version(), - {_Proto, {TcpOpts, Module, Pid}} = ClientSpec, - %% ?debugFmt("connection_mgr callback module: ~p", [Module]), - {ok, Socket} = gen_tcp:connect(RemoteHost, RemotePort, [binary | TcpOpts]), - %% ?debugFmt("connection_mgr calling connection callback for ~p", [Pid]), - ok = Module:connected(Socket, gen_tcp, RemoteName, Version, Pid, []) - end), - {ok, make_ref()} - end). - start_rt() -> riak_repl_test_util:reset_meck(riak_repl2_rt, [no_link, passthrough]), WhoToTell = self(),