Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mas i1775 ttaaesafety #1776

Merged
merged 52 commits into from
Dec 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
4f4e522
Add backoffs on query timeouts
martinsumner Nov 10, 2020
45c9a15
Switch to using fetch_clocks_range
martinsumner Nov 12, 2020
36353b1
Fix issue with format switches onmodifed date ranges
martinsumner Nov 12, 2020
1cc2267
Switch kv_index_tictactree branch
martinsumner Nov 12, 2020
2e425d9
Add support for DayCheck/HourCheck on full fullsync
martinsumner Nov 12, 2020
2bc9157
Add API changes for fetch_clock_nval
martinsumner Nov 16, 2020
8fa58d1
Correction to default nval filter
martinsumner Nov 16, 2020
e5dedf6
Schema and log refinements
martinsumner Nov 16, 2020
33a963b
Temp commit
martinsumner Nov 17, 2020
870b5c9
Add configurable pause
martinsumner Nov 18, 2020
f48568b
Revert "Temp commit"
martinsumner Nov 18, 2020
0a6e7b9
Improve reporting of repairs
martinsumner Nov 19, 2020
0d1f747
Update rebar.config
martinsumner Nov 19, 2020
26f3462
Fix repair logging
martinsumner Nov 19, 2020
4106675
Use environment variable to boost/quiet work items
martinsumner Nov 19, 2020
7af3c2a
Log and quietener improvements
martinsumner Nov 19, 2020
8a936c6
split queues for all_sync and modified_range queries
martinsumner Nov 20, 2020
6c33cc8
Decode clock before compare
martinsumner Nov 20, 2020
a7ff5b4
Decode clocks in mod_range queries
martinsumner Nov 20, 2020
5282d0d
Remove auto-quitener
martinsumner Nov 20, 2020
2b4cb72
Add support for range_check
martinsumner Nov 21, 2020
9da4bab
Add scheduling support for range_sync
martinsumner Nov 21, 2020
d346369
Resolve name clash
martinsumner Nov 22, 2020
8123e88
Correct Mesgaseconds
martinsumner Nov 22, 2020
4ddb72f
Typo
martinsumner Nov 22, 2020
d4dfcc8
Make use of range_check
martinsumner Nov 23, 2020
4c71d57
Correct success reply names
martinsumner Nov 23, 2020
00cf684
Should assume a success on startup
martinsumner Nov 23, 2020
948de1e
Improve handling of crashed nodes
martinsumner Nov 24, 2020
6752535
Add log on repairing/failed exchange
martinsumner Nov 25, 2020
1701a92
Use _check not _sync consistently in schema and code
martinsumner Nov 25, 2020
9459e0e
Use a specific bucket range check if hits target
martinsumner Nov 25, 2020
08769cc
Use more cautious defaults
martinsumner Nov 25, 2020
55fccf7
Update riak_kv.schema
martinsumner Nov 26, 2020
9796c51
Check the bucket with the biggest key count
martinsumner Nov 27, 2020
faf21af
Update rebar.config
martinsumner Nov 29, 2020
5c01d1a
Add compaction run control into leveled backend
martinsumner Nov 29, 2020
f78b750
Make pause in exchange configurable
martinsumner Nov 30, 2020
7aabbbe
ttaaefs_manager - not too clever
martinsumner Dec 1, 2020
19cc3f5
Update NextGenREPL-GettingStarted.md
martinsumner Dec 1, 2020
b0a4062
Tidy-up post review
martinsumner Dec 2, 2020
84d65c4
BE AWARE - potential breaking change
martinsumner Dec 3, 2020
337148e
Typo
martinsumner Dec 3, 2020
3d4ccdd
Switch to improve list_buckets
martinsumner Dec 4, 2020
5de68b1
Changes following review
martinsumner Dec 4, 2020
422594a
Additional review changes
martinsumner Dec 4, 2020
6acbb9e
Export validity checks for aae folds
martinsumner Dec 10, 2020
c25897e
Shortcuts to riak_client functions
martinsumner Dec 11, 2020
b4687d9
Missing export
martinsumner Dec 11, 2020
de8b709
Switch to develop-2.0 branches
martinsumner Dec 14, 2020
c90133b
Update NextGenREPL-GettingStarted.md
martinsumner Dec 14, 2020
1a78da3
Update rebar.config
martinsumner Dec 14, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
376 changes: 376 additions & 0 deletions docs/NextGenREPL-GettingStarted.md

