This is rabit's tutorial, a Reliable Allreduce and Broadcast Interface.
All the example codes are in the guide folder of the project.
To run the examples locally, you will need to build them with make
.
List of Topics
- What is Allreduce
- Common Use Case
- Use Rabit API
- Compile Programs with Rabit
- Running Rabit Jobs
- Fault Tolerance
The main methods provided by rabit are Allreduce and Broadcast. Allreduce performs reduction across different computation nodes, and returns the result to every node. To understand the behavior of the function, consider the following example in basic.cc (there is a python example right after this if you are more familiar with python).
#include <rabit.h>
using namespace rabit;
const int N = 3;
int main(int argc, char *argv[]) {
int a[N];
rabit::Init(argc, argv);
for (int i = 0; i < N; ++i) {
a[i] = rabit::GetRank() + i;
}
printf("@node[%d] before-allreduce: a={%d, %d, %d}\n",
rabit::GetRank(), a[0], a[1], a[2]);
// allreduce take max of each elements in all processes
Allreduce<op::Max>(&a[0], N);
printf("@node[%d] after-allreduce-max: a={%d, %d, %d}\n",
rabit::GetRank(), a[0], a[1], a[2]);
// second allreduce that sums everything up
Allreduce<op::Sum>(&a[0], N);
printf("@node[%d] after-allreduce-sum: a={%d, %d, %d}\n",
rabit::GetRank(), a[0], a[1], a[2]);
rabit::Finalize();
return 0;
}
You can run the example using the rabit_demo.py script. The following command starts the rabit program with two worker processes.
../tracker/rabit_demo.py -n 2 basic.rabit
This will start two processes, one process with rank 0 and the other with rank 1, both processes run the same code.
The rabit::GetRank()
function returns the rank of current process.
Before the call to Allreduce, process 0 contains the array a = {0, 1, 2}
, while process 1 has the array
a = {1, 2, 3}
. After the call to Allreduce, the array contents in all processes are replaced by the
reduction result (in this case, the maximum value in each position across all the processes). So, after the
Allreduce call, the result will become a = {1, 2, 3}
.
Rabit provides different reduction operators, for example, if you change op::Max
to op::Sum
,
the reduction operation will be a summation, and the result will become a = {1, 3, 5}
.
You can also run the example with different processes by setting -n to different values.
If you are more familiar with python, you can also use rabit in python. The same example as before can be found in basic.py:
import numpy as np
import rabit
rabit.init()
n = 3
rank = rabit.get_rank()
a = np.zeros(n)
for i in xrange(n):
a[i] = rank + i
print '@node[%d] before-allreduce: a=%s' % (rank, str(a))
a = rabit.allreduce(a, rabit.MAX)
print '@node[%d] after-allreduce-max: a=%s' % (rank, str(a))
a = rabit.allreduce(a, rabit.SUM)
print '@node[%d] after-allreduce-sum: a=%s' % (rank, str(a))
rabit.finalize()
You can run the program using the following command
../tracker/rabit_demo.py -n 2 basic.py
Broadcast is another method provided by rabit besides Allreduce. This function allows one node to broadcast its local data to all other nodes. The following code in broadcast.cc broadcasts a string from node 0 to all other nodes.
#include <rabit.h>
using namespace rabit;
const int N = 3;
int main(int argc, char *argv[]) {
rabit::Init(argc, argv);
std::string s;
if (rabit::GetRank() == 0) s = "hello world";
printf("@node[%d] before-broadcast: s=\"%s\"\n",
rabit::GetRank(), s.c_str());
// broadcast s from node 0 to all other nodes
rabit::Broadcast(&s, 0);
printf("@node[%d] after-broadcast: s=\"%s\"\n",
rabit::GetRank(), s.c_str());
rabit::Finalize();
return 0;
}
The following command starts the program with three worker processes.
../tracker/rabit_demo.py -n 3 broadcast.rabit
Besides strings, rabit also allows to broadcast constant size array and vectors.
The counterpart in python can be found in broadcast.py. Here is a snippet so that you can get a better sense of how simple is to use the python library:
import rabit
rabit.init()
n = 3
rank = rabit.get_rank()
s = None
if rank == 0:
s = {'hello world':100, 2:3}
print '@node[%d] before-broadcast: s=\"%s\"' % (rank, str(s))
s = rabit.broadcast(s, 0)
print '@node[%d] after-broadcast: s=\"%s\"' % (rank, str(s))
rabit.finalize()
Many distributed machine learning algorithms involve splitting the data into different nodes, computing statistics locally, and finally aggregating them. Such workflow is usually done repetitively through many iterations before the algorithm converges. Allreduce naturally meets the structure of such programs, common use cases include:
- Aggregation of gradient values, which can be used in optimization methods such as L-BFGS.
- Aggregation of other statistics, which can be used in KMeans and Gaussian Mixture Models.
- Find the best split candidate and aggregation of split statistics, used for tree based models.
Rabit is a reliable and portable library for distributed machine learning programs, that allow programs to run reliably on different platforms.
This section introduces topics about how to use rabit API. You can always refer to API Documentation for definition of each functions. This section trys to gives examples of different aspectes of rabit API.
The following code illustrates the common structure of a rabit program. This is an abstract example, you can also refer to wormhole for an example implementation of kmeans algorithm.
#include <rabit.h>
int main(int argc, char *argv[]) {
...
rabit::Init(argc, argv);
// sync on expected model size before load checkpoint, if we pass rabit_bootstrap_cache=true
rabit::Allreduce<rabit::op::Max>(&model.size(), 1);
// load the latest checked model
int version = rabit::LoadCheckPoint(&model);
// initialize the model if it is the first version
if (version == 0) model.InitModel();
// the version number marks the iteration to resume
for (int iter = version; iter < max_iter; ++iter) {
// at this point, the model object should allow us to recover the program state
...
// each iteration can contain multiple calls of allreduce/broadcast
rabit::Allreduce<rabit::op::Max>(&data[0], n);
...
// checkpoint model after one iteration finishes
rabit::CheckPoint(&model);
}
rabit::Finalize();
return 0;
}
Besides the common Allreduce and Broadcast functions, there are two additional functions: LoadCheckPoint
and CheckPoint
. These two functions are used for fault-tolerance purposes.
As mentioned before, traditional machine learning programs involve several iterations. In each iteration, we start with a model, make some calls
to Allreduce or Broadcast and update the model. The calling sequence in each iteration does not need to be the same.
- When the nodes start from the beginning (i.e. iteration 0),
LoadCheckPoint
returns 0, so we can initialize the model. CheckPoint
saves the model after each iteration.- Efficiency Note: the model is only kept in local memory and no save to disk is performed when calling Checkpoint
- When a node goes down and restarts,
LoadCheckPoint
will recover the latest saved model, and - When a node goes down, the rest of the nodes will block in the call of Allreduce/Broadcast and wait for the recovery of the failed node until it catches up.
Please see the Fault Tolerance section to understand the recovery procedure executed by rabit.
Allreduce is one of the most important function provided by rabit. You can call allreduce by specifying the reduction operator, pointer to the data and size of the buffer, as follows
Allreduce<operator>(pointer_of_data, size_of_data);
This is the basic use case of Allreduce function. It is common that user writes the code to prepare the data needed into the data buffer, pass the data to Allreduce function, and get the reduced result. However, when a node restarts from failure, we can directly recover the result from other nodes(see also Fault Tolerance) and the data preparation procedure no longer necessary. Rabit Allreduce add an optional parameter preparation function to support such scenario. User can pass in a function that corresponds to the data preparation procedure to Allreduce calls, and the data preparation function will only be called when necessary. We use lazy_allreduce.cc as an example to demonstrate this feature. It is modified from basic.cc, and you can compare the two codes.
#include <rabit.h>
using namespace rabit;
const int N = 3;
int main(int argc, char *argv[]) {
int a[N] = {0};
rabit::Init(argc, argv);
// lazy preparation function
auto prepare = [&]() {
printf("@node[%d] run prepare function\n", rabit::GetRank());
for (int i = 0; i < N; ++i) {
a[i] = rabit::GetRank() + i;
}
};
printf("@node[%d] before-allreduce: a={%d, %d, %d}\n",
rabit::GetRank(), a[0], a[1], a[2]);
// allreduce take max of each elements in all processes
Allreduce<op::Max>(&a[0], N, prepare);
printf("@node[%d] after-allreduce-sum: a={%d, %d, %d}\n",
rabit::GetRank(), a[0], a[1], a[2]);
// rum second allreduce
Allreduce<op::Sum>(&a[0], N);
printf("@node[%d] after-allreduce-max: a={%d, %d, %d}\n",
rabit::GetRank(), a[0], a[1], a[2]);
rabit::Finalize();
return 0;
}
Here we use features of C++11 because the lambda function makes things much shorter.
There is also C++ compatible callback interface provided in the API.
You can compile the program by typing make lazy_allreduce.mock
. We link against the mock library so that we can see
the effect when a process goes down. You can run the program using the following command
../tracker/rabit_demo.py -n 2 lazy_allreduce.mock mock=0,0,1,0
The additional arguments mock=0,0,1,0
will cause node 0 to kill itself before second call of Allreduce (see also mock test).
You will find that the prepare function's print is only executed once and node 0 will no longer execute the preparation function when it restarts from failure.
You can also find python version of the example in lazy_allreduce.py, and run it using the followin command
../tracker/rabit_demo.py -n 2 lazy_allreduce.py mock=0,0,1,0
Since lazy preparation function may not be called during execution. User should be careful when using this feature. For example, a possible mistake could be putting some memory allocation code in the lazy preparation function, and the computing memory was not allocated when lazy preparation function is not called. The example in lazy_allreduce.cc provides a simple way to migrate normal prepration code(basic.cc) to lazy version: wrap the preparation code with a lambda function, and pass it to allreduce.
Common machine learning algorithms usually involves iterative computation. As mentioned in the section (Structure of a Rabit Program),
user can and should use Checkpoint to save
the progress so far, so that when a node fails, the latest checkpointed model can be loaded.
There are two model arguments you can pass to Checkpoint and LoadCheckpoint: global_model
and local_model
:
global_model
refers to the model that is commonly shared across all the nodes- For example, the centriods of clusters in kmeans is shared across all nodes
local_model
refers to the model that is specifically tied to the current node- For example, in topic modeling, the topic assignments of subset of documents in current node is local model
Because the different nature of the two types of models, different strategy will be used for them.
global_model
is simply saved in local memory of each node, while local_model
will replicated to some other
nodes (selected using a ring replication strategy). The checkpoint is only saved in the memory without touching the disk which makes rabit programs more efficient.
User is encouraged to use global_model
only when is sufficient for better efficiency.
To enable a model class to be checked pointed, user can implement a serialization interface. The serialization interface already provide serialization functions of STL vector and string. For python API, user can checkpoint any python object that can be pickled.
There is a special Checkpoint function called LazyCheckpoint,
which can be used for global_model
only cases under certain condition.
When LazyCheckpoint is called, no action is taken and the rabit engine only remembers the pointer to the model.
The serialization will only happen when another node fails and the recovery starts. So user basically pays no extra cost calling LazyCheckpoint.
To use this function, the user need to ensure the model remain unchanged until the last call of Allreduce/Broadcast in the current version finishes.
So that when recovery procedure happens in these function calls, the serialized model will be the same.
For example, consider the following calling sequence
LazyCheckPoint, code1, Allreduce, code2, Broadcast, code3, LazyCheckPoint
The user must only change the model in code3. Such condition can usually be satiesfied in many scenarios, and user can use LazyCheckpoint to further improve the efficiency of the program.
Rabit is a portable library, to use it, you only need to include the rabit header file.
- You will need to add the path to ../include to the header search path of the compiler
- Solution 1: add
-I/path/to/rabit/include
to the compiler flag in gcc or clang - Solution 2: add the path to the environment variable CPLUS_INCLUDE_PATH
- Solution 1: add
- You will need to add the path to ../lib to the library search path of the compiler
- Solution 1: add
-L/path/to/rabit/lib
to the linker flag - Solution 2: add the path to environment variable LIBRARY_PATH AND LD_LIBRARY_PATH
- Solution 1: add
- Link against lib/rabit.a
- Add
-lrabit
to the linker flag
- Add
The procedure above allows you to compile a program with rabit. The following two sections contain additional options you can use to link against different backends other than the normal one.
You can link against rabit_mpi.a
instead of using MPI Allreduce, however, the resulting program is backed by MPI and
is not fault tolerant anymore.
- Simply change the linker flag from
-lrabit
to-lrabit_mpi
- The final linking needs to be done by mpi wrapper compiler
mpicxx
If you want to use a mock to test the program in order to see the behavior of the code when some nodes go down, you can link against rabit_mock.a
.
- Simply change the linker flag from
-lrabit
to-lrabit_mock
The resulting rabit mock program can take in additional arguments in the following format
mock=rank,version,seq,ndeath
The four integers specify an event that will cause the program to commit suicide
(exit with -2)
- rank specifies the rank of the node to kill
- version specifies the version (iteration) of the model where you want the process to die
- seq specifies the sequence number of the Allreduce/Broadcast call since last checkpoint, where the process will be killed
- ndeath specifies how many times this node died already
For example, consider the following script in the test case
../tracker/rabit_demo.py -n 10 test_model_recover 10000\
mock=0,0,1,0 mock=1,1,1,0 mock=1,1,1,1
- The first mock will cause node 0 to exit when calling the second Allreduce/Broadcast (seq = 1) in iteration 0
- The second mock will cause node 1 to exit when calling the second Allreduce/Broadcast (seq = 1) in iteration 1
- The third mock will cause node 1 to exit again when calling second Allreduce/Broadcast (seq = 1) in iteration 1
- Note that ndeath = 1 means this will happen only if node 1 died once, which is our case
Rabit is a portable library that can run on multiple platforms. All the rabit jobs can be submitted using dmlc-tracker
This section introduces how fault tolerance works in rabit. The following figure shows how rabit deals with failures.
The scenario is as follows:
- Node 1 fails between the first and second call of Allreduce after the second checkpoint
- The other nodes wait in the call of the second Allreduce in order to help node 1 to recover.
- When node 1 restarts, it will call
LoadCheckPoint
, and get the latest checkpoint from one of the existing nodes. - Then node 1 can start from the latest checkpoint and continue running.
- When node 1 calls the first Allreduce again, as the other nodes already know the result, node 1 can get it from one of them.
- When node 1 reaches the second Allreduce, the other nodes find out that node 1 has catched up and they can continue the program normally.
This fault tolerance model is based on a key property of Allreduce and Broadcast: All the nodes get the same result after calling Allreduce/Broadcast. Because of this property, any node can record the results of history Allreduce/Broadcast calls. When a node is recovered, it can fetch the lost results from some alive nodes and rebuild its model.
The checkpoint is introduced so that we can discard the history results of Allreduce/Broadcast calls before the latest checkpoint. This saves memory consumption used for backup. The checkpoint of each node is a model defined by users and can be split into 2 parts: a global model and a local model. The global model is shared by all nodes and can be backed up by any nodes. The local model of a node is replicated to some other nodes (selected using a ring replication strategy). The checkpoint is only saved in the memory without touching the disk which makes rabit programs more efficient. The strategy of rabit is different from the fail-restart strategy where all the nodes restart from the same checkpoint when any of them fail. In rabit, all the alive nodes will block in the Allreduce call and help the recovery. To catch up, the recovered node fetches its latest checkpoint and the results of Allreduce/Broadcast calls after the checkpoint from some alive nodes.
This is just a conceptual introduction to rabit's fault tolerance model. The actual implementation is more sophisticated, and can deal with more complicated cases such as multiple nodes failure and node failure during recovery phase.
In certain cases, rabit cluster may suffer lack of resources to retry failed workers. Thanks to fault tolerant assumption with infinite retry, it might cause entire cluster hang infinitely. We introduce sidecar thread which runs when rabit fault tolerant runtime observed allreduce/broadcast errors. By default, it will wait for 30 mins before all workers program exit. User can opt-in this feature and change treshold by passing rabit_timeout=true and rabit_timeout_sec=x (in seconds).