Skip to content

Commit

Permalink
storage: Modify mdcr-related functions and tests for leo-project/issu…
Browse files Browse the repository at this point in the history
  • Loading branch information
yosukehara committed Apr 26, 2017
1 parent 93cac74 commit b18f5bd
Show file tree
Hide file tree
Showing 9 changed files with 1,153 additions and 185 deletions.
3 changes: 3 additions & 0 deletions apps/leo_storage/include/leo_storage.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@
?CMD_HEAD
).

-type(etag_ret() :: {etag, non_neg_integer()}).


%% @doc queue-related.
-define(QUEUE_ID_PER_OBJECT, 'leo_per_object_queue').
-define(QUEUE_ID_SYNC_BY_VNODE_ID, 'leo_sync_by_vnode_id_queue').
Expand Down
395 changes: 272 additions & 123 deletions apps/leo_storage/src/leo_storage_handler_object.erl

Large diffs are not rendered by default.

97 changes: 64 additions & 33 deletions apps/leo_storage/src/leo_storage_mq.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
%%
%% LeoFS Storage
%%
%% Copyright (c) 2012-2016 Rakuten, Inc.
%% Copyright (c) 2012-2017 Rakuten, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
Expand Down Expand Up @@ -440,16 +440,16 @@ handle_call(_,_,_) ->
%%--------------------------------------------------------------------
%% @doc synchronize by vnode-id.
%% @private
-spec(recover_node(atom()) ->
ok).
-spec(recover_node(Node) ->
ok when Node::node()).
recover_node(Node) ->
Callback = recover_node_callback(Node),
_ = leo_object_storage_api:fetch_by_addr_id(0, Callback),
ok.

%% @private
-spec(recover_node_callback(atom()) ->
any()).
-spec(recover_node_callback(Node) ->
any() when Node::node()).
recover_node_callback(Node) ->
fun(K, V, Acc) ->
Metadata_1 = binary_to_term(V),
Expand Down Expand Up @@ -533,8 +533,12 @@ send_object_to_remote_node(Node, AddrId, Key) ->

%% @doc synchronize by vnode-id.
%% @private
-spec(sync_vnodes(atom(), integer(), list()) ->
ok).
-spec(sync_vnodes(Node, RingHash, ListOfFromToAddrId) ->
ok when Node::node(),
RingHash::integer(),
FromAddrId::integer(),
ToAddrId::integer(),
ListOfFromToAddrId::[{FromAddrId, ToAddrId}]).
sync_vnodes(_, _, []) ->
ok;
sync_vnodes(Node, RingHash, [{FromAddrId, ToAddrId}|T]) ->
Expand All @@ -544,8 +548,10 @@ sync_vnodes(Node, RingHash, [{FromAddrId, ToAddrId}|T]) ->
sync_vnodes(Node, RingHash, T).

%% @private
-spec(sync_vnodes_callback(atom(), pos_integer(), pos_integer()) ->
any()).
-spec(sync_vnodes_callback(Node, FromAddrId, ToAddrId) ->
any() when Node::node(),
FromAddrId::integer(),
ToAddrId::integer()).
sync_vnodes_callback(Node, FromAddrId, ToAddrId)->
fun(_K, V, Acc) ->
%% Note: An object of copy is NOT equal current ring-hash.
Expand Down Expand Up @@ -581,8 +587,11 @@ sync_vnodes_callback(Node, FromAddrId, ToAddrId)->