Large diffs are not rendered by default.

14 changes: 10 additions & 4 deletions docs/NextGenREPL.md
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,13 @@ The queue to write the discovery of any updates where *this* cluster has objects
The queue defined here must also be defined in `replrtq_srcqueue`, and consumed by a `riak_kv_replrtq_snk` configured on another cluster. The `replrtq_enablesrc` does NOT need to be enabled for this queue to function. There will be need for a node on the remote cluster to have `replrtq_enablesink` enabled for the differences discovered by this full-sync to be pulled in to the remote cluster.

`ttaaefs_bucketfilter_name = sample_bucketname`

`ttaaefs_bucketfilter_type = default`

To be configured if `ttaaefs_scope` is restricted to `bucket` - allows the bucket name to be configured, as well as the type if the type is not default.

`ttaaefs_localnval = 3`

`ttaaefs_remotenval = 3`

To be configured if `ttaaefs_scope` is set to all. The peer relationship will full-sync for all buckets/keys, but restricted to those buckets with this n_val. There needs to be a 1:1 relationship between the clusters on n_vals - sets of buckets with an n_val can map to the same sets on a different cluster with a consistently different n_val. If there are buckets with both n_val 3 and n_val 5 on a source cluster, it cannot sync if all buckets are n_val 1 on the sink cluster - but could sync if n_val 3 buckets mapped to n_val 1 buckets and n_val 5 buckets mapped to n_val 3 buckets on the sink.
Expand All @@ -237,23 +239,23 @@ The IP port and protocol to be used when communicating with the sink cluster. T

If it is necessary to encrypt the communication in transit, then the `pb` peer protocol must be used (and TLS security configured on the sink cluster `pb` api). The port is the standard port of the application-facing Riak API for that cluster.

`ttaaefs_allcheck = 100`
`ttaaefs_allcheck = 24`

`ttaaefs_nocheck = 0`

If `all` is the scope for cluster full-sync, then only the `allcheck` and `nocheck` counts should be configured. This is the number of times per day to run a sync operation (or in the case of `nocheck` to skip a sync operation). It is preferable that the total number of sync's configured per-node within the cluster is the same for each node - this will allow for the cluster to best schedule work to avoid overlapping sync operations. The `ttaaefs_nocheck` exists to allow these numbers to be evened up.

For example if this cluster has a peer relationship with `cluster_b` with which it is expected to sync 100 times per day, and another node has a peer relationship with `cluster_c` and a requirement to sync 30 times per day - the second peer relationship should have 70 `ttaaefs_nocheck` syncs configured to balance the schedule within the cluster.
For example if this cluster has a peer relationship with `cluster_b` with which it is expected to sync 24 times per day, and another node has a peer relationship with `cluster_c` and a requirement to sync 6 times per day - the second peer relationship should have 18 `ttaaefs_nocheck` syncs configured to balance the schedule within the cluster.

`ttaaefs_allcheck = 100`
`ttaaefs_allcheck = 24`

`ttaaefs_nocheck = 0`

`ttaaefs_hourcheck = 0`

`ttaaefs_daycheck = 0`

