diff --git a/include/caffe/data_layers.hpp b/include/caffe/data_layers.hpp index 12e6c366620..895d5ceda8e 100644 --- a/include/caffe/data_layers.hpp +++ b/include/caffe/data_layers.hpp @@ -34,6 +34,8 @@ class BaseDataLayer : public Layer { // This method may not be overridden except by the BasePrefetchingDataLayer. virtual void LayerSetUp(const vector*>& bottom, const vector*>& top); + // Data layers should be shared by multiple solvers in parallel + virtual inline bool ShareInParallel() const { return true; } virtual void DataLayerSetUp(const vector*>& bottom, const vector*>& top) {} // Data layers have no bottoms, so reshaping is trivial. @@ -118,6 +120,8 @@ class DummyDataLayer : public Layer { : Layer(param) {} virtual void LayerSetUp(const vector*>& bottom, const vector*>& top); + // Data layers should be shared by multiple solvers in parallel + virtual inline bool ShareInParallel() const { return true; } // Data layers have no bottoms, so reshaping is trivial. virtual void Reshape(const vector*>& bottom, const vector*>& top) {} @@ -151,6 +155,8 @@ class HDF5DataLayer : public Layer { virtual ~HDF5DataLayer(); virtual void LayerSetUp(const vector*>& bottom, const vector*>& top); + // Data layers should be shared by multiple solvers in parallel + virtual inline bool ShareInParallel() const { return true; } // Data layers have no bottoms, so reshaping is trivial. virtual void Reshape(const vector*>& bottom, const vector*>& top) {} @@ -192,6 +198,8 @@ class HDF5OutputLayer : public Layer { virtual ~HDF5OutputLayer(); virtual void LayerSetUp(const vector*>& bottom, const vector*>& top); + // Data layers should be shared by multiple solvers in parallel + virtual inline bool ShareInParallel() const { return true; } // Data layers have no bottoms, so reshaping is trivial. virtual void Reshape(const vector*>& bottom, const vector*>& top) {} diff --git a/include/caffe/layer.hpp b/include/caffe/layer.hpp index 0771b6a8fb4..d82197a9c29 100644 --- a/include/caffe/layer.hpp +++ b/include/caffe/layer.hpp @@ -1,6 +1,7 @@ #ifndef CAFFE_LAYER_H_ #define CAFFE_LAYER_H_ +#include #include #include #include @@ -85,6 +86,14 @@ class Layer { virtual void LayerSetUp(const vector*>& bottom, const vector*>& top) {} + /** + * @brief Whether a layer should be shared by multiple nets during data + * parallelism. By default, all layers except for data layers should + * not be shared. data layers should be shared to ensure each worker + * solver access data sequentially during data parallelism. + */ + virtual inline bool ShareInParallel() const { return false; } + /** * @brief Adjust the shapes of top blobs and internal buffers to accommodate * the shapes of the bottom blobs. @@ -396,6 +405,10 @@ class Layer { } } + private: + // mutex to lock layer to ensure sequential forward + boost::mutex forward_mutex_; + DISABLE_COPY_AND_ASSIGN(Layer); }; // class Layer @@ -405,6 +418,8 @@ class Layer { template inline Dtype Layer::Forward(const vector*>& bottom, const vector*>& top) { + // Lock during forward to ensure sequential forward + boost::mutex::scoped_lock lock(forward_mutex_); Dtype loss = 0; Reshape(bottom, top); switch (Caffe::mode()) { diff --git a/include/caffe/net.hpp b/include/caffe/net.hpp index bf997553ee2..1bf07d28d13 100644 --- a/include/caffe/net.hpp +++ b/include/caffe/net.hpp @@ -23,8 +23,9 @@ namespace caffe { template class Net { public: - explicit Net(const NetParameter& param); - explicit Net(const string& param_file, Phase phase); + explicit Net(const NetParameter& param, const Net* root_net = NULL); + explicit Net(const string& param_file, Phase phase, + const Net* root_net = NULL); virtual ~Net() {} /// @brief Initialize a network with a NetParameter. @@ -291,7 +292,8 @@ class Net { size_t memory_used_; /// Whether to compute and display debug info for the net. bool debug_info_; - + /// The root net that actually holds the shared layers in data parallelism + const Net* const root_net_; DISABLE_COPY_AND_ASSIGN(Net); }; diff --git a/include/caffe/python_layer.hpp b/include/caffe/python_layer.hpp index 2957e7426be..5d4fe61c793 100644 --- a/include/caffe/python_layer.hpp +++ b/include/caffe/python_layer.hpp @@ -27,6 +27,10 @@ class PythonLayer : public Layer { self_.attr("reshape")(bottom, top); } + virtual inline bool ShareInParallel() const { + return this->layer_param_.python_param().share_in_parallel(); + }; + virtual inline const char* type() const { return "Python"; } protected: diff --git a/include/caffe/solver.hpp b/include/caffe/solver.hpp index 7442a95df06..64d6538d535 100644 --- a/include/caffe/solver.hpp +++ b/include/caffe/solver.hpp @@ -17,8 +17,9 @@ namespace caffe { template class Solver { public: - explicit Solver(const SolverParameter& param); - explicit Solver(const string& param_file); + explicit Solver(const SolverParameter& param, + const Solver* root_solver = NULL); + explicit Solver(const string& param_file, const Solver* root_solver = NULL); void Init(const SolverParameter& param); void InitTrainNet(); void InitTestNets(); @@ -79,6 +80,10 @@ class Solver { vector > > test_nets_; vector callbacks_; + // The root solver that holds root nets (actually containing shared layers) + // in data parallelism + const Solver* const root_solver_; + DISABLE_COPY_AND_ASSIGN(Solver); }; @@ -89,8 +94,9 @@ class Solver { template class WorkerSolver : public Solver { public: - explicit WorkerSolver(const SolverParameter& param) - : Solver(param) {} + explicit WorkerSolver(const SolverParameter& param, + const Solver* root_solver = NULL) + : Solver(param, root_solver) {} protected: void ApplyUpdate() {} diff --git a/src/caffe/net.cpp b/src/caffe/net.cpp index 5d0f4322d19..6f0e9b60a17 100644 --- a/src/caffe/net.cpp +++ b/src/caffe/net.cpp @@ -22,12 +22,14 @@ namespace caffe { template -Net::Net(const NetParameter& param) { +Net::Net(const NetParameter& param, const Net* root_net) + : root_net_(root_net) { Init(param); } template -Net::Net(const string& param_file, Phase phase) { +Net::Net(const string& param_file, Phase phase, const Net* root_net) + : root_net_(root_net) { NetParameter param; ReadNetParamsFromTextFileOrDie(param_file, ¶m); param.mutable_state()->set_phase(phase); @@ -36,6 +38,8 @@ Net::Net(const string& param_file, Phase phase) { template void Net::Init(const NetParameter& in_param) { + CHECK (Caffe::root_solver() || root_net_) + << "root_net_ needs to be set for all non-root solvers"; // Set phase from the state. phase_ = in_param.state().phase(); // Filter layers based on their include/exclude rules and @@ -79,6 +83,9 @@ void Net::Init(const NetParameter& in_param) { top_id_vecs_.resize(param.layer_size()); bottom_need_backward_.resize(param.layer_size()); for (int layer_id = 0; layer_id < param.layer_size(); ++layer_id) { + // For non-root solvers, whether this layer is shared from root_net_. + bool is_shared_layer = !Caffe::root_solver() + && root_net_->layers_[layer_id]->ShareInParallel(); // Inherit phase from net if unset. if (!param.layer(layer_id).has_phase()) { param.mutable_layer(layer_id)->set_phase(phase_); @@ -91,7 +98,11 @@ void Net::Init(const NetParameter& in_param) { << "propagate_down param must be specified " << "either 0 or bottom_size times "; } - layers_.push_back(LayerRegistry::CreateLayer(layer_param)); + if (is_shared_layer) { + layers_.push_back(root_net_->layers_[layer_id]); + } else { + layers_.push_back(LayerRegistry::CreateLayer(layer_param)); + } layer_names_.push_back(layer_param.name()); if (Caffe::root_solver()) { LOG(INFO) << "Creating Layer " << layer_param.name(); @@ -125,10 +136,19 @@ void Net::Init(const NetParameter& in_param) { } } // After this layer is connected, set it up. + if (is_shared_layer) { + // Set up size of top blobs using root_net_ + const vector*>& base_top = root_net_->top_vecs_[layer_id]; + const vector*>& this_top = this->top_vecs_[layer_id]; + for (int top_id = 0; top_id < base_top.size(); ++top_id) { + this_top[top_id]->ReshapeLike(*base_top[top_id]); + } + } else { + layers_[layer_id]->SetUp(bottom_vecs_[layer_id], top_vecs_[layer_id]); + } if (Caffe::root_solver()) { LOG(INFO) << "Setting up " << layer_names_[layer_id]; } - layers_[layer_id]->SetUp(bottom_vecs_[layer_id], top_vecs_[layer_id]); for (int top_id = 0; top_id < top_vecs_[layer_id].size(); ++top_id) { if (blob_loss_weights_.size() <= top_id_vecs_[layer_id][top_id]) { blob_loss_weights_.resize(top_id_vecs_[layer_id][top_id] + 1, Dtype(0)); diff --git a/src/caffe/parallel.cpp b/src/caffe/parallel.cpp index 5a08df6c1c8..6e7d802bb99 100644 --- a/src/caffe/parallel.cpp +++ b/src/caffe/parallel.cpp @@ -218,7 +218,7 @@ P2PSync::P2PSync(shared_ptr > root_solver, solver_ = root_solver; } else { Caffe::set_root_solver(false); - solver_.reset(new WorkerSolver(param)); + solver_.reset(new WorkerSolver(param, root_solver.get())); Caffe::set_root_solver(true); } this->configure(solver_.get()); @@ -436,4 +436,3 @@ INSTANTIATE_CLASS(GPUParams); INSTANTIATE_CLASS(P2PSync); } // namespace caffe - diff --git a/src/caffe/proto/caffe.proto b/src/caffe/proto/caffe.proto index ac53bc4213b..61963293f77 100644 --- a/src/caffe/proto/caffe.proto +++ b/src/caffe/proto/caffe.proto @@ -719,6 +719,10 @@ message PythonParameter { // string, dictionary in Python dict format, JSON, etc. You may parse this // string in `setup` method and use it in `forward` and `backward`. optional string param_str = 3 [default = '']; + // Whether this PythonLayer is shared among worker solvers during data parallelism. + // If true, each worker solver sequentially run forward from this layer. + // This value should be set true if you are using it as a data layer. + optional bool share_in_parallel = 4 [default = false]; } // Message that stores parameters used by ReductionLayer diff --git a/src/caffe/solver.cpp b/src/caffe/solver.cpp index 18f8cc89eae..e8529312f38 100644 --- a/src/caffe/solver.cpp +++ b/src/caffe/solver.cpp @@ -18,14 +18,14 @@ namespace caffe { template -Solver::Solver(const SolverParameter& param) - : net_(), callbacks_() { +Solver::Solver(const SolverParameter& param, const Solver* root_solver) + : net_(), callbacks_(), root_solver_(root_solver) { Init(param); } template -Solver::Solver(const string& param_file) - : net_(), callbacks_() { +Solver::Solver(const string& param_file, const Solver* root_solver) + : net_(), callbacks_(), root_solver_(root_solver) { SolverParameter param; ReadProtoFromTextFileOrDie(param_file, ¶m); Init(param); @@ -33,6 +33,8 @@ Solver::Solver(const string& param_file) template void Solver::Init(const SolverParameter& param) { + CHECK (Caffe::root_solver() || root_solver_) + << "root_solver_ needs to be set for all non-root solvers"; LOG_IF(INFO, Caffe::root_solver()) << "Initializing solver from parameters: " << std::endl << param.DebugString(); param_ = param; @@ -88,7 +90,11 @@ void Solver::InitTrainNet() { net_state.MergeFrom(net_param.state()); net_state.MergeFrom(param_.train_state()); net_param.mutable_state()->CopyFrom(net_state); - net_.reset(new Net(net_param)); + if (Caffe::root_solver()) { + net_.reset(new Net(net_param)); + } else { + net_.reset(new Net(net_param, root_solver_->net_.get())); + } } template @@ -163,7 +169,12 @@ void Solver::InitTestNets() { net_params[i].mutable_state()->CopyFrom(net_state); LOG(INFO) << "Creating test net (#" << i << ") specified by " << sources[i]; - test_nets_[i].reset(new Net(net_params[i])); + if (Caffe::root_solver()) { + test_nets_[i].reset(new Net(net_params[i])); + } else { + test_nets_[i].reset(new Net(net_params[i], + root_solver_->test_nets_[i].get())); + } test_nets_[i]->set_debug_info(param_.debug_info()); } }