Skip to content

Commit

Permalink
Python Multi-GPU
Browse files Browse the repository at this point in the history
  • Loading branch information
cypof committed Nov 23, 2016
1 parent 8fcb242 commit 375e402
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 8 deletions.
4 changes: 2 additions & 2 deletions python/caffe/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .pycaffe import Net, SGDSolver, NesterovSolver, AdaGradSolver, RMSPropSolver, AdaDeltaSolver, AdamSolver
from ._caffe import init_log, log, set_mode_cpu, set_mode_gpu, set_device, Layer, get_solver, layer_type_list, set_random_seed
from .pycaffe import Net, SGDSolver, NesterovSolver, AdaGradSolver, RMSPropSolver, AdaDeltaSolver, AdamSolver, NCCL, Timer
from ._caffe import init_log, log, set_mode_cpu, set_mode_gpu, set_device, Layer, get_solver, layer_type_list, set_random_seed, solver_count, set_solver_count, solver_rank, set_solver_rank, Layer, get_solver
from ._caffe import __version__
from .proto.caffe_pb2 import TRAIN, TEST
from .classifier import Classifier
Expand Down
96 changes: 91 additions & 5 deletions python/caffe/_caffe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,12 +267,12 @@ bp::object BlobVec_add_blob(bp::tuple args, bp::dict kwargs) {
}

