From 308cfc11cb8afc0d3a6925b06fa1cda9d2f0cded Mon Sep 17 00:00:00 2001 From: Chuntao Hong Date: Wed, 28 Jan 2015 18:03:14 +0800 Subject: [PATCH 1/8] add minerva --- Makefile | 5 ++- src/app/minerva/main.cc | 86 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 90 insertions(+), 1 deletion(-) create mode 100755 src/app/minerva/main.cc diff --git a/Makefile b/Makefile index 071b35b..0f64246 100644 --- a/Makefile +++ b/Makefile @@ -20,7 +20,7 @@ LDFLAGS += $(THIRD_LIB) -lpthread -lrt PS_LIB = build/libps.a PS_MAIN = build/libpsmain.a -all: ps app build/hello +all: ps app build/hello build/minerva clean: rm -rf build @@ -30,6 +30,9 @@ app: build/ps build/hello: build/app/hello_world/main.o $(PS_LIB) $(PS_MAIN) $(CC) $(CFLAGS) $^ $(LDFLAGS) -o $@ +build/minerva: build/app/minerva/main.o $(PS_LIB) $(PS_MAIN) + $(CC) $(CFLAGS) $^ $(LDFLAGS) -o $@ + sys_srcs = $(wildcard src/*/*.cc) sys_protos = $(wildcard src/*/proto/*.proto) sys_objs = $(patsubst src/%.proto, build/%.pb.o, $(sys_protos)) \ diff --git a/src/app/minerva/main.cc b/src/app/minerva/main.cc new file mode 100755 index 0000000..6f696c3 --- /dev/null +++ b/src/app/minerva/main.cc @@ -0,0 +1,86 @@ +#include +#include +#include +#include "ps.h" +#include "parameter/kv_vector.h" +using namespace PS; + +typedef KVVector SyncVector; +std::map model_; + +void AddLayer(const std::string & layerName, size_t nParams) { + auto layer = new SyncVector(layerName); + layer->key().resize(nParams); + for (size_t i = 0; i < nParams; i++) + layer->key()[i] = i; + layer->value().resize(nParams); + CHECK(model_.insert(make_pair(layerName, layer)).second) << "layer " << layerName << " already exists!"; +} + +SyncVector * GetLayer(const std::string & layerName) +{ + auto it = model_.find(layerName); + CHECK(it != model_.end()) << "trying to pull " << layerName << " without declaring it first"; + return it->second; +} + +void PullLayer(const std::string & layerName, double * grad, double * weight) { + auto layer = GetLayer(layerName); + MessagePtr msg(new Message(kServerGroup)); + + size_t nParams = layer->value().size(); + + memcpy(weight, grad, sizeof(double)*nParams); + layer->value().reset(weight, nParams, false); + msg->key = layer->key(); + int pull_time = layer->pull(msg); + + layer->waitOutMsg(kServerGroup, pull_time); +} + +void DeclareLayers() +{ + AddLayer("w", 6); + AddLayer("w2", 6); +} + +namespace PS { + +class MinervaServer : public App { + public: + MinervaServer() : App() { } + virtual ~MinervaServer() { } + + void init() { + LL << myNodeID() << ", this is server " << myRank(); + DeclareLayers(); + auto & v = GetLayer("w")->value(); + for (size_t i = 0; i < v.size(); i++) + v[i] = (double)i / 10; + } +}; + +App* CreateServerNode(const std::string& conf) { + return new MinervaServer(); +} +} // namespace PS + +std::ostream & operator << (std::ostream & os, const std::vector & v) +{ + for (auto & d : v)\ + os << " " << d; + return os; +} + +int WorkerNodeMain(int argc, char *argv[]) { + using namespace PS; + LOG(ERROR) << MyNodeID() << ": this is worker " << MyRank(); + + DeclareLayers(); + std::vector grad(6, 2); + std::vector weight(6, 0); + PullLayer("w", &grad[0], &weight[0]); + LOG(ERROR) << MyNodeID() << ": " << weight; + + return 0; +} From 385a9d2b62ac4c0180715c64c989276a58bf163c Mon Sep 17 00:00:00 2001 From: Chuntao Hong Date: Thu, 29 Jan 2015 18:38:31 +0800 Subject: [PATCH 2/8] cp --- Makefile | 10 +- src/app/minerva/main.cc | 297 ++++++++++++++++++++++++------- src/app/minerva/minerva_ps.cc | 37 ++++ src/app/minerva/minerva_ps.h | 12 ++ src/app/minerva/minerva_server.h | 33 ++++ src/app/minerva/shared_model.h | 126 +++++++++++++ src/app/minerva/updater.h | 27 +++ src/system/proto/task.pb.h | 129 +++++++++++--- src/system/proto/task.proto | 1 + 9 files changed, 585 insertions(+), 87 deletions(-) create mode 100644 src/app/minerva/minerva_ps.cc create mode 100644 src/app/minerva/minerva_ps.h create mode 100644 src/app/minerva/minerva_server.h create mode 100644 src/app/minerva/shared_model.h create mode 100644 src/app/minerva/updater.h diff --git a/Makefile b/Makefile index 0f64246..a98e62a 100644 --- a/Makefile +++ b/Makefile @@ -3,6 +3,10 @@ CC = g++ # OPT = -O0 -ggdb OPT = -O3 -ggdb +MINERVA_INC=/home/hct/minerva/minerva +MINERVA_LIB=/home/hct/minerva/release/libminerva.so + + THIRD_PATH=$(shell pwd)/third_party STATIC_THIRD_LIB=1 ifeq ($(STATIC_THIRD_LIB), 1) @@ -13,9 +17,9 @@ endif # THIRD_LIB+=-ltcmalloc_and_profiler WARN = -Wall -Wno-unused-function -finline-functions -Wno-sign-compare #-Wconversion -INCPATH = -I./src -I$(THIRD_PATH)/include +INCPATH = -I./src -I$(THIRD_PATH)/include -I$(MINERVA_INC) CFLAGS = -std=c++0x $(WARN) $(OPT) $(INCPATH) -LDFLAGS += $(THIRD_LIB) -lpthread -lrt +LDFLAGS += $(THIRD_LIB) -lpthread -lrt $(MINERVA_LIB) PS_LIB = build/libps.a PS_MAIN = build/libpsmain.a @@ -30,7 +34,7 @@ app: build/ps build/hello: build/app/hello_world/main.o $(PS_LIB) $(PS_MAIN) $(CC) $(CFLAGS) $^ $(LDFLAGS) -o $@ -build/minerva: build/app/minerva/main.o $(PS_LIB) $(PS_MAIN) +build/minerva: build/app/minerva/main.o build/app/minerva/minerva_ps.o $(PS_LIB) $(PS_MAIN) $(CC) $(CFLAGS) $^ $(LDFLAGS) -o $@ sys_srcs = $(wildcard src/*/*.cc) diff --git a/src/app/minerva/main.cc b/src/app/minerva/main.cc index 6f696c3..7b339b4 100755 --- a/src/app/minerva/main.cc +++ b/src/app/minerva/main.cc @@ -1,86 +1,259 @@ -#include -#include -#include #include "ps.h" -#include "parameter/kv_vector.h" -using namespace PS; - -typedef KVVector SyncVector; -std::map model_; - -void AddLayer(const std::string & layerName, size_t nParams) { - auto layer = new SyncVector(layerName); - layer->key().resize(nParams); - for (size_t i = 0; i < nParams; i++) - layer->key()[i] = i; - layer->value().resize(nParams); - CHECK(model_.insert(make_pair(layerName, layer)).second) << "layer " << layerName << " already exists!"; +#include "updater.h" +#include "shared_model.h" +#include "minerva_ps.h" +#include "minerva_server.h" +using namespace minerva; + +template +inline std::string arrstr(const V* data, int n) { + std::stringstream ss; + ss << "[" << n << "]: "; + for (int i = 0; i < n; ++i) ss << data[i] << " "; + return ss.str(); } -SyncVector * GetLayer(const std::string & layerName) +bool StartsWith(const std::string & str, const std::string & pattern) { - auto it = model_.find(layerName); - CHECK(it != model_.end()) << "trying to pull " << layerName << " without declaring it first"; - return it->second; + return str.size() >= pattern.size() && str.substr(0, pattern.size()) == pattern; } -void PullLayer(const std::string & layerName, double * grad, double * weight) { - auto layer = GetLayer(layerName); - MessagePtr msg(new Message(kServerGroup)); - - size_t nParams = layer->value().size(); +//=================================================== + +#include +#include + +using namespace std; +using namespace minerva; + +const float epsW = 0.01, epsB = 0.01; +const int numepochs = 100; +const int mb_size = 256; +const int num_mb_per_epoch = 235; + +const string weight_init_files[] = { "w12.dat", "w23.dat" }; +const string weight_out_files[] = { "w12.dat", "w23.dat" }; +const string bias_out_files[] = { "b2_trained.dat", "b3_trained.dat" }; +const string train_data_file = "/home/hct/mnist/traindata.dat"; +const string train_label_file = "/home/hct/mnist/trainlabel.dat"; +const string test_data_file = "/home/hct/mnist/testdata.dat"; +const string test_label_file = "/home/hct/mnist/testlabel.dat"; - memcpy(weight, grad, sizeof(double)*nParams); - layer->value().reset(weight, nParams, false); - msg->key = layer->key(); - int pull_time = layer->pull(msg); +const int num_layers = 3; +const int lsize[num_layers] = { 784, 256, 10 }; +vector weights; +vector bias; - layer->waitOutMsg(kServerGroup, pull_time); +string GetWeightName(int i) { + return string("weights_") + to_string(i); } -void DeclareLayers() +string GetBiasName(int i) { + return string("bias_") + to_string(i); +} + +bool IsWeightName(const std::string & name) { - AddLayer("w", 6); - AddLayer("w2", 6); + return StartsWith(name, "weights_"); } -namespace PS { +bool IsBiasName(const std::string & name) +{ + return StartsWith(name, "bias_"); +} -class MinervaServer : public App { - public: - MinervaServer() : App() { } - virtual ~MinervaServer() { } +void GenerateInitWeight() { + for (int i = 0; i < num_layers - 1; ++i) + { + weights[i] = NArray::Randn({ lsize[i + 1], lsize[i] }, 0.0, sqrt(4.0 / (lsize[0] + lsize[1]))); + bias[i] = NArray::Constant({ lsize[i + 1], 1 }, 1.0); + } + FileFormat format; + format.binary = true; + for (int i = 0; i < num_layers - 1; ++i) + weights[i].ToFile(weight_init_files[i], format); +} + +namespace minerva { + + template + class MnistUpdater : public Updater { + public: + virtual void InitLayer(const std::string &name, V* weight, size_t size) { + // init by 0, gaussian random, or others + for (int i = 0; i < size; ++i) { + weight[i] = 0; + } + } + + virtual void Update(const std::string &name, V* weight, V* gradient, size_t size) { + // weight -= eta * gradient + V eta = .1; + if (IsWeightName(name)) { + eta = epsW; + } + else { + eta = epsB; + } + + for (int i = 0; i < size; ++i) { + weight[i] -= eta * gradient[i]; + } + } + }; - void init() { - LL << myNodeID() << ", this is server " << myRank(); - DeclareLayers(); - auto & v = GetLayer("w")->value(); - for (size_t i = 0; i < v.size(); i++) - v[i] = (double)i / 10; +} + +PS::App* PS::CreateServerNode(const std::string& conf) { + Updater * updater = new minerva::MnistUpdater(); + MinervaServer * server = new minerva::MinervaServer(updater); + + MinervaSystem& ms = MinervaSystem::Instance(); + char * dummy = "minerva"; + vector dummyArgs = { dummy }; + int dummyArgc = 1; + char ** dummyArgv = &dummyArgs[0]; + ms.Initialize(&dummyArgc, &dummyArgv); + uint64_t cpuDevice = ms.CreateCpuDevice(); + ms.current_device_id_ = cpuDevice; + weights.resize(num_layers - 1); + bias.resize(num_layers - 1); + GenerateInitWeight(); + + for (int i = 0; i < num_layers - 1; ++i) + { + server->initLayer(GetWeightName(i), weights[i].Get().get(), weights[i].Size().Prod()); + server->initLayer(GetBiasName(i), bias[i].Get().get(), bias[i].Size().Prod()); } -}; -App* CreateServerNode(const std::string& conf) { - return new MinervaServer(); + return server; } -} // namespace PS -std::ostream & operator << (std::ostream & os, const std::vector & v) -{ - for (auto & d : v)\ - os << " " << d; - return os; +NArray Softmax(NArray m) { + NArray maxval = m.Max(0); + // NArray centered = m - maxval.Tile({m.Size(0), 1}); + NArray centered = m.NormArithmetic(maxval, ArithmeticType::kSub); + NArray class_normalizer = Elewise::Ln(Elewise::Exp(centered).Sum(0)) + maxval; + // return Elewise::Exp(m - class_normalizer.Tile({m.Size(0), 1})); + return Elewise::Exp(m.NormArithmetic(class_normalizer, ArithmeticType::kSub)); +} + +void PrintTrainingAccuracy(NArray o, NArray t) { + //get predict + NArray predict = o.MaxIndex(0); + //get groundtruth + NArray groundtruth = t.MaxIndex(0); + + float correct = (predict - groundtruth).CountZero(); + cout << "Training Error: " << (mb_size - correct) / mb_size << endl; } int WorkerNodeMain(int argc, char *argv[]) { - using namespace PS; - LOG(ERROR) << MyNodeID() << ": this is worker " << MyRank(); - - DeclareLayers(); - std::vector grad(6, 2); - std::vector weight(6, 0); - PullLayer("w", &grad[0], &weight[0]); - LOG(ERROR) << MyNodeID() << ": " << weight; - + MinervaSystem& ms = MinervaSystem::Instance(); + ms.Initialize(&argc, &argv); + uint64_t cpuDevice = ms.CreateCpuDevice(); +#ifdef HAS_CUDA + uint64_t gpuDevice = ms.CreateGpuDevice(0); +#endif + ms.current_device_id_ = cpuDevice; + + weights.resize(num_layers - 1); + bias.resize(num_layers - 1); + GenerateInitWeight(); + for (int i = 0; i < num_layers - 1; ++i) + { + weights[i].Pull(GetWeightName(i)); + bias[i].Pull(GetBiasName(i)); + } + /* if(FLAGS_init) { + cout << "Generate initial weights" << endl; + GenerateInitWeight(); + cout << "Finished!" << endl; + return 0; + } else { + cout << "Init weights and bias" << endl; + InitWeight(); + } + */ + + //for (int i = 0; i < num_layers - 1; ++i) + //{ + // weights[i] = NArray::PushGradAndPullWeight() + // bias[i] = NArray::Constant({ lsize[i + 1], 1 }, 1.0); + //} + + cout << "Training procedure:" << endl; + NArray acts[num_layers], sens[num_layers]; + for (int epoch = 0; epoch < numepochs; ++epoch) { + cout << " Epoch #" << epoch << endl; + ifstream data_file_in(train_data_file.c_str()); + ifstream label_file_in(train_label_file.c_str()); + for (int mb = 0; mb < num_mb_per_epoch; ++mb) { + + ms.current_device_id_ = cpuDevice; + + Scale data_size{ lsize[0], mb_size }; + Scale label_size{ lsize[num_layers - 1], mb_size }; + shared_ptr data_ptr(new float[data_size.Prod()]); + shared_ptr label_ptr(new float[label_size.Prod()]); + data_file_in.read(reinterpret_cast(data_ptr.get()), data_size.Prod() * sizeof(float)); + label_file_in.read(reinterpret_cast(label_ptr.get()), label_size.Prod() * sizeof(float)); + + acts[0] = NArray::MakeNArray(data_size, data_ptr); + NArray label = NArray::MakeNArray(label_size, label_ptr); + +#ifdef HAS_CUDA + ms.current_device_id_ = gpuDevice; +#endif + + // ff + for (int k = 1; k < num_layers - 1; ++k) { + NArray wacts = weights[k - 1] * acts[k - 1]; + NArray wactsnorm = wacts.NormArithmetic(bias[k - 1], ArithmeticType::kAdd); + acts[k] = Elewise::SigmoidForward(wactsnorm); + } + // softmax + acts[num_layers - 1] = Softmax((weights[num_layers - 2] * acts[num_layers - 2]).NormArithmetic(bias[num_layers - 2], ArithmeticType::kAdd)); + // bp + sens[num_layers - 1] = acts[num_layers - 1] - label; + for (int k = num_layers - 2; k >= 1; --k) { + NArray d_act = Elewise::Mult(acts[k], 1 - acts[k]); + sens[k] = weights[k].Trans() * sens[k + 1]; + sens[k] = Elewise::Mult(sens[k], d_act); + } + + /* + // Update bias + for (int k = 0; k < num_layers - 1; ++k) { // no input layer + bias[k] -= epsB * sens[k + 1].Sum(1) / mb_size; + } + // Update weight + for (int k = 0; k < num_layers - 1; ++k) { + weights[k] -= epsW * sens[k + 1] * acts[k].Trans() / mb_size; + } + */ + for (int k = 0; k < num_layers - 1; ++k) { + bias[k] = NArray::PushGradAndPullWeight(sens[k + 1].Sum(1) / mb_size, GetBiasName(k)); + weights[k] = NArray::PushGradAndPullWeight(sens[k + 1] * acts[k].Trans() / mb_size, GetWeightName(k)); + } + + if (mb % 20 == 0) { + ms.current_device_id_ = cpuDevice; + PrintTrainingAccuracy(acts[num_layers - 1], label); + } + } + data_file_in.close(); + label_file_in.close(); + } + ms.current_device_id_ = cpuDevice; + + // output weights + cout << "Write weight to files" << endl; + //FileFormat format; + //format.binary = true; + weights.clear(); + bias.clear(); + cout << "Training finished." << endl; + return 0; return 0; } diff --git a/src/app/minerva/minerva_ps.cc b/src/app/minerva/minerva_ps.cc new file mode 100644 index 0000000..a22f011 --- /dev/null +++ b/src/app/minerva/minerva_ps.cc @@ -0,0 +1,37 @@ +#include "minerva_ps.h" + +namespace minerva { +namespace basic { + +// shared_model = nullptr; + +void PushGradAndPullWeight(const float * grad, float * weight, size_t size, + const std::string & layer_name) { + if (!shared_model) { + shared_model = new PS::SharedModel(); + } + + // push + using namespace PS; + int push_time = -1; + if (grad) { + SArray val; val.copyFrom(grad, size); + MessagePtr push_msg(new Message(kServerGroup)); + push_msg->addValue(val); + // LL << val; + push_msg->task.set_key_channel_str(layer_name); + Range(0, size).to(push_msg->task.mutable_key_range()); + push_time = CHECK_NOTNULL(shared_model)->push(push_msg); + } + + // pull + shared_model->setLayer(layer_name, weight, size); + MessagePtr pull_msg(new Message(kServerGroup, -1, push_time)); + pull_msg->task.set_key_channel_str(layer_name); + Range(0, size).to(pull_msg->task.mutable_key_range()); + pull_msg->wait = true; + shared_model->pull(pull_msg); +} + +} +} diff --git a/src/app/minerva/minerva_ps.h b/src/app/minerva/minerva_ps.h new file mode 100644 index 0000000..ef0b948 --- /dev/null +++ b/src/app/minerva/minerva_ps.h @@ -0,0 +1,12 @@ +#pragma once +#include "shared_model.h" +#include + +namespace minerva { +namespace basic { +void PushGradAndPullWeight(const float * grad, float * weights, size_t size, + const std::string & layer_name); + +static PS::SharedModel *shared_model; +} +} diff --git a/src/app/minerva/minerva_server.h b/src/app/minerva/minerva_server.h new file mode 100644 index 0000000..d759a61 --- /dev/null +++ b/src/app/minerva/minerva_server.h @@ -0,0 +1,33 @@ +#include "ps.h" +#include "updater.h" +#include "shared_model.h" +#include "minerva_ps.h" + +namespace minerva { + class MinervaServer : public PS::App { + public: + MinervaServer(Updater * updater) : App() { + updater_ = updater; + shared_model_ = new PS::SharedModel(); + shared_model_->setUpdater(updater_); + } + + virtual void init() { + LOG(ERROR) << "this is server " << myRank(); + } + + virtual void initLayer(const std::string & layerName, float * data, size_t size) + { + shared_model_->setLayer(layerName, data, size); + } + + virtual ~MinervaServer() { + delete updater_; + delete shared_model_; + } + + private: + Updater *updater_; + PS::SharedModel *shared_model_; + }; +} \ No newline at end of file diff --git a/src/app/minerva/shared_model.h b/src/app/minerva/shared_model.h new file mode 100644 index 0000000..cf10018 --- /dev/null +++ b/src/app/minerva/shared_model.h @@ -0,0 +1,126 @@ +#pragma once +#include "parameter/shared_parameter.h" +#include "updater.h" + +namespace PS { + +DECLARE_string(app_name); + +template +class SharedModel : public SharedParameter { + public: + SharedModel(const string& my_name = FLAGS_app_name + "_model", + const string& parent_name = FLAGS_app_name) : + SharedParameter(my_name, parent_name) { } + virtual ~SharedModel() { } + + void setLayer(string name, V* data, size_t size) { + val_[name] = SArray(data, size, false); + } + void setUpdater(minerva::Updater* updater) { + updater_ = updater; + } + + // funcs will be called by the system + MessagePtrList slice(const MessagePtr& msg, const KeyRangeList& krs); + void getValue(const MessagePtr& msg); + void setValue(const MessagePtr& msg); + protected: + std::unordered_map> val_; + // an array is placed into multiple servers only if its length > min_slice_size + size_t min_slice_size_ = 1000; + + minerva::Updater* updater_ = nullptr; +}; + + +template +void SharedModel::setValue(const MessagePtr& msg) { + CHECK_EQ(msg->value.size(), 1); + SArray recv_data(msg->value[0]); + Range kr(msg->task.key_range()); + CHECK_EQ(kr.size(), recv_data.size()); + string key = msg->task.key_channel_str(); + auto& my_val = val_[key]; + + if (isWorker()) { + if (my_val.empty()) my_val.resize(kr.size(), 0); + CHECK_GE(my_val.size(), kr.end()); + my_val.segment(kr).copyFrom(recv_data); + } else if (isServer()) { + // TODO this server can do flexible consistency control here + + if (my_val.empty()) { + // initialize weight + my_val.resize(kr.size(), 0); + CHECK_NOTNULL(updater_)->InitLayer(key, my_val.data(), my_val.size()); + } + + // update weight + CHECK_GE(my_val.size(), kr.size()); + CHECK_NOTNULL(updater_)->Update( + key, my_val.data(), recv_data.data(), recv_data.size()); + } +} + +// only be called at servers, namely a worker pull data from this server +template +void SharedModel::getValue(const MessagePtr& msg) { + auto& my_val = val_[msg->task.key_channel_str()]; + Range kr(msg->task.key_range()); + if (my_val.empty()) { + // initialize weight + my_val.resize(kr.size(), 0); + CHECK_NOTNULL(updater_)->InitLayer(msg->task.key_channel_str(), my_val.data(), my_val.size()); + } + + // TODO store the kr in memory + CHECK_EQ(my_val.size(), kr.size()); + SArray send_data(kr.size()); + send_data.copyFrom(my_val); + msg->addValue(send_data); +} + +// divide a message into n part, where part i goes to server i. it's a zero-copy +// implementation +template +MessagePtrList SharedModel::slice(const MessagePtr& msg, const KeyRangeList& krs) { + // divide the key range + size_t n = krs.size(); + MessagePtrList ret(n); + Range kr(msg->task.key_range()); + for (size_t i = 0; i < n; ++i) { + ret[i] = MessagePtr(new Message()); + ret[i]->miniCopyFrom(*msg); + ret[i]->valid = true; + auto mut_kr = ret[i]->task.mutable_key_range(); + if (kr.size() < min_slice_size_) { + if (i == 0) { + // server 0 get all data + kr.to(mut_kr); + } else { + Range(0,0).to(mut_kr); + // do not sent to server 1 - n + ret[i]->valid = false; + } + } else { + kr.evenDivide(n, i).to(mut_kr); + } + } + + // divide the data + for (size_t i = 0; i < msg->value.size(); ++i) { + SArray data(msg->value[i]); + CHECK_EQ(data.size(), kr.size()); + for (size_t j = 0; j < n; ++j) { + if (ret[j]->valid) { + Range kr(ret[j]->task.key_range()); + ret[j]->addValue(data.segment(kr)); + } + } + } + return ret; +} + + +} // namespace PS diff --git a/src/app/minerva/updater.h b/src/app/minerva/updater.h new file mode 100644 index 0000000..fb623a9 --- /dev/null +++ b/src/app/minerva/updater.h @@ -0,0 +1,27 @@ +#pragma once + +namespace minerva { + +template +class Updater { + public: + Updater() { } + virtual ~Updater() { } + + virtual void InitLayer(const std::string &name, V* weight, size_t size) { + // init by 0, gaussian random, or others + for (int i = 0; i < size; ++i) { + weight[i] = 0; + } + } + + virtual void Update(const std::string &name, V* weight, V* gradient, size_t size) { + // weight -= eta * gradient + V eta = .1; + for (int i = 0; i < size; ++i) { + weight[i] -= eta * gradient[i]; + } + } +}; + +} // namespace minerva diff --git a/src/system/proto/task.pb.h b/src/system/proto/task.pb.h index f3569c3..2c50f07 100644 --- a/src/system/proto/task.pb.h +++ b/src/system/proto/task.pb.h @@ -292,6 +292,18 @@ class Task : public ::google::protobuf::Message { inline bool has_key() const; inline void set_has_key(bool value); + // optional string key_channel_str = 10; + inline bool has_key_channel_str() const; + inline void clear_key_channel_str(); + static const int kKeyChannelStrFieldNumber = 10; + inline const ::std::string& key_channel_str() const; + inline void set_key_channel_str(const ::std::string& value); + inline void set_key_channel_str(const char* value); + inline void set_key_channel_str(const char* value, size_t size); + inline ::std::string* mutable_key_channel_str(); + inline ::std::string* release_key_channel_str(); + inline void set_allocated_key_channel_str(::std::string* key_channel_str); + // optional .PS.DataType key_type = 13; inline bool has_key_type() const; inline void clear_key_type(); @@ -395,6 +407,8 @@ class Task : public ::google::protobuf::Message { inline void clear_has_key_channel(); inline void set_has_has_key(); inline void clear_has_has_key(); + inline void set_has_key_channel_str(); + inline void clear_has_key_channel_str(); inline void set_has_key_type(); inline void clear_has_key_type(); inline void set_has_msg(); @@ -422,6 +436,7 @@ class Task : public ::google::protobuf::Message { bool has_key_; ::google::protobuf::int32 key_channel_; ::PS::PbRange* key_range_; + ::std::string* key_channel_str_; ::google::protobuf::RepeatedField value_type_; ::google::protobuf::RepeatedPtrField< ::PS::FilterConfig > filter_; ::std::string* msg_; @@ -433,7 +448,7 @@ class Task : public ::google::protobuf::Message { int key_type_; mutable int _cached_size_; - ::google::protobuf::uint32 _has_bits_[(17 + 31) / 32]; + ::google::protobuf::uint32 _has_bits_[(18 + 31) / 32]; friend void protobuf_AddDesc_system_2fproto_2ftask_2eproto(); friend void protobuf_AssignDesc_system_2fproto_2ftask_2eproto(); @@ -939,15 +954,85 @@ inline void Task::set_has_key(bool value) { has_key_ = value; } +// optional string key_channel_str = 10; +inline bool Task::has_key_channel_str() const { + return (_has_bits_[0] & 0x00000100u) != 0; +} +inline void Task::set_has_key_channel_str() { + _has_bits_[0] |= 0x00000100u; +} +inline void Task::clear_has_key_channel_str() { + _has_bits_[0] &= ~0x00000100u; +} +inline void Task::clear_key_channel_str() { + if (key_channel_str_ != &::google::protobuf::internal::kEmptyString) { + key_channel_str_->clear(); + } + clear_has_key_channel_str(); +} +inline const ::std::string& Task::key_channel_str() const { + return *key_channel_str_; +} +inline void Task::set_key_channel_str(const ::std::string& value) { + set_has_key_channel_str(); + if (key_channel_str_ == &::google::protobuf::internal::kEmptyString) { + key_channel_str_ = new ::std::string; + } + key_channel_str_->assign(value); +} +inline void Task::set_key_channel_str(const char* value) { + set_has_key_channel_str(); + if (key_channel_str_ == &::google::protobuf::internal::kEmptyString) { + key_channel_str_ = new ::std::string; + } + key_channel_str_->assign(value); +} +inline void Task::set_key_channel_str(const char* value, size_t size) { + set_has_key_channel_str(); + if (key_channel_str_ == &::google::protobuf::internal::kEmptyString) { + key_channel_str_ = new ::std::string; + } + key_channel_str_->assign(reinterpret_cast(value), size); +} +inline ::std::string* Task::mutable_key_channel_str() { + set_has_key_channel_str(); + if (key_channel_str_ == &::google::protobuf::internal::kEmptyString) { + key_channel_str_ = new ::std::string; + } + return key_channel_str_; +} +inline ::std::string* Task::release_key_channel_str() { + clear_has_key_channel_str(); + if (key_channel_str_ == &::google::protobuf::internal::kEmptyString) { + return NULL; + } else { + ::std::string* temp = key_channel_str_; + key_channel_str_ = const_cast< ::std::string*>(&::google::protobuf::internal::kEmptyString); + return temp; + } +} +inline void Task::set_allocated_key_channel_str(::std::string* key_channel_str) { + if (key_channel_str_ != &::google::protobuf::internal::kEmptyString) { + delete key_channel_str_; + } + if (key_channel_str) { + set_has_key_channel_str(); + key_channel_str_ = key_channel_str; + } else { + clear_has_key_channel_str(); + key_channel_str_ = const_cast< ::std::string*>(&::google::protobuf::internal::kEmptyString); + } +} + // optional .PS.DataType key_type = 13; inline bool Task::has_key_type() const { - return (_has_bits_[0] & 0x00000100u) != 0; + return (_has_bits_[0] & 0x00000200u) != 0; } inline void Task::set_has_key_type() { - _has_bits_[0] |= 0x00000100u; + _has_bits_[0] |= 0x00000200u; } inline void Task::clear_has_key_type() { - _has_bits_[0] &= ~0x00000100u; + _has_bits_[0] &= ~0x00000200u; } inline void Task::clear_key_type() { key_type_ = 0; @@ -1016,13 +1101,13 @@ Task::mutable_filter() { // optional bytes msg = 17; inline bool Task::has_msg() const { - return (_has_bits_[0] & 0x00000800u) != 0; + return (_has_bits_[0] & 0x00001000u) != 0; } inline void Task::set_has_msg() { - _has_bits_[0] |= 0x00000800u; + _has_bits_[0] |= 0x00001000u; } inline void Task::clear_has_msg() { - _has_bits_[0] &= ~0x00000800u; + _has_bits_[0] &= ~0x00001000u; } inline void Task::clear_msg() { if (msg_ != &::google::protobuf::internal::kEmptyString) { @@ -1086,13 +1171,13 @@ inline void Task::set_allocated_msg(::std::string* msg) { // optional .PS.ManageNode mng_node = 18; inline bool Task::has_mng_node() const { - return (_has_bits_[0] & 0x00001000u) != 0; + return (_has_bits_[0] & 0x00002000u) != 0; } inline void Task::set_has_mng_node() { - _has_bits_[0] |= 0x00001000u; + _has_bits_[0] |= 0x00002000u; } inline void Task::clear_has_mng_node() { - _has_bits_[0] &= ~0x00001000u; + _has_bits_[0] &= ~0x00002000u; } inline void Task::clear_mng_node() { if (mng_node_ != NULL) mng_node_->::PS::ManageNode::Clear(); @@ -1124,13 +1209,13 @@ inline void Task::set_allocated_mng_node(::PS::ManageNode* mng_node) { // optional .PS.ManageApp mng_app = 19; inline bool Task::has_mng_app() const { - return (_has_bits_[0] & 0x00002000u) != 0; + return (_has_bits_[0] & 0x00004000u) != 0; } inline void Task::set_has_mng_app() { - _has_bits_[0] |= 0x00002000u; + _has_bits_[0] |= 0x00004000u; } inline void Task::clear_has_mng_app() { - _has_bits_[0] &= ~0x00002000u; + _has_bits_[0] &= ~0x00004000u; } inline void Task::clear_mng_app() { if (mng_app_ != NULL) mng_app_->::PS::ManageApp::Clear(); @@ -1162,13 +1247,13 @@ inline void Task::set_allocated_mng_app(::PS::ManageApp* mng_app) { // optional .PS.CallSharedPara shared_para = 20; inline bool Task::has_shared_para() const { - return (_has_bits_[0] & 0x00004000u) != 0; + return (_has_bits_[0] & 0x00008000u) != 0; } inline void Task::set_has_shared_para() { - _has_bits_[0] |= 0x00004000u; + _has_bits_[0] |= 0x00008000u; } inline void Task::clear_has_shared_para() { - _has_bits_[0] &= ~0x00004000u; + _has_bits_[0] &= ~0x00008000u; } inline void Task::clear_shared_para() { if (shared_para_ != NULL) shared_para_->::PS::CallSharedPara::Clear(); @@ -1200,13 +1285,13 @@ inline void Task::set_allocated_shared_para(::PS::CallSharedPara* shared_para) { // optional .PS.SGDCall sgd = 21; inline bool Task::has_sgd() const { - return (_has_bits_[0] & 0x00008000u) != 0; + return (_has_bits_[0] & 0x00010000u) != 0; } inline void Task::set_has_sgd() { - _has_bits_[0] |= 0x00008000u; + _has_bits_[0] |= 0x00010000u; } inline void Task::clear_has_sgd() { - _has_bits_[0] &= ~0x00008000u; + _has_bits_[0] &= ~0x00010000u; } inline void Task::clear_sgd() { if (sgd_ != NULL) sgd_->::PS::SGDCall::Clear(); @@ -1238,13 +1323,13 @@ inline void Task::set_allocated_sgd(::PS::SGDCall* sgd) { // optional .PS.BCDCall bcd = 22; inline bool Task::has_bcd() const { - return (_has_bits_[0] & 0x00010000u) != 0; + return (_has_bits_[0] & 0x00020000u) != 0; } inline void Task::set_has_bcd() { - _has_bits_[0] |= 0x00010000u; + _has_bits_[0] |= 0x00020000u; } inline void Task::clear_has_bcd() { - _has_bits_[0] &= ~0x00010000u; + _has_bits_[0] &= ~0x00020000u; } inline void Task::clear_bcd() { if (bcd_ != NULL) bcd_->::PS::BCDCall::Clear(); diff --git a/src/system/proto/task.proto b/src/system/proto/task.proto index 486b9c3..4785386 100644 --- a/src/system/proto/task.proto +++ b/src/system/proto/task.proto @@ -33,6 +33,7 @@ message Task { optional int32 key_channel = 8; // indiciate whether or not has the key list optional bool has_key = 9 [default = false]; + optional string key_channel_str = 10; optional DataType key_type = 13; repeated DataType value_type = 14; From 0cc67d04d0266400127d05ebaaf4a6ae1160f24d Mon Sep 17 00:00:00 2001 From: chuntao hong Date: Fri, 30 Jan 2015 11:29:57 +0800 Subject: [PATCH 3/8] minerva hello world ok --- Makefile | 10 +- src/app/minerva/hello.cc | 43 ++++++ src/app/minerva/main.cc | 252 +------------------------------ src/app/minerva/minerva_ps.cc | 4 + src/app/minerva/minerva_ps.h | 6 +- src/app/minerva/minerva_server.h | 50 +++--- src/app/minerva/shared_model.h | 5 +- src/app/minerva/updater.h | 21 ++- 8 files changed, 96 insertions(+), 295 deletions(-) create mode 100644 src/app/minerva/hello.cc diff --git a/Makefile b/Makefile index a98e62a..b528876 100644 --- a/Makefile +++ b/Makefile @@ -3,10 +3,6 @@ CC = g++ # OPT = -O0 -ggdb OPT = -O3 -ggdb -MINERVA_INC=/home/hct/minerva/minerva -MINERVA_LIB=/home/hct/minerva/release/libminerva.so - - THIRD_PATH=$(shell pwd)/third_party STATIC_THIRD_LIB=1 ifeq ($(STATIC_THIRD_LIB), 1) @@ -17,9 +13,9 @@ endif # THIRD_LIB+=-ltcmalloc_and_profiler WARN = -Wall -Wno-unused-function -finline-functions -Wno-sign-compare #-Wconversion -INCPATH = -I./src -I$(THIRD_PATH)/include -I$(MINERVA_INC) +INCPATH = -I./src -I$(THIRD_PATH)/include CFLAGS = -std=c++0x $(WARN) $(OPT) $(INCPATH) -LDFLAGS += $(THIRD_LIB) -lpthread -lrt $(MINERVA_LIB) +LDFLAGS += $(THIRD_LIB) -lpthread -lrt PS_LIB = build/libps.a PS_MAIN = build/libpsmain.a @@ -34,7 +30,7 @@ app: build/ps build/hello: build/app/hello_world/main.o $(PS_LIB) $(PS_MAIN) $(CC) $(CFLAGS) $^ $(LDFLAGS) -o $@ -build/minerva: build/app/minerva/main.o build/app/minerva/minerva_ps.o $(PS_LIB) $(PS_MAIN) +build/minerva: build/app/minerva/main.o build/app/minerva/minerva_ps.o build/app/minerva/hello.o $(PS_LIB) $(PS_MAIN) $(CC) $(CFLAGS) $^ $(LDFLAGS) -o $@ sys_srcs = $(wildcard src/*/*.cc) diff --git a/src/app/minerva/hello.cc b/src/app/minerva/hello.cc new file mode 100644 index 0000000..8efc2c1 --- /dev/null +++ b/src/app/minerva/hello.cc @@ -0,0 +1,43 @@ +#include +#include "ps.h" +#include "minerva_ps.h" + +template +inline std::string arrstr(const V* data, int n) { + std::stringstream ss; + ss << "[" << n << "]: "; + for (int i = 0; i < n; ++i) ss << data[i] << " "; + return ss.str(); +} + +void InitLayer(const std::string & name, float * data, size_t size) { + for(size_t i = 0; i < size; i++) + data[i] = 0; +} + +void UpdateLayer(const std::string & name, float * weight, float * grad, size_t size) { + float eta = .1; + for(size_t i = 0; i < size; i++) + weight[i] -= eta * grad[i]; +} + +int MinervaWorkerMain(int rank, int size, int argc, char ** argv) +{ + using minerva::basic::PushGradAndPullWeight; + const int n = 10; + float grad[10]; + float weight[10]; + + PushGradAndPullWeight(nullptr, weight, n, "layer0"); + LOG(ERROR) << "worker " << PS::MyRank() << "/" << PS::RankSize() + << " init weight " << arrstr(weight, n); + + for (int j = 1; j < 4; ++j) { + for (int i = 0; i < n; ++i) grad[i] = j; + PushGradAndPullWeight(grad, weight, n, "layer0"); + LOG(ERROR) << "worker " << PS::MyRank() << "/" << PS::RankSize() + << " pull weight " << arrstr(weight, n); + } + + return 0; +} diff --git a/src/app/minerva/main.cc b/src/app/minerva/main.cc index 7b339b4..7c613cb 100755 --- a/src/app/minerva/main.cc +++ b/src/app/minerva/main.cc @@ -3,257 +3,11 @@ #include "shared_model.h" #include "minerva_ps.h" #include "minerva_server.h" -using namespace minerva; - -template -inline std::string arrstr(const V* data, int n) { - std::stringstream ss; - ss << "[" << n << "]: "; - for (int i = 0; i < n; ++i) ss << data[i] << " "; - return ss.str(); -} - -bool StartsWith(const std::string & str, const std::string & pattern) -{ - return str.size() >= pattern.size() && str.substr(0, pattern.size()) == pattern; -} - -//=================================================== - -#include -#include - -using namespace std; -using namespace minerva; - -const float epsW = 0.01, epsB = 0.01; -const int numepochs = 100; -const int mb_size = 256; -const int num_mb_per_epoch = 235; - -const string weight_init_files[] = { "w12.dat", "w23.dat" }; -const string weight_out_files[] = { "w12.dat", "w23.dat" }; -const string bias_out_files[] = { "b2_trained.dat", "b3_trained.dat" }; -const string train_data_file = "/home/hct/mnist/traindata.dat"; -const string train_label_file = "/home/hct/mnist/trainlabel.dat"; -const string test_data_file = "/home/hct/mnist/testdata.dat"; -const string test_label_file = "/home/hct/mnist/testlabel.dat"; - -const int num_layers = 3; -const int lsize[num_layers] = { 784, 256, 10 }; -vector weights; -vector bias; - -string GetWeightName(int i) { - return string("weights_") + to_string(i); -} - -string GetBiasName(int i) { - return string("bias_") + to_string(i); -} - -bool IsWeightName(const std::string & name) -{ - return StartsWith(name, "weights_"); -} - -bool IsBiasName(const std::string & name) -{ - return StartsWith(name, "bias_"); -} - -void GenerateInitWeight() { - for (int i = 0; i < num_layers - 1; ++i) - { - weights[i] = NArray::Randn({ lsize[i + 1], lsize[i] }, 0.0, sqrt(4.0 / (lsize[0] + lsize[1]))); - bias[i] = NArray::Constant({ lsize[i + 1], 1 }, 1.0); - } - FileFormat format; - format.binary = true; - for (int i = 0; i < num_layers - 1; ++i) - weights[i].ToFile(weight_init_files[i], format); -} - -namespace minerva { - - template - class MnistUpdater : public Updater { - public: - virtual void InitLayer(const std::string &name, V* weight, size_t size) { - // init by 0, gaussian random, or others - for (int i = 0; i < size; ++i) { - weight[i] = 0; - } - } - - virtual void Update(const std::string &name, V* weight, V* gradient, size_t size) { - // weight -= eta * gradient - V eta = .1; - if (IsWeightName(name)) { - eta = epsW; - } - else { - eta = epsB; - } - - for (int i = 0; i < size; ++i) { - weight[i] -= eta * gradient[i]; - } - } - }; - -} PS::App* PS::CreateServerNode(const std::string& conf) { - Updater * updater = new minerva::MnistUpdater(); - MinervaServer * server = new minerva::MinervaServer(updater); - - MinervaSystem& ms = MinervaSystem::Instance(); - char * dummy = "minerva"; - vector dummyArgs = { dummy }; - int dummyArgc = 1; - char ** dummyArgv = &dummyArgs[0]; - ms.Initialize(&dummyArgc, &dummyArgv); - uint64_t cpuDevice = ms.CreateCpuDevice(); - ms.current_device_id_ = cpuDevice; - weights.resize(num_layers - 1); - bias.resize(num_layers - 1); - GenerateInitWeight(); - - for (int i = 0; i < num_layers - 1; ++i) - { - server->initLayer(GetWeightName(i), weights[i].Get().get(), weights[i].Size().Prod()); - server->initLayer(GetBiasName(i), bias[i].Get().get(), bias[i].Size().Prod()); - } - - return server; -} - -NArray Softmax(NArray m) { - NArray maxval = m.Max(0); - // NArray centered = m - maxval.Tile({m.Size(0), 1}); - NArray centered = m.NormArithmetic(maxval, ArithmeticType::kSub); - NArray class_normalizer = Elewise::Ln(Elewise::Exp(centered).Sum(0)) + maxval; - // return Elewise::Exp(m - class_normalizer.Tile({m.Size(0), 1})); - return Elewise::Exp(m.NormArithmetic(class_normalizer, ArithmeticType::kSub)); -} - -void PrintTrainingAccuracy(NArray o, NArray t) { - //get predict - NArray predict = o.MaxIndex(0); - //get groundtruth - NArray groundtruth = t.MaxIndex(0); - - float correct = (predict - groundtruth).CountZero(); - cout << "Training Error: " << (mb_size - correct) / mb_size << endl; + return new minerva::MinervaServer(); } int WorkerNodeMain(int argc, char *argv[]) { - MinervaSystem& ms = MinervaSystem::Instance(); - ms.Initialize(&argc, &argv); - uint64_t cpuDevice = ms.CreateCpuDevice(); -#ifdef HAS_CUDA - uint64_t gpuDevice = ms.CreateGpuDevice(0); -#endif - ms.current_device_id_ = cpuDevice; - - weights.resize(num_layers - 1); - bias.resize(num_layers - 1); - GenerateInitWeight(); - for (int i = 0; i < num_layers - 1; ++i) - { - weights[i].Pull(GetWeightName(i)); - bias[i].Pull(GetBiasName(i)); - } - /* if(FLAGS_init) { - cout << "Generate initial weights" << endl; - GenerateInitWeight(); - cout << "Finished!" << endl; - return 0; - } else { - cout << "Init weights and bias" << endl; - InitWeight(); - } - */ - - //for (int i = 0; i < num_layers - 1; ++i) - //{ - // weights[i] = NArray::PushGradAndPullWeight() - // bias[i] = NArray::Constant({ lsize[i + 1], 1 }, 1.0); - //} - - cout << "Training procedure:" << endl; - NArray acts[num_layers], sens[num_layers]; - for (int epoch = 0; epoch < numepochs; ++epoch) { - cout << " Epoch #" << epoch << endl; - ifstream data_file_in(train_data_file.c_str()); - ifstream label_file_in(train_label_file.c_str()); - for (int mb = 0; mb < num_mb_per_epoch; ++mb) { - - ms.current_device_id_ = cpuDevice; - - Scale data_size{ lsize[0], mb_size }; - Scale label_size{ lsize[num_layers - 1], mb_size }; - shared_ptr data_ptr(new float[data_size.Prod()]); - shared_ptr label_ptr(new float[label_size.Prod()]); - data_file_in.read(reinterpret_cast(data_ptr.get()), data_size.Prod() * sizeof(float)); - label_file_in.read(reinterpret_cast(label_ptr.get()), label_size.Prod() * sizeof(float)); - - acts[0] = NArray::MakeNArray(data_size, data_ptr); - NArray label = NArray::MakeNArray(label_size, label_ptr); - -#ifdef HAS_CUDA - ms.current_device_id_ = gpuDevice; -#endif - - // ff - for (int k = 1; k < num_layers - 1; ++k) { - NArray wacts = weights[k - 1] * acts[k - 1]; - NArray wactsnorm = wacts.NormArithmetic(bias[k - 1], ArithmeticType::kAdd); - acts[k] = Elewise::SigmoidForward(wactsnorm); - } - // softmax - acts[num_layers - 1] = Softmax((weights[num_layers - 2] * acts[num_layers - 2]).NormArithmetic(bias[num_layers - 2], ArithmeticType::kAdd)); - // bp - sens[num_layers - 1] = acts[num_layers - 1] - label; - for (int k = num_layers - 2; k >= 1; --k) { - NArray d_act = Elewise::Mult(acts[k], 1 - acts[k]); - sens[k] = weights[k].Trans() * sens[k + 1]; - sens[k] = Elewise::Mult(sens[k], d_act); - } - - /* - // Update bias - for (int k = 0; k < num_layers - 1; ++k) { // no input layer - bias[k] -= epsB * sens[k + 1].Sum(1) / mb_size; - } - // Update weight - for (int k = 0; k < num_layers - 1; ++k) { - weights[k] -= epsW * sens[k + 1] * acts[k].Trans() / mb_size; - } - */ - for (int k = 0; k < num_layers - 1; ++k) { - bias[k] = NArray::PushGradAndPullWeight(sens[k + 1].Sum(1) / mb_size, GetBiasName(k)); - weights[k] = NArray::PushGradAndPullWeight(sens[k + 1] * acts[k].Trans() / mb_size, GetWeightName(k)); - } - - if (mb % 20 == 0) { - ms.current_device_id_ = cpuDevice; - PrintTrainingAccuracy(acts[num_layers - 1], label); - } - } - data_file_in.close(); - label_file_in.close(); - } - ms.current_device_id_ = cpuDevice; - - // output weights - cout << "Write weight to files" << endl; - //FileFormat format; - //format.binary = true; - weights.clear(); - bias.clear(); - cout << "Training finished." << endl; - return 0; - return 0; -} + return ::MinervaWorkerMain(PS::MyRank(), PS::RankSize(), argc, argv); +} \ No newline at end of file diff --git a/src/app/minerva/minerva_ps.cc b/src/app/minerva/minerva_ps.cc index a22f011..6283d82 100644 --- a/src/app/minerva/minerva_ps.cc +++ b/src/app/minerva/minerva_ps.cc @@ -1,4 +1,8 @@ #include "minerva_ps.h" +#include "shared_model.h" + + +static PS::SharedModel *shared_model = nullptr; namespace minerva { namespace basic { diff --git a/src/app/minerva/minerva_ps.h b/src/app/minerva/minerva_ps.h index ef0b948..9aa71c3 100644 --- a/src/app/minerva/minerva_ps.h +++ b/src/app/minerva/minerva_ps.h @@ -1,12 +1,14 @@ #pragma once -#include "shared_model.h" #include +void InitLayer(const std::string & name, float * data, size_t size); +void UpdateLayer(const std::string & name, float * weight, float * grad, size_t size); +int MinervaWorkerMain(int rank, int size, int argc, char ** argv); + namespace minerva { namespace basic { void PushGradAndPullWeight(const float * grad, float * weights, size_t size, const std::string & layer_name); -static PS::SharedModel *shared_model; } } diff --git a/src/app/minerva/minerva_server.h b/src/app/minerva/minerva_server.h index d759a61..1168f4e 100644 --- a/src/app/minerva/minerva_server.h +++ b/src/app/minerva/minerva_server.h @@ -3,31 +3,35 @@ #include "shared_model.h" #include "minerva_ps.h" -namespace minerva { - class MinervaServer : public PS::App { - public: - MinervaServer(Updater * updater) : App() { - updater_ = updater; - shared_model_ = new PS::SharedModel(); - shared_model_->setUpdater(updater_); - } +namespace PS{ + namespace minerva { - virtual void init() { - LOG(ERROR) << "this is server " << myRank(); - } +class MinervaServer : public PS::App { +public: + MinervaServer() : App() { + updater_ = new Updater(); + shared_model_ = new PS::SharedModel(); + shared_model_->setUpdater(updater_); + } - virtual void initLayer(const std::string & layerName, float * data, size_t size) - { - shared_model_->setLayer(layerName, data, size); - } + virtual void init() { + LOG(ERROR) << "this is server " << myRank(); + } - virtual ~MinervaServer() { - delete updater_; - delete shared_model_; - } + virtual void initLayer(const std::string & layerName, float * data, size_t size) + { + shared_model_->setLayer(layerName, data, size); + } - private: - Updater *updater_; - PS::SharedModel *shared_model_; - }; + virtual ~MinervaServer() { + delete updater_; + delete shared_model_; + } + +private: + Updater *updater_; + PS::SharedModel *shared_model_; +}; + + } } \ No newline at end of file diff --git a/src/app/minerva/shared_model.h b/src/app/minerva/shared_model.h index cf10018..3c454e8 100644 --- a/src/app/minerva/shared_model.h +++ b/src/app/minerva/shared_model.h @@ -8,6 +8,7 @@ DECLARE_string(app_name); template class SharedModel : public SharedParameter { + typedef typename minerva::Updater UpdaterT; public: SharedModel(const string& my_name = FLAGS_app_name + "_model", const string& parent_name = FLAGS_app_name) : @@ -17,7 +18,7 @@ class SharedModel : public SharedParameter { void setLayer(string name, V* data, size_t size) { val_[name] = SArray(data, size, false); } - void setUpdater(minerva::Updater* updater) { + void setUpdater(UpdaterT * updater) { updater_ = updater; } @@ -30,7 +31,7 @@ class SharedModel : public SharedParameter { // an array is placed into multiple servers only if its length > min_slice_size size_t min_slice_size_ = 1000; - minerva::Updater* updater_ = nullptr; + UpdaterT * updater_ = nullptr; }; diff --git a/src/app/minerva/updater.h b/src/app/minerva/updater.h index fb623a9..7b4ab47 100644 --- a/src/app/minerva/updater.h +++ b/src/app/minerva/updater.h @@ -1,27 +1,24 @@ #pragma once -namespace minerva { +#include "minerva_ps.h" + +namespace PS { + namespace minerva{ template class Updater { - public: +public: Updater() { } virtual ~Updater() { } virtual void InitLayer(const std::string &name, V* weight, size_t size) { - // init by 0, gaussian random, or others - for (int i = 0; i < size; ++i) { - weight[i] = 0; - } + ::InitLayer(name, weight, size); } virtual void Update(const std::string &name, V* weight, V* gradient, size_t size) { - // weight -= eta * gradient - V eta = .1; - for (int i = 0; i < size; ++i) { - weight[i] -= eta * gradient[i]; - } + ::UpdateLayer(name, weight, gradient, size); } }; -} // namespace minerva + } // namespace minerva +} // namespace PS From a23536006cf4380c7467edcafd86392c2f470b29 Mon Sep 17 00:00:00 2001 From: Chuntao Hong Date: Fri, 30 Jan 2015 16:20:39 +0800 Subject: [PATCH 4/8] single worker works --- Makefile | 18 ++++++++++++++---- src/app/minerva/minerva_ps.cc | 11 +++++++---- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/Makefile b/Makefile index b528876..68e8385 100644 --- a/Makefile +++ b/Makefile @@ -14,23 +14,24 @@ endif WARN = -Wall -Wno-unused-function -finline-functions -Wno-sign-compare #-Wconversion INCPATH = -I./src -I$(THIRD_PATH)/include -CFLAGS = -std=c++0x $(WARN) $(OPT) $(INCPATH) -LDFLAGS += $(THIRD_LIB) -lpthread -lrt +CFLAGS = -std=c++0x $(WARN) $(OPT) $(INCPATH) +LDFLAGS += $(THIRD_LIB) -lpthread -lrt PS_LIB = build/libps.a PS_MAIN = build/libpsmain.a -all: ps app build/hello build/minerva +all: ps app build/hello build/minerva_test clean: rm -rf build ps: $(PS_LIB) $(PS_MAIN) app: build/ps +minerva: build/libminervaps.a build/hello: build/app/hello_world/main.o $(PS_LIB) $(PS_MAIN) $(CC) $(CFLAGS) $^ $(LDFLAGS) -o $@ -build/minerva: build/app/minerva/main.o build/app/minerva/minerva_ps.o build/app/minerva/hello.o $(PS_LIB) $(PS_MAIN) +build/minerva_test: build/app/minerva/hello.o build/libminervaps.a $(CC) $(CFLAGS) $^ $(LDFLAGS) -o $@ sys_srcs = $(wildcard src/*/*.cc) @@ -43,6 +44,15 @@ build/libps.a: $(sys_objs) build/libpsmain.a: build/ps_main.o ar crv $@ $? +build/libminervaps.a: build/ps_main.o build/app/minerva/main.o build/app/minerva/minerva_ps.o $(sys_objs) + @mkdir -p build/tmp/gflags; cd build/tmp/gflags; ar -x $(THIRD_PATH)/lib/libgflags.a; cd - + @mkdir -p build/tmp/zmq; cd build/tmp/zmq; ar -x $(THIRD_PATH)/lib/libzmq.a; cd - + @mkdir -p build/tmp/protobuf; cd build/tmp/protobuf; ar -x $(THIRD_PATH)/lib/libprotobuf.a; cd - + @mkdir -p build/tmp/glog; cd build/tmp/glog; ar -x $(THIRD_PATH)/lib/libglog.a; cd - + @mkdir -p build/tmp/z; cd build/tmp/z; ar -x $(THIRD_PATH)/lib/libz.a; cd - + @mkdir -p build/tmp/snappy; cd build/tmp/snappy; ar -x $(THIRD_PATH)/lib/libsnappy.a; cd - + ar -qcv $@ $? build/tmp/gflags/*.o build/tmp/zmq/*.o build/tmp/protobuf/*.o build/tmp/glog/*.o build/tmp/z/*.o build/tmp/snappy/*.o + app_objs = $(addprefix build/app/, main/proto/app.pb.o linear_method/linear.o linear_method/proto/linear.pb.o) build/ps: build/app/main/main.o $(app_objs) $(PS_LIB) diff --git a/src/app/minerva/minerva_ps.cc b/src/app/minerva/minerva_ps.cc index 6283d82..909a718 100644 --- a/src/app/minerva/minerva_ps.cc +++ b/src/app/minerva/minerva_ps.cc @@ -1,9 +1,7 @@ +#include #include "minerva_ps.h" #include "shared_model.h" - -static PS::SharedModel *shared_model = nullptr; - namespace minerva { namespace basic { @@ -11,8 +9,13 @@ namespace basic { void PushGradAndPullWeight(const float * grad, float * weight, size_t size, const std::string & layer_name) { + static PS::SharedModel *shared_model = nullptr; + static std::mutex mu; + if (!shared_model) { - shared_model = new PS::SharedModel(); + std::lock_guard lg(mu); + if (!shared_model) + shared_model = new PS::SharedModel(); } // push From bb60e87f31d9221544300fb29b0f1e286ae7fb18 Mon Sep 17 00:00:00 2001 From: chuntao hong Date: Fri, 30 Jan 2015 17:57:24 +0800 Subject: [PATCH 5/8] cp --- Makefile | 2 +- src/app/minerva/hello.cc | 2 +- src/app/minerva/minerva_ps.cc | 2 -- src/app/minerva/minerva_ps.h | 3 --- 4 files changed, 2 insertions(+), 7 deletions(-) diff --git a/Makefile b/Makefile index 68e8385..f43e726 100644 --- a/Makefile +++ b/Makefile @@ -51,7 +51,7 @@ build/libminervaps.a: build/ps_main.o build/app/minerva/main.o build/app/minerva @mkdir -p build/tmp/glog; cd build/tmp/glog; ar -x $(THIRD_PATH)/lib/libglog.a; cd - @mkdir -p build/tmp/z; cd build/tmp/z; ar -x $(THIRD_PATH)/lib/libz.a; cd - @mkdir -p build/tmp/snappy; cd build/tmp/snappy; ar -x $(THIRD_PATH)/lib/libsnappy.a; cd - - ar -qcv $@ $? build/tmp/gflags/*.o build/tmp/zmq/*.o build/tmp/protobuf/*.o build/tmp/glog/*.o build/tmp/z/*.o build/tmp/snappy/*.o + ar -qc $@ $? build/tmp/gflags/*.o build/tmp/zmq/*.o build/tmp/protobuf/*.o build/tmp/glog/*.o build/tmp/z/*.o build/tmp/snappy/*.o app_objs = $(addprefix build/app/, main/proto/app.pb.o linear_method/linear.o linear_method/proto/linear.pb.o) diff --git a/src/app/minerva/hello.cc b/src/app/minerva/hello.cc index 8efc2c1..adf7393 100644 --- a/src/app/minerva/hello.cc +++ b/src/app/minerva/hello.cc @@ -23,7 +23,7 @@ void UpdateLayer(const std::string & name, float * weight, float * grad, size_t int MinervaWorkerMain(int rank, int size, int argc, char ** argv) { - using minerva::basic::PushGradAndPullWeight; + using minerva::PushGradAndPullWeight; const int n = 10; float grad[10]; float weight[10]; diff --git a/src/app/minerva/minerva_ps.cc b/src/app/minerva/minerva_ps.cc index 909a718..e9f4c6e 100644 --- a/src/app/minerva/minerva_ps.cc +++ b/src/app/minerva/minerva_ps.cc @@ -3,7 +3,6 @@ #include "shared_model.h" namespace minerva { -namespace basic { // shared_model = nullptr; @@ -41,4 +40,3 @@ void PushGradAndPullWeight(const float * grad, float * weight, size_t size, } } -} diff --git a/src/app/minerva/minerva_ps.h b/src/app/minerva/minerva_ps.h index 9aa71c3..d9c3b41 100644 --- a/src/app/minerva/minerva_ps.h +++ b/src/app/minerva/minerva_ps.h @@ -6,9 +6,6 @@ void UpdateLayer(const std::string & name, float * weight, float * grad, size_t int MinervaWorkerMain(int rank, int size, int argc, char ** argv); namespace minerva { -namespace basic { void PushGradAndPullWeight(const float * grad, float * weights, size_t size, const std::string & layer_name); - -} } From 42f072fe8140829ed7351b1fb49355900f0bc0b4 Mon Sep 17 00:00:00 2001 From: chuntao hong Date: Thu, 2 Apr 2015 11:02:47 +0800 Subject: [PATCH 6/8] build static library --- Makefile | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/Makefile b/Makefile index c1dffec..94dd72a 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,6 @@ CC = g++ -# OPT = -O0 -ggdb -OPT = -O3 -ggdb -fPIC + OPT = -O0 -ggdb THIRD_PATH=$(shell pwd)/third_party STATIC_THIRD_LIB=0 @@ -26,13 +25,13 @@ clean: ps: $(PS_LIB) $(PS_MAIN) app: build/ps -#minerva: build/libminervaps.a -minerva: build/libminervaps.so +minerva: build/libminervaps.a +#minerva: build/libminervaps.so build/hello: build/app/hello_world/main.o $(PS_LIB) $(PS_MAIN) $(CC) $(CFLAGS) $^ $(LDFLAGS) -o $@ -build/minerva_test: build/app/minerva/hello.o build/libminervaps.a +build/minerva_test: build/app/minerva/hello.o build/libminervaps.so $(CC) $(CFLAGS) $^ $(LDFLAGS) -o $@ sys_srcs = $(wildcard src/util/*.cc) $(wildcard src/data/*.cc) \ @@ -47,17 +46,17 @@ build/libps.a: $(sys_objs) build/libpsmain.a: build/ps_main.o ar crv $@ $? -#build/libminervaps.a: build/ps_main.o build/app/minerva/main.o build/app/minerva/minerva_ps.o $(sys_objs) -# @mkdir -p build/tmp/gflags; cd build/tmp/gflags; ar -x $(THIRD_PATH)/lib/libgflags.a; cd - -# @mkdir -p build/tmp/zmq; cd build/tmp/zmq; ar -x $(THIRD_PATH)/lib/libzmq.a; cd - -# @mkdir -p build/tmp/protobuf; cd build/tmp/protobuf; ar -x $(THIRD_PATH)/lib/libprotobuf.a; cd - -# @mkdir -p build/tmp/glog; cd build/tmp/glog; ar -x $(THIRD_PATH)/lib/libglog.a; cd - -# @mkdir -p build/tmp/z; cd build/tmp/z; ar -x $(THIRD_PATH)/lib/libz.a; cd - -# @mkdir -p build/tmp/snappy; cd build/tmp/snappy; ar -x $(THIRD_PATH)/lib/libsnappy.a; cd - -# ar -qc $@ $? build/tmp/gflags/*.o build/tmp/zmq/*.o build/tmp/protobuf/*.o build/tmp/glog/*.o build/tmp/z/*.o build/tmp/snappy/*.o - -build/libminervaps.so: build/ps_main.o build/app/minerva/main.o build/app/minerva/minerva_ps.o $(sys_objs) - $(CC) $(CFLAGS) $^ $(LDFLAGS) -shared -o $@ +build/libminervaps.a: build/ps_main.o build/app/minerva/main.o build/app/minerva/minerva_ps.o $(sys_objs) + @mkdir -p build/tmp/gflags; cd build/tmp/gflags; ar -x $(THIRD_PATH)/lib/libgflags.a; cd - + @mkdir -p build/tmp/zmq; cd build/tmp/zmq; ar -x $(THIRD_PATH)/lib/libzmq.a; cd - + @mkdir -p build/tmp/protobuf; cd build/tmp/protobuf; ar -x $(THIRD_PATH)/lib/libprotobuf.a; cd - + @mkdir -p build/tmp/glog; cd build/tmp/glog; ar -x $(THIRD_PATH)/lib/libglog.a; cd - + @mkdir -p build/tmp/z; cd build/tmp/z; ar -x $(THIRD_PATH)/lib/libz.a; cd - + @mkdir -p build/tmp/snappy; cd build/tmp/snappy; ar -x $(THIRD_PATH)/lib/libsnappy.a; cd - + ar -qc $@ $? build/tmp/gflags/*.o build/tmp/zmq/*.o build/tmp/protobuf/*.o build/tmp/glog/*.o build/tmp/z/*.o build/tmp/snappy/*.o + +#build/libminervaps.so: build/ps_main.o build/app/minerva/main.o build/app/minerva/minerva_ps.o $(sys_objs) +# $(CC) $(CFLAGS) $^ $(LDFLAGS) -shared -o $@ app_objs = $(addprefix build/app/, main/proto/app.pb.o linear_method/linear.o linear_method/proto/linear.pb.o) From e6464f1ac33cb054f2bac76ece4336e92fabca3f Mon Sep 17 00:00:00 2001 From: chuntao hong Date: Wed, 3 Jun 2015 14:42:13 +0800 Subject: [PATCH 7/8] use O2 by default --- Makefile | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Makefile b/Makefile index 94dd72a..f01b21c 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ CC = g++ - OPT = -O0 -ggdb +OPT = -O2 -ggdb THIRD_PATH=$(shell pwd)/third_party STATIC_THIRD_LIB=0 @@ -55,8 +55,8 @@ build/libminervaps.a: build/ps_main.o build/app/minerva/main.o build/app/minerva @mkdir -p build/tmp/snappy; cd build/tmp/snappy; ar -x $(THIRD_PATH)/lib/libsnappy.a; cd - ar -qc $@ $? build/tmp/gflags/*.o build/tmp/zmq/*.o build/tmp/protobuf/*.o build/tmp/glog/*.o build/tmp/z/*.o build/tmp/snappy/*.o -#build/libminervaps.so: build/ps_main.o build/app/minerva/main.o build/app/minerva/minerva_ps.o $(sys_objs) -# $(CC) $(CFLAGS) $^ $(LDFLAGS) -shared -o $@ +build/libminervaps.so: build/ps_main.o build/app/minerva/main.o build/app/minerva/minerva_ps.o $(sys_objs) + $(CC) $(CFLAGS) $^ $(LDFLAGS) -shared -o $@ app_objs = $(addprefix build/app/, main/proto/app.pb.o linear_method/linear.o linear_method/proto/linear.pb.o) From 31f3b4c1d23baf8f876d62cbdef05e471e2ee918 Mon Sep 17 00:00:00 2001 From: hjk41 Date: Wed, 10 Jun 2015 04:35:38 +0000 Subject: [PATCH 8/8] use my third-party git --- script/install_third.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/script/install_third.sh b/script/install_third.sh index e2858ad..4abf6c7 100755 --- a/script/install_third.sh +++ b/script/install_third.sh @@ -1,6 +1,6 @@ #!/bin/bash dir=`dirname "$0"` cd $dir/.. -git clone https://github.com/mli/third_party +git clone https://github.com/hjk41/third_party cd third_party ./install.sh