Skip to content

Commit

Permalink
Data Layer Parallel for Multi-GPU and Remove DataReader
Browse files Browse the repository at this point in the history
Allow data layers (and also PythonLayer when used as data layer) to be shared
among worker solver's training net, and also test net for future-proof if one
wants to do Multi-GPU testing. Data layers are locked during forward to ensure
sequential forward
  • Loading branch information
ronghanghu committed Aug 11, 2015
1 parent 1410c3f commit 20563cb
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 19 deletions.
8 changes: 8 additions & 0 deletions include/caffe/data_layers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class BaseDataLayer : public Layer<Dtype> {
// This method may not be overridden except by the BasePrefetchingDataLayer.
virtual void LayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top);
// Data layers should be shared by multiple solvers in parallel
virtual inline bool ShareInParallel() const { return true; }
virtual void DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top) {}
// Data layers have no bottoms, so reshaping is trivial.
Expand Down Expand Up @@ -118,6 +120,8 @@ class DummyDataLayer : public Layer<Dtype> {
: Layer<Dtype>(param) {}
virtual void LayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top);
// Data layers should be shared by multiple solvers in parallel
virtual inline bool ShareInParallel() const { return true; }
// Data layers have no bottoms, so reshaping is trivial.
virtual void Reshape(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top) {}
Expand Down Expand Up @@ -151,6 +155,8 @@ class HDF5DataLayer : public Layer<Dtype> {
virtual ~HDF5DataLayer();
virtual void LayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top);
// Data layers should be shared by multiple solvers in parallel
virtual inline bool ShareInParallel() const { return true; }
// Data layers have no bottoms, so reshaping is trivial.
virtual void Reshape(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top) {}
Expand Down Expand Up @@ -192,6 +198,8 @@ class HDF5OutputLayer : public Layer<Dtype> {
virtual ~HDF5OutputLayer();
virtual void LayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top);
// Data layers should be shared by multiple solvers in parallel
virtual inline bool ShareInParallel() const { return true; }
// Data layers have no bottoms, so reshaping is trivial.
virtual void Reshape(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top) {}
Expand Down
15 changes: 15 additions & 0 deletions include/caffe/layer.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef CAFFE_LAYER_H_
#define CAFFE_LAYER_H_

#include <boost/thread.hpp>
#include <algorithm>
#include <string>
#include <vector>
Expand Down Expand Up @@ -85,6 +86,14 @@ class Layer {
virtual void LayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top) {}

/**
* @brief Whether a layer should be shared by multiple nets during data
* parallelism. By default, all layers except for data layers should
* not be shared. data layers should be shared to ensure each worker
* solver access data sequentially during data parallelism.
*/
virtual inline bool ShareInParallel() const { return false; }