template<typename Dtype>
class PythonCallback: public Solver<Dtype>::Callback {
class SolverCallback: public Solver<Dtype>::Callback {
protected:
bp::object on_start_, on_gradients_ready_;

public:
PythonCallback(bp::object on_start, bp::object on_gradients_ready)
SolverCallback(bp::object on_start, bp::object on_gradients_ready)
: on_start_(on_start), on_gradients_ready_(on_gradients_ready) { }
virtual void on_gradients_ready() {
on_gradients_ready_();
Expand All @@ -284,9 +284,61 @@ class PythonCallback: public Solver<Dtype>::Callback {
template<typename Dtype>
void Solver_add_callback(Solver<Dtype> * solver, bp::object on_start,
bp::object on_gradients_ready) {
solver->add_callback(new PythonCallback<Dtype>(on_start, on_gradients_ready));
solver->add_callback(new SolverCallback<Dtype>(on_start, on_gradients_ready));
}

// Seems boost cannot call the base method directly
void Solver_add_nccl(SGDSolver<Dtype>* solver
#ifdef USE_NCCL
, NCCL<Dtype>* nccl
#endif
) {
#ifdef USE_NCCL
solver->add_callback(nccl);
#endif
}

template<typename Dtype>
class NetCallback: public Net<Dtype>::Callback {
public:
explicit NetCallback(bp::object run) : run_(run) {}

protected:
virtual void run(int layer) {
run_(layer);
}
bp::object run_;
};
void Net_before_forward(Net<Dtype>* net, bp::object run) {
net->add_before_forward(new NetCallback<Dtype>(run));
}
void Net_after_forward(Net<Dtype>* net, bp::object run) {
net->add_after_forward(new NetCallback<Dtype>(run));
}
void Net_before_backward(Net<Dtype>* net, bp::object run) {
net->add_before_backward(new NetCallback<Dtype>(run));
}
void Net_after_backward(Net<Dtype>* net, bp::object run) {
net->add_after_backward(new NetCallback<Dtype>(run));
}

void Net_add_nccl(Net<Dtype>* net
#ifdef USE_NCCL
, NCCL<Dtype>* nccl
#endif
) {
#ifdef USE_NCCL
net->add_after_backward(nccl);
#endif
}
#ifndef USE_NCCL
template<typename Dtype>
class NCCL {
public:
NCCL(shared_ptr<Solver<Dtype> > solver, const string& uid) {}
};
#endif

BOOST_PYTHON_MEMBER_FUNCTION_OVERLOADS(SolveOverloads, Solve, 0, 1);

BOOST_PYTHON_MODULE(_caffe) {
Expand All @@ -303,6 +355,10 @@ BOOST_PYTHON_MODULE(_caffe) {
bp::def("set_mode_gpu", &set_mode_gpu);
bp::def("set_random_seed", &set_random_seed);
bp::def("set_device", &Caffe::SetDevice);
bp::def("solver_count", &Caffe::solver_count);
bp::def("set_solver_count", &Caffe::set_solver_count);
bp::def("solver_rank", &Caffe::solver_rank);
bp::def("set_solver_rank", &Caffe::set_solver_rank);

bp::def("layer_type_list", &LayerRegistry<Dtype>::LayerTypeList);

Expand Down Expand Up @@ -346,7 +402,12 @@ BOOST_PYTHON_MODULE(_caffe) {
bp::with_custodian_and_ward<1, 2, bp::with_custodian_and_ward<1, 3> >())
.def("save", &Net_Save)
.def("save_hdf5", &Net_SaveHDF5)
.def("load_hdf5", &Net_LoadHDF5);
.def("load_hdf5", &Net_LoadHDF5)
.def("before_forward", &Net_before_forward)
.def("after_forward", &Net_after_forward)
.def("before_backward", &Net_before_backward)
.def("after_backward", &Net_after_backward)
.def("after_backward", &Net_add_nccl);
BP_REGISTER_SHARED_PTR_TO_PYTHON(Net<Dtype>);

bp::class_<Blob<Dtype>, shared_ptr<Blob<Dtype> >, boost::noncopyable>(
Expand Down Expand Up @@ -378,6 +439,10 @@ BOOST_PYTHON_MODULE(_caffe) {
.add_property("type", bp::make_function(&Layer<Dtype>::type));
BP_REGISTER_SHARED_PTR_TO_PYTHON(Layer<Dtype>);

bp::class_<SolverParameter>("SolverParameter", bp::no_init)
.add_property("max_iter", &SolverParameter::max_iter)
.add_property("display", &SolverParameter::display)
.add_property("layer_wise_reduce", &SolverParameter::layer_wise_reduce);
bp::class_<LayerParameter>("LayerParameter", bp::no_init);

bp::class_<Solver<Dtype>, shared_ptr<Solver<Dtype> >, boost::noncopyable>(
Expand All @@ -387,11 +452,14 @@ BOOST_PYTHON_MODULE(_caffe) {
bp::return_internal_reference<>()))
.add_property("iter", &Solver<Dtype>::iter)
.def("add_callback", &Solver_add_callback<Dtype>)
.def("add_callback", &Solver_add_nccl)
.def("solve", static_cast<void (Solver<Dtype>::*)(const char*)>(
&Solver<Dtype>::Solve), SolveOverloads())
.def("step", &Solver<Dtype>::Step)
.def("restore", &Solver<Dtype>::Restore)
.def("snapshot", &Solver<Dtype>::Snapshot);
.def("snapshot", &Solver<Dtype>::Snapshot)
.add_property("param", bp::make_function(&Solver<Dtype>::param,
bp::return_value_policy<bp::copy_const_reference>()));
BP_REGISTER_SHARED_PTR_TO_PYTHON(Solver<Dtype>);

bp::class_<SGDSolver<Dtype>, bp::bases<Solver<Dtype> >,
Expand Down Expand Up @@ -435,6 +503,24 @@ BOOST_PYTHON_MODULE(_caffe) {
bp::class_<vector<bool> >("BoolVec")
.def(bp::vector_indexing_suite<vector<bool> >());

bp::class_<NCCL<Dtype>, shared_ptr<NCCL<Dtype> >,
boost::noncopyable>("NCCL",
bp::init<shared_ptr<Solver<Dtype> >, const string&>())
#ifdef USE_NCCL
.def("new_uid", &NCCL<Dtype>::new_uid).staticmethod("new_uid")
.def("bcast", &NCCL<Dtype>::Broadcast)
#endif
/* NOLINT_NEXT_LINE(whitespace/semicolon) */
;
BP_REGISTER_SHARED_PTR_TO_PYTHON(NCCL<Dtype>);

bp::class_<Timer, shared_ptr<Timer>, boost::noncopyable>(
"Timer", bp::init<>())
.def("start", &Timer::Start)
.def("stop", &Timer::Stop)
.add_property("ms", &Timer::MilliSeconds);
BP_REGISTER_SHARED_PTR_TO_PYTHON(Timer);

// boost python expects a void (missing) return value, while import_array
// returns NULL for python3. import_array1() forces a void return value.
import_array1();
Expand Down
2 changes: 1 addition & 1 deletion python/caffe/pycaffe.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import numpy as np

from ._caffe import Net, SGDSolver, NesterovSolver, AdaGradSolver, \
RMSPropSolver, AdaDeltaSolver, AdamSolver
RMSPropSolver, AdaDeltaSolver, AdamSolver, NCCL, Timer
import caffe.io

import six
Expand Down
99 changes: 99 additions & 0 deletions python/train.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#!/usr/bin/env python
"""
Trains a model using one or more GPUs.
"""
from multiprocessing import Process

import caffe


def train(
solver, # solver proto definition
snapshot, # solver snapshot to restore
gpus, # list of device ids
timing=False, # show timing info for compute and communications
):
# NCCL uses a uid to identify a session
uid = caffe.NCCL.new_uid()

caffe.init_log()
caffe.log('Using devices %s' % str(gpus))

procs = []
for rank in range(len(gpus)):
p = Process(target=solve,
args=(solver, snapshot, gpus, timing, uid, rank))
p.daemon = True
p.start()
procs.append(p)
for p in procs:
p.join()


def time(solver, nccl):
fprop = []
bprop = []
total = caffe.Timer()
allrd = caffe.Timer()
for _ in range(len(solver.net.layers)):
fprop.append(caffe.Timer())
bprop.append(caffe.Timer())
display = solver.param.display

def show_time():
if solver.iter % display == 0:
s = '\n'
for i in range(len(solver.net.layers)):
s += 'forw %3d %8s ' % (i, solver.net.layers[i].layer_param.name)
s += ': %.2f\n' % fprop[i].ms
for i in range(len(solver.net.layers) - 1, -1, -1):
s += 'back %3d %8s ' % (i, solver.net.layers[i].layer_param.name)
s += ': %.2f\n' % bprop[i].ms
s += 'solver total: %.2f\n' % total.ms
s += 'allreduce: %.2f\n' % allrd.ms
caffe.log(s)

solver.net.before_forward(lambda layer: fprop[layer].start())
solver.net.after_forward(lambda layer: fprop[layer].stop())
solver.net.before_backward(lambda layer: bprop[layer].start())
solver.net.after_backward(lambda layer: bprop[layer].stop())
solver.add_callback(lambda: total.start(), lambda: (total.stop(), allrd.start()))
solver.add_callback(nccl)
solver.add_callback(lambda: '', lambda: (allrd.stop(), show_time()))


def solve(proto, snapshot, gpus, timing, uid, rank):
caffe.set_mode_gpu()
caffe.set_device(gpus[rank])
caffe.set_solver_count(len(gpus))
caffe.set_solver_rank(rank)

solver = caffe.SGDSolver(proto)
if snapshot and len(snapshot) != 0:
solver.restore(snapshot)

nccl = caffe.NCCL(solver, uid)
nccl.bcast()

if timing and rank == 0:
time(solver, nccl)
else:
solver.add_callback(nccl)

if solver.param.layer_wise_reduce:
solver.net.after_backward(nccl)
solver.step(solver.param.max_iter)


if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()

parser.add_argument("--solver", required=True, help="Solver proto definition.")
parser.add_argument("--snapshot", help="Solver snapshot to restore.")
parser.add_argument("--gpus", type=int, nargs='+', default=[0],
help="List of device ids.")
parser.add_argument("--timing", action='store_true', help="Show timing info.")
args = parser.parse_args()

train(args.solver, args.snapshot, args.gpus, args.timing)

0 comments on commit 375e402

Please sign in to comment.