Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[rabit harden] fix rabit tests #81

Merged
merged 20 commits into from
Mar 14, 2019
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ env:
- TASK=test CXX=g++

# dependent apt packages
dist: xenial
addons:
apt:
packages:
Expand All @@ -21,9 +22,10 @@ addons:
- libcurl4-openssl-dev
- unzip
- python-numpy
- gcc-5

before_install:
- git clone https://github.com/dmlc/dmlc-core
- git clone https://github.com/chenqin/dmlc-core
- export TRAVIS=dmlc-core/scripts/travis/
- source ${TRAVIS}/travis_setup_env.sh

Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ OS := $(shell uname)

ifeq ($(OS), Linux)
ifndef CXX
export CXX = g++
export CXX = g++-5
chenqin marked this conversation as resolved.
Show resolved Hide resolved
endif
export MPICXX = mpicxx
export LDFLAGS= -Llib -lrt
Expand All @@ -18,7 +18,7 @@ export LDFLAGS= -Llib -Wl,-rpath=/usr/local/lib/gcc6

endif

export WARNFLAGS= -Wall -Wextra -Wno-unused-parameter -Wno-unknown-pragmas -std=c++0x
export WARNFLAGS= -Wall -Wextra -Wno-unused-parameter -Wno-unknown-pragmas -std=c++11
export CFLAGS = -O3 $(WARNFLAGS)

#----------------------------
Expand Down
12 changes: 7 additions & 5 deletions scripts/travis_runtest.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#!/bin/bash
#make -f test.mk model_recover_10_10k || exit -1
#make -f test.mk model_recover_10_10k_die_same || exit -1
#make -f test.mk local_recover_10_10k || exit -1
#make -f test.mk lazy_recover_10_10k_die_hard || exit -1
#make -f test.mk lazy_recover_10_10k_die_same || exit -1

make -f test.mk model_recover_10_10k || exit -1
make -f test.mk model_recover_10_10k_die_same || exit -1
make -f test.mk model_recover_10_10k_die_hard || exit -1
make -f test.mk local_recover_10_10k || exit -1
make -f test.mk lazy_recover_10_10k_die_hard || exit -1
make -f test.mk lazy_recover_10_10k_die_same || exit -1
make -f test.mk ringallreduce_10_10k || exit -1
12 changes: 12 additions & 0 deletions src/allreduce_robust.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ void AllreduceRobust::Allreduce(void *sendrecvbuf_,
return;
}
bool recovered = RecoverExec(sendrecvbuf_, type_nbytes * count, 0, seq_counter);