/**
* @brief Adjust the shapes of top blobs and internal buffers to accommodate
* the shapes of the bottom blobs.
Expand Down Expand Up @@ -396,6 +405,10 @@ class Layer {
}
}

private:
// mutex to lock layer to ensure sequential forward
boost::mutex forward_mutex_;

DISABLE_COPY_AND_ASSIGN(Layer);
}; // class Layer

Expand All @@ -405,6 +418,8 @@ class Layer {
template <typename Dtype>
inline Dtype Layer<Dtype>::Forward(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top) {
// Lock during forward to ensure sequential forward
boost::mutex::scoped_lock lock(forward_mutex_);
Dtype loss = 0;
Reshape(bottom, top);
switch (Caffe::mode()) {
Expand Down
8 changes: 5 additions & 3 deletions include/caffe/net.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ namespace caffe {
template <typename Dtype>
class Net {
public:
explicit Net(const NetParameter& param);
explicit Net(const string& param_file, Phase phase);
explicit Net(const NetParameter& param, const Net* root_net = NULL);
explicit Net(const string& param_file, Phase phase,
const Net* root_net = NULL);
virtual ~Net() {}

/// @brief Initialize a network with a NetParameter.
Expand Down Expand Up @@ -291,7 +292,8 @@ class Net {
size_t memory_used_;
/// Whether to compute and display debug info for the net.
bool debug_info_;

/// The root net that actually holds the shared layers in data parallelism
const Net* const root_net_;
DISABLE_COPY_AND_ASSIGN(Net);
};

Expand Down
4 changes: 4 additions & 0 deletions include/caffe/python_layer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ class PythonLayer : public Layer<Dtype> {
self_.attr("reshape")(bottom, top);
}

virtual inline bool ShareInParallel() const {
return this->layer_param_.python_param().share_in_parallel();
};

virtual inline const char* type() const { return "Python"; }

protected:
Expand Down
14 changes: 10 additions & 4 deletions include/caffe/solver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ namespace caffe {
template <typename Dtype>
class Solver {
public:
explicit Solver(const SolverParameter& param);
explicit Solver(const string& param_file);
explicit Solver(const SolverParameter& param,
const Solver* root_solver = NULL);
explicit Solver(const string& param_file, const Solver* root_solver = NULL);
void Init(const SolverParameter& param);
void InitTrainNet();
void InitTestNets();
Expand Down Expand Up @@ -79,6 +80,10 @@ class Solver {
vector<shared_ptr<Net<Dtype> > > test_nets_;
vector<Callback*> callbacks_;

// The root solver that holds root nets (actually containing shared layers)
// in data parallelism
const Solver* const root_solver_;

DISABLE_COPY_AND_ASSIGN(Solver);
};

Expand All @@ -89,8 +94,9 @@ class Solver {
template <typename Dtype>
class WorkerSolver : public Solver<Dtype> {
public:
explicit WorkerSolver(const SolverParameter& param)
: Solver<Dtype>(param) {}
explicit WorkerSolver(const SolverParameter& param,
const Solver<Dtype>* root_solver = NULL)
: Solver<Dtype>(param, root_solver) {}

protected:
void ApplyUpdate() {}
Expand Down
28 changes: 24 additions & 4 deletions src/caffe/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
namespace caffe {

template <typename Dtype>
Net<Dtype>::Net(const NetParameter& param) {
Net<Dtype>::Net(const NetParameter& param, const Net* root_net)
: root_net_(root_net) {
Init(param);
}

template <typename Dtype>
Net<Dtype>::Net(const string& param_file, Phase phase) {
Net<Dtype>::Net(const string& param_file, Phase phase, const Net* root_net)
: root_net_(root_net) {
NetParameter param;
ReadNetParamsFromTextFileOrDie(param_file, &param);
param.mutable_state()->set_phase(phase);
Expand All @@ -36,6 +38,8 @@ Net<Dtype>::Net(const string& param_file, Phase phase) {

template <typename Dtype>
void Net<Dtype>::Init(const NetParameter& in_param) {
CHECK (Caffe::root_solver() || root_net_)
<< "root_net_ needs to be set for all non-root solvers";
// Set phase from the state.
phase_ = in_param.state().phase();
// Filter layers based on their include/exclude rules and
Expand Down Expand Up @@ -79,6 +83,9 @@ void Net<Dtype>::Init(const NetParameter& in_param) {
top_id_vecs_.resize(param.layer_size());
bottom_need_backward_.resize(param.layer_size());
for (int layer_id = 0; layer_id < param.layer_size(); ++layer_id) {
// For non-root solvers, whether this layer is shared from root_net_.
bool is_shared_layer = !Caffe::root_solver()
&& root_net_->layers_[layer_id]->ShareInParallel();
// Inherit phase from net if unset.
if (!param.layer(layer_id).has_phase()) {
param.mutable_layer(layer_id)->set_phase(phase_);
Expand All @@ -91,7 +98,11 @@ void Net<Dtype>::Init(const NetParameter& in_param) {
<< "propagate_down param must be specified "
<< "either 0 or bottom_size times ";
}
layers_.push_back(LayerRegistry<Dtype>::CreateLayer(layer_param));
if (is_shared_layer) {
layers_.push_back(root_net_->layers_[layer_id]);
} else {
layers_.push_back(LayerRegistry<Dtype>::CreateLayer(layer_param));
}
layer_names_.push_back(layer_param.name());
if (Caffe::root_solver()) {
LOG(INFO) << "Creating Layer " << layer_param.name();
Expand Down Expand Up @@ -125,10 +136,19 @@ void Net<Dtype>::Init(const NetParameter& in_param) {
}
}
// After this layer is connected, set it up.
if (is_shared_layer) {
// Set up size of top blobs using root_net_
const vector<Blob<Dtype>*>& base_top = root_net_->top_vecs_[layer_id];
const vector<Blob<Dtype>*>& this_top = this->top_vecs_[layer_id];
for (int top_id = 0; top_id < base_top.size(); ++top_id) {
this_top[top_id]->ReshapeLike(*base_top[top_id]);
}
} else {
layers_[layer_id]->SetUp(bottom_vecs_[layer_id], top_vecs_[layer_id]);
}
if (Caffe::root_solver()) {
LOG(INFO) << "Setting up " << layer_names_[layer_id];
}
layers_[layer_id]->SetUp(bottom_vecs_[layer_id], top_vecs_[layer_id]);
for (int top_id = 0; top_id < top_vecs_[layer_id].size(); ++top_id) {
if (blob_loss_weights_.size() <= top_id_vecs_[layer_id][top_id]) {
blob_loss_weights_.resize(top_id_vecs_[layer_id][top_id] + 1, Dtype(0));
Expand Down
3 changes: 1 addition & 2 deletions src/caffe/parallel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ P2PSync<Dtype>::P2PSync(shared_ptr<Solver<Dtype> > root_solver,
solver_ = root_solver;
} else {
Caffe::set_root_solver(false);
solver_.reset(new WorkerSolver<Dtype>(param));
solver_.reset(new WorkerSolver<Dtype>(param, root_solver.get()));
Caffe::set_root_solver(true);
}
this->configure(solver_.get());
Expand Down Expand Up @@ -436,4 +436,3 @@ INSTANTIATE_CLASS(GPUParams);
INSTANTIATE_CLASS(P2PSync);

} // namespace caffe

4 changes: 4 additions & 0 deletions src/caffe/proto/caffe.proto
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,10 @@ message PythonParameter {
// string, dictionary in Python dict format, JSON, etc. You may parse this
// string in `setup` method and use it in `forward` and `backward`.
optional string param_str = 3 [default = ''];
// Whether this PythonLayer is shared among worker solvers during data parallelism.
// If true, each worker solver sequentially run forward from this layer.
// This value should be set true if you are using it as a data layer.
optional bool share_in_parallel = 4 [default = false];
}

// Message that stores parameters used by ReductionLayer
Expand Down
23 changes: 17 additions & 6 deletions src/caffe/solver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,23 @@
namespace caffe {

template <typename Dtype>
Solver<Dtype>::Solver(const SolverParameter& param)
: net_(), callbacks_() {
Solver<Dtype>::Solver(const SolverParameter& param, const Solver* root_solver)
: net_(), callbacks_(), root_solver_(root_solver) {
Init(param);
}

template <typename Dtype>
Solver<Dtype>::Solver(const string& param_file)
: net_(), callbacks_() {
Solver<Dtype>::Solver(const string& param_file, const Solver* root_solver)
: net_(), callbacks_(), root_solver_(root_solver) {
SolverParameter param;
ReadProtoFromTextFileOrDie(param_file, &param);
Init(param);
}

template <typename Dtype>
void Solver<Dtype>::Init(const SolverParameter& param) {
CHECK (Caffe::root_solver() || root_solver_)
<< "root_solver_ needs to be set for all non-root solvers";
LOG_IF(INFO, Caffe::root_solver()) << "Initializing solver from parameters: "
<< std::endl << param.DebugString();
param_ = param;
Expand Down Expand Up @@ -88,7 +90,11 @@ void Solver<Dtype>::InitTrainNet() {
net_state.MergeFrom(net_param.state());
net_state.MergeFrom(param_.train_state());
net_param.mutable_state()->CopyFrom(net_state);
net_.reset(new Net<Dtype>(net_param));
if (Caffe::root_solver()) {
net_.reset(new Net<Dtype>(net_param));
} else {
net_.reset(new Net<Dtype>(net_param, root_solver_->net_.get()));
}
}

template <typename Dtype>
Expand Down Expand Up @@ -163,7 +169,12 @@ void Solver<Dtype>::InitTestNets() {
net_params[i].mutable_state()->CopyFrom(net_state);
LOG(INFO)
<< "Creating test net (#" << i << ") specified by " << sources[i];
test_nets_[i].reset(new Net<Dtype>(net_params[i]));
if (Caffe::root_solver()) {
test_nets_[i].reset(new Net<Dtype>(net_params[i]));
} else {
test_nets_[i].reset(new Net<Dtype>(net_params[i],
root_solver_->test_nets_[i].get()));
}
test_nets_[i]->set_debug_info(param_.debug_info());
}
}
Expand Down

0 comments on commit 20563cb

Please sign in to comment.