Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

active preflist on bucket key #705

Merged
merged 1 commit into from
Feb 25, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 184 additions & 41 deletions src/riak_core_apl.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@
%% -------------------------------------------------------------------
-module(riak_core_apl).
-export([active_owners/1, active_owners/2,
get_apl/3, get_apl/4, get_apl_ann/3, get_apl_ann/4,
get_apl/3, get_apl/4,
get_apl_ann/2, get_apl_ann/3, get_apl_ann/4,
get_apl_ann_with_pnum/1,
get_primary_apl/3, get_primary_apl/4,
get_primary_apl_chbin/4,
first_up/2, offline_owners/1, offline_owners/2
]).

-export_type([preflist/0, preflist_ann/0]).
-export_type([preflist/0, preflist_ann/0, preflist_with_pnum_ann/0]).

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
Expand All @@ -40,13 +43,18 @@
-type ring() :: riak_core_ring:riak_core_ring().
-type preflist() :: [{index(), node()}].
-type preflist_ann() :: [{{index(), node()}, primary|fallback}].
%% @type preflist_with_pnum_ann -
%% Annoated preflist where the partition value is an id/number
%% (0 to ring_size-1) instead of a hash.
-type preflist_with_pnum_ann() :: [{{riak_core_ring:partition_id(), node()},
primary|fallback}].
-type iterator() :: term().
-type chashbin() :: term().
-type docidx() :: chash:index().

%% Return preflist of all active primary nodes (with no
%% substituion of fallbacks). Used to simulate a
%% preflist with N=ring_size
%% @doc Return preflist of all active primary nodes (with no
%% substituion of fallbacks). Used to simulate a
%% preflist with N=ring_size.
-spec active_owners(atom()) -> preflist_ann().
active_owners(Service) ->
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
Expand All @@ -59,62 +67,81 @@ active_owners(Ring, UpNodes) ->
{Up, _Pangs} = check_up(Primaries, UpNodes1, [], []),
Up.

%% Get the active preflist taking account of which nodes are up
%% @doc Get the active preflist taking account of which nodes are up.
-spec get_apl(docidx(), n_val(), atom()) -> preflist().
get_apl(DocIdx, N, Service) ->
{ok, CHBin} = riak_core_ring_manager:get_chash_bin(),
get_apl_chbin(DocIdx, N, CHBin, riak_core_node_watcher:nodes(Service)).

%% Get the active preflist taking account of which nodes are up
%% for a given chash/upnodes list
%% @doc Get the active preflist taking account of which nodes are up
%% for a given chash/upnodes list.
-spec get_apl_chbin(docidx(), n_val(), chashbin:chashbin(), [node()]) -> preflist().
get_apl_chbin(DocIdx, N, CHBin, UpNodes) ->
[{Partition, Node} || {{Partition, Node}, _Type} <-
get_apl_ann_chbin(DocIdx, N, CHBin, UpNodes)].