%% @doc Remove a node from redundancies
%% @private
-spec(delete_node_from_redundancies(list(#redundant_node{}), atom(), list(#redundant_node{})) ->
{ok, list(#redundant_node{})}).

-spec(delete_node_from_redundancies(Redundancies, Node, AccRedundancies) ->
{ok, AccRedundancies} when Redundancies::[#redundant_node{}],
Node::node(),
AccRedundancies::[#redundant_node{}]).
delete_node_from_redundancies([],_,Acc) ->
{ok, lists:reverse(Acc)};
delete_node_from_redundancies([#redundant_node{node = Node}|Rest], Node, Acc) ->
Expand All @@ -593,8 +602,10 @@ delete_node_from_redundancies([RedundantNode|Rest], Node, Acc) ->

%% @doc Find a node from redundancies
%% @private
-spec(find_node_from_redundancies(list(#redundant_node{}), atom()) ->
boolean()).
-spec(find_node_from_redundancies(Redundancies, Node) ->
Ret when Redundancies::[#redundant_node{}],
Node::node(),
Ret::boolean()).
find_node_from_redundancies([],_) ->
false;
find_node_from_redundancies([#redundant_node{node = Node}|_], Node) ->
Expand All @@ -605,8 +616,11 @@ find_node_from_redundancies([_|Rest], Node) ->

%% @doc Notify a message to manager node(s)
%% @private
-spec(notify_message_to_manager(list(), integer(), atom()) ->
ok | {error, any()}).
-spec(notify_message_to_manager(ManagerNodes, VNodeId, Node) ->
ok | {error, Cause} when ManagerNodes::[node()],
VNodeId::integer(),
Node::node(),
Cause::any()).
notify_message_to_manager([],_VNodeId,_Node) ->
{error, 'fail_notification'};
notify_message_to_manager([Manager|T], VNodeId, Node) ->
Expand All @@ -633,8 +647,9 @@ notify_message_to_manager([Manager|T], VNodeId, Node) ->

%% @doc correct_redundancies/1 - first.
%% @private
-spec(correct_redundancies(binary()) ->
ok | {error, any()}).
-spec(correct_redundancies(Key) ->
ok | {error, Cause} when Key::binary(),
Cause::any()).
correct_redundancies(Key) ->
case leo_redundant_manager_api:get_redundancies_by_key(Key) of
{ok, #redundancies{nodes = Redundancies,
Expand Down Expand Up @@ -681,28 +696,33 @@ correct_redundancies_1(Key, AddrId, [#redundant_node{node = Node}|T], Metadatas,

%% @doc correct_redundancies_2/3
%% @private
-spec(correct_redundancies_2(list(), [any()]) ->
ok | {error, any()}).
correct_redundancies_2(ListOfMetadata, ErrorNodes) ->
H = case (erlang:length(ListOfMetadata) == 1) of
-spec(correct_redundancies_2(ListOfMetadatas, ErrorNodes) ->
ok | {error, any()} when ListOfMetadatas::[#?METADATA{}],
ErrorNodes::[any()]).
correct_redundancies_2(ListOfMetadatas, ErrorNodes) ->
%% Retrieve latest metadata of an object to fix its inconsistency
H = case (erlang:length(ListOfMetadatas) == 1) of
true ->
erlang:hd(ListOfMetadata);
erlang:hd(ListOfMetadatas);
false ->
MaxClock = lists:max([M#?METADATA.clock
|| {_,M} <- ListOfMetadata]),
|| {_,M} <- ListOfMetadatas]),
{_,RetL} = lists:foldl(
fun({_,#?METADATA{clock = Clock}} = M,
{MaxClock_1, Acc}) when Clock == MaxClock_1 ->
{MaxClock_1, [M|Acc]};
(_, {MaxClock_1, Acc}) ->
{MaxClock_1, Acc}
end, {MaxClock, []}, ListOfMetadata),
end, {MaxClock, []}, ListOfMetadatas),
erlang:hd(RetL)
end,
{_,Metadata} = H,
{_, Metadata} = H,

%% If 'metadata' contains 'num_of_replicas > 0',
%% it is adopted instead of the local 'number of replicas'
{_Dest, CorrectNodes, InconsistentNodes} =
lists:foldl(
fun({Node,_Metadata}, {{DestNode, _Metadata} = Dest, C, R}) when Node =:= DestNode ->
fun({Node,_Metadata}, {{DestNode,_Metadata} = Dest, C, R}) when Node =:= DestNode ->
{Dest, [Node|C], R};
({Node, #?METADATA{clock = Clock}},
{{DestNode, #?METADATA{clock = DestClock}} = Dest, C, R}) when Node =/= DestNode,
Expand All @@ -712,7 +732,7 @@ correct_redundancies_2(ListOfMetadata, ErrorNodes) ->
{{DestNode, #?METADATA{clock = DestClock}} = Dest, C, R}) when Node =/= DestNode,
Clock =/= DestClock ->
{Dest, C, [Node|R]}
end, {H, [], []}, ListOfMetadata),
end, {H, [], []}, ListOfMetadatas),
correct_redundancies_3(ErrorNodes ++ InconsistentNodes, CorrectNodes, Metadata).


Expand Down Expand Up @@ -813,14 +833,22 @@ rebalance_2({ok, Redundancies}, #rebalance_message{node = Node,
AddrId, Key, Redundancies),
case find_node_from_redundancies(Redundancies_1, erlang:node()) of
true ->
send_object_to_remote_node(Node, AddrId, Key);
case lists:filter(
fun(#redundant_node{node = RedundantNode}) ->
Node == RedundantNode
end, Redundancies_1) of
[] ->
?MODULE:publish(?QUEUE_ID_PER_OBJECT,
AddrId, Key, ?ERR_TYPE_RECOVER_DATA);
_ ->
send_object_to_remote_node(Node, AddrId, Key)
end;
false ->
?warn("rebalance_2/2",
[{node, Node}, {addr_id, AddrId},
{key, Key}, {cause, 'node_not_found'}]),
ok = publish(?QUEUE_ID_PER_OBJECT,
AddrId, Key, ?ERR_TYPE_REPLICATE_DATA),
ok
publish(?QUEUE_ID_PER_OBJECT,
AddrId, Key, ?ERR_TYPE_REPLICATE_DATA)
end.


Expand Down Expand Up @@ -911,8 +939,11 @@ fix_consistency_between_clusters(#inconsistent_data_with_dc{
%%--------------------------------------------------------------------
%% @doc Lookup rebalance counter
%% @private
-spec(ets_lookup(atom(), integer()) ->
list() | {error, any()}).
-spec(ets_lookup(Table, Key) ->
{ok, Value} | {error, Cause} when Table::atom(),
Key::binary(),
Value::integer(),
Cause::any()).
ets_lookup(Table, Key) ->
case catch ets:lookup(Table, Key) of
[] ->
Expand Down
57 changes: 39 additions & 18 deletions apps/leo_storage/src/leo_sync_remote_cluster.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
%%
%% Leo Storage
%%
%% Copyright (c) 2012-2016 Rakuten, Inc.
%% Copyright (c) 2012-2017 Rakuten, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
Expand Down Expand Up @@ -340,15 +340,21 @@ stack_fun(ClusterId, #?OBJECT{addr_id = AddrId,
case leo_cluster_tbl_conf:get() of
{ok, #?SYSTEM_CONF{
cluster_id = MDC_ClusterId,
num_of_dc_replicas = MDC_NumOfReplicas}} ->
num_of_dc_replicas = MDCR_N,
mdcr_r = MDCR_R,
mdcr_w = MDCR_W,
mdcr_d = MDCR_D}} ->
CMeta_1 = leo_object_storage_transformer:list_to_cmeta_bin(
[{?PROP_CMETA_CLUSTER_ID, MDC_ClusterId},
{?PROP_CMETA_NUM_OF_REPLICAS, MDC_NumOfReplicas},
{?PROP_CMETA_NUM_OF_REPLICAS, MDCR_N},
{?PROP_CMETA_UDM, leo_object_storage_transformer:get_udm_from_cmeta_bin(CMeta)}
]),
CMetaLen = byte_size(CMeta_1),
Object_1 = Object#?OBJECT{cluster_id = MDC_ClusterId,
num_of_replicas = MDC_NumOfReplicas,
num_of_replicas = MDCR_N,
preferred_r = MDCR_R,
preferred_w = MDCR_W,
preferred_d = MDCR_D,
msize = CMetaLen,
meta = CMeta_1},
ObjBin = term_to_binary(Object_1),
Expand Down Expand Up @@ -419,33 +425,48 @@ slice(StackedObjs) ->
end.


%% @TODO
%% @doc Replicate an object between clusters
%% @private
-spec(replicate(atom(), #?OBJECT{}) ->
-spec(replicate(atom(), #object{} | #object_1{} | #?OBJECT{}) ->
[] | #?METADATA{}).
replicate(ClusterId, Object) ->
replicate(ClusterId, #?OBJECT{num_of_replicas = MDCR_N} = Object) ->
%% Retrieve redundancies of the cluster,
%% then overwrite 'n' and 'w' for the mdc-replication
Ret = case leo_mdcr_tbl_cluster_info:get(ClusterId) of
{ok, #?CLUSTER_INFO{num_of_dc_replicas = NumOfReplicas}} ->
Object_1 = Object#?OBJECT{cluster_id = ClusterId,
num_of_replicas = NumOfReplicas},
case leo_storage_handler_object:replicate(Object_1) of
{ok, _ETag} ->
{ok, leo_object_storage_transformer:object_to_metadata(Object)};
%% then overwrite 'n' and 'w' for the mdc-replication
Ret = case (MDCR_N == 0) of
true ->
case leo_mdcr_tbl_cluster_info:get(ClusterId) of
{ok, #?CLUSTER_INFO{num_of_dc_replicas = MDCR_1}} ->
{ok, MDCR_1};
{error, Cause} ->
?warn("replicate/2", [{cause, Cause}]),
{error, Cause}
end;
false ->
{ok, MDCR_N}
end,
replicate_1(Ret, Object#?OBJECT{cluster_id = ClusterId});
replicate(ClusterId, Object) ->
replicate(ClusterId, leo_object_storage_transformer:transform_object(Object)).

%% @private
replicate_1({ok, MDCR_N}, Object) ->
Object_1 = Object#?OBJECT{num_of_replicas = MDCR_N},
Ret = case leo_storage_handler_object:replicate(Object_1) of
{ok,_ETag} ->
{ok, leo_object_storage_transformer:object_to_metadata(Object_1)};
{error, Cause} ->
?warn("replicate/2", [{cause, Cause}]),
?warn("replicate_1/1", [{cause, Cause}]),
{error, Cause}
end,
replicate_1(Ret).
replicate_2(Ret);
replicate_1(Ret,_Object) ->
replicate_2(Ret).

%% @private
replicate_1({ok, Metadata}) ->
replicate_2({ok, Metadata}) ->
Metadata;
replicate_1(_Error) ->
replicate_2(_) ->
[].


Expand Down
2 changes: 1 addition & 1 deletion mdcr.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# GLOBALS
#-------------------------------------------------------------------------------
NCLUSTERS=2
CLUSTER_NSTORAGES=3
CLUSTER_NSTORAGES=4

#-------------------------------------------------------------------------------
# ROUTINES
Expand Down
26 changes: 21 additions & 5 deletions priv/test/mdcr-test/c1/leo_manager.conf.0
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,16 @@ system.cluster_id = leofs_1
## * See: https://leo-project.net/leofs/docs/configuration/configuration_1.html
## --------------------------------------------------------------------
## A number of replicas
consistency.num_of_replicas = 2
consistency.num_of_replicas = 3

## A number of replicas needed for a successful WRITE operation
consistency.write = 1
consistency.write = 2

## A number of replicas needed for a successful READ operation
consistency.read = 1

## A number of replicas needed for a successful DELETE operation
consistency.delete = 1
consistency.delete = 2

## A number of rack-aware replicas
consistency.rack_aware_replicas = 0
Expand All @@ -82,8 +82,24 @@ consistency.rack_aware_replicas = 0
## A number of replication targets
mdc_replication.max_targets = 2

## A number of replicas a DC
mdc_replication.num_of_replicas_a_dc = 1
## A number of replicas per a datacenter
## [note] A local LeoFS sends a stacked object which contains an items of a replication method:
## - [L1_N] A number of replicas
## - [L1_W] A number of replicas needed for a successful WRITE operation
## - [L1_R] A number of replicas needed for a successful READ operation
## - [L1_D] A number of replicas needed for a successful DELETE operation
## A remote cluster of a LeoFS system which receives its object,
## and then replicates it by its contained reoplication method.
mdc_replication.num_of_replicas_a_dc = 2

## MDC replication / A number of replicas needed for a successful WRITE operation
mdc_replication.consistency.write = 1

## MDC replication / A number of replicas needed for a successful READ operation
mdc_replication.consistency.read = 1

## MDC replication / A number of replicas needed for a successful DELETE operation
mdc_replication.consistency.delete = 1


## --------------------------------------------------------------------
Expand Down
Loading

0 comments on commit b18f5bd

Please sign in to comment.