If `bucket` or `type` is the scope for this node, then two additional sync counts can be configured. The `hourcheck` and `daycheck` will be configured to sync only for objects modified in the past hour or day (as determined by the objects last_modified metadata). When running a full-sync check restricted to a bucket or type, this is less efficient than running a check for an entire n_val as cached trees may not be used. Running a `ttaaefs_allcheck` may take o(hour) on a large bucket, whereas `ttaaefs_daycheck` may be o(minute) and `ttaaefs_hourcheck` may be o(second). so it would be preferable to frequently check recent modifications have synced correctly, and less frequently check that older modifications have synchronised.
If `bucket` or `type` is the scope for this node, then two additional sync counts can be configured. The `hourcheck` and `daycheck` will be configured to sync only for objects modified in the past hour or day (as determined by the objects last_modified metadata). When running a full-sync check restricted to a bucket or type, this is less efficient than running a check for an entire n_val as cached trees may not be used. Running a `ttaaefs_allcheck` may take o(hour) on a large bucket, whereas `ttaaefs_daycheck` may be o(minute) and `ttaaefs_hourcheck` may be o(second). So it would be preferable to frequently check recent modifications have synced correctly, and less frequently check that older modifications have synchronised.

A random distribution of checks is used, based on the allocated counts. If a check count of 1 of N is set for a given check type, there is no pre-determination of when in the day that check will run.

Expand All @@ -263,6 +265,10 @@ If a previous check is still running when the allocated scheduled time for the n

By default the `riak_kv_ttaaefs_manager` will log counts of keys repaired on each sync. Enabling `ttaaefs_logrepairs` will log the Bucket and Key of every key re-queued for synchronisation via full-sync.

`ttaaefs_maxresults = 256`

A negative aspect of the Tictac AAE full-sync solution is that deltas are relatively slow to be fixed. The Merkle tree used to represent the cluster has 1M segments, but only `ttaaefs_maxresults` segments will be repaired each cycle. This means that if there are 100K discrepancies, and we assume these discrepancies are evenly distributed around the Merkle tree - it will take 400 full-sync cycles to complete the repair of the delta.

*Replication Source [`riak_kv_replrtq_src`](https://github.com/martinsumner/riak_kv/blob/mas-i1691-ttaaefullsync/src/riak_kv_replrtq_src.erl)*

`replrtq_enablesrc = enabled`
Expand Down
105 changes: 92 additions & 13 deletions priv/riak_kv.schema
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,15 @@

%% @doc Frequency to prompt exchange per vnode
%% The number of milliseconds which the vnode must wait between self-pokes to
%% maybe prompt the next exchange. Default is 4 minutes - check all partitions
%% when n=3 every 30 minutes.
%% maybe prompt the next exchange. Default is 8 minutes - check all partitions
%% when n=3 once every hour (in each direction). A cycle of exchanges will
%% take (n - 1) * n + 1 exchange ticks for each nval.
%% Note if this is to be reduced further the riak_core vnode_inactivity_timeout
%% should also be reduced or handoffs may be blocked. To be safe the
%% vnode_inactivity_timeout must be < 0.5 * the tictacaae_exchangetick.
{mapping, "tictacaae_exchangetick", "riak_kv.tictacaae_exchangetick", [
{datatype, integer},
{default, 240000},
{default, 480000},
hidden
]}.

Expand Down Expand Up @@ -153,19 +154,19 @@
%% AAE rebuilds at present
{mapping, "node_worker_pool_size", "riak_kv.node_worker_pool_size", [
{datatype, integer},
{default, 2}
{default, 4}
]}.
{mapping, "af1_worker_pool_size", "riak_kv.af1_worker_pool_size", [
{datatype, integer},
{default, 1}
{default, 2}
]}.
{mapping, "af2_worker_pool_size", "riak_kv.af2_worker_pool_size", [
{datatype, integer},
{default, 1}
]}.
{mapping, "af3_worker_pool_size", "riak_kv.af3_worker_pool_size", [
{datatype, integer},
{default, 2}
{default, 4}
]}.
{mapping, "af4_worker_pool_size", "riak_kv.af4_worker_pool_size", [
{datatype, integer},
Expand Down Expand Up @@ -842,7 +843,7 @@
{default, disabled}
]}.

%% @doc for tictac full-sync what registered queue name on this cluster should
%% @doc For tictac full-sync what registered queue name on this cluster should
%% be use for passing references to data which needs to be replicated for AAE
%% full-sync. This queue name must be defined as a
%% `riak_kv.replq<n>_queuename`, but need not be exlusive to full-sync (i.e. a
Expand All @@ -852,6 +853,33 @@
{default, q1_ttaaefs}
]}.