%% Get the active preflist taking account of which nodes are up
%% for a given ring/upnodes list
%% @doc Get the active preflist taking account of which nodes are up
%% for a given ring/upnodes list.
-spec get_apl(docidx(), n_val(), ring(), [node()]) -> preflist().
get_apl(DocIdx, N, Ring, UpNodes) ->
[{Partition, Node} || {{Partition, Node}, _Type} <-
[{Partition, Node} || {{Partition, Node}, _Type} <-
get_apl_ann(DocIdx, N, Ring, UpNodes)].

%% Get the active preflist taking account of which nodes are up
%% and annotate each node with type of primary/fallback
%% @doc Get the active preflist taking account of which nodes are up for a given
%% chash/upnodes list and annotate each node with type of primary/fallback.
get_apl_ann(DocIdx, N, UpNodes) ->
{ok, CHBin} = riak_core_ring_manager:get_chash_bin(),
get_apl_ann_chbin(DocIdx, N, CHBin, UpNodes).

%% Get the active preflist taking account of which nodes are up
%% for a given chash/upnodes list and annotate each node with type of
%% primary/fallback
-spec get_apl_ann_chbin(binary(), n_val(), chashbin(), [node()]) -> preflist_ann().
get_apl_ann_chbin(DocIdx, N, CHBin, UpNodes) ->
UpNodes1 = UpNodes,
Itr = chashbin:iterator(DocIdx, CHBin),
{Primaries, Itr2} = chashbin:itr_pop(N, Itr),
{Up, Pangs} = check_up(Primaries, UpNodes1, [], []),
Up ++ find_fallbacks_chbin(Pangs, Itr2, UpNodes1, []).

%% Get the active preflist taking account of which nodes are up
%% for a given ring/upnodes list and annotate each node with type of
%% primary/fallback
%% @doc Get the active preflist taking account of which nodes are up
%% for a given ring/upnodes list and annotate each node with type of
%% primary/fallback.
-spec get_apl_ann(binary(), n_val(), ring(), [node()]) -> preflist_ann().
get_apl_ann(DocIdx, N, Ring, UpNodes) ->
UpNodes1 = UpNodes,
Preflist = riak_core_ring:preflist(DocIdx, Ring),

{Primaries, Fallbacks} = lists:split(N, Preflist),
{Up, Pangs} = check_up(Primaries, UpNodes1, [], []),
Up ++ find_fallbacks(Pangs, Fallbacks, UpNodes1, []).

%% Same as get_apl, but returns only the primaries.
%% @doc Get the active preflist for a given {bucket, key} and list of nodes
%% and annotate each node with type of primary/fallback.
-spec get_apl_ann(riak_core_bucket:bucket(), [node()]) -> preflist_ann().
get_apl_ann({Bucket, Key}, UpNodes) ->
BucketProps = riak_core_bucket:get_bucket(Bucket),
NVal = proplists:get_value(n_val, BucketProps),
DocIdx = riak_core_util:chash_key({Bucket, Key}),
get_apl_ann(DocIdx, NVal, UpNodes).

%% @doc Get the active preflist taking account of which nodes are up
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add @doc.

%% for a given {bucket, key} and annotate each node with type of
%% primary/fallback
-spec get_apl_ann_with_pnum(riak_core_bucket:bucket()) -> preflist_with_pnum_ann().
get_apl_ann_with_pnum(BKey) ->
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
UpNodes = riak_core_ring:all_members(Ring),
Apl = get_apl_ann(BKey, UpNodes),
Size = riak_core_ring:num_partitions(Ring),
apl_with_partition_nums(Apl, Size).

%% @doc Get the active preflist taking account of which nodes are up
%% for a given chash/upnodes list and annotate each node with type of
%% primary/fallback.
-spec get_apl_ann_chbin(binary(), n_val(), chashbin(), [node()]) -> preflist_ann().
get_apl_ann_chbin(DocIdx, N, CHBin, UpNodes) ->
UpNodes1 = UpNodes,
Itr = chashbin:iterator(DocIdx, CHBin),
{Primaries, Itr2} = chashbin:itr_pop(N, Itr),
{Up, Pangs} = check_up(Primaries, UpNodes1, [], []),
Up ++ find_fallbacks_chbin(Pangs, Itr2, UpNodes1, []).

%% @doc Same as get_apl, but returns only the primaries.
-spec get_primary_apl(binary(), n_val(), atom()) -> preflist_ann().
get_primary_apl(DocIdx, N, Service) ->
{ok, CHBin} = riak_core_ring_manager:get_chash_bin(),
get_primary_apl_chbin(DocIdx, N, CHBin, riak_core_node_watcher:nodes(Service)).

%% Same as get_apl, but returns only the primaries.
%% @doc Same as get_apl, but returns only the primaries.
-spec get_primary_apl_chbin(binary(), n_val(), chashbin(), [node()]) -> preflist_ann().
get_primary_apl_chbin(DocIdx, N, CHBin, UpNodes) ->
UpNodes1 = UpNodes,
Expand All @@ -123,7 +150,7 @@ get_primary_apl_chbin(DocIdx, N, CHBin, UpNodes) ->
{Up, _} = check_up(Primaries, UpNodes1, [], []),
Up.

%% Same as get_apl, but returns only the primaries.
%% @doc Same as get_apl, but returns only the primaries.
-spec get_primary_apl(binary(), n_val(), ring(), [node()]) -> preflist_ann().
get_primary_apl(DocIdx, N, Ring, UpNodes) ->
UpNodes1 = UpNodes,
Expand All @@ -132,8 +159,8 @@ get_primary_apl(DocIdx, N, Ring, UpNodes) ->
{Up, _} = check_up(Primaries, UpNodes1, [], []),
Up.

%% Return the first entry that is up in the preflist for `DocIdx'. This
%% will crash if all owning nodes are offline.
%% @doc Return the first entry that is up in the preflist for `DocIdx'. This
%% will crash if all owning nodes are offline.
first_up(DocIdx, Service) ->
{ok, CHBin} = riak_core_ring_manager:get_chash_bin(),
Itr = chashbin:iterator(DocIdx, CHBin),
Expand All @@ -154,7 +181,7 @@ offline_owners(Service, CHBin) ->
end, CHBin),
DownVNodes.

