-
Notifications
You must be signed in to change notification settings - Fork 391
Home
Riak Core is the distributed systems framework upon which Riak, Basho's distributed Key/Value datastore, is built. This Wiki is meant to help you understand the riak core concepts and how they are connected to the source code modules. There is also more detailed information about the riak_core modules and functions at http://basho.github.com/riak_core, the edoc pages generated straight from the source code. For more information, check out:
- Mailing list: http://lists.basho.com/mailman/listinfo/riak-core_lists.basho.com
- http://basho.com/where-to-start-with-riak-core/
Core is written in Erlang, using its OTP framework. You will need to understand Erlang and OTP to understand how Riak Core works. The Erlang/OTP in a Nutshell page is a good starting point.
Riak Core handles the distribution of ownership among a group of nodes in a cluster. An abstract space is split into a given number of partitions, each of which is owned by an entity called a virtual node. We normally refer to this space as a Ring. The virtual nodes are spread among all the nodes in the cluster. The number of partitions is fixed, so adding and removing nodes would cause the virtual nodes to be re-distributed. For example, your application may start with 64 partitions in a single node, and hence 64 virtual nodes running in that single node. When another node is added ownership would be split, and you would end up with 2 nodes each one running 32 virtual nodes, and so on.
A typical riak_core application would handle incoming requests by determining the partition they correspond to and routing them to the corresponding virtual node. Requests could also span many or all partitions. In core, that is handled by coverage queries.
Information about existing partitions and the virtual nodes responsible to handle them is stored in a data structure called the Ring. Each node in the cluster has a copy of this ring. The number of partitions in the ring is decided at installation time and stays fixed. There is current work being done to allow it to change size. We also store data about buckets in it. This might change in the future as it limits scalability. Information about ongoing ownership transitions are also stored in the ring, as well as the designated Claimant node. The ring structure is defined in riak_core_ring (it's the chstate_v2 record). Also see riak_core_ring_manager.
Each virtual node (or vnode) is a process responsible for a given partition in the ring. A riak_core node will host any number of vnodes, depending on the size of the ring and the number of nodes in the cluster. Basically, around (size of ring) / (number of nodes). What vnodes do will vary per application. In general, they handle commands that need to be executed for the partition they own. Besides the main vnode process, riak_core provides a pool of workers that can execute asynchronous work on behalf of it.
See more about their API in the vnode page
Buckets are nothing but namespaces with configuration properties. Your application may use only one, or whatever number works best. See riak_core_bucket
Changes to the ring need to be propagated throughout the cluster. This is what the gossip protocol is for. The main module involved is riak_core_gossip.
Ownership of a partition may be transferred from one virtual node to another in a different node when nodes join or are removed from the cluster, and under certain failure scenarios to guarantee high availability. If a node goes down unexpectedly, the partitions owned by the virtual nodes it contained will be temporarily handled by virtual nodes in other physical nodes. If the original node comes back up, ownership will eventually be transferred back to the original owners, also called primary virtual nodes. The virtual nodes that took over ownership in that scenario are called secondary virtual nodes. The process by which this ownership is negotiated and any relevant data is transferred to accomplish that is what we call a handoff. Transfer of ownership may also occur when adding or removing physical nodes to the cluster. See all the gory details in the handoff page
At any given point, one node from the cluster is assigned the task of coordinating the addition and removal of nodes to the cluster. This process also tries to keep the partition assignment nice and balanced when these cluster changes occur. That is, as added nodes claim their share of the partitions and nodes leaving relinquish some, ideally the cluster should spread them evenly among the new set of nodes. If the claimant node goes down, cluster changes will not be possible until a new one is chosen or the claimant node is marked as down so another one can take the role.
The claimant process is an Erlang gen_server implemented in riak_core_claimant.erl. The algorithm used to reassign partitions when the cluster changes is in riak_core_claim.erl.
Sometimes not all nodes in a cluster have the same features. The most common scenario is a rolling upgrade. Nodes may be taken down, upgraded to a new version of the software and put back in the cluster. The new software may contain new features that the old version can not understand, so they should not be activated. But they should be used as soon as all nodes are upgraded. This is resolved with the capabilities negotiation mechanism.
See riak_core_capability.
- For usage of Cluster Metadata, see Cluster Metadata
- For a better understanding of how it works, see Cluster Metadata Internals
- Cluster Metadata uses Riak Core Broadcast to distribute data.
When the strong consistency subsystem is enabled (see riak_ensemble), the claimant is responsible for detecting node additions and removals and updating the ensemble cluster node list. This happens on the claimant's periodic tick
Additionally, care is taken to avoid removing nodes until they are not part of the ensemble cluster. This happens in riak_core_claimant:maybe_remove_exiting/2, where it checks if members of the root ensemble still exist on the exiting nodes.
At the top, we have the riak_core_sup module. Underneath that supervisor, we have the following process hierarchy:
- riak_core_sysmon_minder
- riak_core_vnode_sup
- riak_core_vnode
- [riak_core_vnode_worker_pool](https://github.com/basho/riak_core/blob/master/src/riak_core_vnode_worker_pool.erl)
- [riak_core_vnode_worker](https://github.com/basho/riak_core/blob/master/src/riak_core_vnode_worker.erl)
- ... (more workers)
- ... (more vnodes and their workers)
- riak_core_eventhandler_sup
- riak_core_ring_events
- riak_core_ring_manager
- riak_core_vnode_proxy_sup
- riak_core_vnode_proxy
- ... (more vnode proxies)
- riak_core_node_watcher_events
- riak_core_node_watcher
- riak_core_vnode_manager
- riak_core_capability
- riak_core_handoff_sup
- riak_core_handoff_receiver_sup
- riak_core_handoff_sender_sup
- riak_core_handoff_listener_sup
- riak_core_handoff_manager
- riak_core_gossip
- riak_core_claimant
- riak_core_stat_sup
- folsom_sup
- riak_core_stats_sup
- riak_core_stat_calc_sup
Life begins at the start method of riak_core_app. That is where the top of the supervisor hierarchy is started. At that point the OTP applications listed as dependencies of riak_core in the rebar configuration file are already running.
During the lifetime of a cluster, nodes are added or removed in different ways. These operations are done in two stages. First, the additions/removals are staged and core generates a plan of what to do when the changes take effect. This will happen once you commit the plan. You can find entry points for these cluster operations in riak_core_console.erl, which are typically directly called by an admin shell script in your application.
The staging step in this two step process is done by the staged_join functions in riak_core, which simply updates the cluster change plan.
TBD What happens the previous ring and partitions of the joining node?
The staging step in this two step process is done by the stage_leave functions in riak_core_console.erl, which simply updates the cluster change plan. Besides the 'normal' case, there is also a force remove option, which will mark the node as 'invalid'. This will prevent any handoffs from originating from this node.
The changes are committed by calling commit_staged in riak_core_console.erl. This forwards to the claimant node to coordinate the additions and removals, which it does by updating the member information in its ring and forwarding the ring to other nodes by gossip. Handoff of ownership will only start happening once all the nodes have seen the new ring. When the last node receiving the new ring verifies that all nodes have seen this new ring, it will start the process. That is done in the riak_core_vnode_manager (see the handle_cast function matching the ring_changed event) of that last node as it receives a ring update event from the riak_core_gossip process. However, if the configured handoff concurrency is not high enough, only some of these handoffs will start. A periodic tick should trigger a re-check in the future, and more handoffs will start up to the allowed limit. It is also possible to trigger this manually with the force_handoffs function in riak_core_vnode_manager.