Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add minerva support #12

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
CC = g++

# OPT = -O0 -ggdb
OPT = -O3 -ggdb
OPT = -O2 -ggdb

THIRD_PATH=$(shell pwd)/third_party
STATIC_THIRD_LIB=0
Expand All @@ -14,24 +13,30 @@ endif

WARN = -Wall -Wno-unused-function -finline-functions -Wno-sign-compare #-Wconversion
INCPATH = -I./src -I$(THIRD_PATH)/include
CFLAGS = -std=c++0x $(WARN) $(OPT) $(INCPATH)
LDFLAGS += $(THIRD_LIB) -lpthread -lrt
CFLAGS = -std=c++0x $(WARN) $(OPT) $(INCPATH)
LDFLAGS += $(THIRD_LIB) -lpthread -lrt

PS_LIB = build/libps.a
PS_MAIN = build/libpsmain.a

all: ps app
all: ps app build/hello build/minerva_test
clean:
rm -rf build

ps: $(PS_LIB) $(PS_MAIN)
app: build/ps
minerva: build/libminervaps.a
#minerva: build/libminervaps.so

build/hello: build/app/hello_world/main.o $(PS_LIB) $(PS_MAIN)
$(CC) $(CFLAGS) $^ $(LDFLAGS) -o $@

build/minerva_test: build/app/minerva/hello.o build/libminervaps.so
$(CC) $(CFLAGS) $^ $(LDFLAGS) -o $@