%% Split a preference list into up and down lists
%% @doc Split a preference list into up and down lists.
-spec check_up(preflist(), [node()], preflist_ann(), preflist()) -> {preflist_ann(), preflist()}.
check_up([], _UpNodes, Up, Pangs) ->
{lists:reverse(Up), lists:reverse(Pangs)};
Expand All @@ -166,7 +193,7 @@ check_up([{Partition,Node}|Rest], UpNodes, Up, Pangs) ->
check_up(Rest, UpNodes, Up, [{Partition, Node} | Pangs])
end.

%% Find fallbacks for downed nodes in the preference list
%% @doc Find fallbacks for downed nodes in the preference list.
-spec find_fallbacks(preflist(), preflist(), [node()], preflist_ann()) -> preflist_ann().
find_fallbacks(_Pangs, [], _UpNodes, Secondaries) ->
lists:reverse(Secondaries);
Expand All @@ -181,7 +208,7 @@ find_fallbacks([{Partition, _Node}|Rest]=Pangs, [{_,FN}|Fallbacks], UpNodes, Sec
find_fallbacks(Pangs, Fallbacks, UpNodes, Secondaries)
end.

%% Find fallbacks for downed nodes in the preference list
%% @doc Find fallbacks for downed nodes in the preference list.
-spec find_fallbacks_chbin(preflist(), iterator(),[node()], preflist_ann()) -> preflist_ann().
find_fallbacks_chbin([], _Fallbacks, _UpNodes, Secondaries) ->
lists:reverse(Secondaries);
Expand All @@ -198,10 +225,17 @@ find_fallbacks_chbin([{Partition, _Node}|Rest]=Pangs, Itr, UpNodes, Secondaries)
find_fallbacks_chbin(Pangs, Itr2, UpNodes, Secondaries)
end.

%% Return true if a node is up
%% @doc Return true if a node is up.
is_up(Node, UpNodes) ->
lists:member(Node, UpNodes).

%% @doc Return annotated preflist with partition ids/nums instead of hashes.
-spec apl_with_partition_nums(preflist_ann(), riak_core_ring:ring_size()) ->
preflist_with_pnum_ann().
apl_with_partition_nums(Apl, Size) ->
[{{riak_core_ring_util:hash_to_partition_id(Hash, Size), Node}, Ann} ||
{{Hash, Node}, Ann} <- Apl].

-ifdef(TEST).

smallest_test() ->
Expand All @@ -213,12 +247,12 @@ four_node_test() ->
Ring = perfect_ring(8, Nodes),
?assertEqual([{0,nodea},
{182687704666362864775460604089535377456991567872,nodeb},
{365375409332725729550921208179070754913983135744,nodec}],
{365375409332725729550921208179070754913983135744,nodec}],
get_apl(last_in_ring(), 3, Ring, Nodes)),
%% With a node down
?assertEqual([{182687704666362864775460604089535377456991567872,nodeb},
{365375409332725729550921208179070754913983135744,nodec},
{0,noded}],
{0,noded}],
get_apl(last_in_ring(), 3, Ring, [nodeb, nodec, noded])),
%% With two nodes down
?assertEqual([{365375409332725729550921208179070754913983135744,nodec},
Expand All @@ -231,8 +265,7 @@ four_node_test() ->
{365375409332725729550921208179070754913983135744,nodea}],
get_apl(last_in_ring(), 3, Ring, [nodea, nodeb])).


%% Create a perfect ring - RingSize must be a multiple of nodes
%% @doc Create a perfect ring - RingSize must be a multiple of nodes
perfect_ring(RingSize, Nodes) when RingSize rem length(Nodes) =:= 0 ->
Ring = riak_core_ring:fresh(RingSize,node()),
Owners = riak_core_ring:all_owners(Ring),
Expand Down Expand Up @@ -326,6 +359,116 @@ six_node_test() ->

ok.