%% @doc For tictac full-sync what is the maximum number of AAE segments to be
%% compared per exchange. Reducing this will speed up clock compare queries,
%% but will increase the number of exchanges required to complete a repair.
%% If using range_check to speed-up repairs, this can be reduced as the
%% range_check maxresults will be boosted by the ttaaefs_rangeboost When using
%% range_check a value of 64 is recommended, which may be reduced to 32 or 16
%% if the cluster has a very large volume of keys and/or limited capacity.
%% Only reduce below 16 in exceptional circumstances.
%% More capacity to process sync queries can be added by increaseing the af2
%% and af3 queue sizes - but this will be at the risk of there being a bigger
%% impact on KV performance when repairs are required.
{mapping, "ttaaefs_maxresults", "riak_kv.ttaaefs_maxresults", [
{datatype, integer},
{default, 64}
]}.

%% @doc For tictac full-sync what is the maximum number of AAE segments to be
%% compared per exchange. When running a range_check query this will be the
%% ttaaefs_max results * ttaaefs_rangeboost.
%% When using range_check, a small maxresults can be used, in effect using
%% other *_check syncs as discovery queries (to find the range_check for the
%% range_check to do the heavy lifting)
{mapping, "ttaaefs_rangeboost", "riak_kv.ttaaefs_rangeboost", [
{datatype, integer},
{default, 8}
]}.

%% @doc For Tictac bucket full-sync which bucket should be sync'd by this
%% node. Only ascii string bucket definitions supported (which will be
%% converted using list_to_binary).
Expand Down Expand Up @@ -954,10 +982,20 @@
]}.

%% @doc How many times per 24hour period should all the data be checked to
%% confirm it is fully sync'd
%% confirm it is fully sync'd. When running a full (i.e. nval) sync this will
%% check all the data under that nval between the clusters, and when the trees
%% are out of alignment, will check across all data where the nval matches the
%% specified nval.
%% On large clusters (in terms of key count), this may take a long time - so
%% allcheck should be scheduled infrequently, as other checks may be delayed by
%% consumption of queue resource by the allcheck.
%% The af3_queue size, and the ttaaefs_maxresults, both need to be tuned to
%% ensure that the allcheck can run wihtin the 30 minute timeout.
%% For per-bucket replication all is a reference to all of the data for that
%% bucket, and warnings about sizing are specially relevant.
{mapping, "ttaaefs_allcheck", "riak_kv.ttaaefs_allcheck", [
{datatype, integer},
{default, 100}
{default, 24}
]}.

%% @doc How many times per 24hour period should no data be checked to
Expand All @@ -970,21 +1008,52 @@
]}.

%% @doc How many times per 24hour period should the last hours data be checked
%% to confirm it is fully sync'd. This is relevant for per-bucket sync scope
%% only, and will be ignored if all sync is enabled
%% to confirm it is fully sync'd.
%% For per-bucket replication, the tree comparison prompted by this will be
%% constrained by the time period, as well as the keys and clocks checked for
%% repair. For full, nval, replication - the tree comparison is across all
%% time, but the keys and clocks checked for repair are constrained by the time
%% period.
%% Once deltas are outside of the last hour, an hourcheck can do
%% nothing to resolve the data, but will still consume resource.
{mapping, "ttaaefs_hourcheck", "riak_kv.ttaaefs_hourcheck", [
{datatype, integer},
{default, 0}
]}.

%% @doc How many times per 24hour period should the last 24-hours of data be
%% checked to confirm it is fully sync'd. This si relevant for per-bucket
%% sync scope only, and will be ignored if all sync is enabled
%% checked to confirm it is fully sync'd.
%% For per-bucket replication, the tree comparison prompted by this will be
%% constrained by the time period, as well as the keys and clocks checked for
%% repair. For full, nval, replication - the tree comparison is across all
%% time, but the keys and clocks checked for repair are constrained by the time
%% period.
%% Once deltas are outside of the last hour, a daycheck can do
%% nothing to resolve the data, but will still consume resource.
{mapping, "ttaaefs_daycheck", "riak_kv.ttaaefs_daycheck", [
{datatype, integer},
{default, 0}
]}.

