diff --git a/CMakeLists.txt b/CMakeLists.txt index 300cbdad..4202a37d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,10 +1,10 @@ -cmake_minimum_required(VERSION 3.0) +cmake_minimum_required(VERSION 3.3) -project(rabit VERSION 0.2.0) +project(rabit VERSION 0.2.1) option(RABIT_BUILD_TESTS "Build rabit tests" OFF) option(RABIT_BUILD_MPI "Build MPI" OFF) -option(RABIT_BUILD_DMLC "Include DMLC_CORE in build" ON) +option(RABIT_BUILD_DMLC "Include DMLC_CORE in build" OFF) add_library(rabit src/allreduce_base.cc src/allreduce_robust.cc src/engine.cc src/c_api.cc) add_library(rabit_base src/allreduce_base.cc src/engine_base.cc src/c_api.cc) diff --git a/Makefile b/Makefile index 7ca884b9..02f1f7d1 100644 --- a/Makefile +++ b/Makefile @@ -9,7 +9,7 @@ else endif export WARNFLAGS= -Wall -Wextra -Wno-unused-parameter -Wno-unknown-pragmas -std=c++11 -export CFLAGS = -O3 $(WARNFLAGS) -I $(DMLC)/include -I include/ +export CFLAGS = -O3 -g $(WARNFLAGS) -I $(DMLC)/include -I include/ export LDFLAGS =-Llib #download mpi diff --git a/include/rabit/c_api.h b/include/rabit/c_api.h index 22e73f7b..99e63718 100644 --- a/include/rabit/c_api.h +++ b/include/rabit/c_api.h @@ -84,7 +84,8 @@ RABIT_DLL void RabitGetProcessorName(char *out_name, * \param root the root of process */ RABIT_DLL void RabitBroadcast(void *sendrecv_data, - rbt_ulong size, int root); + rbt_ulong size, int root, + const char* caller = __builtin_FUNCTION()); /*! * \brief perform in-place allreduce, on sendrecvbuf * this function is NOT thread-safe @@ -108,7 +109,8 @@ RABIT_DLL void RabitAllreduce(void *sendrecvbuf, int enum_dtype, int enum_op, void (*prepare_fun)(void *arg), - void *prepare_arg); + void *prepare_arg, + const char* caller = __builtin_FUNCTION()); /*! * \brief load latest check point diff --git a/include/rabit/internal/engine.h b/include/rabit/internal/engine.h index 6a7dfe4a..79565218 100644 --- a/include/rabit/internal/engine.h +++ b/include/rabit/internal/engine.h @@ -158,6 +158,8 @@ class IEngine { * \param msg message to be printed in the tracker */ virtual void TrackerPrint(const std::string &msg) = 0; + virtual void TrackerSetConfig(const std::string &key, const std::string &value) = 0; + virtual void TrackerGetConfig(const std::string& key, std::string* value) = 0; }; /*! \brief initializes the engine module */ diff --git a/include/rabit/internal/rabit-inl.h b/include/rabit/internal/rabit-inl.h index f556d62e..d01b3387 100644 --- a/include/rabit/internal/rabit-inl.h +++ b/include/rabit/internal/rabit-inl.h @@ -177,6 +177,15 @@ inline void Allreduce(DType *sendrecvbuf, size_t count, std::function pr inline void TrackerPrint(const std::string &msg) { engine::GetEngine()->TrackerPrint(msg); } + +inline void TrackerSetConfig(const std::string &key, const std::string &value) { + engine::GetEngine()->TrackerSetConfig(key, value); +} + +inline void TrackerGetConfig(const std::string &key, std::string* value) { + engine::GetEngine()->TrackerGetConfig(key, value); +} + #ifndef RABIT_STRICT_CXX98_ inline void TrackerPrintf(const char *fmt, ...) { const int kPrintBuffer = 1 << 10; @@ -188,6 +197,38 @@ inline void TrackerPrintf(const char *fmt, ...) { msg.resize(strlen(msg.c_str())); TrackerPrint(msg); } + +inline void TrackerSetConfig(const char *key, const char *value, ...) { + const int kPrintBuffer = 1 << 10; + std::string k(kPrintBuffer, '\0'), v(kPrintBuffer, '\0'); + + va_list args1, args2; + va_start(args1, key); + va_start(args2, value); + vsnprintf(&k[0], kPrintBuffer, key, args1); + vsnprintf(&v[0], kPrintBuffer, value, args2); + va_end(args1); + va_end(args2); + k.resize(strlen(k.c_str())); + v.resize(strlen(v.c_str())); + engine::GetEngine()->TrackerSetConfig(k, v); +} + +inline void TrackerGetConfig(const char *key, char* value, ...) { + const int kPrintBuffer = 1 << 10; + std::string k(kPrintBuffer, '\0'), v(kPrintBuffer, '\0'); + + va_list args1, args2; + va_start(args1, key); + va_start(args2, value); + vsnprintf(&k[0], kPrintBuffer, key, args1); + vsnprintf(&v[0], kPrintBuffer, value, args2); + va_end(args1); + va_end(args2); + k.resize(strlen(k.c_str())); + v.resize(strlen(v.c_str())); + engine::GetEngine()->TrackerGetConfig(k, &v); +} #endif // RABIT_STRICT_CXX98_ // load latest check point inline int LoadCheckPoint(Serializable *global_model, diff --git a/include/rabit/rabit.h b/include/rabit/rabit.h index 83e8c58f..63aaaabd 100644 --- a/include/rabit/rabit.h +++ b/include/rabit/rabit.h @@ -99,6 +99,19 @@ inline std::string GetProcessorName(); * \param msg the message to be printed */ inline void TrackerPrint(const std::string &msg); +/*! + * \brief save config to tracker, + * \param key configuration key + * \param value value of config + */ +inline void TrackerSetConfig(const std::string &key, const std::string &value); +/*! + * \brief get config to tracker, + * \param key configuration key + * \param value value of config + */ +inline void TrackerGetConfig(const std::string &key, std::string* value); + #ifndef RABIT_STRICT_CXX98_ /*! * \brief prints the msg to the tracker, this function may not be available @@ -108,6 +121,18 @@ inline void TrackerPrint(const std::string &msg); * \param fmt the format string */ inline void TrackerPrintf(const char *fmt, ...); +/*! + * \brief save config to tracker, + * \param key configuration key + * \param value value of config + */ +inline void TrackerSetConfig(const char *key, const char *value, ...); +/*! + * \brief get config to tracker, + * \param key configuration key + * \param value value of config + */ +inline void TrackerGetConfig(const char *key, char* value, ...); #endif // RABIT_STRICT_CXX98_ /*! * \brief broadcasts a memory region to every node from the root diff --git a/scripts/travis_runtest.sh b/scripts/travis_runtest.sh index 37fc9953..1ec04bb0 100755 --- a/scripts/travis_runtest.sh +++ b/scripts/travis_runtest.sh @@ -1,10 +1,10 @@ #!/bin/bash -make -f test.mk model_recover_10_10k || exit -1 -make -f test.mk model_recover_10_10k_die_same || exit -1 -make -f test.mk model_recover_10_10k_die_hard || exit -1 -make -f test.mk local_recover_10_10k || exit -1 -make -f test.mk lazy_recover_10_10k_die_hard || exit -1 -make -f test.mk lazy_recover_10_10k_die_same || exit -1 -make -f test.mk ringallreduce_10_10k || exit -1 -make -f test.mk pylocal_recover_10_10k || exit -1 +make -f test.mk RABIT_BUILD_DMLC=1 model_recover_10_10k || exit -1 +make -f test.mk RABIT_BUILD_DMLC=1 model_recover_10_10k_die_same || exit -1 +make -f test.mk RABIT_BUILD_DMLC=1 model_recover_10_10k_die_hard || exit -1 +make -f test.mk RABIT_BUILD_DMLC=1 local_recover_10_10k || exit -1 +make -f test.mk RABIT_BUILD_DMLC=1 lazy_recover_10_10k_die_hard || exit -1 +make -f test.mk RABIT_BUILD_DMLC=1 lazy_recover_10_10k_die_same || exit -1 +make -f test.mk RABIT_BUILD_DMLC=1 ringallreduce_10_10k || exit -1 +make -f test.mk RABIT_BUILD_DMLC=1 pylocal_recover_10_10k || exit -1 diff --git a/src/allreduce_base.cc b/src/allreduce_base.cc index 60351699..3c184efe 100644 --- a/src/allreduce_base.cc +++ b/src/allreduce_base.cc @@ -135,6 +135,7 @@ void AllreduceBase::Shutdown(void) { sock_listen.Close(); utils::TCPSocket::Finalize(); } + void AllreduceBase::TrackerPrint(const std::string &msg) { if (tracker_uri == "NULL") { utils::Printf("%s", msg.c_str()); return; @@ -144,6 +145,23 @@ void AllreduceBase::TrackerPrint(const std::string &msg) { tracker.SendStr(msg); tracker.Close(); } + +void AllreduceBase::TrackerSetConfig(const std::string &key, const std::string &value) { + utils::TCPSocket tracker = this->ConnectTracker(); + tracker.SendStr(std::string("set")); + tracker.SendStr(key); + tracker.SendStr(value); + tracker.Close(); +} + +void AllreduceBase::TrackerGetConfig(const std::string &key, std::string* value) { + utils::TCPSocket tracker = this->ConnectTracker(); + tracker.SendStr(std::string("get")); + tracker.SendStr(key); + tracker.RecvStr(value); + tracker.Close(); +} + // util to parse data with unit suffix inline size_t ParseUnit(const char *name, const char *val) { char unit; diff --git a/src/allreduce_base.h b/src/allreduce_base.h index b83cb0d0..da41fda5 100644 --- a/src/allreduce_base.h +++ b/src/allreduce_base.h @@ -54,6 +54,9 @@ class AllreduceBase : public IEngine { * \param msg message to be printed in the tracker */ virtual void TrackerPrint(const std::string &msg); + virtual void TrackerSetConfig(const std::string &key, const std::string &value); + virtual void TrackerGetConfig(const std::string& key, std::string* value); + /*! \brief get rank */ virtual int GetRank(void) const { return rank; diff --git a/src/allreduce_robust.cc b/src/allreduce_robust.cc index ce5a5616..c9111494 100644 --- a/src/allreduce_robust.cc +++ b/src/allreduce_robust.cc @@ -207,6 +207,8 @@ int AllreduceRobust::LoadCheckPoint(Serializable *global_model, // run another phase of check ack, if recovered from data utils::Assert(RecoverExec(NULL, 0, ActionSummary::kCheckAck, ActionSummary::kSpecialOp), "check ack must return true"); + utils::Printf("[%d] load checkpoint global %ld version %d\n", rank, + global_checkpoint.length(), version_number); return version_number; } else { // reset result buffer @@ -296,7 +298,6 @@ void AllreduceRobust::CheckPoint_(const Serializable *global_model, if (lazy_checkpt) { global_lazycheck = global_model; } else { - printf("[%d] save global checkpoint #%d \n", this->rank, version_number); global_checkpoint.resize(0); utils::MemoryBufferStream fs(&global_checkpoint); fs.Write(&version_number, sizeof(version_number)); @@ -547,6 +548,7 @@ AllreduceRobust::TryDecideRouting(AllreduceRobust::RecoverType role, { // get the shortest distance to the request point std::vector > dist_in, dist_out; + ReturnType succ = MsgPassing(std::make_pair(role == kHaveData, *p_size), &dist_in, &dist_out, ShortestDist); if (succ != kSuccess) return succ; @@ -713,6 +715,51 @@ AllreduceRobust::TryRecoverData(RecoverType role, } return kSuccess; } +/*! + * \brief try to fetch allreduce/broadcast results from rest of nodes + * as collaberative function called by all nodes, only requester node + * will pass seqno to rest of nodes and reconstruct/backfill sendrecvbuf_ + * of specific seqno from other nodes. + */ +AllreduceRobust::ReturnType AllreduceRobust::TryLoadCache(void* sendrecvbuf, bool requester) { + RecoverType role = requester ? kRequestData : kHaveData; + ReturnType succ; + // recover global checkpoint + size_t size = this->global_checkpoint.length(); + int recv_link; + std::vector req_in; + succ = TryDecideRouting(role, &size, &recv_link, &req_in); + if (succ != kSuccess) return succ; + // there is no checkpoints, which might be okay as long as resbuf has allreduce cache + //if (size == 0) return kSuccess; + + //TODO: run allreduce min and populate restored sequence counter to kHaveData hosts + int a = 0; + size_t s = 0; + void* buf = resbuf.Query(a, &s); + + // get size of allreduce buf from other + ReturnType ret = TryRecoverData(role, &s, sizeof(size_t), recv_link, req_in); + // for requester, allocate cache and push into resbuf + if(requester){ + utils::Printf("[%d] recover resbuf %d size %d \n", rank, a, s); + buf = resbuf.AllocTemp(s, 1); + resbuf.PushTemp(this->seq_counter, s, 1); + } + // backfill result from other hosts + ret = TryRecoverData(role, buf, s, recv_link, req_in); + + if(requester){ + //copy resbuf of seq_counter to only requester sendrecvbuf + //as other workers sendrecvbuf might point to other allreduce functions + //with different code path + memccpy(sendrecvbuf, buf, s, 1); + } + + //TODO: consider right return type + return ret; +} + /*! * \brief try to load check point * @@ -777,6 +824,7 @@ AllreduceRobust::ReturnType AllreduceRobust::TryLoadCheckPoint(bool requester) { global_checkpoint.resize(size); } if (size == 0) return kSuccess; + utils::Printf("[%d] load checkpoint size %d seq %d\n", rank, size, seq_counter); return TryRecoverData(role, BeginPtr(global_checkpoint), size, recv_link, req_in); } /*! @@ -848,9 +896,12 @@ AllreduceRobust::TryGetResult(void *sendrecvbuf, size_t size, int seqno, bool re * - false means this is the lastest action that has not yet been executed, need to execute the action */ bool AllreduceRobust::RecoverExec(void *buf, size_t size, int flag, int seqno) { + if (flag != 0) { utils::Assert(seqno == ActionSummary::kSpecialOp, "must only set seqno for normal operations"); } + + //utils::Printf("[%d] flag %d, seqno %d\n", rank, flag, seqno); // request ActionSummary req(flag, seqno); while (true) { @@ -867,6 +918,7 @@ bool AllreduceRobust::RecoverExec(void *buf, size_t size, int flag, int seqno) { // if we requested checkpoint, we are free to go if (req.check_point()) return true; } else if (act.load_check()) { + // check cache // if there is only check_ack and load_check, do load_check if (!CheckAndRecover(TryLoadCheckPoint(req.load_check()))) continue; // if requested load check, then misson complete @@ -894,6 +946,8 @@ bool AllreduceRobust::RecoverExec(void *buf, size_t size, int flag, int seqno) { if (act.load_check()) { // all the nodes called load_check, this is an incomplete action if (!act.diff_seq()) return false; + // load cache stored from other node to local TODO: consider return type + //TryLoadCache(buf, req.load_check()); // load check have higher priority, do load_check if (!CheckAndRecover(TryLoadCheckPoint(req.load_check()))) continue; // if requested load check, then misson complete diff --git a/src/allreduce_robust.h b/src/allreduce_robust.h index c8860822..48129353 100644 --- a/src/allreduce_robust.h +++ b/src/allreduce_robust.h @@ -375,6 +375,19 @@ class AllreduceRobust : public AllreduceBase { * \sa ReturnType */ ReturnType TryLoadCheckPoint(bool requester); + + /*! + * \brief try to load cache + * + * This is a collaborative function called by all nodes + * only the nodes with requester set to true really needs to load the check point + * other nodes acts as collaborative roles to complete this request + * \param buf the buffer to store the result, this parameter is only used when current node is requester + * \param requester whether current node is the requester + * \return this function can return kSuccess/kSockError/kGetExcept, see ReturnType for details + * \sa ReturnType + */ + ReturnType TryLoadCache(void *buf, bool requester); /*! * \brief try to get the result of operation specified by seqno * diff --git a/src/engine_empty.cc b/src/engine_empty.cc index 8177410a..71329918 100644 --- a/src/engine_empty.cc +++ b/src/engine_empty.cc @@ -68,6 +68,14 @@ class EmptyEngine : public IEngine { // simply print information into the tracker utils::Printf("%s", msg.c_str()); } + virtual void TrackerSetConfig(const std::string &key, const std::string &value) { + // simply print information into the tracker + utils::Printf("%s-%s", key.c_str(), value.c_str()); + } + virtual void TrackerGetConfig(const std::string& key, std::string* value) { + // simply print information into the tracker + utils::Printf("%s", key.c_str()); + } private: int version_number; diff --git a/src/engine_mpi.cc b/src/engine_mpi.cc index 35283ad5..55c59717 100644 --- a/src/engine_mpi.cc +++ b/src/engine_mpi.cc @@ -77,6 +77,15 @@ class MPIEngine : public IEngine { utils::Printf("%s", msg.c_str()); } } + virtual void TrackerSetConfig(const std::string &key, const std::string &value) { + // simply print information into the tracker + // TODO(chen qin): figure out how to support MPI + utils::Printf("%s-%s", key.c_str(), value.c_str()); + } + virtual void TrackerGetConfig(const std::string& key, std::string* value) { + // simply print information into the tracker + utils::Printf("%s", key.c_str()); + } private: int version_number; diff --git a/test/Makefile b/test/Makefile index b1c329d3..19ca64e0 100644 --- a/test/Makefile +++ b/test/Makefile @@ -1,6 +1,14 @@ +RABIT_BUILD_DMLC = 0 + +ifeq ($(RABIT_BUILD_DMLC),1) + DMLC=../dmlc-core +else + DMLC=../../dmlc-core +endif + MPICXX=../mpich/bin/mpicxx export LDFLAGS= -L../lib -pthread -lm -export CFLAGS = -Wall -O3 -msse2 -Wno-unknown-pragmas -fPIC -I../include -I ../dmlc-core/include -std=c++11 +export CFLAGS = -Wall -O3 -msse3 -g -Wno-unknown-pragmas -fPIC -I../include -I $(DMLC)/include -std=c++11 OS := $(shell uname) diff --git a/test/test.mk b/test/test.mk index b4b9afc4..632231c8 100644 --- a/test/test.mk +++ b/test/test.mk @@ -1,3 +1,11 @@ +RABIT_BUILD_DMLC = 0 + +ifeq ($(RABIT_BUILD_DMLC),1) + DMLC=../dmlc-core +else + DMLC=../../dmlc-core +endif + # this is a makefile used to show testcases of rabit .PHONY: all @@ -5,25 +13,25 @@ all: model_recover_10_10k model_recover_10_10k_die_same model_recover_10_10k_di # this experiment test recovery with actually process exit, use keepalive to keep program alive model_recover_10_10k: - ../dmlc-core/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=20 model_recover 10000 mock=0,0,1,0 mock=1,1,1,0 + $(DMLC)/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=20 model_recover 10000 mock=0,0,1,0 mock=1,1,1,0 model_recover_10_10k_die_same: - ../dmlc-core/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=20 model_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 + $(DMLC)/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=20 model_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 model_recover_10_10k_die_hard: - ../dmlc-core/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=20 model_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=1,1,1,1 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 mock=8,1,2,0 mock=4,1,3,0 + $(DMLC)/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=20 model_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=1,1,1,1 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 mock=8,1,2,0 mock=4,1,3,0 local_recover_10_10k: - ../dmlc-core/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=20 local_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 mock=1,1,1,1 + $(DMLC)/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=20 local_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 mock=1,1,1,1 pylocal_recover_10_10k: - ../dmlc-core/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=20 local_recover.py 10000 mock=0,0,1,0 mock=1,1,1,0 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 mock=1,1,1,1 + $(DMLC)/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=20 local_recover.py 10000 mock=0,0,1,0 mock=1,1,1,0 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 mock=1,1,1,1 lazy_recover_10_10k_die_hard: - ../dmlc-core/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=20 lazy_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=1,1,1,1 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 mock=8,1,2,0 mock=4,1,3,0 + $(DMLC)/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=20 lazy_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=1,1,1,1 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 mock=8,1,2,0 mock=4,1,3,0 lazy_recover_10_10k_die_same: - ../dmlc-core/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=20 lazy_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 + $(DMLC)/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=20 lazy_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 ringallreduce_10_10k: - ../dmlc-core/tracker/dmlc-submit --cluster local --num-workers=10 model_recover 10000 rabit_reduce_ring_mincount=10 + $(DMLC)/tracker/dmlc-submit --cluster local --num-workers=10 model_recover 10000 rabit_reduce_ring_mincount=10