diff --git a/.hgignore b/.hgignore deleted file mode 100644 index 9fa67369..00000000 --- a/.hgignore +++ /dev/null @@ -1,5 +0,0 @@ -syntax regex -^ebin/.*.beam -.*~ -^.eunit/* -deps diff --git a/.hgtags b/.hgtags deleted file mode 100644 index 06f3b21b..00000000 --- a/.hgtags +++ /dev/null @@ -1,4 +0,0 @@ -ecd300bd87e35e52b1ee462830e66c05b1c315b4 riak_repl-0.13.0rc3 -dfcff7d31e9e83ea5f80b54543c5321b89d5814f riak_repl-0.13.0rc4 -ec9466b011933c4f78b18ae60e24e80df1259a54 riak_repl-0.13.0rc7 -fd9c0ba603c96fae87e9a563b0252efef398ff2c riak_repl-0.13.0 diff --git a/ebin/.empty_for_hg b/ebin/.empty_for_hg deleted file mode 100644 index e69de29b..00000000 diff --git a/eqc/repl_leader_eqc.erl b/eqc/repl_leader_eqc.erl deleted file mode 100644 index 63dd0570..00000000 --- a/eqc/repl_leader_eqc.erl +++ /dev/null @@ -1,624 +0,0 @@ -%% Riak EnterpriseDS -%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved. - -%% -%% Replication leader EQC -%% -%% This quickcheck test exercises leadership elections. It attempts -%% to simulate nodes going up and down with different candidates -%% for leaders in a way similar to ring gossips. The test state -%% keeps a list of nodes and for each whether node should be a -%% candidate (a listener) or a worker (a non-listener). Nodes -%% have their type toggled at random and the list is updated -%% at random. -%% -%% The test properties make sure that nodes with the same list -%% of candidates/workers elect the same leader. Nodes are -%% created using slave() which causes some interactions between -%% cover and generates some warning messages -%% - --module(repl_leader_eqc). - --include_lib("eqc/include/eqc.hrl"). --include_lib("eqc/include/eqc_fsm.hrl"). --include_lib("eunit/include/eunit.hrl"). - --compile([export_all, nowarn_export_all]). - --record(replnode, {node, - running=false, - type=worker, - candidates=[], - workers=[]}). --record(state, { replnodes=[] }). % {name, stopped | running} - --define(TEST_TIMEOUT, 30 * 60). --define(MAX_NODES, 5). --define(QC_OUT(P), - eqc:on_output(fun(Str, Args) -> io:format(user, Str, Args) end, P)). - --define(DBG(Fmt,Args),ok). -%%-define(DBG(Fmt,Args),io:format(user, Fmt, Args)). - -qc_test_() -> - %% try and clean the repl controller before cover ever runs - %code:purge(riak_repl_controller), - %code:delete(riak_repl_controller), - ?DBG("Cover modules:\n~p\n", [cover:modules()]), - Prop = ?QC_OUT(eqc:numtests(40, prop_main())), - case testcase() of - [] -> - {timeout, ?TEST_TIMEOUT, fun() -> ?assert(eqc:quickcheck(Prop)) end}; - Testcase -> - {timeout, ?TEST_TIMEOUT, fun() -> ?assert(eqc:check(Prop, Testcase)) end} - end. - -testcase() -> - []. - -prop_main() -> - ?FORALL(Cmds, commands(?MODULE), - begin - %% Setup - ?DBG("\n=== Starting ===\n", []), - os:cmd("rm -rf repl_leader_eqc_data"), - maybe_start_net_kernel(), - code:ensure_loaded(riak_repl_leader), - code:ensure_loaded(riak_repl_leader_helper), - %% Run tests - start_slave_driver(), - try - {H, {_State, _StateData}, Res} = run_commands(?MODULE,Cmds), - ?WHENFAIL(begin - io:format(user, "Test Failed\n~p\n", - [zip(state_names(H),command_names(Cmds))]), - io:format(user, "State: ~p\nStateData: ~p\nRes: ~p\n", - [_State, _StateData, Res]) - end, - %% Generate statistics - aggregate(zip(state_names(H),command_names(Cmds)), Res == ok)) - after - stop_slave_driver(), - net_kernel:stop() - end - end). - -% -%% ==================================================================== -%% eqc_fsm callbacks -%% ==================================================================== - -initial_state() -> - running. - -initial_state_data() -> - #state{}. - -running(S) -> - [{running, {call, ?MODULE, new_node, []}}, - {running, {call, ?MODULE, start_repl, [g_stopped_replnode(S)]}}, - {running, {call, ?MODULE, stop_repl, [g_running_node(S)]}}, - {running, {call, ?MODULE, toggle_type, [g_node(S)]}}, - %% {running, {call, ?MODULE, ping, [g_node(S)]}}, - {running, {call, ?MODULE, set_candidates, [g_running_node(S), - candidates(S), - workers(S), - S]}}, - {running, {call, ?MODULE, check_leaders, [S]}}, - {running, {call, ?MODULE, add_receiver, [g_running_node(S)]}}]. - -weight(_,_,{call,_,toggle_type,_}) -> 10; -weight(_,_,{call,_,stop_repl,_}) -> 10; -weight(_,_,_) -> 100. - -next_state_data(_From, _To, S, _Res, {call, ?MODULE, new_node, []}) -> - Node = make_node(length(S#state.replnodes)+1), - add_replnode(Node, S); -next_state_data(_From, _To, S, _Res, {call, ?MODULE, start_repl, [ReplNode]}) -> - upd_replnode(ReplNode#replnode{running = true}, S); -next_state_data(_From, _To, S, _Res, {call, ?MODULE, stop_repl, [Node]}) -> - ReplNode = get_replnode(Node, S), - upd_replnode(ReplNode#replnode{running = false}, S); -next_state_data(_From, _To, S, _Res, {call, ?MODULE, toggle_type, [Node]}) -> - ReplNode = get_replnode(Node, S), - case ReplNode#replnode.type of - worker-> - UpdReplNode=ReplNode#replnode{type = candidate}; - candidate -> - UpdReplNode=ReplNode#replnode{type = worker} - end, - upd_replnode(UpdReplNode, S); -next_state_data(_From, _To, S, _Res, {call, ?MODULE, set_candidates, - [Node, Candidates, Workers, S]}) -> - ReplNode = get_replnode(Node, S), - upd_replnode(ReplNode#replnode{candidates = lists:sort(Candidates), - workers = lists:sort(Workers)}, - S); -next_state_data(_From, _To, S, _Res, _Call) -> - S. - -precondition(_From, _To, S, {call, ?MODULE, new_node, []}) -> - length(S#state.replnodes) < ?MAX_NODES; -precondition(_From, _To, S, {call, ?MODULE, start_repl, [ReplNode]}) -> - lists:member(ReplNode, S#state.replnodes) andalso ReplNode#replnode.running =:= false; -precondition(_From, _To, S, {call, ?MODULE, stop_repl, [Node]}) -> - check_replnode(Node, #replnode.running, true, S); -precondition(_From, _To, S, {call, ?MODULE, toggle_type, [Node]}) -> - node_exists(Node, S); -precondition(_From, _To, S, {call, ?MODULE, set_candidates, [Node, _C, _W, S]}) -> - check_replnode(Node, #replnode.running, true, S); -precondition(_From, _To, S, {call, ?MODULE, add_receiver, [Node]}) -> - check_replnode(Node, #replnode.running, true, S); -precondition(_From, _To, _S, _Call) -> - true. - -postcondition(_From, _To, _S, {call, ?MODULE, start_repl, [_]}, Res) -> - case Res of - ok -> true; - _ -> {start_repl, Res} - end; -postcondition(_From, _To, S, {call, ?MODULE, ping, [Node]}, Res) -> - ReplNode = get_replnode(Node, S), - case ReplNode#replnode.running of - true -> - Res == pong; - false -> - Res == pang - end; -postcondition(_From, _To, S, {call, ?MODULE, check_leaders, _}, LeaderByNode) -> - NodesByCandidates0 = nodes_by_candidates(running_replnodes(S)), - NodesByCandidates = lists:filter(fun({{Candidates, _}, Nodes}) -> - lists:all(fun(Cand) -> lists:member(Cand, Nodes) end, Candidates) - end, NodesByCandidates0), - check_same_leaders(NodesByCandidates, LeaderByNode); -postcondition(_From, _To, _S, {call, ?MODULE, add_receiver, [Node]}, {Leader1, Leader2, Res, _Pid}) when Node == Leader1; Node == Leader2 -> - %% The node believes itself to be a leader - case Res of - ok -> - true; - _ -> - {add_receiver_expected_ok, Res} - end; -postcondition(_From, _To, _S, {call, ?MODULE, add_receiver, [_Node]}, - {_Leader1, _Leader2, Res, _Pid}=_R) when _Leader1 /= undefined, - _Leader2 /= undefined -> - %% Node thinks somebody else is the leader or not sure who the leader is. - %% If the candidates/workers is being changed and the node *was* the leader - %% then riak_repl_leader continues to act as the leader until the election - %% completes (but sets the leader undefined). Will only *definitely* - %% return {error, not_leader} once the election completes. - ?DBG("Postcond: n=~p r=~p\n", [_Node, _R]), - case Res of - {error, not_leader} -> - true; - _ -> - {add_receiver_expected_err, Res} - end; -postcondition(_From, _To, _S, _Call, _Res) -> - true. - -%% ==================================================================== -%% Generator functions -%% ==================================================================== - -g_node(S) -> - ?LET(RN, elements(S#state.replnodes), RN#replnode.node). - -g_running_node(S) -> - elements([RN#replnode.node || - RN <- S#state.replnodes, RN#replnode.running =:= true]). - -g_running_candidate(S) -> - elements([RN#replnode.node || - RN <- S#state.replnodes, - RN#replnode.running =:= true, - RN#replnode.type =:= candidate ]). - -g_stopped_node(S) -> - elements([RN#replnode.node || - RN <- S#state.replnodes, RN#replnode.running =/= true]). -g_stopped_replnode(S) -> - elements([RN || RN <- S#state.replnodes, RN#replnode.running =/= true]). - -%% ==================================================================== -%% Actions -%% ==================================================================== - -new_node() -> - ok. - -start_repl(ReplNode) -> - Node = ReplNode#replnode.node, - ?DBG("Starting slave ~p\n", [Node]), - ok = start_slave(Node), - pong = net_adm:ping(Node), - %{ok, _StartedNodes} = cover:start([Node]), - dbg:n(Node), - %?DBG("Cover nodes: ~p\n", [_StartedNodes]), - ?DBG("Started slave ~p\n", [Node]), - R = rpc:call(Node, ?MODULE, setup_slave, [ReplNode#replnode.candidates, - ReplNode#replnode.workers]), - ?DBG("slave start returned ~p~n", [R]), - R. - -stop_repl(Node) -> - ?DBG("Stopping cover on ~p\n", [Node]), - %cover:stop([Node]), - ?DBG("Stopping repl on ~p\n", [Node]), - ok = stop_slave(Node), - ?DBG("Stopped slave~p\n", [Node]), - ok. - -toggle_type(_Node) -> - ?DBG("Changing type for ~p\n", [_Node]), - ok. - -ping(Node) -> - ?DBG("Pinging ~p\n", [Node]), - Res = net_adm:ping(Node), - ?DBG("Pinged ~p: ~p\n", [Node, Res]), - Res. - -set_candidates(Node, Candidates, Workers, S) -> - ?DBG("Setting candidates for ~p to {~p, ~p}\n", - [Node, Candidates, Workers]), - pong = net_adm:ping(Node), - ?DBG("riak_repl_leader pid on ~p is ~p\n", - [Node, rpc:call(Node, erlang, whereis, [riak_repl_leader])]), - ok = rpc:call(Node, riak_repl_leader, set_candidates, [Candidates, Workers]), - - %% Request the helper leader node to make any elections stabalize - %% before calling the rest of the quickcheck code, otherwise results - %% are totally unpredictable. - - %% Have to duplicate some work done in next_state here - helper_leader_node - %% needs an updated [#replnode{}]. - ReplNode = get_replnode(Node, S), - UpdS = upd_replnode(ReplNode#replnode{candidates = lists:sort(Candidates), - workers = lists:sort(Workers)}, - S), - {_HLN, _UpCand} = helper_leader_node(Node, UpdS), - ?DBG("Set candidates for ~p, HLN=~p, UpCand=~p\n", [Node, _HLN, _UpCand]), - ok. - -check_leaders(S) -> % include a dummy anode from list so - ?DBG("CheckLeaders - running nodes ~p\n", [shorten(running_nodes(S))]), - F = fun(RN) -> % generator will blow up if none running - {HelperLN, UpCand} = helper_leader_node(RN#replnode.node, S), - %% Make sure at least one candidate node for the replnode - %% is running, is a candidate and belongs to the same set of - %% candidates/workers - N = RN#replnode.node, - - LN = rpc:call(N, riak_repl_leader, leader_node, []), - ?DBG(" ~p: {~p, ~p} LN=~p HLN=~p UpCand=~p\n", - [N, shorten(RN#replnode.candidates), - shorten(RN#replnode.workers), LN, HelperLN, - UpCand]), - {N, LN, HelperLN, UpCand} - end, - [F(N) || N <- running_replnodes(S)]. - -add_receiver(N) -> - % R = {Leader1, Leader2, Pid, Res}, - R = rpc:call(N, ?MODULE, register_receiver, []), - ?DBG("add_receiver: ~p\n", [R]), - R. - -%% ==================================================================== -%% Internal functions -%% ==================================================================== - -nodes_by_candidates(ReplNodes) -> - %% Build a dict of all nodes with the same config - %% Check they agree on who the leader is - F = fun(ReplNode, D) -> - Key = {lists:sort(ReplNode#replnode.candidates), - lists:sort(ReplNode#replnode.workers)}, - orddict:append_list(Key, [ReplNode#replnode.node], D) - end, - lists:foldl(F, orddict:new(), ReplNodes). - -check_same_leaders([], _LeaderByNode) -> - true; -check_same_leaders([{{C,W},Nodes}|Rest], LeaderByNode) -> - Leaders = lookup_leaders(Nodes, LeaderByNode), - UniqLeaders = lists:usort([Ldr || {_Node,Ldr} <- Leaders]), - case UniqLeaders of - [_SingleLeader] -> - check_same_leaders(Rest, LeaderByNode); - ManyLeaders -> - {different_leaders, ManyLeaders, - {candidates, C}, - {workers, W}, - {nodes, Nodes}, - {leaders, Leaders}, - {all_leader_info, LeaderByNode}} - end. - -%% For each node, lookup the current leader and build a {Node, LeaderName} tuple. -lookup_leaders(Nodes, LeaderByNode) -> - F = fun(N, A) -> - try - {N, LN, _HLN, _Cs} = lists:keyfind(N, 1, LeaderByNode), - [{N, LN} | A] - catch - _:Error -> - throw({cannot_find, N, LeaderByNode, Error}) - end - end, - lists:foldl(F, [], Nodes). - -maybe_start_net_kernel() -> - [] = os:cmd("epmd -daemon"), - case net_kernel:start(['repl_leader_eqc@127.0.0.1', longnames]) of - {ok, _} -> - ?DBG("Net kernel started as ~p\n", [node()]); - {error, {already_started, _}} -> - ok; - {error, Reason} -> - throw({start_net_kernel_failed, Reason}) - end. - -good_path() -> - [filename:absname(D) || - D <- lists:filter(fun filelib:is_dir/1, code:get_path())]. - -make_node(N) -> - list_to_atom("n" ++ integer_to_list(N) ++ "@" ++ get_host(node())). - -get_name(Node) -> - list_to_atom(hd(string:tokens(atom_to_list(Node), "@"))). - -get_host(_Node) -> - "127.0.0.1". - -shorten(Nodes) -> - [get_name(N) || N <- Nodes]. - -candidates(S) -> - [RN#replnode.node || RN <- S#state.replnodes, RN#replnode.type =:= candidate]. - -workers(S) -> - [RN#replnode.node || RN <- S#state.replnodes, RN#replnode.type =:= worker]. - -running_nodes(S) -> - [RN#replnode.node || RN <- running_replnodes(S)]. - -running_replnodes(S) -> - [RN || RN <- S#state.replnodes, RN#replnode.running =:= true]. - -get_replnode(Node, S) -> - {value, ReplNode} = lists:keysearch(Node, #replnode.node, S#state.replnodes), - ReplNode. - -add_replnode(Node, S) -> - ReplNode = #replnode{node = Node, - candidates = lists:sort(candidates(S)), - workers = lists:sort([Node | workers(S)])}, - UpdReplNodes = lists:keystore(Node, #replnode.node, - S#state.replnodes, ReplNode), - S#state{replnodes = UpdReplNodes}. - -upd_replnode(ReplNode, S) -> - ?DBG("Updating ~p\nin ~p\n", [ReplNode, S]), - UpdReplNodes = lists:keyreplace(ReplNode#replnode.node, #replnode.node, - S#state.replnodes, ReplNode), - ?DBG("Updated ~p\n", [UpdReplNodes]), - S#state{replnodes = UpdReplNodes}. - -%% Check if a node exists in the state -node_exists(Node,S) -> - try - get_replnode(Node, S), - true - catch - _:_ -> - false % no match on node name - end. - -%% Check if a node has the replnode element at Pos == Value -check_replnode(Node, Pos, Value, S) -> - try - ReplNode = get_replnode(Node, S), - element(Pos, ReplNode) =:= Value - catch - _:_ -> - false % no match on node name (or bad position) - end. - -wait_for_helper(Node) -> - wait_for_helper(Node, 1000). - -wait_for_helper(_Node, 0) -> - helper_timeout; -wait_for_helper(Node, Retries) -> - case rpc:call(Node, riak_repl_leader, helper_pid, []) of - undefined -> - timer:sleep(10), - wait_for_helper(Node, Retries -1); - Pid when is_pid(Pid) -> - {ok, Pid}; - {killed, _OldPid} -> - timer:sleep(10), - wait_for_helper(Node, Retries - 1) - end. - -%% Ask the helper who it thinks the leader is and all up candidates -%% that will answer. Nodes must be 'up' *and* have the same worker/candidate -%% lists. -%% Call this makes sure any elections have been propagated to the -%% riak_repl_helper process. The helper is only asked when at least on -%% candidate nodes should be up, otherwise it will block waiting for any -%% candidate. -helper_leader_node(N, S) -> - ?DBG("Handler leader node ~p\nState:\n~p\n", [N, S]), - RN = get_replnode(N, S), - C = RN#replnode.candidates, - W = RN#replnode.workers, - CRNs = [get_replnode(X, S) || X <- C], - ?DBG("Candidate replication nodes\n~p\n", [CRNs]), - UpCandidates = [CRN#replnode.node || CRN <- CRNs, - CRN#replnode.running =:= true, - lists:member(CRN#replnode.node, C), - CRN#replnode.candidates =:= C, - CRN#replnode.workers =:= W], - case UpCandidates of - [] -> - HelperLN = no_candidates; - _ -> - {ok, Helper} = wait_for_helper(N), - HelperLN = rpc:call(N, riak_repl_leader_helper, - leader_node, [Helper, 300000], 305000) - end, - ?DBG("UpCandidatess ~p~n", [UpCandidates]), - {HelperLN, UpCandidates}. - -%% ==================================================================== -%% Slave driver - link all slaves under a single process for easy cleanup -%% ==================================================================== - -start_slave_driver() -> - ?DBG("Starting slave driver\n", []), - ReplyTo = self(), - spawn(fun() -> - true = register(slave_driver, self()), - ?DBG("Started slave driver\n", []), - ReplyTo ! ready, - slave_driver_loop() - end), - receive - ready -> - ok - after - 5000 -> - throw(slave_driver_timeout) - end. - -slave_driver_loop() -> - receive - {start, Name, ReplyTo} -> - ?DBG("starting node ~p@~p from node ~p~n", [Name, - get_host(node()), node()]), - {ok, Node} = slave:start_link(get_host(node()), Name), - true = rpc:call(Node, code, set_path, [good_path()]), - ReplyTo ! {ok, Node}; - {stop, Node, ReplyTo} -> - ok = slave:stop(Node), - pang = net_adm:ping(Node), - ReplyTo ! {stopped, Node} - end, - slave_driver_loop(). - -stop_slave_driver() -> - ?DBG("Stopping slave driver\n", []), - case whereis(slave_driver) of - undefined -> - ?DBG("Slave driver not running\n", []), - ok; - Pid -> - Mref = erlang:monitor(process, Pid), - exit(Pid, kill), - receive - {'DOWN', Mref, process, _Obj, _Info} -> - ?DBG("Stopped slave driver\n", []), - ok - end - end. - -start_slave(Node) -> - Name = get_name(Node), - slave_driver ! {start, Name, self()}, - receive - {ok, Node} -> - ok - end. - -stop_slave(Node) -> - slave_driver ! {stop, Node, self()}, - receive - {stopped, Node} -> - ok - end. - -%% ==================================================================== -%% Slave functions - code that runs on the slave nodes for setup -%% ==================================================================== - -setup_slave(Candidates, Workers) -> - %mock_repl_controller(), - start_leader(Candidates, Workers). - -mock_repl_controller() -> - ?DBG("Mocking riak_repl_controller on ~p\n", [node()]), - {module, meck} = code:ensure_loaded(meck), - ok = meck:new(riak_repl_controller, [no_link]), - ok = meck:expect(riak_repl_controller, set_is_leader, - fun(_Bool) -> ok end), - ok = riak_repl_controller:set_is_leader(false), % call it, just to prove it works - ?DBG("Mocked riak_repl_controller on ~p\n", [node()]). - -start_leader(Candidates, Workers) -> - ?DBG("Starting repl on ~p\n", [node()]), - - application:start(ranch), - %% Set up the application config so multiple leaders do not - %% tread on one anothers toes - application:load(riak_repl), - DataRoot = "repl_leader_eqc_data/"++atom_to_list(node()), - application:set_env(riak_repl, data_root, DataRoot), - - %% cannot just call rpc:call(Node, riak_repl_leader, start_link, []) as it - %% would link to the rex process created for the call. This creates the - %% process and unlinks before returning. - {ok, REPid} = riak_core_ring_events:start_link(), - ?DBG("Started ring_events at ~p~n", [REPid]), - unlink(REPid), - {ok, RMPid} = riak_core_ring_manager:start_link(test), - ?DBG("Started ring_manager at ~p~n", [RMPid]), - unlink(RMPid), - {ok, NWEPid} = riak_core_node_watcher_events:start_link(), - ?DBG("Started node_watcher_events at ~p~n", [NWEPid]), - unlink(NWEPid), - application:set_env(riak_core, gossip_interval, 5000), - {ok, NWPid} = riak_core_node_watcher:start_link(), - ?DBG("Started node_watcher at ~p~n", [NWPid]), - unlink(NWPid), - {ok, CSPid} = riak_repl_client_sup:start_link(), - ?DBG("Started repl client_sup at ~p~n", [CSPid]), - unlink(CSPid), - {ok, SSPid} = riak_repl_server_sup:start_link(), - ?DBG("Started repl server_sup at ~p~n", [SSPid]), - unlink(SSPid), - {ok, Pid} = riak_repl_leader:start_link(), - ?DBG("Started repl leader at ~p", [Pid]), - unlink(Pid), - - %% set the candidates so that the repl helper is created - ok = riak_repl_leader:set_candidates(Candidates, Workers), - - ?DBG("Started repl on ~p as ~p with candidates {~p, ~p}\n", - [node(), Pid, Candidates, Workers]), - - %riak_repl_leader:ensure_sites(), - - %% Check leader completes election - %{ok, Helper} = wait_for_helper(node()), - %_HelperLN = riak_repl_leader_helper:leader_node(Helper, 10000), - ok. - - -%% Creates a dummy process that waits for the message 'die' -register_receiver() -> - Pid = spawn(fun() -> - receive - die -> - ok - end - end), - Leader1 = riak_repl_leader:leader_node(), - Res = riak_repl_leader:add_receiver_pid(Pid), - Leader2 = riak_repl_leader:leader_node(), - {Leader1, Leader2, Res, Pid}. - diff --git a/rebar.config b/rebar.config index 8787d31c..f071f418 100644 --- a/rebar.config +++ b/rebar.config @@ -22,9 +22,9 @@ {deps, [ {lager, {git, "https://github.com/erlang-lager/lager.git", {tag, "3.8.0"}}}, - {ranch, {git, "https://github.com/ninenines/ranch.git", {tag, "1.6.0"}}}, + {ranch, {git, "https://github.com/ninenines/ranch.git", {tag, "1.6.2"}}}, {ebloom,{git, "https://github.com/basho/ebloom.git", {tag, "2.1.0"}}}, - {riak_kv, {git, "https://github.com/basho/riak_kv.git", {tag, "riak_kv-3.0.11"}}} + {riak_kv, {git, "https://github.com/basho/riak_kv.git", {branch, "develop-3.0"}}} ]}. {plugins, [{rebar3_gpb_plugin, {git, "https://github.com/basho/rebar3_gpb_plugin", {tag, "2.15.1+riak.3.0.4"}}}, diff --git a/test/rt_source_eqc.erl b/test/rt_source_eqc.erl deleted file mode 100644 index 5c0520fd..00000000 --- a/test/rt_source_eqc.erl +++ /dev/null @@ -1,615 +0,0 @@ --module(rt_source_eqc). - --compile([export_all, nowarn_export_all]). - --ifdef(EQC). --include("rt_source_eqc.hrl"). --include_lib("eqc/include/eqc.hrl"). --include_lib("eqc/include/eqc_statem.hrl"). --include_lib("eunit/include/eunit.hrl"). - --define(P(EXPR), PPP = (EXPR), case PPP of true -> ok; _ -> io:format(user, "PPP ~p at line ~p\n", [PPP, ?LINE]) end, PPP). --define(QC_OUT(P), - eqc:on_output(fun(Str, Args) -> - io:format(user, Str, Args) end, P)). - --record(pushed, { - seq, - v1_seq, - push_res, - skips, - remotes_up -}). - - -%% =================================================================== -%% Helper Funcs -%% =================================================================== - --record(ctx, { - fs_pid, - fs_port -}). - -setup() -> - % ?debugMsg("enter setup()"), - application:load(sasl), - application:set_env(sasl, sasl_error_logger, {file, "rt_source_eqc_sasl.log"}), - error_logger:tty(false), - application:start(lager), - riak_repl_test_util:stop_test_ring(), - riak_repl_test_util:start_test_ring(), - riak_repl_test_util:abstract_gen_tcp(), - riak_repl_test_util:abstract_stats(), - riak_repl_test_util:abstract_stateful(), - kill_rtq(), - {ok, _RTPid} = rt_source_helpers:start_rt(), - {ok, _RTQPid} = rt_source_helpers:start_rtq(), - {ok, _TCPMonPid} = rt_source_helpers:start_tcp_mon(), - %% {ok, _Pid1} = riak_core_service_mgr:start_link(ClusterAddr), - %% {ok, _Pid2} = riak_core_connection_mgr:start_link(), - %% {ok, _Pid3} = riak_core_cluster_conn_sup:start_link(), - %% {ok, _Pid4 } = riak_core_cluster_mgr:start_link(), - {ok, FsPid, FsPort} = rt_source_helpers:init_fake_sink(), - ok = rt_source_helpers:abstract_connection_mgr(FsPort), - R = #ctx{fs_pid = FsPid, fs_port = FsPort}, - % ?debugFmt("leave setup() -> ~p", [R]), - R. - -cleanup() -> - % ?debugFmt("enter cleanup(~p)", [_Ctx]), - rt_source_helpers:kill_fake_sink(), - riak_repl_test_util:kill_and_wait(riak_core_tcp_mon), - riak_repl2_rtq:stop(), - riak_repl_test_util:kill_and_wait(riak_repl2_rt), - riak_repl_test_util:stop_test_ring(), - meck:unload(), - % ?debugMsg("leave cleanup(~p)", [_Ctx]), - ok. - -property_test() -> - {spawn, - [%% Run the quickcheck tests - {timeout, 120, - ?_assertEqual(true, eqc:quickcheck(eqc:numtests(5, ?QC_OUT(prop_main()))))} - ] - }. - -prop_main() -> - ?SETUP(fun() -> - setup(), - fun() -> - process_flag(trap_exit, false), - cleanup() - end - end, - ?FORALL(Cmds, noshrink(commands(?MODULE)), - begin - {H, S, Res} = run_commands(?MODULE, Cmds), - aggregate(command_names(Cmds), - pretty_commands(?MODULE, Cmds, {H,S,Res}, Res == ok)) - end)). - -%% ==================================================================== -%% Generators (including commands) -%% ==================================================================== - -command(S) -> - oneof( - [{call, ?MODULE, connect_to_v1, [remote_name(S), S#state.master_queue]} || S#state.remotes_available /= []] ++ - [{call, ?MODULE, connect_to_v2, [remote_name(S), S#state.master_queue]} || S#state.remotes_available /= []] ++ - [{call, ?MODULE, disconnect, [elements(S#state.sources)]} || S#state.sources /= []] ++ - % push an object that may already have been rt'ed - [{call, ?MODULE, push_object, [g_unique_remotes(), g_riak_object(), S]} || S#state.sources /= []] ++ - [?LET({_Remote, SrcState} = Source, elements(S#state.sources), {call, ?MODULE, ack_objects, [g_up_to_length(SrcState#src_state.unacked_objects), Source]}) || S#state.sources /= []] - ). - -g_riak_object() -> - ?LET({Bucket, Key, Content}, {binary(), binary(), binary()}, riak_object:new(Bucket, Key, Content)). - -g_up_to_length([]) -> - 0; -g_up_to_length(List) -> - Max = length(List), - choose(0, Max). - -g_unique_remotes() -> - ?LET(Remotes, list(g_remote_name()), lists:usort(Remotes)). - -g_remote_name() -> - oneof(?all_remotes). - -% name of a remote -remote_name(#state{remotes_available = []}) -> - erlang:error(no_name_available); -remote_name(#state{remotes_available = Remotes}) -> - oneof(Remotes). - -precondition(#state{master_queue = MasterQ} = S, {call, _, Connect, [Remote, MasterQ]}) when Connect =:= connect_to_v1; Connect =:= connect_to_v2 -> - %% ?debugFmt("Remote requested: ~p Remotes available: ~p", [Remote, S#state.remotes_available]), - lists:member(Remote, S#state.remotes_available); -precondition(#state{sources = []}, {call, _, disconnect, _Args}) -> - false; -precondition(S, {call, _, disconnect, _Args}) -> - S#state.sources /= []; -precondition(_S, {call, _, ack_objects, [0, _]}) -> - false; -precondition(#state{sources = []}, {call, _, ack_objects, _Args}) -> - false; -precondition(S, {call, _, ack_objects, _Args}) -> - S#state.sources /= []; -precondition(_S, {call, _, push_object, [[], _, _]}) -> - false; -precondition(S, {call, _, push_object, [_, _, S]}) -> - S#state.sources /= []; -precondition(_S, {call, _, push_object, [_, _, _NotS]}) -> - %% ?debugFmt("Bad states.~n State: ~p~nArg: ~p", [S, NotS]), - false; -precondition(_S, _Call) -> - true. - -dynamic_precondition(#state{sources = Sources}, {call, _, disconnect, [{Remote, _}]}) -> - is_tuple(lists:keyfind(Remote, 1, Sources)); -dynamic_precondition(#state{sources = Sources}, {call, _, ack_objects, [NumAck, {Remote, SrcState}]}) -> - case lists:keyfind(Remote, 1, Sources) of - {_, #src_state{unacked_objects = UnAcked}} when length(UnAcked) >= NumAck -> - UnAcked =:= SrcState#src_state.unacked_objects; - %true; - _ -> - false - end; -dynamic_precondition(_S, _Call) -> - true. -%% dynamic_precondition(_S, {call, _, Connect, [_Remote, _MasterQ]}) when Connect =:= connect_to_v1; Connect =:= connect_to_v2 -> -%% false; - -%% ==================================================================== -%% state generation -%% ==================================================================== - -initial_state() -> - %% catch unlink(whereis(fake_sink)), - %% catch exit(whereis(fake_sink), kill), - process_flag(trap_exit, true), - %% riak_repl_test_util:stop_test_ring(), - %% riak_repl_test_util:start_test_ring(), - %% riak_repl_test_util:abstract_gen_tcp(), - %% riak_repl_test_util:abstract_stats(), - %% riak_repl_test_util:abstract_stateful(), - %% abstract_connection_mgr(), - %% kill_rtq(), - %% {ok, _RTPid} = start_rt(), - %% {ok, _RTQPid} = start_rtq(), - %% {ok, _TCPMonPid} = start_tcp_mon(), - %% {ok, _FakeSinkPid} = start_fake_sink(), - #state{}. - -kill_rtq() -> - case whereis(riak_repl2_rtq) of - undefined -> - ok; - _ -> - riak_repl2_rtq:stop(), - catch exit(riak_repl2_rtq, kill) - end. - -next_state(S, Res, {call, _, connect_to_v1, [Remote, _MQ]}) -> - SrcState = #src_state{pids = Res, version = 1}, - next_state_connect(Remote, SrcState, S); - -next_state(S, Res, {call, _, connect_to_v2, [Remote, _MQ]}) -> - SrcState = #src_state{pids = Res, version = 2}, - next_state_connect(Remote, SrcState, S); - -next_state(S, _Res, {call, _, disconnect, [{Remote, _}]}) -> - %% ?debugFmt("Removing ~p from sources", [Remote]), - Sources = lists:keydelete(Remote, 1, S#state.sources), - Master = if - Sources == [] -> - []; - true -> - MasterKeys = ordsets:from_list([Seq || {Seq, _, _, _} <- S#state.master_queue]), - SourceQueues = [Source#src_state.unacked_objects || {_, Source} <- Sources], - ExtractSeqs = fun(Elem, Acc) -> - SrcSeqs = ordsets:from_list([PushedThang#pushed.seq || PushedThang <- Elem]), - ordsets:union(Acc, SrcSeqs) - end, - ActiveSeqs = lists:foldl(ExtractSeqs, [], SourceQueues), - RemoveSeqs = ordsets:subtract(MasterKeys, ActiveSeqs), - UpdateMaster = fun(RemoveSeq, Acc) -> - lists:keystore(RemoveSeq, 1, Acc, {RemoveSeq, tombstone}) - end, - lists:foldl(UpdateMaster, S#state.master_queue, RemoveSeqs) - end, - %% ?debugFmt("Updated remotes after disconnect: ~p", [[Remote | S#state.remotes_available]]), - S#state{master_queue = Master, sources = Sources, remotes_available = [Remote | S#state.remotes_available]}; - -next_state(S, Res, {call, _, push_object, [Remotes, RiakObj, _S]}) -> - Seq = S#state.seq + 1, - Sources = update_unacked_objects(Remotes, Seq, Res, S), - RoutingSources = [R || {R, _} <- Sources, not lists:member(R, Remotes)], - Master2 = case RoutingSources of - [] -> - [{Seq, tombstone} | S#state.master_queue]; - _ -> - Master = S#state.master_queue, - [{Seq, Remotes, RiakObj, Res} | Master] - end, - S#state{sources = Sources, master_queue = Master2, seq = Seq}; - -next_state(S, _Res, {call, _, ack_objects, [NumAcked, {Remote, _Source}]}) -> - case lists:keytake(Remote, 1, S#state.sources) of - false -> - S; - {value, {Remote, RealSource}, Sources} -> - UnAcked = RealSource#src_state.unacked_objects, - {Updated, Chopped} = model_ack_objects(NumAcked, UnAcked), - SrcState2 = RealSource#src_state{unacked_objects = Updated}, - Sources2 = [{Remote, SrcState2} | Sources], - %% ?debugFmt("Updated ack sources: ~p", [[R || {R, _} <- Sources2]]), - Master2 = remove_fully_acked(S#state.master_queue, Chopped, Sources2), - S#state{sources = Sources2, master_queue = Master2} - end. - -next_state_connect(Remote, SrcState, State) -> - case lists:keyfind(Remote, 1, State#state.sources) of - false -> - Remotes = lists:delete(Remote, State#state.remotes_available), - {NewQueue, Skips, Offset} = generate_unacked_from_master(State, Remote), - SrcState2 = SrcState#src_state{unacked_objects = NewQueue, skips = Skips, offset = Offset}, - %% ?debugFmt("Adding ~p to sources", [Remote]), - Sources = [{Remote, SrcState2} | State#state.sources], - %% ?debugFmt("Updated sources: ~p", [[R || {R, _} <- Sources]]), - State#state{sources = Sources, remotes_available = Remotes}; - _Tuple -> - State - end. - -generate_unacked_from_master(State, Remote) -> - UpRemotes = running_remotes(State), - generate_unacked_from_master(lists:reverse(State#state.master_queue), UpRemotes, Remote, undefined, 0, []). - -generate_unacked_from_master([], _UpRemotes, _Remote, Skips, Offset, Acc) -> - {Acc, Skips, Offset}; -generate_unacked_from_master([{_Seq, tombstone} | Tail], UpRemotes, Remote, undefined, Offset, Acc) -> - generate_unacked_from_master(Tail, UpRemotes, Remote, undefined, Offset, Acc); -generate_unacked_from_master([{_Seq, tombstone} | Tail], UpRemotes, Remote, Skips, Offset, Acc) -> - generate_unacked_from_master(Tail, UpRemotes, Remote, Skips + 1, Offset, Acc); -generate_unacked_from_master([{Seq, Remotes, _Binary, Res} | Tail], UpRemotes, Remote, Skips, Offset, Acc) -> - case {lists:member(Remote, Remotes), Skips} of - {true, undefined} -> - % on start up, we don't worry about skips until we've sent at least - % one; the sink doesn't care what the first seq number is anyway. - generate_unacked_from_master(Tail, UpRemotes, Remote, Skips, Offset, Acc); - {true, _} -> - generate_unacked_from_master(Tail, UpRemotes, Remote, Skips + 1, Offset, Acc); - {false, _} -> - NextSkips = case Skips of - undefined -> 0; - _ -> Skips - end, - Offset2 = Offset + NextSkips, - Obj = #pushed{seq = Seq, v1_seq = Seq - Offset2, push_res = Res, skips = NextSkips, - remotes_up = UpRemotes}, - Acc2 = [Obj | Acc], - generate_unacked_from_master(Tail, UpRemotes, Remote, 0, Offset2, Acc2) - end. - -remove_fully_acked(Master, [], _Sources) -> - Master; - -remove_fully_acked(Master, [Pushed | Chopped], Sources) -> - #pushed{seq = Seq} = Pushed, - case is_in_a_source(Seq, Sources) of - true -> - remove_fully_acked(Master, Chopped, Sources); - false -> - Master2 = lists:keystore(Seq, 1, Master, {Seq, tombstone}), - remove_fully_acked(Master2, Chopped, Sources) - end. - -is_in_a_source(_Seq, []) -> - false; -is_in_a_source(Seq, [{_Remote, #src_state{unacked_objects = UnAcked}} | Tail]) -> - case lists:keymember(Seq, #pushed.seq, UnAcked) of - true -> - true; - false -> - is_in_a_source(Seq, Tail) - end. - -running_remotes(State) -> - Remotes = [Remote || {Remote, _} <- State#state.sources], - ordsets:from_list(Remotes). - -update_unacked_objects(Remotes, Seq, Res, State) when is_record(State, state) -> - UpRemotes = running_remotes(State), - update_unacked_objects(Remotes, Seq, Res, UpRemotes, State#state.sources, []). - -update_unacked_objects(_Remotes, _Seq, _Res, _UpRemotes, [], Acc) -> - lists:reverse(Acc); - -update_unacked_objects(Remotes, Seq, Res, UpRemotes, [{Remote, Source} | Tail], Acc) -> - case lists:member(Remote, Remotes) of - true -> - Skipped = case Source#src_state.skips of - undefined -> undefined; - _ -> Source#src_state.skips + 1 - end, - update_unacked_objects(Remotes, Seq, Res, UpRemotes, Tail, [{Remote, Source#src_state{skips = Skipped}} | Acc]); - false -> - Entry = model_push_object(Seq, Res, UpRemotes, Source), - update_unacked_objects(Remotes, Seq, Res, UpRemotes, Tail, [{Remote, Entry} | Acc]) - end. - -model_push_object(Seq, Res, UpRemotes, SrcState = #src_state{unacked_objects = ObjQueue}) -> - {Seq2, Offset2} = case SrcState#src_state.skips of - undefined -> - {Seq, SrcState#src_state.offset}; - _ -> - Off2 = SrcState#src_state.offset + SrcState#src_state.skips, - {Seq - Off2, Off2} - end, - Obj = #pushed{seq = Seq, v1_seq = Seq2, push_res = Res, skips = SrcState#src_state.skips, remotes_up = UpRemotes}, - SrcState#src_state{unacked_objects = [Obj | ObjQueue], offset = Offset2, skips = 0}. - -insert_skip_meta({Seq, {Count, Bin, Meta}}, #src_state{skips = SkipCount}) -> - Meta2 = orddict:from_list(Meta), - Meta3 = orddict:store(skip_count, SkipCount, Meta2), - {Seq, {Count, Bin, Meta3}}. - -model_ack_objects(_Num, []) -> - {[], []}; -model_ack_objects(NumAcked, Unacked) when length(Unacked) >= NumAcked -> - NewLength = length(Unacked) - NumAcked, - lists:split(NewLength, Unacked). - -%% ==================================================================== -%% postcondition -%% ==================================================================== - -postcondition(_State, {call, _, connect_to_v1, _Args}, {error, _}) -> - ?P(false); -postcondition(_State, {call, _, connect_to_v1, _Args}, {Source, Sink}) -> - ?P(is_pid(Source) andalso is_pid(Sink)); - -postcondition(_State, {call, _, connect_to_v2, _Args}, {error, _}) -> - ?P(false); -postcondition(_State, {call, _, connect_to_v2, _Args}, {Source, Sink}) -> - ?P(is_pid(Source) andalso is_pid(Sink)); - -postcondition(_State, {call, _, disconnect, [_SourceState]}, {Source, _Sink}) -> - %% ?P(lists:all(fun(ok) -> true; (_) -> false end, Waits)); - %% ?debugFmt("Source alive? ~p", [is_process_alive(Source)]), - ?P(is_process_alive(Source) =:= false); - -postcondition(_State, {call, _, push_object, [Remotes, _RiakObj, State]}, Res) -> - ?P(assert_sink_bugs(Res, Remotes, State#state.sources)); - -postcondition(State, {call, _, ack_objects, [NumAck, {Remote, Source}]}, AckedStack) -> - RemoteLives = is_tuple(lists:keyfind(Remote, 1, State#state.sources)), - #src_state{unacked_objects = UnAcked} = Source, - {_, Acked} = lists:split(length(UnAcked) - NumAck, UnAcked), - if - RemoteLives == false -> - ?P(AckedStack == []); - length(Acked) =/= length(AckedStack) -> - ?debugMsg("Acked length is not the same as AckedStack length"), - ?P(false); - true -> - ?P(assert_sink_ackings(Remote, Source#src_state.version, Acked, AckedStack)) - end; - -postcondition(_S, _C, _R) -> - ?debugMsg("fall through postcondition"), - false. - -assert_sink_bugs(Object, Remotes, Sources) -> - Active = [A || {A, _} <- Sources], - assert_sink_bugs(Object, Remotes, Active, Sources, []). - -assert_sink_bugs(_Object, _Remotes, _Active, [], Acc) -> - lists:all(fun(true) -> true; (_) -> false end, Acc); - -assert_sink_bugs(Object, Remotes, Active, [{Remote, SrcState} | Tail], Acc) -> - Truthiness = assert_sink_bug(Object, Remotes, Remote, Active, SrcState), - assert_sink_bugs(Object, Remotes, Active, Tail, [Truthiness | Acc]). - -assert_sink_bug({_Num, RiakObj, BadMeta}, Remotes, Remote, Active, SrcState) -> - {_, Sink} = SrcState#src_state.pids, - Version = SrcState#src_state.version, - History = gen_server:call(Sink, history), - ShouldSkip = lists:member(Remote, Remotes), - BadMeta2 = set_skip_meta(BadMeta, SrcState), - Routed = ordsets:from_list(Remotes ++ [Remote] ++ Active ++ ["undefined"]), - Meta = orddict:store(routed_clusters, Routed, BadMeta2), - ObjBin = case SrcState#src_state.version of - 1 -> - term_to_binary([RiakObj]); - 2 -> - riak_repl_util:to_wire(w1, [RiakObj]) - end, - if - ShouldSkip andalso length(History) == length(SrcState#src_state.unacked_objects) -> - true; - ShouldSkip -> - ?debugFmt("assert sink history length failure!~n" - " Remote: ~p~n" - " Length Sink Hist: ~p~n" - " Length Model Hist: ~p~n" - " Sink:~n" - "~p~n" - " Model:~n" - "~p",[Remote, length(History), length(SrcState#src_state.unacked_objects), History, SrcState#src_state.unacked_objects]), - false; - true -> - Frame = hd(History), - case {Version, Frame} of - {1, {objects, {_Seq, ObjBin}}} -> - true; - {2, {objects_and_meta, {_Seq, ObjBin, _M}}} -> - true; - _ -> - ?debugFmt("assert sink bug failure!~n" - " Remote: ~p~n" - " Remotes: ~p~n" - " Active: ~p~n" - " ObjBin: ~p~n" - " Meta: ~p~n" - " ShouldSkip: ~p~n" - " Version: ~p~n" - " Frame: ~p~n" - " Length Sink Hist: ~p~n" - " History: ~p~n" - " Length Model Hist: ~p", [Remote, Remotes, Active, ObjBin, Meta, ShouldSkip, Version, Frame, length(History), History, length(SrcState#src_state.unacked_objects)]), - false - end - end. - -set_skip_meta(Meta, #src_state{skips = Skips}) -> - set_skip_meta(Meta, Skips); -set_skip_meta(Meta, undefined) -> - set_skip_meta(Meta, 0); -set_skip_meta(Meta, Skips) -> - orddict:store(skip_count, Skips, Meta). - -assert_sink_ackings(Remote, Version, Expecteds, Gots) -> - assert_sink_ackings(Remote, Version, Expecteds, Gots, []). - -assert_sink_ackings(_Remote, _Version, [], [], Acc) -> - lists:all(fun(true) -> true; (_) -> false end, Acc); - -assert_sink_ackings(Remote, Version, [Expected | ETail], [Got | GotTail], Acc) -> - AHead = assert_sink_acking(Remote, Version, Expected, Got), - assert_sink_ackings(Remote, Version, ETail, GotTail, [AHead | Acc]). - -assert_sink_acking(Remote, Version, Pushed, Got) -> - #pushed{seq = Seq, v1_seq = V1Seq, push_res = {1, RiakObj, PushMeta}, skips = Skip, - remotes_up = UpRemotes} = Pushed, - PushMeta1 = set_skip_meta(PushMeta, Skip), - PushMeta2 = fix_routed_meta(PushMeta1, ordsets:add_element(Remote, UpRemotes)), - Meta = PushMeta2, - ObjBin = case Version of - 1 -> riak_repl_util:to_wire(w0, [RiakObj]); - 2 -> riak_repl_util:to_wire(w1, [RiakObj]) - end, - case {Version, Got} of - {1, {objects, {V1Seq, ObjBin}}} -> - true; - {2, {objects_and_meta, {Seq, ObjBin, Meta}}} -> - true; - _ -> - ?debugFmt("Sink ack failure!~n" - " Remote: ~p~n" - " UpRemotes: ~p~n" - " Version: ~p~n" - " ObjBin: ~p~n" - " Seq: ~p~n" - " V1Seq: ~p~n" - " Skip: ~p~n" - " PushMeta: ~p~n" - " Meta: ~p~n" - " Got: ~p", [Remote, UpRemotes, Version, ObjBin, Seq, V1Seq, Skip, PushMeta, Meta, Got]), - false - end. - -fix_routed_meta(Meta, AdditionalRemotes) -> - Routed1 = case orddict:find(routed_clusters, Meta) of - error -> []; - {ok, V} -> V - end, - Routed2 = ordsets:union(AdditionalRemotes, Routed1), - Routed3 = ordsets:add_element("undefined", Routed2), - orddict:store(routed_clusters, Routed3, Meta). - -%% ==================================================================== -%% test callbacks -%% ==================================================================== - -connect_to_v1(RemoteName, MasterQueue) -> - %% ?debugFmt("connect_to_v1: ~p", [RemoteName]), - stateful:set(version, {realtime, {1,0}, {1,0}}), - connect(RemoteName, MasterQueue). - -connect_to_v2(RemoteName, MasterQueue) -> - %% ?debugFmt("connect_to_v2: ~p", [RemoteName]), - stateful:set(version, {realtime, {2,0}, {2,0}}), - connect(RemoteName, MasterQueue). - -connect(RemoteName, MasterQueue) -> - %% ?debugFmt("connect: ~p", [RemoteName]), - stateful:set(remote, RemoteName), - %% ?debugMsg("Starting rtsource link"), - {ok, SourcePid} = riak_repl2_rtsource_conn:start_link(RemoteName), - %% ?debugFmt("rtsource pid: ~p", [SourcePid]), - %% ?debugMsg("Waiting for sink_started"), - _ = wait_for_rtsource_helper(SourcePid), - FakeSink = whereis(fake_sink), - %% ?debugFmt("fake_sink pid: ~p", [FakeSink]), - FakeSink ! {status, self()}, - receive - {sink_started, SinkPid} -> - erlang:monitor(process, SinkPid), - %% ?debugFmt("V1 SinkPid: ~p", [SinkPid]), - rt_source_helpers:wait_for_valid_sink_history(SinkPid, RemoteName, MasterQueue), - ok = rt_source_helpers:ensure_registered(RemoteName), - {SourcePid, SinkPid} - after 5000 -> - {error, timeout} - end. - -wait_for_rtsource_helper(SourcePid) -> - Status = riak_repl2_rtsource_conn:status(SourcePid), - wait_for_rtsource_helper(SourcePid, 20, 1000, Status). - -wait_for_rtsource_helper(_SourcePid, 0, _Wait, _Status) -> - {error, rtsource_helper_failed}; -wait_for_rtsource_helper(SourcePid, RetriesLeft, Wait, Status) -> - case lists:keyfind(helper_pid, 1, Status) of - false -> - timer:sleep(Wait), - NewStatus = riak_repl2_rtsource_conn:status(SourcePid), - wait_for_rtsource_helper(SourcePid, RetriesLeft-1, Wait, NewStatus); - _ -> - ok - end. - -disconnect(ConnectState) -> - {Remote, SrcState} = ConnectState, - %% ?debugFmt("Disconnecting ~p", [Remote]), - #src_state{pids = {Source, Sink}} = SrcState, - %% ?debugFmt("is Source alive: ~p", [is_process_alive(Source)]), - %% ?debugFmt("is Sink ~p alive: ~p", [Sink, is_process_alive(Sink)]), - %% Stop the source, but no need to stop our fake sink - riak_repl2_rtsource_conn:stop(Source), - Sink ! stop, %% Reset the fake sink history - riak_repl2_rtq:unregister(Remote), - {Source, Sink}. - %% [riak_repl_test_util:wait_for_pid(P, 3000) || P <- [Source, Sink]]. - -push_object(Remotes, RiakObj, State) -> - %% ?debugFmt("push_object remotes: ~p", [Remotes]), - Meta = [{routed_clusters, Remotes}], - riak_repl2_rtq:push(1, riak_repl_util:to_wire(w1, [RiakObj]), Meta), - rt_source_helpers:wait_for_pushes(State, Remotes), - {1, RiakObj, Meta}. - -ack_objects(NumToAck, {Remote, SrcState}) -> - {_, Sink} = SrcState#src_state.pids, - ProcessAlive = is_process_alive(Sink), - if - ProcessAlive -> - {ok, Acked} = gen_server:call(Sink, {ack, NumToAck}), - case Acked of - [] -> - ok; - [{objects_and_meta, {Seq, _, _}} | _] -> - riak_repl2_rtq:ack(Remote, Seq); - [{objects, {Seq, _}} | _] -> - riak_repl2_rtq:ack(Remote, Seq) - end, - Acked; - true -> - [] - end. --endif. diff --git a/test/rt_source_helpers.erl b/test/rt_source_helpers.erl index 2bc58ff9..c04ff81a 100644 --- a/test/rt_source_helpers.erl +++ b/test/rt_source_helpers.erl @@ -12,74 +12,6 @@ %% helpful utility functions %% ==================================================================== -ensure_registered(RemoteName) -> - ensure_registered(RemoteName, 10). - -ensure_registered(_RemoteName, N) when N < 1 -> - {error, registration_timeout}; -ensure_registered(RemoteName, N) -> - Status = riak_repl2_rtq:status(), - %% ?debugFmt("RTQ Status: ~p~n", [Status]), - Consumers = proplists:get_value(consumers, Status), - case proplists:get_value(RemoteName, Consumers) of - undefined -> - timer:sleep(1000), - ensure_registered(RemoteName, N - 1); - _ -> - ok - end. - -wait_for_valid_sink_history(Pid, Remote, MasterQueue) -> - NewQueue = [{Seq, Queued} || {Seq, RoutedRemotes, _Binary, Queued} - <- MasterQueue, not lists:member(Remote, RoutedRemotes)], - if - length(NewQueue) > 0 -> - gen_server:call(Pid, {block_until, length(NewQueue)}, 30000); - true -> - ok - end. - -wait_for_pushes(State, Remotes) -> - [wait_for_push(SrcState, Remotes) || SrcState <- State#state.sources]. - -wait_for_push({Remote, SrcState}, Remotes) -> - case lists:member(Remote, Remotes) of - true -> ok; - _ -> - WaitLength = length(SrcState#src_state.unacked_objects) + 1, - {_, Sink} = SrcState#src_state.pids, - gen_server:call(Sink, {block_until, WaitLength}, 30000) - end. - -plant_bugs(_Remotes, []) -> - ok; -plant_bugs(Remotes, [{Remote, SrcState} | Tail]) -> - case lists:member(Remote, Remotes) of - true -> - plant_bugs(Remotes, Tail); - false -> - {_, Sink} = SrcState#src_state.pids, - ok = gen_server:call(Sink, bug), - plant_bugs(Remotes, Tail) - end. - -abstract_connection_mgr(RemotePort) when is_integer(RemotePort) -> - abstract_connection_mgr({"localhost", RemotePort}); -abstract_connection_mgr({RemoteHost, RemotePort} = RemoteName) -> - riak_repl_test_util:reset_meck(riak_core_connection_mgr, [no_link, passthrough]), - meck:expect(riak_core_connection_mgr, connect, fun(_ServiceAndRemote, ClientSpec) -> - proc_lib:spawn_link(fun() -> - %% ?debugFmt("connection_mgr connect for ~p", [ServiceAndRemote]), - Version = stateful:version(), - {_Proto, {TcpOpts, Module, Pid}} = ClientSpec, - %% ?debugFmt("connection_mgr callback module: ~p", [Module]), - {ok, Socket} = gen_tcp:connect(RemoteHost, RemotePort, [binary | TcpOpts]), - %% ?debugFmt("connection_mgr calling connection callback for ~p", [Pid]), - ok = Module:connected(Socket, gen_tcp, RemoteName, Version, Pid, []) - end), - {ok, make_ref()} - end). - start_rt() -> riak_repl_test_util:reset_meck(riak_repl2_rt, [no_link, passthrough]), WhoToTell = self(),