%% @doc How many times per 24hour period should the a range_check be run. The
%% range_check is intended to be a smart check, in that it will:
%% - use a last_modified range starting from the last successful check as its
%% range if the last check was successful (i.e. showed the clusters to be
%% in sync);
%% - use a range identified by the last check (a last modified range, and
%% perhaps also a specific Bucket) if a range to limit the issues has been
%% identified by a previous failure
%% - Not run at all if the clusters are out of sync and no range has been
%% discovered (this may be the case when running on a sink which is behind a
%% source cluster).
%% For full, nval, sync operations the range is only relevant to the search
%% for objects to repair - the tree comparison is always between all data for
%% that nval.
{mapping, "ttaaefs_rangecheck", "riak_kv.ttaaefs_rangecheck", [
{datatype, integer},
{default, 0}
]}.

%% @doc If Tictac AAE full-sync discovers keys to be repaired, should each key
%% that is repaired be logged
{mapping, "ttaaefs_logrepairs", "riak_kv.ttaaefs_logrepairs", [
Expand All @@ -993,6 +1062,16 @@
{commented, enabled}
]}.

%% @doc If Tictac AAE sees difference in trees (for nval-based full
%% comparisons) only, should it attempt to repair those trees as well as
%% repairing any deltas. Enabling this setting will change the concurrency
%% of fetch_clock_nval queries run to find repairs.
{mapping, "aae_fetchclocks_repair", "riak_kv.aae_fetchclocks_repair", [
{datatype, {flag, enabled, disabled}},
{default, disabled},
{commented, enabled}
]}.

%% @doc Enable this node to act as a real-time replication source
{mapping, "replrtq_enablesrc", "riak_kv.replrtq_enablesrc", [
{datatype, {flag, enabled, disabled}},
Expand Down
12 changes: 6 additions & 6 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,17 @@
]}.

{deps, [
{riak_core, {git, "git://github.com/basho/riak_core.git", {tag, "riak_kv-3.0.0"}}},
{riak_core, {git, "git://github.com/basho/riak_core.git", {branch, "develop-3.0"}}},
{sidejob, {git, "git://github.com/basho/sidejob.git", {tag, "2.1.0"}}},
{bitcask, {git, "git://github.com/basho/bitcask.git", {tag, "2.1.0"}}},
{redbug, {git, "https://github.com/massemanet/redbug", {tag, "1.2.1"}}},
{recon, {git, "https://github.com/ferd/recon", {tag, "2.4.0"}}},
{sext, {git, "git://github.com/uwiger/sext.git", {tag, "1.4.1"}}},
{riak_pipe, {git, "git://github.com/basho/riak_pipe.git", {tag, "riak_kv-3.0.0"}}},
{riak_pipe, {git, "git://github.com/basho/riak_pipe.git", {branch, "develop-3.0"}}},
{riak_dt, {git, "git://github.com/basho/riak_dt.git", {tag, "riak_kv-3.0.0"}}},
{riak_api, {git, "git://github.com/basho/riak_api.git", {tag, "riak_kv-3.0.1"}}},
{riak_api, {git, "git://github.com/basho/riak_api.git", {branch, "develop-3.0"}}},
{hyper, {git, "git://github.com/basho/hyper", {tag, "1.1.0"}}},
{leveled, {git, "https://github.com/martinsumner/leveled.git", {tag, "1.0.0"}}},
{kv_index_tictactree, {git, "https://github.com/martinsumner/kv_index_tictactree.git", {tag, "0.9.16"}}},
{riakhttpc, {git, "git://github.com/basho/riak-erlang-http-client", {tag, "riak_kv-3.0.1"}}}
{leveled, {git, "https://github.com/martinsumner/leveled.git", {branch, "develop-3.0"}}},
{kv_index_tictactree, {git, "https://github.com/martinsumner/kv_index_tictactree.git", {branch, "develop-3.0"}}},
{riakhttpc, {git, "git://github.com/basho/riak-erlang-http-client", {branch, "develop-3.0"}}}
]}.
49 changes: 43 additions & 6 deletions src/riak_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
-export([stream_list_buckets/1,stream_list_buckets/2,
stream_list_buckets/3,stream_list_buckets/4, stream_list_buckets/5]).
-export([get_index/4,get_index/3]).
-export([aae_fold/2]).
-export([ttaaefs_fullsync/2, ttaaefs_fullsync/3]).
-export([aae_fold/1, aae_fold/2]).
-export([ttaaefs_fullsync/1, ttaaefs_fullsync/2, ttaaefs_fullsync/3]).
-export([hotbackup/4]).
-export([stream_get_index/4,stream_get_index/3]).
-export([set_bucket/3,get_bucket/2,reset_bucket/2]).
Expand All @@ -49,6 +49,7 @@
-export([for_dialyzer_only_ignore/3]).
-export([ensemble/1]).
-export([fetch/2, push/4]).
-export([remove_node_from_coverage/0, reset_node_for_coverage/0]).