sys_srcs = $(wildcard src/util/*.cc) $(wildcard src/data/*.cc) \
$(wildcard src/system/*.cc) $(wildcard src/filter/*.cc)

sys_protos = $(wildcard src/*/proto/*.proto)
sys_objs = $(patsubst src/%.proto, build/%.pb.o, $(sys_protos)) \
$(patsubst src/%.cc, build/%.o, $(sys_srcs))
Expand All @@ -41,6 +46,18 @@ build/libps.a: $(sys_objs)
build/libpsmain.a: build/ps_main.o
ar crv $@ $?

build/libminervaps.a: build/ps_main.o build/app/minerva/main.o build/app/minerva/minerva_ps.o $(sys_objs)
@mkdir -p build/tmp/gflags; cd build/tmp/gflags; ar -x $(THIRD_PATH)/lib/libgflags.a; cd -
@mkdir -p build/tmp/zmq; cd build/tmp/zmq; ar -x $(THIRD_PATH)/lib/libzmq.a; cd -
@mkdir -p build/tmp/protobuf; cd build/tmp/protobuf; ar -x $(THIRD_PATH)/lib/libprotobuf.a; cd -
@mkdir -p build/tmp/glog; cd build/tmp/glog; ar -x $(THIRD_PATH)/lib/libglog.a; cd -
@mkdir -p build/tmp/z; cd build/tmp/z; ar -x $(THIRD_PATH)/lib/libz.a; cd -
@mkdir -p build/tmp/snappy; cd build/tmp/snappy; ar -x $(THIRD_PATH)/lib/libsnappy.a; cd -
ar -qc $@ $? build/tmp/gflags/*.o build/tmp/zmq/*.o build/tmp/protobuf/*.o build/tmp/glog/*.o build/tmp/z/*.o build/tmp/snappy/*.o

build/libminervaps.so: build/ps_main.o build/app/minerva/main.o build/app/minerva/minerva_ps.o $(sys_objs)
$(CC) $(CFLAGS) $^ $(LDFLAGS) -shared -o $@

app_objs = $(addprefix build/app/, main/proto/app.pb.o linear_method/linear.o linear_method/proto/linear.pb.o)

build/ps: build/app/main/main.o $(app_objs) $(PS_LIB)
Expand Down
2 changes: 1 addition & 1 deletion script/install_third.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash
dir=`dirname "$0"`
cd $dir/..
git clone https://github.com/mli/third_party
git clone https://github.com/hjk41/third_party
cd third_party
./install.sh
43 changes: 43 additions & 0 deletions src/app/minerva/hello.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#include <glog/logging.h>
#include "ps.h"
#include "minerva_ps.h"

template <typename V>
inline std::string arrstr(const V* data, int n) {
std::stringstream ss;
ss << "[" << n << "]: ";
for (int i = 0; i < n; ++i) ss << data[i] << " ";
return ss.str();
}

void InitLayer(const std::string & name, float * data, size_t size) {
for(size_t i = 0; i < size; i++)
data[i] = 0;
}

void UpdateLayer(const std::string & name, float * weight, float * grad, size_t size) {
float eta = .1;
for(size_t i = 0; i < size; i++)
weight[i] -= eta * grad[i];
}

int MinervaWorkerMain(int rank, int size, int argc, char ** argv)
{
using minerva::PushGradAndPullWeight;
const int n = 10;
float grad[10];
float weight[10];

PushGradAndPullWeight(nullptr, weight, n, "layer0");
LOG(ERROR) << "worker " << PS::MyRank() << "/" << PS::RankSize()
<< " init weight " << arrstr(weight, n);

for (int j = 1; j < 4; ++j) {
for (int i = 0; i < n; ++i) grad[i] = j;
PushGradAndPullWeight(grad, weight, n, "layer0");
LOG(ERROR) << "worker " << PS::MyRank() << "/" << PS::RankSize()
<< " pull weight " << arrstr(weight, n);
}

return 0;
}
13 changes: 13 additions & 0 deletions src/app/minerva/main.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#include "ps.h"
#include "updater.h"
#include "shared_model.h"
#include "minerva_ps.h"
#include "minerva_server.h"

PS::App* PS::CreateServerNode(const std::string& conf) {
return new minerva::MinervaServer();
}

int WorkerNodeMain(int argc, char *argv[]) {
return ::MinervaWorkerMain(PS::MyRank(), PS::RankSize(), argc, argv);
}
42 changes: 42 additions & 0 deletions src/app/minerva/minerva_ps.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#include <mutex>
#include "minerva_ps.h"
#include "shared_model.h"

namespace minerva {

// shared_model = nullptr;

void PushGradAndPullWeight(const float * grad, float * weight, size_t size,
const std::string & layer_name) {
static PS::SharedModel<float> *shared_model = nullptr;
static std::mutex mu;

if (!shared_model) {
std::lock_guard<std::mutex> lg(mu);
if (!shared_model)
shared_model = new PS::SharedModel<float>();
}

// push
using namespace PS;
int push_time = -1;
if (grad) {
SArray<float> val; val.copyFrom(grad, size);
MessagePtr push_msg(new Message(kServerGroup));
push_msg->addValue(val);
// LL << val;
push_msg->task.set_key_channel_str(layer_name);
Range<Key>(0, size).to(push_msg->task.mutable_key_range());
push_time = CHECK_NOTNULL(shared_model)->push(push_msg);
}

// pull
shared_model->setLayer(layer_name, weight, size);
MessagePtr pull_msg(new Message(kServerGroup, -1, push_time));
pull_msg->task.set_key_channel_str(layer_name);
Range<Key>(0, size).to(pull_msg->task.mutable_key_range());
pull_msg->wait = true;
shared_model->pull(pull_msg);
}

}
11 changes: 11 additions & 0 deletions src/app/minerva/minerva_ps.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#pragma once
#include <string>

void InitLayer(const std::string & name, float * data, size_t size);
void UpdateLayer(const std::string & name, float * weight, float * grad, size_t size);
int MinervaWorkerMain(int rank, int size, int argc, char ** argv);

namespace minerva {
void PushGradAndPullWeight(const float * grad, float * weights, size_t size,
const std::string & layer_name);
}
37 changes: 37 additions & 0 deletions src/app/minerva/minerva_server.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#include "ps.h"
#include "updater.h"
#include "shared_model.h"
#include "minerva_ps.h"

namespace PS{
namespace minerva {

class MinervaServer : public PS::App {
public:
MinervaServer() : App() {
updater_ = new Updater<float>();
shared_model_ = new PS::SharedModel<float>();
shared_model_->setUpdater(updater_);
}

virtual void init() {
LOG(ERROR) << "this is server " << myRank();
}

virtual void initLayer(const std::string & layerName, float * data, size_t size)
{
shared_model_->setLayer(layerName, data, size);
}

virtual ~MinervaServer() {
delete updater_;
delete shared_model_;
}

private:
Updater<float> *updater_;
PS::SharedModel<float> *shared_model_;
};

}
}
127 changes: 127 additions & 0 deletions src/app/minerva/shared_model.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
#pragma once
#include "parameter/shared_parameter.h"
#include "updater.h"

namespace PS {

DECLARE_string(app_name);

template <typename V>
class SharedModel : public SharedParameter<Key> {
typedef typename minerva::Updater<V> UpdaterT;
public:
SharedModel(const string& my_name = FLAGS_app_name + "_model",
const string& parent_name = FLAGS_app_name) :
SharedParameter<Key>(my_name, parent_name) { }
virtual ~SharedModel() { }

void setLayer(string name, V* data, size_t size) {
val_[name] = SArray<V>(data, size, false);
}
void setUpdater(UpdaterT * updater) {
updater_ = updater;
}

// funcs will be called by the system
MessagePtrList slice(const MessagePtr& msg, const KeyRangeList& krs);
void getValue(const MessagePtr& msg);
void setValue(const MessagePtr& msg);
protected:
std::unordered_map<std::string, SArray<V>> val_;
// an array is placed into multiple servers only if its length > min_slice_size
size_t min_slice_size_ = 1000;

UpdaterT * updater_ = nullptr;
};


template <typename V>
void SharedModel<V>::setValue(const MessagePtr& msg) {
CHECK_EQ(msg->value.size(), 1);
SArray<V> recv_data(msg->value[0]);
Range<Key> kr(msg->task.key_range());
CHECK_EQ(kr.size(), recv_data.size());
string key = msg->task.key_channel_str();
auto& my_val = val_[key];

if (isWorker()) {
if (my_val.empty()) my_val.resize(kr.size(), 0);
CHECK_GE(my_val.size(), kr.end());
my_val.segment(kr).copyFrom(recv_data);
} else if (isServer()) {
// TODO this server can do flexible consistency control here

if (my_val.empty()) {
// initialize weight
my_val.resize(kr.size(), 0);
CHECK_NOTNULL(updater_)->InitLayer(key, my_val.data(), my_val.size());
}

// update weight
CHECK_GE(my_val.size(), kr.size());
CHECK_NOTNULL(updater_)->Update(
key, my_val.data(), recv_data.data(), recv_data.size());
}
}

// only be called at servers, namely a worker pull data from this server
template <typename V>
void SharedModel<V>::getValue(const MessagePtr& msg) {
auto& my_val = val_[msg->task.key_channel_str()];
Range<Key> kr(msg->task.key_range());
if (my_val.empty()) {
// initialize weight
my_val.resize(kr.size(), 0);
CHECK_NOTNULL(updater_)->InitLayer(msg->task.key_channel_str(), my_val.data(), my_val.size());
}

// TODO store the kr in memory
CHECK_EQ(my_val.size(), kr.size());
SArray<V> send_data(kr.size());
send_data.copyFrom(my_val);
msg->addValue(send_data);
}

// divide a message into n part, where part i goes to server i. it's a zero-copy
// implementation
template <typename V>
MessagePtrList SharedModel<V>::slice(const MessagePtr& msg, const KeyRangeList& krs) {
// divide the key range
size_t n = krs.size();
MessagePtrList ret(n);
Range<Key> kr(msg->task.key_range());
for (size_t i = 0; i < n; ++i) {
ret[i] = MessagePtr(new Message());
ret[i]->miniCopyFrom(*msg);
ret[i]->valid = true;
auto mut_kr = ret[i]->task.mutable_key_range();
if (kr.size() < min_slice_size_) {
if (i == 0) {
// server 0 get all data
kr.to(mut_kr);
} else {
Range<Key>(0,0).to(mut_kr);
// do not sent to server 1 - n
ret[i]->valid = false;
}
} else {
kr.evenDivide(n, i).to(mut_kr);
}
}

// divide the data
for (size_t i = 0; i < msg->value.size(); ++i) {
SArray<V> data(msg->value[i]);
CHECK_EQ(data.size(), kr.size());
for (size_t j = 0; j < n; ++j) {
if (ret[j]->valid) {
Range<Key> kr(ret[j]->task.key_range());
ret[j]->addValue(data.segment(kr));
}
}
}
return ret;
}


} // namespace PS
24 changes: 24 additions & 0 deletions src/app/minerva/updater.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#pragma once

#include "minerva_ps.h"

namespace PS {
namespace minerva{

template<typename V>
class Updater {
public:
Updater() { }
virtual ~Updater() { }

virtual void InitLayer(const std::string &name, V* weight, size_t size) {
::InitLayer(name, weight, size);
}

virtual void Update(const std::string &name, V* weight, V* gradient, size_t size) {
::UpdateLayer(name, weight, gradient, size);
}
};

} // namespace minerva
} // namespace PS