Skip to content

Commit

Permalink
storage: Modified mdcr-related functions for leo-project#645 (2)
Browse files Browse the repository at this point in the history
  • Loading branch information
yosukehara committed Apr 26, 2017
1 parent 5982921 commit d84a516
Show file tree
Hide file tree
Showing 8 changed files with 834 additions and 79 deletions.
2 changes: 0 additions & 2 deletions apps/leo_storage/src/leo_storage_handler_object.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1293,8 +1293,6 @@ read_and_repair_3({ok, Metadata, #?OBJECT{data = Bin,
case (NumOfReplicas > 0 andalso
Preferred_R > 0) of
true ->
?debugVal({NumOfReplicas, Preferred_R}),
?debugVal(Redundancies),
{Preferred_R, lists:sublist(Redundancies, NumOfReplicas - 1)};
false ->
{Quorum, Redundancies}
Expand Down
116 changes: 59 additions & 57 deletions apps/leo_storage/src/leo_storage_mq.erl
Original file line number Diff line number Diff line change
Expand Up @@ -440,16 +440,16 @@ handle_call(_,_,_) ->
%%--------------------------------------------------------------------
%% @doc synchronize by vnode-id.
%% @private
-spec(recover_node(atom()) ->
ok).
-spec(recover_node(Node) ->
ok when Node::node()).
recover_node(Node) ->
Callback = recover_node_callback(Node),
_ = leo_object_storage_api:fetch_by_addr_id(0, Callback),
ok.

%% @private
-spec(recover_node_callback(atom()) ->
any()).
-spec(recover_node_callback(Node) ->
any() when Node::node()).
recover_node_callback(Node) ->
fun(K, V, Acc) ->
Metadata_1 = binary_to_term(V),
Expand Down Expand Up @@ -512,41 +512,17 @@ recover_node_callback_2([SrcNode|Rest], AddrId, Key, FixedNode) ->
send_object_to_remote_node(Node, AddrId, Key) ->
Ref = make_ref(),
case leo_storage_handler_object:get({Ref, Key}) of
{ok, Ref, #?METADATA{num_of_replicas = Preferred_N} = Metadata, Bin} ->
%% Check redundant nodes by key's addr_id
%% whether this node needs to store the object OR not
case leo_redundant_manager_api:get_redundancies_by_addr_id(AddrId) of
{ok, #redundancies{nodes = Redundancies,
n = N}} ->
Redundancies_1 =
case (N > Preferred_N andalso Preferred_N > 0) of
true ->
lists:sublist(Redundancies, Preferred_N);
false ->
Redundancies
end,

case lists:filter(
fun(#redundant_node{node = RedundantNode}) ->
Node == RedundantNode
end, Redundancies_1) of
[] ->
ok;
_ ->
case rpc:call(Node, leo_sync_local_cluster, store,
[Metadata, Bin], ?DEF_REQ_TIMEOUT) of
ok ->
ok;
{error, inconsistent_obj} ->
?MODULE:publish(?QUEUE_ID_PER_OBJECT,
AddrId, Key, ?ERR_TYPE_RECOVER_DATA);
_ ->
?MODULE:publish(?QUEUE_ID_PER_OBJECT, AddrId, Key,
Node, true, ?ERR_TYPE_RECOVER_DATA)
end
end;
{error, Reason} ->
{error, Reason}
{ok, Ref, Metadata, Bin} ->
case rpc:call(Node, leo_sync_local_cluster, store,
[Metadata, Bin], ?DEF_REQ_TIMEOUT) of
ok ->
ok;
{error, inconsistent_obj} ->
?MODULE:publish(?QUEUE_ID_PER_OBJECT,
AddrId, Key, ?ERR_TYPE_RECOVER_DATA);
_ ->
?MODULE:publish(?QUEUE_ID_PER_OBJECT, AddrId, Key,
Node, true, ?ERR_TYPE_RECOVER_DATA)
end;
{error, Ref, Cause} ->
{error, Cause};
Expand All @@ -557,8 +533,12 @@ send_object_to_remote_node(Node, AddrId, Key) ->

%% @doc synchronize by vnode-id.
%% @private
-spec(sync_vnodes(atom(), integer(), list()) ->
ok).
-spec(sync_vnodes(Node, RingHash, ListOfFromToAddrId) ->
ok when Node::node(),
RingHash::integer(),
FromAddrId::integer(),
ToAddrId::integer(),
ListOfFromToAddrId::[{FromAddrId, ToAddrId}]).
sync_vnodes(_, _, []) ->
ok;
sync_vnodes(Node, RingHash, [{FromAddrId, ToAddrId}|T]) ->
Expand All @@ -568,8 +548,10 @@ sync_vnodes(Node, RingHash, [{FromAddrId, ToAddrId}|T]) ->
sync_vnodes(Node, RingHash, T).

%% @private
-spec(sync_vnodes_callback(atom(), pos_integer(), pos_integer()) ->
any()).
-spec(sync_vnodes_callback(Node, FromAddrId, ToAddrId) ->
any() when Node::node(),
FromAddrId::integer(),
ToAddrId::integer()).
sync_vnodes_callback(Node, FromAddrId, ToAddrId)->
fun(_K, V, Acc) ->
%% Note: An object of copy is NOT equal current ring-hash.
Expand Down Expand Up @@ -605,8 +587,11 @@ sync_vnodes_callback(Node, FromAddrId, ToAddrId)->

%% @doc Remove a node from redundancies
%% @private
-spec(delete_node_from_redundancies(list(#redundant_node{}), atom(), list(#redundant_node{})) ->
{ok, list(#redundant_node{})}).

-spec(delete_node_from_redundancies(Redundancies, Node, AccRedundancies) ->
{ok, AccRedundancies} when Redundancies::[#redundant_node{}],
Node::node(),
AccRedundancies::[#redundant_node{}]).
delete_node_from_redundancies([],_,Acc) ->
{ok, lists:reverse(Acc)};
delete_node_from_redundancies([#redundant_node{node = Node}|Rest], Node, Acc) ->
Expand All @@ -617,8 +602,10 @@ delete_node_from_redundancies([RedundantNode|Rest], Node, Acc) ->

%% @doc Find a node from redundancies
%% @private
-spec(find_node_from_redundancies(list(#redundant_node{}), atom()) ->
boolean()).
-spec(find_node_from_redundancies(Redundancies, Node) ->
Ret when Redundancies::[#redundant_node{}],
Node::node(),
Ret::boolean()).
find_node_from_redundancies([],_) ->
false;
find_node_from_redundancies([#redundant_node{node = Node}|_], Node) ->
Expand All @@ -629,8 +616,11 @@ find_node_from_redundancies([_|Rest], Node) ->

%% @doc Notify a message to manager node(s)
%% @private
-spec(notify_message_to_manager(list(), integer(), atom()) ->
ok | {error, any()}).
-spec(notify_message_to_manager(ManagerNodes, VNodeId, Node) ->
ok | {error, Cause} when ManagerNodes::[node()],
VNodeId::integer(),
Node::node(),
Cause::any()).
notify_message_to_manager([],_VNodeId,_Node) ->
{error, 'fail_notification'};
notify_message_to_manager([Manager|T], VNodeId, Node) ->
Expand All @@ -657,8 +647,9 @@ notify_message_to_manager([Manager|T], VNodeId, Node) ->

%% @doc correct_redundancies/1 - first.
%% @private
-spec(correct_redundancies(binary()) ->
ok | {error, any()}).
-spec(correct_redundancies(Key) ->
ok | {error, Cause} when Key::binary(),
Cause::any()).
correct_redundancies(Key) ->
case leo_redundant_manager_api:get_redundancies_by_key(Key) of
{ok, #redundancies{nodes = Redundancies,
Expand Down Expand Up @@ -842,14 +833,22 @@ rebalance_2({ok, Redundancies}, #rebalance_message{node = Node,
AddrId, Key, Redundancies),
case find_node_from_redundancies(Redundancies_1, erlang:node()) of
true ->
send_object_to_remote_node(Node, AddrId, Key);
case lists:filter(
fun(#redundant_node{node = RedundantNode}) ->
Node == RedundantNode
end, Redundancies_1) of
[] ->
?MODULE:publish(?QUEUE_ID_PER_OBJECT,
AddrId, Key, ?ERR_TYPE_RECOVER_DATA);
_ ->
send_object_to_remote_node(Node, AddrId, Key)
end;
false ->
?warn("rebalance_2/2",
[{node, Node}, {addr_id, AddrId},
{key, Key}, {cause, 'node_not_found'}]),
ok = publish(?QUEUE_ID_PER_OBJECT,
AddrId, Key, ?ERR_TYPE_REPLICATE_DATA),
ok
publish(?QUEUE_ID_PER_OBJECT,
AddrId, Key, ?ERR_TYPE_REPLICATE_DATA)
end.


Expand Down Expand Up @@ -940,8 +939,11 @@ fix_consistency_between_clusters(#inconsistent_data_with_dc{
%%--------------------------------------------------------------------
%% @doc Lookup rebalance counter
%% @private
-spec(ets_lookup(atom(), integer()) ->
list() | {error, any()}).
-spec(ets_lookup(Table, Key) ->
{ok, Value} | {error, Cause} when Table::atom(),
Key::binary(),
Value::integer(),
Cause::any()).
ets_lookup(Table, Key) ->
case catch ets:lookup(Table, Key) of
[] ->
Expand Down
9 changes: 0 additions & 9 deletions apps/leo_storage/src/leo_sync_remote_cluster.erl
Original file line number Diff line number Diff line change
Expand Up @@ -452,15 +452,6 @@ replicate(ClusterId, Object) ->
%% @private
replicate_1({ok, MDCR_N}, Object) ->
Object_1 = Object#?OBJECT{num_of_replicas = MDCR_N},
%% @DEBUG >>
lists:foreach(fun(X) ->
?debugVal(X)
end, lists:zip(
record_info(
fields, ?OBJECT),
tl(tuple_to_list(Object_1)))),
%% <<

Ret = case leo_storage_handler_object:replicate(Object_1) of
{ok,_ETag} ->
{ok, leo_object_storage_transformer:object_to_metadata(Object_1)};
Expand Down
2 changes: 1 addition & 1 deletion mdcr.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# GLOBALS
#-------------------------------------------------------------------------------
NCLUSTERS=2
CLUSTER_NSTORAGES=3
CLUSTER_NSTORAGES=4

#-------------------------------------------------------------------------------
# ROUTINES
Expand Down
26 changes: 21 additions & 5 deletions priv/test/mdcr-test/c1/leo_manager.conf.0
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,16 @@ system.cluster_id = leofs_1
## * See: https://leo-project.net/leofs/docs/configuration/configuration_1.html
## --------------------------------------------------------------------
## A number of replicas
consistency.num_of_replicas = 2
consistency.num_of_replicas = 3

## A number of replicas needed for a successful WRITE operation
consistency.write = 1
consistency.write = 2

## A number of replicas needed for a successful READ operation
consistency.read = 1

## A number of replicas needed for a successful DELETE operation
consistency.delete = 1
consistency.delete = 2

## A number of rack-aware replicas
consistency.rack_aware_replicas = 0
Expand All @@ -82,8 +82,24 @@ consistency.rack_aware_replicas = 0
## A number of replication targets
mdc_replication.max_targets = 2

## A number of replicas a DC
mdc_replication.num_of_replicas_a_dc = 1
## A number of replicas per a datacenter
## [note] A local LeoFS sends a stacked object which contains an items of a replication method:
## - [L1_N] A number of replicas
## - [L1_W] A number of replicas needed for a successful WRITE operation
## - [L1_R] A number of replicas needed for a successful READ operation
## - [L1_D] A number of replicas needed for a successful DELETE operation
## A remote cluster of a LeoFS system which receives its object,
## and then replicates it by its contained reoplication method.
mdc_replication.num_of_replicas_a_dc = 2

## MDC replication / A number of replicas needed for a successful WRITE operation
mdc_replication.consistency.write = 1

## MDC replication / A number of replicas needed for a successful READ operation
mdc_replication.consistency.read = 1

## MDC replication / A number of replicas needed for a successful DELETE operation
mdc_replication.consistency.delete = 1


## --------------------------------------------------------------------
Expand Down
Loading

0 comments on commit d84a516

Please sign in to comment.