This is work prompted initially by the need to support a coverage fold for active-anti-entropy and multi-data-centre replication. To support this feature there is a need to run a coverage fold across objects (or heads in the case of leveled), a fold that can potentially be throttled by using the core node_worker_pool rather than the vnode_worker_pool, and a fold which can work over indexes as well as keys/objects, and where the accumulator for the fold may not be a simple list but may be a more complex object. It may be that such a fold capability may be more generically useful beyond anti-entropy - such as distribute map-like functions across objects or indexes.
There are three potential starting points in Riak for evolving such a feature:
-
The
riak_core_coverage_fsm
behaviour, currently used by secondary index queries and list_keys queries; -
The
riak_pipe
framework written to support Map/Reduce operations in Riak KV (which is already optimised to re-use the coverage FSM when the Mp/Reduce operation is for a defined range on the special $bucket or $key indexes); -
The
riak_kv_sweeper
framework that is as yet unreleased, but is available on the Riak develop branch - which is design to allow multiple functions to be combined on the same fold, with concurrency management and throttling of folds in-built.
This attempt to support a coverage fold will be based on the riak_core_coverage_fsm
behaviour as used by secondary index queries. This code is considered at the start of the exercise to be easier to change, and is more hardened through use in real-world environments (whereas M/R is generally not recommended for use in production systems, and sweeper is not yet released).
To better understand what is there to be modified, here is a run-through of how coverage queries currently work, focused on the secondary index query use case. The general path coverage queries take are:
-
After filtering and processing by the API to produce the query a
riak_client
is started andget_index
is called. -
The riak_client will need to start a
riak_kv_index_fsm
which follows theriak_core_coverage_fsm
behaviour. Each riak_kv node is started withriak_kv_index_fsm_sup
from which individual query FSMs can be spawned. There is a 1-1 relationship between queries and individual query FSMs - one FSM is initialised in the cluster at the start of the query (on the node which received the query request) and that FSM is closed by completion of the query. -
The
riak_core_coverage_fsm
behaviour requires the callback module (in this case theriak_kv_index_fsm
) to implement aninit
function to generate the query, the options required for the coverage plan, and the initial ModState. -
The
riak_core_coverage_fsm
behaviour requires the callback module (in this case theriak_kv_index_fsm
) to also provide a plan function (which allows the ModState to be changed based on the vnodes involved in the plan), a process_reuslts function to take the results returned from the vnode, and a finish function to tidy-up and send a response back to theriak_client
at the end. -
The
riak_core_coverage_fsm
calls Mod:init during its initialisation. There are four inputs to be passed intoriak_core_coverage_plan:create_plan
in order to produce a coverage plan, which are taken from the outputs of Mod:init;-
VnodeSelector
which can be eitherall
orallup
and is used should there be insufficient active vnodes available to make a coverage plan - if the selector is set toall
, this will prompt an error, whereasallup
will do a best endeavours query using as many vnodes as can be accessed. -
NVal
which is the n-val of the bucket over which the query will be run (normally 3). -
PrimaryVNodeCoverage
which is in effect the 'r' value of the query (e.g. normally 1). If this is more than 1, the callback module will need to handle de-duplication of the results, this is not managed in theriak_core_coverage_fsm
behaviour. -
NodeCheckService
is the service registered with riak_core for which vnode health will be determined (always set toriak_kv
for Riak KV folds).
-
-
The coverage plan will be based on an offset K where 1 <= K <= NVal. The offset is not passed into the coverage plan, but calculated within the coverage plan by taking the mod n-val result of the pseudo random request ID (which is generated by the
riak_client
). Note that this is not necessarily desirable behaviour as:-
The random distribution of queries may lead to inefficient caching in the database as repeated queries to nearby index ranges will not hit the same vnodes and therefore the same backend or page-based caches. There is a trade-off whereby the advantages of caching are outweighed by the the imbalances of poorly distributed load across vnodes - and in the general case this is probably a sensible trade to make.
-
When the coverage query is for intra-cluster ant-entropy comparison, it may be desirable to control the offset (so that the requestor know that the comparison is between different offsets, and all offsets are being compared eventually).
-
-
The ring is discovered by the coverage_plan module by calling
riak_core_ring_manager:get_chash_bin/0
, and this is combined with knowledge of the offline nodes found by callingriak_core_apl:offline_owners/2
. This information, with the offset, and the plan inputs - is sufficient to produce a coverage plan. -
The result of the coverage plan is a tuple of two lists:
{[{VnodeIdx, Node}], [{VNodeIdx, KeySpaceIdexes}]}
. The first part is the vnodes and nodes over which the query should be run, the second part (known as filter vnodes) highlights that for some of those vnodes there is a need to only look at certain partitions to avoid duplication of results. In a healthy cluster, where the ring-size is divisible by the n-val, there should be no filter vnodes. -
The result of the plan is then sent with the request to
riak_core_vnode_master:coverage/5
the function which should distributes the request to all the coverage vnodes. Before doing this distribution a little text transition dance is performed between functions to go from the passed in module nameriak_kv_vnode_master
to the registered vnode identity (of the formproxy_riak_kv_vnode_0
where in this case 0 is the partition ID). -
With the registered ID of the riak_core_vnode an event will be sent to be handled by
riak_core_vnode:vnode_coverage/4
which will in turn callriak_kv_vnode:handle_coverage
, that will callhandle_coverage_fold
and thenBackendMod:fold_keys/4
orBackendMod:fold_objects/4
. -
The response from the backend call will then be passed to a member of the riak_core
vnode_worker_pool
if the response was{async, Folder}
, or to a member of the riak_corenode_worker_pool
if the answer was{queue, Folder}
(although not in mainstream Basho's version of Riak, where only thevnode_worker_pool
is supported). -
The results of the worker calling the Folder function, are sent back to the FSM marshalling the query to be handled under
CallbackMod:process_results
. -
Once all vnodes have responded, then the
CallbackMod:finish
is called, and the finish process should send the completed and merged results back to the riak_client, which is waiting in a receive loop until it either receives a{ReqID, done}
or reaches a timeout.
The intention is to add a new riak_kv_mapfold_fsm
that uses the riak_core_coverage_fsm
behaviour. The expected differences between this and other coverage FSM implementations are:
-
The query request will include a fold module name, and that module will need to contain functions to
generate_filter
(e.g. to filter for only specific segments),generate_acc
(e.g. to produce a new TicTac tree to be used as the accumulator for the query - rather than a list),generate_mergefun
(e.g. generate a function to merge two TicTac tree together when inprocess_results/3
),generate_objectfoldfun
(e.g. generate a function to add the Key and Value to the accumulator) andstate_needs
(demand backend capabilities necessary to complete the fold) when given the query options. -
The query request can contain a
sample_only
query which will cause the query to be run on a subset of vnodes/partitions that is below full coverage - when a sample is sufficient to get a desirable result (for example on a fold to assess average object size in the database). -
The query request can enforce the offset to be used within the coverage plan.
-
The fold module should return
fold_heads
fromstate_needs/1
when the query can be returned efficiently from a proxy_object rather than a full object. The fold module should returnstandard_hash_size_only
if it has no requirements beyond a 'standard' view of the object hash and the size of the object. This will allow for the backend to enhance performance through pre-computation where possible. -
The query request can either be an object query or an index query - and the fold function should reflect the change in inputs for these queries (e.g. {B, K, Obj} or {B, Idx, T, K}).
-
All accumulators will need to be of fixed size (no fold buffers will be supported) and have associative merge functions.