// check if restarted worker recovered from other healthy workers
if (recovered) {
printf("[%d] all reduce with recover with seq %d\n", this->rank, seq_counter);
chenqin marked this conversation as resolved.
Show resolved Hide resolved
}
// now we are free to remove the last result, if any
if (resbuf.LastSeqNo() != -1 &&
(result_buffer_round == -1 ||
Expand Down Expand Up @@ -270,6 +275,9 @@ void AllreduceRobust::CheckPoint_(const Serializable *global_model,
if (RecoverExec(NULL, 0, 0, ActionSummary::kLocalCheckPoint)) break;
// save model model to new version place
chenqin marked this conversation as resolved.
Show resolved Hide resolved
int new_version = !local_chkpt_version;
printf("[%d] local checkpoint # %d, new # %d \n",
this->rank, local_chkpt_version, new_version);
chenqin marked this conversation as resolved.
Show resolved Hide resolved

local_chkpt[new_version].clear();
utils::MemoryBufferStream fs(&local_chkpt[new_version]);
if (local_model != NULL) {
Expand All @@ -296,6 +304,7 @@ void AllreduceRobust::CheckPoint_(const Serializable *global_model,
if (lazy_checkpt) {
global_lazycheck = global_model;
} else {
printf("[%d] save global checkpoint #%d \n", this->rank, version_number);
global_checkpoint.resize(0);
utils::MemoryBufferStream fs(&global_checkpoint);
fs.Write(&version_number, sizeof(version_number));
Expand Down Expand Up @@ -737,6 +746,9 @@ AllreduceRobust::ReturnType AllreduceRobust::TryLoadCheckPoint(bool requester) {
succ = TryRecoverLocalState(&local_rptr[local_chkpt_version],
&local_chkpt[local_chkpt_version]);
if (succ != kSuccess) return succ;

printf("[%d] recovered local checkpoint version %d \n", this->rank, local_chkpt_version);
chenqin marked this conversation as resolved.
Show resolved Hide resolved

int nlocal = std::max(static_cast<int>(local_rptr[local_chkpt_version].size()) - 1, 0);
// check if everyone is OK
unsigned state = 0;
Expand Down
3 changes: 2 additions & 1 deletion test/Makefile
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
export CC = gcc
export CXX = g++
export MPICXX = mpicxx

export LDFLAGS= -L../lib -pthread -lm -lrt
export CFLAGS = -Wall -O3 -msse2 -Wno-unknown-pragmas -fPIC -I../include -std=c++0x
export CFLAGS = -Wall -O3 -msse2 -Wno-unknown-pragmas -fPIC -I../include -std=c++11

# specify tensor path
BIN = speed_test model_recover local_recover lazy_recover
Expand Down
13 changes: 6 additions & 7 deletions test/model_recover.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <cstdio>
#include <cstdlib>
#include <cmath>

using namespace rabit;

// dummy model
Expand Down Expand Up @@ -77,10 +78,9 @@ inline void TestBcast(size_t n, int root, int ntrial, int iter) {
std::string res;
if (root == rank) {
res = s;
rabit::Broadcast(&res, root);
} else {
rabit::Broadcast(&res, root);
}
rabit::Broadcast(&res, root);

utils::Check(res == s, "[%d] TestBcast fail", rank);
}

Expand All @@ -104,10 +104,9 @@ int main(int argc, char *argv[]) {
int iter = rabit::LoadCheckPoint(&model);
if (iter == 0) {
model.InitModel(n);
printf("[%d] reload-trail=%d, init iter=%d\n", rank, ntrial, iter);
} else {
printf("[%d] reload-trail=%d, init iter=%d\n", rank, ntrial, iter);
chenqin marked this conversation as resolved.
Show resolved Hide resolved
}
printf("[%d] reload-trail=%d, init iter=%d\n", rank, ntrial, iter);

for (int r = iter; r < 3; ++r) {
TestMax(&model, ntrial, r);
printf("[%d] !!!TestMax pass, iter=%d\n", rank, r);
Expand All @@ -119,7 +118,7 @@ int main(int argc, char *argv[]) {
TestSum(&model, ntrial, r);
printf("[%d] !!!TestSum pass, iter=%d\n", rank, r);
rabit::CheckPoint(&model);
printf("[%d] !!!CheckPont pass, iter=%d\n", rank, r);
printf("[%d] !!!Checkpoint pass, iter=%d\n", rank, r);
}
rabit::Finalize();
return 0;
Expand Down
15 changes: 7 additions & 8 deletions test/test.mk
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,26 @@
all: model_recover_10_10k model_recover_10_10k_die_same model_recover_10_10k_die_hard local_recover_10_10k

# this experiment test recovery with actually process exit, use keepalive to keep program alive
# TODO: enable those tests once we fix issue in rabit
model_recover_10_10k:
../dmlc-core/tracker/dmlc-submit --cluster local --num-workers=10 model_recover 10000 mock=0,0,1,0 mock=1,1,1,0
../dmlc-core/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=10 model_recover 10000 mock=0,0,1,0 mock=1,1,1,0

model_recover_10_10k_die_same:
../dmlc-core/tracker/dmlc-submit --cluster local --num-workers=10 model_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0
../dmlc-core/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=10 model_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0

model_recover_10_10k_die_hard:
../dmlc-core/tracker/dmlc-submit --cluster local --num-workers=10 model_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=1,1,1,1 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 mock=8,1,2,0 mock=4,1,3,0
../dmlc-core/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=10 model_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=1,1,1,1 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 mock=8,1,2,0 mock=4,1,3,0

local_recover_10_10k:
../dmlc-core/tracker/dmlc-submit --cluster local --num-workers=10 local_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 mock=1,1,1,1
../dmlc-core/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=10 local_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 mock=1,1,1,1

pylocal_recover_10_10k:
../dmlc-core/tracker/dmlc-submit --cluster local --num-workers=10 ./local_recover.py 10000 mock=0,0,1,0 mock=1,1,1,0 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 mock=1,1,1,1
../dmlc-core/tracker/dmlc-submit --cluster local --num-workers=10 local_recover.py 10000 mock=0,0,1,0 mock=1,1,1,0 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 mock=1,1,1,1

lazy_recover_10_10k_die_hard:
../dmlc-core/tracker/dmlc-submit --cluster local --num-workers=10 lazy_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=1,1,1,1 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 mock=8,1,2,0 mock=4,1,3,0
../dmlc-core/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=10 lazy_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=1,1,1,1 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 mock=8,1,2,0 mock=4,1,3,0

lazy_recover_10_10k_die_same:
../dmlc-core/tracker/dmlc-submit --cluster local --num-workers=10 lazy_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0
../dmlc-core/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=10 lazy_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0

ringallreduce_10_10k:
../dmlc-core/tracker/dmlc-submit --cluster local --num-workers=10 model_recover 100 rabit_reduce_ring_mincount=10