-compile({no_auto_import,[put/2]}).
%% @type default_timeout() = 60000
Expand Down Expand Up @@ -834,6 +835,11 @@ stream_list_buckets(Filter, Timeout, Client, Type,
{ok, ReqId}.


-spec aae_fold(riak_kv_clusteraae_fsm:query_definition())
-> {ok, any()}|{error, timeout}|{error, Err :: term()}.
aae_fold(Query) ->
aae_fold(Query, riak_client:new(node(), adhoc_aaefold)).

%% @doc
%%
%% Run a cluster-wide AAE query - which can either access cached AAE
Expand All @@ -845,11 +851,21 @@ aae_fold(Query, {?MODULE, [Node, _ClientId]}) ->
Me = self(),
ReqId = mk_reqid(),
TimeOut = ?DEFAULT_FOLD_TIMEOUT,
riak_kv_clusteraae_fsm_sup:start_clusteraae_fsm(Node,
[{raw, ReqId, Me},
[Query, TimeOut]]),
wait_for_fold_results(ReqId, TimeOut).
Q0 = riak_kv_clusteraae_fsm:convert_fold(Query),
case riak_kv_clusteraae_fsm:is_valid_fold(Q0) of
true ->
riak_kv_clusteraae_fsm_sup:start_clusteraae_fsm(Node,
[{raw, ReqId, Me},
[Query, TimeOut]]),
wait_for_fold_results(ReqId, TimeOut);
false ->
{error, "Invalid AAE fold definition"}
end.


-spec ttaaefs_fullsync(riak_kv_ttaaefs_manager:work_item()) -> ok.
ttaaefs_fullsync(WorkItem) ->
ttaaefs_fullsync(WorkItem, 900).

%% @doc
%% Prompt a full-sync based on the current configuration, and using either
Expand Down Expand Up @@ -878,7 +894,28 @@ ttaaefs_fullsync(WorkItem, SecsTimeout, Now) ->
wait_for_reqid(ReqId, SecsTimeout * 1000).


-spec participate_in_coverage(boolean()) -> ok.
participate_in_coverage(Participate) ->
F =
fun(Ring, _) ->
{new_ring,
riak_core_ring:update_member_meta(node(),
Ring,
node(),
participate_in_coverage,
Participate)}
end,
{ok, _FinalRing} = riak_core_ring_manager:ring_trans(F, undefined),
ok.

-spec remove_node_from_coverage() -> ok.
remove_node_from_coverage() ->
participate_in_coverage(false).

-spec reset_node_for_coverage() -> ok.
reset_node_for_coverage() ->
participate_in_coverage(
app_helper:get_env(riak_core, participate_in_coverage)).

%% @doc
%% Run a hot backup - returns {ok, true} if successful
Expand Down
Loading