six_node_bucket_key_ann_test() ->
{ok, [Ring0]} = file:consult("../test/my_ring"),
Nodes = ['dev1@127.0.0.1', 'dev2@127.0.0.1', 'dev3@127.0.0.1',
'dev4@127.0.0.1', 'dev5@127.0.0.1', 'dev6@127.0.0.1'],
Ring = riak_core_ring:upgrade(Ring0),
Bucket = <<"favorite">>,
Key = <<"jethrotull">>,
application:set_env(riak_core, default_bucket_props,
[{n_val, 3},
{chash_keyfun,{riak_core_util,chash_std_keyfun}}]),
riak_core_ring_manager:setup_ets(test),
riak_core_ring_manager:set_ring_global(Ring),
Size = riak_core_ring:num_partitions(Ring),
?assertEqual([{{34,
'dev5@127.0.0.1'},
primary},
{{35,
'dev6@127.0.0.1'},
primary},
{{36,
'dev1@127.0.0.1'},
primary}],
apl_with_partition_nums(
get_apl_ann({Bucket, Key}, Nodes), Size)),
?assertEqual([{{35,
'dev6@127.0.0.1'},
primary},
{{36,
'dev1@127.0.0.1'},
primary},
{{34,
'dev2@127.0.0.1'},
fallback}],
apl_with_partition_nums(
get_apl_ann({Bucket, Key}, Nodes --
['dev5@127.0.0.1']), Size)),
?assertEqual([{{36,
'dev1@127.0.0.1'},
primary},
{{34,
'dev2@127.0.0.1'},
fallback},
{{35,
'dev3@127.0.0.1'},
fallback}],
apl_with_partition_nums(
get_apl_ann({Bucket, Key}, Nodes --
['dev5@127.0.0.1',
'dev6@127.0.0.1']), Size)),
?assertEqual([{{34,
'dev2@127.0.0.1'},
fallback},
{{35,
'dev3@127.0.0.1'},
fallback},
{{36,
'dev4@127.0.0.1'},
fallback}],
apl_with_partition_nums(
get_apl_ann({Bucket, Key}, Nodes --
['dev5@127.0.0.1',
'dev6@127.0.0.1',
'dev1@127.0.0.1']), Size)),
?assertEqual([{{34,
'dev3@127.0.0.1'},
fallback},
{{35,
'dev4@127.0.0.1'},
fallback},
{{36,
'dev3@127.0.0.1'},
fallback}],
apl_with_partition_nums(
get_apl_ann({Bucket, Key}, Nodes --
['dev5@127.0.0.1',
'dev6@127.0.0.1',
'dev1@127.0.0.1',
'dev2@127.0.0.1']), Size)),
?assertEqual([{{34,
'dev4@127.0.0.1'},
fallback},
{{35,
'dev4@127.0.0.1'},
fallback},
{{36,
'dev4@127.0.0.1'},
fallback}],
apl_with_partition_nums(
get_apl_ann({Bucket, Key}, Nodes --
['dev5@127.0.0.1',
'dev6@127.0.0.1',
'dev1@127.0.0.1',
'dev2@127.0.0.1',
'dev3@127.0.0.1']), Size)),
?assertEqual([{{34,
'dev5@127.0.0.1'},
primary},
{{35,
'dev6@127.0.0.1'},
primary},
{{36,
'dev3@127.0.0.1'},
fallback}],
apl_with_partition_nums(
get_apl_ann({Bucket, Key}, Nodes --
['dev1@127.0.0.1',
'dev2@127.0.0.1']), Size)),
riak_core_ring_manager:cleanup_ets(test),
ok.

chbin_test_() ->
{timeout, 180, fun chbin_test_scenario/0}.

Expand Down
6 changes: 5 additions & 1 deletion src/riak_core_bucket.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,16 @@
name/1,
n_val/1]).

-export_type([bucket/0]).

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.

-define(METADATA_PREFIX, {core, buckets}).

-type bucket() :: binary() | {riak_core_bucket_type:bucket_type(), binary()}.

%% @doc Add a list of defaults to global list of defaults for new
%% buckets. If any item is in Items is already set in the
%% current defaults list, the new setting is omitted, and the old
Expand All @@ -55,7 +59,7 @@ append_bucket_defaults(Items) when is_list(Items) ->

%% @doc Set the given BucketProps in Bucket or {BucketType, Bucket}. If BucketType does not
%% exist, or is not active, {error, no_type} is returned.
-spec set_bucket(binary() | {riak_core_bucket_type:bucket_type(), binary()}, [{atom(), any()}]) ->
-spec set_bucket(bucket(), [{atom(), any()}]) ->
ok | {error, no_type | [{atom(), atom()}]}.
set_bucket({<<"default">>, Name}, BucketProps) ->
set_bucket(Name, BucketProps);
Expand Down
Loading