diff --git a/.gitignore b/.gitignore index 53c1fb056bb..eb3005447f4 100644 --- a/.gitignore +++ b/.gitignore @@ -46,6 +46,7 @@ # PyCharm files .idea +*.iml # OSX dir files .DS_Store diff --git a/Makefile b/Makefile index 403e00a38a1..f7542102e4d 100644 --- a/Makefile +++ b/Makefile @@ -328,6 +328,12 @@ ifeq ($(USE_CUDNN), 1) COMMON_FLAGS += -DUSE_CUDNN endif +# NCCL acceleration configuration +ifeq ($(USE_NCCL), 1) + LIBRARIES += nccl + COMMON_FLAGS += -DUSE_NCCL +endif + # configure IO libraries ifeq ($(USE_OPENCV), 1) COMMON_FLAGS += -DUSE_OPENCV diff --git a/Makefile.config.example b/Makefile.config.example index 07bed63ae40..19a3b407af6 100644 --- a/Makefile.config.example +++ b/Makefile.config.example @@ -4,6 +4,10 @@ # cuDNN acceleration switch (uncomment to build with cuDNN). # USE_CUDNN := 1 +# NCCL acceleration switch (uncomment to build with NCCL) +# See https://github.com/NVIDIA/nccl +# USE_NCCL := 1 + # CPU-only switch (uncomment to build without GPU support). # CPU_ONLY := 1 diff --git a/include/caffe/blob.hpp b/include/caffe/blob.hpp index af360ac24bd..2f59471c29e 100644 --- a/include/caffe/blob.hpp +++ b/include/caffe/blob.hpp @@ -220,6 +220,7 @@ class Blob { void set_cpu_data(Dtype* data); const int* gpu_shape() const; const Dtype* gpu_data() const; + void set_gpu_data(Dtype* data); const Dtype* cpu_diff() const; const Dtype* gpu_diff() const; Dtype* mutable_cpu_data(); diff --git a/include/caffe/common.hpp b/include/caffe/common.hpp index 3c6a076ec2f..5436a79e8c5 100644 --- a/include/caffe/common.hpp +++ b/include/caffe/common.hpp @@ -161,6 +161,8 @@ class Caffe { // Parallel training info inline static int solver_count() { return Get().solver_count_; } inline static void set_solver_count(int val) { Get().solver_count_ = val; } + inline static int solver_rank() { return Get().solver_rank_; } + inline static void set_solver_rank(int val) { Get().solver_rank_ = val; } inline static bool root_solver() { return Get().root_solver_; } inline static void set_root_solver(bool val) { Get().root_solver_ = val; } @@ -173,6 +175,7 @@ class Caffe { Brew mode_; int solver_count_; + int solver_rank_; bool root_solver_; private: diff --git a/include/caffe/data_transformer.hpp b/include/caffe/data_transformer.hpp index 97b4ee6a8c4..87c2c17eb9e 100644 --- a/include/caffe/data_transformer.hpp +++ b/include/caffe/data_transformer.hpp @@ -23,7 +23,7 @@ class DataTransformer { * @brief Initialize the Random number generations if needed by the * transformation. */ - void InitRand(); + void InitRand(unsigned int seed); /** * @brief Applies the transformation defined in the data layer's diff --git a/include/caffe/internal_thread.hpp b/include/caffe/internal_thread.hpp index 6a8c5a02892..1a079fca695 100644 --- a/include/caffe/internal_thread.hpp +++ b/include/caffe/internal_thread.hpp @@ -42,8 +42,8 @@ class InternalThread { bool must_stop(); private: - void entry(int device, Caffe::Brew mode, int rand_seed, int solver_count, - bool root_solver); + void entry(int device, Caffe::Brew mode, int rand_seed, + int solver_count, int solver_rank, bool root_solver); shared_ptr thread_; }; diff --git a/include/caffe/layers/python_layer.hpp b/include/caffe/layers/python_layer.hpp index 66dbbdf13b8..ad6ae903ef6 100644 --- a/include/caffe/layers/python_layer.hpp +++ b/include/caffe/layers/python_layer.hpp @@ -20,10 +20,10 @@ class PythonLayer : public Layer { const vector*>& top) { // Disallow PythonLayer in MultiGPU training stage, due to GIL issues // Details: https://github.com/BVLC/caffe/issues/2936 - if (this->phase_ == TRAIN && Caffe::solver_count() > 1 - && !ShareInParallel()) { - LOG(FATAL) << "PythonLayer is not implemented in Multi-GPU training"; - } +// if (this->phase_ == TRAIN && Caffe::solver_count() > 1 +// && !ShareInParallel()) { +// LOG(FATAL) << "PythonLayer is not implemented in Multi-GPU training"; +// } self_.attr("param_str") = bp::str( this->layer_param_.python_param().param_str()); self_.attr("phase") = static_cast(this->phase_); diff --git a/include/caffe/net.hpp b/include/caffe/net.hpp index 493bdf294e2..cc23692a11c 100644 --- a/include/caffe/net.hpp +++ b/include/caffe/net.hpp @@ -228,6 +228,31 @@ class Net { static bool StateMeetsRule(const NetState& state, const NetStateRule& rule, const string& layer_name); + // Invoked at specific points during an iteration + class Callback { + protected: + virtual void run(int layer) = 0; + + template + friend class Net; + }; + const vector& before_forward() const { return before_forward_; } + void add_before_forward(Callback* value) { + before_forward_.push_back(value); + } + const vector& after_forward() const { return after_forward_; } + void add_after_forward(Callback* value) { + after_forward_.push_back(value); + } + const vector& before_backward() const { return before_backward_; } + void add_before_backward(Callback* value) { + before_backward_.push_back(value); + } + const vector& after_backward() const { return after_backward_; } + void add_after_backward(Callback* value) { + after_backward_.push_back(value); + } + protected: // Helpers for Init. /// @brief Append a new top blob to the net. @@ -308,7 +333,13 @@ class Net { bool debug_info_; /// The root net that actually holds the shared layers in data parallelism const Net* const root_net_; - DISABLE_COPY_AND_ASSIGN(Net); + // Callbacks + vector before_forward_; + vector after_forward_; + vector before_backward_; + vector after_backward_; + +DISABLE_COPY_AND_ASSIGN(Net); }; diff --git a/include/caffe/parallel.hpp b/include/caffe/parallel.hpp index 6c496c884e3..8900cba8452 100644 --- a/include/caffe/parallel.hpp +++ b/include/caffe/parallel.hpp @@ -3,6 +3,7 @@ #include +#include #include #include "caffe/blob.hpp" @@ -14,6 +15,10 @@ #include "caffe/syncedmem.hpp" #include "caffe/util/blocking_queue.hpp" +#ifdef USE_NCCL +#include "caffe/util/nccl.hpp" +#endif + namespace caffe { // Represents a net parameters. Once a net is created, its parameter buffers can @@ -80,6 +85,50 @@ class DevicePair { int device_; }; +template +class NCCL : public GPUParams, + public Solver::Callback, + public Net::Callback { + public: + /** + * In multi-process settings, first create a NCCL id (new_uid), then + * pass it to each process to create connected instances. + */ + NCCL(shared_ptr > solver, const string& uid = ""); + ~NCCL(); + + /** + * In single process settings, create instances without uids and + * call this. + */ + static void init_single_process(vector*>* nccls); + + static string new_uid(); + + /** + * Broadcast weigths from rank 0 other solvers. + */ + void bcast(); + + protected: + void on_start() {} + void on_gradients_ready(); + void run(int layer); + +#ifdef USE_NCCL + ncclComm_t comm_; + cudaStream_t stream_; + vector layer_events_; + cudaEvent_t solver_event_; +#endif + + shared_ptr > solver_; + bool layer_wise_; + using Params::size_; + using Params::data_; + using Params::diff_; +}; + // Synchronous data parallelism using map-reduce between local GPUs. template class P2PSync : public GPUParams, public Solver::Callback, diff --git a/include/caffe/util/math_functions.hpp b/include/caffe/util/math_functions.hpp index 6f6d3feeae2..51068fe2b80 100644 --- a/include/caffe/util/math_functions.hpp +++ b/include/caffe/util/math_functions.hpp @@ -185,6 +185,11 @@ void caffe_gpu_add_scalar(const int N, const Dtype alpha, Dtype *X); template void caffe_gpu_scal(const int N, const Dtype alpha, Dtype *X); +#ifndef CPU_ONLY +template +void caffe_gpu_scal(const int N, const Dtype alpha, Dtype* X, cudaStream_t str); +#endif + template void caffe_gpu_add(const int N, const Dtype* a, const Dtype* b, Dtype* y); diff --git a/include/caffe/util/nccl.hpp b/include/caffe/util/nccl.hpp new file mode 100644 index 00000000000..e01fb7451e8 --- /dev/null +++ b/include/caffe/util/nccl.hpp @@ -0,0 +1,37 @@ +#ifndef CAFFE_UTIL_NCCL_H_ +#define CAFFE_UTIL_NCCL_H_ +#ifdef USE_NCCL + +#include + +#include "caffe/common.hpp" + +#define NCCL_CHECK(condition) \ +{ \ + ncclResult_t result = condition; \ + CHECK_EQ(result, ncclSuccess) << " " \ + << ncclGetErrorString(result); \ +} + +namespace caffe { + +namespace nccl { + +template class dataType; + +template<> class dataType { + public: + static const ncclDataType_t type = ncclFloat; +}; +template<> class dataType { + public: + static const ncclDataType_t type = ncclDouble; +}; + +} // namespace nccl + +} // namespace caffe + +#endif // end USE_NCCL + +#endif // CAFFE_UTIL_NCCL_H_ diff --git a/python/caffe/__init__.py b/python/caffe/__init__.py index 35868a403a3..47d65c9c634 100644 --- a/python/caffe/__init__.py +++ b/python/caffe/__init__.py @@ -1,8 +1,9 @@ -from .pycaffe import Net, SGDSolver, NesterovSolver, AdaGradSolver, RMSPropSolver, AdaDeltaSolver, AdamSolver -from ._caffe import set_mode_cpu, set_mode_gpu, set_device, Layer, get_solver, layer_type_list, set_random_seed +from .pycaffe import Net, SGDSolver, NesterovSolver, AdaGradSolver, RMSPropSolver, AdaDeltaSolver, AdamSolver, DataTransformer, Blob, NCCL, Timer +from ._caffe import init_log, log, set_mode_cpu, set_mode_gpu, set_device, Layer, get_solver, layer_type_list, set_random_seed, get_random, solver_count, set_solver_count, solver_rank, set_solver_rank, Layer, get_solver, layer_type_list from ._caffe import __version__ from .proto.caffe_pb2 import TRAIN, TEST from .classifier import Classifier from .detector import Detector from . import io from .net_spec import layers, params, NetSpec, to_proto +from .train import train \ No newline at end of file diff --git a/python/caffe/_caffe.cpp b/python/caffe/_caffe.cpp index bdee75acd6c..dc28ee99934 100644 --- a/python/caffe/_caffe.cpp +++ b/python/caffe/_caffe.cpp @@ -53,6 +53,16 @@ void set_mode_gpu() { Caffe::set_mode(Caffe::GPU); } void set_random_seed(unsigned int seed) { Caffe::set_random_seed(seed); } +void InitLog(int level) { + FLAGS_logtostderr = 1; + FLAGS_minloglevel = level; + ::google::InitGoogleLogging(""); + ::google::InstallFailureSignalHandler(); +} +void Log(const string& s) { + LOG(INFO) << s; +} + // For convenience, check that input files can be opened, and raise an // exception that boost will send to Python if not (caffe could still crash // later if the input files are disturbed before they are actually used, but @@ -254,12 +264,12 @@ bp::object BlobVec_add_blob(bp::tuple args, bp::dict kwargs) { } template -class PythonCallback: public Solver::Callback { +class SolverCallback: public Solver::Callback { protected: bp::object on_start_, on_gradients_ready_; public: - PythonCallback(bp::object on_start, bp::object on_gradients_ready) + SolverCallback(bp::object on_start, bp::object on_gradients_ready) : on_start_(on_start), on_gradients_ready_(on_gradients_ready) { } virtual void on_gradients_ready() { on_gradients_ready_(); @@ -271,7 +281,121 @@ class PythonCallback: public Solver::Callback { template void Solver_add_callback(Solver * solver, bp::object on_start, bp::object on_gradients_ready) { - solver->add_callback(new PythonCallback(on_start, on_gradients_ready)); + solver->add_callback(new SolverCallback(on_start, on_gradients_ready)); +} +// Seems boost cannot call the base method directly +void Solver_add_nccl(SGDSolver* solver, NCCL* nccl) { + solver->add_callback(nccl); +} +template +class NetCallback: public Net::Callback { + public: + explicit NetCallback(bp::object run) : run_(run) {} + + protected: + virtual void run(int layer) { + run_(layer); + } + bp::object run_; +}; +void Net_before_forward(Net* net, bp::object run) { + net->add_before_forward(new NetCallback(run)); +} +void Net_after_forward(Net* net, bp::object run) { + net->add_after_forward(new NetCallback(run)); +} +void Net_before_backward(Net* net, bp::object run) { + net->add_before_backward(new NetCallback(run)); +} +void Net_after_backward(Net* net, bp::object run) { + net->add_after_backward(new NetCallback(run)); +} +void Net_add_nccl(Net* net, NCCL* nccl) { + net->add_after_backward(nccl); +} + +// Transformer constructor for passing phase as int +shared_ptr > Transformer_Init( + const TransformationParameter& param, int phase) { + shared_ptr > t( + new DataTransformer(param, + static_cast(phase))); + return t; +} + +void Transform(DataTransformer* trans, + const string& str, + Blob* data, + Blob* label, + int index) { + Datum datum; + datum.ParseFromString(str); + vector shape(data->shape()); + shape[0] = 1; + Blob tmp(shape); + tmp.set_cpu_data(data->mutable_cpu_data() + data->offset(index)); + trans->Transform(datum, &tmp); + label->mutable_cpu_data()[label->offset(index)] = datum.label(); +} + +template +struct proto_pickle : bp::pickle_suite { + static bp::tuple getstate(const T& proto) { + return bp::make_tuple(proto.SerializeAsString()); + } + + static void setstate(T& proto, // NOLINT(runtime/references) + bp::tuple state) { + string s = bp::extract(state[0])(); + proto.ParseFromString(s); + } +}; + +struct blob_pickle : bp::pickle_suite { + // TODO also transfer cpu side through regular IPC + static bp::tuple getstate(const Blob& blob) { + string s1(sizeof(int) * blob.shape().size(), 0); + memcpy(&s1[0], &blob.shape()[0], s1.size()); // NOLINT(caffe/alt_fn) + + cudaPointerAttributes attributes; + CUDA_CHECK(cudaPointerGetAttributes(&attributes, blob.gpu_data())); + CUDA_CHECK(cudaSetDevice(attributes.device)); + + cudaIpcMemHandle_t handle; + CUDA_CHECK(cudaIpcGetMemHandle(&handle, + reinterpret_cast(const_cast(blob.gpu_data())))); + string s2(CUDA_IPC_HANDLE_SIZE, 0); + memcpy(&s2[0], &handle, CUDA_IPC_HANDLE_SIZE); // NOLINT(caffe/alt_fn) + + return bp::make_tuple(s1, s2); + } + + static void setstate(Blob& blob, // NOLINT(runtime/references) + bp::tuple state) { + string s1 = bp::extract(state[0])(); + string s2 = bp::extract(state[1])(); + + vector shape(s1.size() / sizeof(int)); + memcpy(&shape[0], &s1[0], s1.size()); // NOLINT(caffe/alt_fn) + blob.Reshape(shape); + + cudaIpcMemHandle_t handle; + memcpy(&handle, &s2[0], CUDA_IPC_HANDLE_SIZE); // NOLINT(caffe/alt_fn) + Dtype* data; + CUDA_CHECK(cudaIpcOpenMemHandle(reinterpret_cast(&data), handle, + cudaIpcMemLazyEnablePeerAccess)); + blob.set_gpu_data(data); + } +}; + +int phase_as_int(LayerParameter* param) { + return static_cast(param->phase()); +} +void prefetch_to_gpu(Blob* blob) { + blob->gpu_data(); +} +void set_gpu_data(Blob* blob, Blob* source) { + blob->set_gpu_data(source->mutable_gpu_data()); } BOOST_PYTHON_MEMBER_FUNCTION_OVERLOADS(SolveOverloads, Solve, 0, 1); @@ -283,10 +407,17 @@ BOOST_PYTHON_MODULE(_caffe) { bp::scope().attr("__version__") = AS_STRING(CAFFE_VERSION); // Caffe utility functions + bp::def("init_log", &InitLog); + bp::def("log", &Log); bp::def("set_mode_cpu", &set_mode_cpu); bp::def("set_mode_gpu", &set_mode_gpu); bp::def("set_random_seed", &set_random_seed); + bp::def("get_random", &caffe_rng_rand); bp::def("set_device", &Caffe::SetDevice); + bp::def("solver_count", &Caffe::solver_count); + bp::def("set_solver_count", &Caffe::set_solver_count); + bp::def("solver_rank", &Caffe::solver_rank); + bp::def("set_solver_rank", &Caffe::set_solver_rank); bp::def("layer_type_list", &LayerRegistry::LayerTypeList); @@ -317,6 +448,7 @@ BOOST_PYTHON_MODULE(_caffe) { bp::return_internal_reference<>())) .add_property("layers", bp::make_function(&Net::layers, bp::return_internal_reference<>())) + .def("layer", bp::make_function(&Net::layer_by_name)) .add_property("_blob_names", bp::make_function(&Net::blob_names, bp::return_value_policy())) .add_property("_layer_names", bp::make_function(&Net::layer_names, @@ -330,11 +462,16 @@ BOOST_PYTHON_MODULE(_caffe) { bp::with_custodian_and_ward<1, 2, bp::with_custodian_and_ward<1, 3> >()) .def("save", &Net_Save) .def("save_hdf5", &Net_SaveHDF5) - .def("load_hdf5", &Net_LoadHDF5); + .def("load_hdf5", &Net_LoadHDF5) + .def("before_forward", &Net_before_forward) + .def("after_forward", &Net_after_forward) + .def("before_backward", &Net_before_backward) + .def("after_backward", &Net_after_backward) + .def("after_backward", &Net_add_nccl); BP_REGISTER_SHARED_PTR_TO_PYTHON(Net); bp::class_, shared_ptr >, boost::noncopyable>( - "Blob", bp::no_init) + "Blob", bp::init<>()) .add_property("shape", bp::make_function( static_cast& (Blob::*)() const>( @@ -350,7 +487,10 @@ BOOST_PYTHON_MODULE(_caffe) { .add_property("data", bp::make_function(&Blob::mutable_cpu_data, NdarrayCallPolicies())) .add_property("diff", bp::make_function(&Blob::mutable_cpu_diff, - NdarrayCallPolicies())); + NdarrayCallPolicies())) + .def_pickle(blob_pickle()) + .def("prefetch_to_gpu", &prefetch_to_gpu) + .def("set_gpu_data", &set_gpu_data); BP_REGISTER_SHARED_PTR_TO_PYTHON(Blob); bp::class_, shared_ptr >, @@ -359,10 +499,43 @@ BOOST_PYTHON_MODULE(_caffe) { bp::return_internal_reference<>())) .def("setup", &Layer::LayerSetUp) .def("reshape", &Layer::Reshape) - .add_property("type", bp::make_function(&Layer::type)); + .add_property("type", bp::make_function(&Layer::type)) + .add_property("layer_param", bp::make_function(&Layer::layer_param, + bp::return_value_policy())); BP_REGISTER_SHARED_PTR_TO_PYTHON(Layer); - bp::class_("LayerParameter", bp::no_init); + bp::class_("SolverParameter", bp::init<>()) + .add_property("max_iter", &SolverParameter::max_iter) + .add_property("display", &SolverParameter::display) + .def_pickle(proto_pickle()); + bp::class_("TransformationParameter", bp::init<>()) + .add_property("crop_size", &TransformationParameter::crop_size); + bp::class_("DataParameter", bp::init<>()) + .add_property("batch_size", &DataParameter::batch_size) + .add_property("source", bp::make_function(&DataParameter::source, + bp::return_value_policy())) + .add_property("backend", &DataParameter::backend) + .def_pickle(proto_pickle()); + bp::class_("MemoryDataParameter", bp::init<>()) + .add_property("batch_size", &MemoryDataParameter::batch_size) + .add_property("channels", &MemoryDataParameter::channels) + .add_property("height", &MemoryDataParameter::height) + .add_property("width", &MemoryDataParameter::width) + .def_pickle(proto_pickle()); + bp::class_("LayerParameter", bp::init<>()) + .add_property("name", bp::make_function(&LayerParameter::name, + bp::return_value_policy())) + .add_property("phase", &phase_as_int) + .add_property("top_size", &LayerParameter::top_size) + .add_property("transform_param", + bp::make_function(&LayerParameter::transform_param, + bp::return_value_policy())) + .add_property("data_param", bp::make_function(&LayerParameter::data_param, + bp::return_value_policy())) + .add_property("memory_data_param", + bp::make_function(&LayerParameter::memory_data_param, + bp::return_value_policy())) + .def_pickle(proto_pickle()); bp::class_, shared_ptr >, boost::noncopyable>( "Solver", bp::no_init) @@ -371,11 +544,14 @@ BOOST_PYTHON_MODULE(_caffe) { bp::return_internal_reference<>())) .add_property("iter", &Solver::iter) .def("add_callback", &Solver_add_callback) + .def("add_callback", &Solver_add_nccl) .def("solve", static_cast::*)(const char*)>( &Solver::Solve), SolveOverloads()) .def("step", &Solver::Step) .def("restore", &Solver::Restore) - .def("snapshot", &Solver::Snapshot); + .def("snapshot", &Solver::Snapshot) + .add_property("param", bp::make_function(&Solver::param, + bp::return_value_policy())); BP_REGISTER_SHARED_PTR_TO_PYTHON(Solver); bp::class_, bp::bases >, @@ -419,6 +595,27 @@ BOOST_PYTHON_MODULE(_caffe) { bp::class_ >("BoolVec") .def(bp::vector_indexing_suite >()); + bp::class_, shared_ptr >, + boost::noncopyable>("DataTransformer", bp::no_init) + .def("__init__", bp::make_constructor(&Transformer_Init)) + .def("init_rand", &DataTransformer::InitRand) + .def("transform", &Transform); + BP_REGISTER_SHARED_PTR_TO_PYTHON(DataTransformer); + + bp::class_, shared_ptr >, + boost::noncopyable>("NCCL", + bp::init >, const string&>()) + .def("new_uid", &NCCL::new_uid).staticmethod("new_uid") + .def("bcast", &NCCL::bcast); + BP_REGISTER_SHARED_PTR_TO_PYTHON(NCCL); + + bp::class_, boost::noncopyable>( + "Timer", bp::init<>()) + .def("start", &Timer::Start) + .def("stop", &Timer::Stop) + .add_property("ms", &Timer::MilliSeconds); + BP_REGISTER_SHARED_PTR_TO_PYTHON(Timer); + // boost python expects a void (missing) return value, while import_array // returns NULL for python3. import_array1() forces a void return value. import_array1(); diff --git a/python/caffe/pycaffe.py b/python/caffe/pycaffe.py index 5bae18d9a4d..4c0c0a7765b 100644 --- a/python/caffe/pycaffe.py +++ b/python/caffe/pycaffe.py @@ -11,7 +11,8 @@ import numpy as np from ._caffe import Net, SGDSolver, NesterovSolver, AdaGradSolver, \ - RMSPropSolver, AdaDeltaSolver, AdamSolver + RMSPropSolver, AdaDeltaSolver, AdamSolver, DataTransformer, \ + Blob, NCCL, Timer import caffe.io import six diff --git a/python/caffe/train.py b/python/caffe/train.py new file mode 100644 index 00000000000..04383de198e --- /dev/null +++ b/python/caffe/train.py @@ -0,0 +1,278 @@ +""" +Trains a model using one or more GPUs. Existing solver and model params can +be used, with the only modification of changing the data layer type from 'Data' +to 'Python', and adding a python_params section like this: + + python_param { + module: "caffe.train" + layer: "DataLayer" + } + +Other sections like transform_param and data_param will be interpreted by the +python layer and can be used as is. LMDB is currently the only supported backend. +""" +from Queue import Empty +from multiprocessing import Process, Queue + +import lmdb, time +import caffe + + +def train( + solver, # solver proto definition + snapshot, # network snapshot to restore + gpus, # set of GPUs to run on + layer_wise_reduce=False, # overlaps training and transfers + data_layer_count=1, # For nets with multiple TODO get from net proto + prefetchers_per_gpu=2, # 0 to disable prefetching + prefetch_queue_size=4, # more can improve perf, but needs GPUs memory + timing=False, # show timing info for compute and communications +): + _train(solver=solver, snapshot=snapshot, gpus=gpus, + layer_wise_reduce=layer_wise_reduce, + data_layer_count=data_layer_count, + prefetchers_per_gpu=prefetchers_per_gpu, + prefetch_queue_size=prefetch_queue_size, + timing=timing) + + +def _train(**kwargs): + # NCCL uses a uid to identify the session + uid = caffe.NCCL.new_uid() + + procs = [] + for rank in range(len(kwargs['gpus'])): + queues = [] + for _ in range(kwargs['data_layer_count']): + for _ in range(kwargs['prefetchers_per_gpu'] * 2): # train + test + queues.append(QueuePair()) + + p = Process(target=solve, args=(uid, rank, queues, kwargs)) + p.daemon = True + p.start() + procs.append(p) + + for i in range(len(queues)): + p = Process(target=fill, args=(queues[i], rank, i, kwargs)) + p.daemon = True + p.start() + + for p in procs: + p.join() + + +class DataLayer(caffe.Layer): + """ + Python version of Caffe's data layer. It reads transform_param to apply the + same transforms as the original. If prefetching if enabled, loading and transform + are done in separate processes, and retrieved through zero-copy CUDA IPC. + """ + + def init(self, queues, kwargs): + self.queues = queues + self.queue = 0 + self.index = -1 + self.kwargs = kwargs + if kwargs['prefetchers_per_gpu'] == 0: + self.source = DataSource(caffe.solver_rank(), 0, + self.layer_param, + caffe.get_random(), kwargs) + for queue in queues: + assert len(queue.items) == 0 + for i in range(kwargs['prefetch_queue_size']): + queue.free.put(i) + blobs = [] + for _ in range(self.layer_param.top_size): + blobs.append(caffe.Blob()) + self.reshape(None, blobs) + # Make sure buffer is created before queueing + for blob in blobs: + blob.prefetch_to_gpu() + queue.items.append(blobs) + # Arguments for prefetch process + queue.init.put((queue.items, self.layer_param, caffe.get_random())) + + def setup(self, bottom, top): + pass + + def reshape(self, bottom, top): + batch = self.layer_param.data_param.batch_size + # If the data layer does not have a transform, you need to specify + # its shape manually, e.g. for mnist: + # top[0].reshape(batch, 1, 28, 28) + top[0].reshape(batch, + 3, + self.layer_param.transform_param.crop_size, + self.layer_param.transform_param.crop_size) + top[1].reshape(batch, 1) + + def forward(self, bottom, top): + if self.kwargs['prefetchers_per_gpu'] == 0: + self.source.batch(top) + else: + if self.index != -1: + self.queues[self.queue].free.put(self.index) + # Round robin for deterministic runs + self.queue += 1 + if self.queue == len(self.queues): + self.queue = 0 + + qp = self.queues[self.queue] + try: + self.index = qp.full.get(block=False) + except Empty: + caffe.log('Waiting on data') + self.index = qp.full.get() + data = qp.items[self.index] + for i in range(len(data)): + top[i].set_gpu_data(data[i]) + + def backward(self, top, propagate_down, bottom): + pass + + +class DataSource: + def __init__(self, rank, index, param, seed, kwargs): + self.batch_size = param.data_param.batch_size + self.db = self.open_db(param.data_param.source, rank, index, + param.phase, kwargs) + self.tr = caffe.DataTransformer(param.transform_param, param.phase) + self.tr.init_rand(seed) + + def open_db(self, path, rank, index, phase, kwargs): + """ + Reads items from lmdb, skipping keys that will be read by + other processes and threads. + """ + caffe.log('lmdb open %s' % path) + env = lmdb.open(path, map_size=1024 ^ 4, readonly=True, create=False) + txn = env.begin() + + per_gpu = max(1, kwargs['prefetchers_per_gpu']) + segment = len(kwargs['gpus']) * per_gpu + offset = 0 + while True: + for key, value in txn.cursor(): # TODO also test in parallel + if offset % segment == rank * per_gpu + index or phase == caffe.TEST: + yield value + offset += 1 + + def batch(self, blobs): + for i in range(self.batch_size): + self.tr.transform(self.db.next(), blobs[0], blobs[1], i) + + +class QueuePair: + """ + Exchange items between processes. Items are sent once initially + through the init queue, then only indexes are exchanged. + """ + + def __init__(self): + self.free = Queue() + self.full = Queue() + + # Initial arguments and items cached on each side + self.init = Queue() + self.items = [] + + +def solve(uid, rank, queues, kwargs): + gpus = kwargs['gpus'] + + # glog levels: INFO = 0, WARNING = 1, ERROR = 2, FATAL = 3 + caffe.init_log(0 if rank == 0 else 1) + caffe.set_mode_gpu() + caffe.set_device(gpus[rank]) + caffe.set_solver_count(len(gpus)) + caffe.set_solver_rank(rank) + + solver = caffe.SGDSolver(kwargs['solver']) + if rank == 0 and kwargs['snapshot'] is not None: + solver.restore(kwargs['snapshot']) + + index = 0 + batch = 0 + prefetchers_per_gpu = kwargs['prefetchers_per_gpu'] + for layer in solver.net.layers: + if isinstance(layer, DataLayer): + layer.init(queues[index: index + prefetchers_per_gpu], kwargs) + index += prefetchers_per_gpu + batch = layer.layer_param.data_param.batch_size + for layer in solver.test_nets[0].layers: + if isinstance(layer, DataLayer): + layer.init(queues[index: index + prefetchers_per_gpu], kwargs) + index += prefetchers_per_gpu + assert index == len(queues) + + nccl = caffe.NCCL(solver, uid) + nccl.bcast() + display = solver.param.display + + if kwargs['timing']: + fprop = [] + bprop = [] + total = caffe.Timer() + allrd = caffe.Timer() + for _ in range(len(solver.net.layers)): + fprop.append(caffe.Timer()) + bprop.append(caffe.Timer()) + + def show_time(): + if solver.iter % display == 0: + s = '\n' + for i in range(len(solver.net.layers)): + s += 'forw %3d %8s ' % (i, solver.net.layers[i].layer_param.name) + s += ': %.2f\n' % fprop[i].ms + for i in range(len(solver.net.layers) - 1, -1, -1): + s += 'back %3d %8s ' % (i, solver.net.layers[i].layer_param.name) + s += ': %.2f\n' % bprop[i].ms + s += 'solver total: %.2f\n' % total.ms + s += 'allreduce: %.2f\n' % allrd.ms + caffe.log(s) + + solver.net.before_forward(lambda layer: fprop[layer].start()) + solver.net.after_forward(lambda layer: fprop[layer].stop()) + solver.net.before_backward(lambda layer: bprop[layer].start()) + solver.net.after_backward(lambda layer: bprop[layer].stop()) + solver.add_callback(lambda: total.start(), lambda: (total.stop(), allrd.start())) + solver.add_callback(nccl) + solver.add_callback(lambda: '', lambda: (allrd.stop(), show_time())) + else: + solver.add_callback(nccl) + + class Rate: + def __init__(self): + self.start = time.time() + self.count = 0 + + def run(self): + if solver.iter % display == 0: + nbr = batch * solver.iter * len(gpus) + now = time.time() + caffe.log('%d examples/s' % ((nbr - self.count) / (now - self.start))) + self.start = now + self.count = nbr + + rate = Rate() + solver.add_callback(lambda: '', lambda: rate.run()) + + if kwargs['layer_wise_reduce']: + solver.net.after_backward(nccl) + solver.step(solver.param.max_iter) + + +def fill(qp, rank, index, kwargs): + caffe.init_log(0 if rank == 0 else 1) + caffe.set_device(kwargs['gpus'][rank]) + args = qp.init.get() + assert len(qp.items) == 0 + qp.items = args[0] + assert len(qp.items) == kwargs['prefetch_queue_size'] + source = DataSource(rank, index, args[1], args[2], kwargs) + while True: + index = qp.free.get() + source.batch(qp.items[index]) + for blob in qp.items[index]: + blob.prefetch_to_gpu() + qp.full.put(index) diff --git a/python/multi_gpu.py b/python/multi_gpu.py new file mode 100644 index 00000000000..23bb9872fe2 --- /dev/null +++ b/python/multi_gpu.py @@ -0,0 +1,11 @@ +#!/usr/bin/env python +import caffe + +# Example multi-GPU training +caffe.train( + solver='models/bvlc_reference_caffenet/solver.prototxt', + snapshot=None, + gpus=range(8), + layer_wise_reduce=True, + # timing=True, +) diff --git a/src/caffe/blob.cpp b/src/caffe/blob.cpp index 4a34e4c5856..863d940c190 100644 --- a/src/caffe/blob.cpp +++ b/src/caffe/blob.cpp @@ -98,6 +98,12 @@ const Dtype* Blob::gpu_data() const { return (const Dtype*)data_->gpu_data(); } +template +void Blob::set_gpu_data(Dtype* data) { + CHECK(data); + data_->set_gpu_data(data); +} + template const Dtype* Blob::cpu_diff() const { CHECK(diff_); diff --git a/src/caffe/common.cpp b/src/caffe/common.cpp index dee681654aa..79027ce2605 100644 --- a/src/caffe/common.cpp +++ b/src/caffe/common.cpp @@ -106,7 +106,7 @@ void* Caffe::RNG::generator() { Caffe::Caffe() : cublas_handle_(NULL), curand_generator_(NULL), random_generator_(), - mode_(Caffe::CPU), solver_count_(1), root_solver_(true) { + mode_(Caffe::CPU), solver_count_(1), solver_rank_(0), root_solver_(true) { // Try to create a cublas handler, and report an error if failed (but we will // keep the program running as one might just want to run CPU code). if (cublasCreate(&cublas_handle_) != CUBLAS_STATUS_SUCCESS) { diff --git a/src/caffe/data_transformer.cpp b/src/caffe/data_transformer.cpp index 7189d67e289..21216aa9c9d 100644 --- a/src/caffe/data_transformer.cpp +++ b/src/caffe/data_transformer.cpp @@ -520,12 +520,11 @@ vector DataTransformer::InferBlobShape( #endif // USE_OPENCV template -void DataTransformer::InitRand() { +void DataTransformer::InitRand(unsigned int seed) { const bool needs_rand = param_.mirror() || (phase_ == TRAIN && param_.crop_size()); if (needs_rand) { - const unsigned int rng_seed = caffe_rng_rand(); - rng_.reset(new Caffe::RNG(rng_seed)); + rng_.reset(new Caffe::RNG(seed)); } else { rng_.reset(); } diff --git a/src/caffe/internal_thread.cpp b/src/caffe/internal_thread.cpp index 104884e0295..95825bc4f27 100644 --- a/src/caffe/internal_thread.cpp +++ b/src/caffe/internal_thread.cpp @@ -28,24 +28,26 @@ void InternalThread::StartInternalThread() { Caffe::Brew mode = Caffe::mode(); int rand_seed = caffe_rng_rand(); int solver_count = Caffe::solver_count(); + int solver_rank = Caffe::solver_rank(); bool root_solver = Caffe::root_solver(); try { thread_.reset(new boost::thread(&InternalThread::entry, this, device, mode, - rand_seed, solver_count, root_solver)); + rand_seed, solver_count, solver_rank, root_solver)); } catch (std::exception& e) { LOG(FATAL) << "Thread exception: " << e.what(); } } void InternalThread::entry(int device, Caffe::Brew mode, int rand_seed, - int solver_count, bool root_solver) { + int solver_count, int solver_rank, bool root_solver) { #ifndef CPU_ONLY CUDA_CHECK(cudaSetDevice(device)); #endif Caffe::set_mode(mode); Caffe::set_random_seed(rand_seed); Caffe::set_solver_count(solver_count); + Caffe::set_solver_rank(solver_rank); Caffe::set_root_solver(root_solver); InternalThreadEntry(); diff --git a/src/caffe/layer_factory.cpp b/src/caffe/layer_factory.cpp index e967bd6181c..f14253a510e 100644 --- a/src/caffe/layer_factory.cpp +++ b/src/caffe/layer_factory.cpp @@ -67,6 +67,7 @@ shared_ptr > GetConvolutionLayer( #endif } else { LOG(FATAL) << "Layer " << param.name() << " has unknown engine."; + throw; // Avoids missing return warning } } @@ -104,6 +105,7 @@ shared_ptr > GetPoolingLayer(const LayerParameter& param) { #endif } else { LOG(FATAL) << "Layer " << param.name() << " has unknown engine."; + throw; // Avoids missing return warning } } @@ -141,6 +143,7 @@ shared_ptr > GetLRNLayer(const LayerParameter& param) { #endif } else { LOG(FATAL) << "Layer " << param.name() << " has unknown engine."; + throw; // Avoids missing return warning } } @@ -164,6 +167,7 @@ shared_ptr > GetReLULayer(const LayerParameter& param) { #endif } else { LOG(FATAL) << "Layer " << param.name() << " has unknown engine."; + throw; // Avoids missing return warning } } @@ -187,6 +191,7 @@ shared_ptr > GetSigmoidLayer(const LayerParameter& param) { #endif } else { LOG(FATAL) << "Layer " << param.name() << " has unknown engine."; + throw; // Avoids missing return warning } } @@ -210,6 +215,7 @@ shared_ptr > GetSoftmaxLayer(const LayerParameter& param) { #endif } else { LOG(FATAL) << "Layer " << param.name() << " has unknown engine."; + throw; // Avoids missing return warning } } @@ -233,6 +239,7 @@ shared_ptr > GetTanHLayer(const LayerParameter& param) { #endif } else { LOG(FATAL) << "Layer " << param.name() << " has unknown engine."; + throw; // Avoids missing return warning } } diff --git a/src/caffe/layers/base_data_layer.cpp b/src/caffe/layers/base_data_layer.cpp index 989319f1a07..6906e64bb0d 100644 --- a/src/caffe/layers/base_data_layer.cpp +++ b/src/caffe/layers/base_data_layer.cpp @@ -27,7 +27,7 @@ void BaseDataLayer::LayerSetUp(const vector*>& bottom, } data_transformer_.reset( new DataTransformer(transform_param_, this->phase_)); - data_transformer_->InitRand(); + data_transformer_->InitRand(caffe_rng_rand()); // The subclasses should setup the size of bottom and top DataLayerSetUp(bottom, top); } @@ -67,7 +67,7 @@ void BasePrefetchingDataLayer::LayerSetUp( } #endif DLOG(INFO) << "Initializing prefetch"; - this->data_transformer_->InitRand(); + this->data_transformer_->InitRand(caffe_rng_rand()); StartInternalThread(); DLOG(INFO) << "Prefetch initialized."; } diff --git a/src/caffe/net.cpp b/src/caffe/net.cpp index 644cb7e97ee..1c81fe6ed0f 100644 --- a/src/caffe/net.cpp +++ b/src/caffe/net.cpp @@ -546,10 +546,15 @@ Dtype Net::ForwardFromTo(int start, int end) { CHECK_LT(end, layers_.size()); Dtype loss = 0; for (int i = start; i <= end; ++i) { - // LOG(ERROR) << "Forwarding " << layer_names_[i]; + for (int c = 0; c < before_forward_.size(); ++c) { + before_forward_[c]->run(i); + } Dtype layer_loss = layers_[i]->Forward(bottom_vecs_[i], top_vecs_[i]); loss += layer_loss; if (debug_info_) { ForwardDebugInfo(i); } + for (int c = 0; c < after_forward_.size(); ++c) { + after_forward_[c]->run(i); + } } return loss; } @@ -591,11 +596,17 @@ void Net::BackwardFromTo(int start, int end) { CHECK_GE(end, 0); CHECK_LT(start, layers_.size()); for (int i = start; i >= end; --i) { + for (int c = 0; c < before_backward_.size(); ++c) { + before_backward_[c]->run(i); + } if (layer_need_backward_[i]) { layers_[i]->Backward( top_vecs_[i], bottom_need_backward_[i], bottom_vecs_[i]); if (debug_info_) { BackwardDebugInfo(i); } } + for (int c = 0; c < after_backward_.size(); ++c) { + after_backward_[c]->run(i); + } } } diff --git a/src/caffe/parallel.cpp b/src/caffe/parallel.cpp index 5bc41c6a6e5..219dff46020 100644 --- a/src/caffe/parallel.cpp +++ b/src/caffe/parallel.cpp @@ -8,6 +8,10 @@ #include #include +#ifdef USE_NCCL +#include +#endif + #include "boost/thread.hpp" #include "caffe/caffe.hpp" #include "caffe/parallel.hpp" @@ -436,8 +440,147 @@ void P2PSync::Run(const vector& gpus) { } } +static int getDevice() { + int device = 0; +#ifndef CPU_ONLY + CUDA_CHECK(cudaGetDevice(&device)); +#endif + return device; +} + +template +NCCL::NCCL(shared_ptr > solver, const string& uid) + : GPUParams(solver, getDevice()), + solver_(solver), layer_wise_() { + this->configure(solver.get()); + +#ifdef USE_NCCL + if (uid.size() > 0) { // Multi-process case + ncclUniqueId nccl_uid; + memcpy(&nccl_uid, &uid[0], NCCL_UNIQUE_ID_BYTES); // NOLINT(caffe/alt_fn) + NCCL_CHECK(ncclCommInitRank(&comm_, + Caffe::solver_count(), + nccl_uid, + Caffe::solver_rank())); + } else { + memset(&comm_, 0, sizeof(comm_)); // NOLINT(caffe/alt_fn) + } + + CUDA_CHECK(cudaStreamCreateWithFlags(&stream_, cudaStreamNonBlocking)); + layer_events_.resize(solver->net()->layers().size()); + for (int i = 0; i < layer_events_.size(); ++i) { + CUDA_CHECK(cudaEventCreateWithFlags(&layer_events_[i], + cudaEventDisableTiming)); + } + CUDA_CHECK(cudaEventCreateWithFlags(&solver_event_, cudaEventDisableTiming)); +#else + LOG(FATAL) << "Multi-GPU execution not available - rebuild with USE_NCCL"; +#endif +} + +template +NCCL::~NCCL() { +#ifdef USE_NCCL + CUDA_CHECK(cudaEventDestroy(solver_event_)); + for (int i = 0; i < layer_events_.size(); ++i) { + CUDA_CHECK(cudaEventDestroy(layer_events_[i])); + } + CUDA_CHECK(cudaStreamDestroy(stream_)); + if (comm_) + ncclCommDestroy(comm_); +#endif +} + +template +void NCCL::init_single_process(vector*>* nccls) { +#ifdef USE_NCCL + ncclComm_t* comms = new ncclComm_t[nccls->size()]; + int* gpu_list = new int[nccls->size()]; + for (int i = 0; i < nccls->size(); ++i) { + gpu_list[i] = (*nccls)[i]->solver_->param().device_id(); + } + NCCL_CHECK(ncclCommInitAll(comms, static_cast(nccls->size()), gpu_list)); + for (int i = 0; i < nccls->size(); ++i) { + (*nccls)[i]->comm_ = comms[i]; + } +#endif +} + +template +string NCCL::new_uid() { + string uid(NCCL_UNIQUE_ID_BYTES, 0); +#ifdef USE_NCCL + ncclUniqueId nccl_uid; + NCCL_CHECK(ncclGetUniqueId(&nccl_uid)); + memcpy(&uid[0], &nccl_uid, NCCL_UNIQUE_ID_BYTES); // NOLINT(caffe/alt_fn) +#endif + return uid; +} + +template +void NCCL::bcast() { +#ifdef USE_NCCL + NCCL_CHECK(ncclBcast(data_, static_cast(size_), + nccl::dataType::type, 0, + comm_, cudaStreamDefault)); +#endif +} + +template +void NCCL::on_gradients_ready() { +#ifdef USE_NCCL + if (layer_wise_) { + CHECK_EQ(solver_->net()->params().size(), + solver_->net()->learnable_params().size()) + << "Layer-wise reduce is not supported for nets wih shared weights."; + // Make sure reduction is done before applying gradients + CUDA_CHECK(cudaEventRecord(solver_event_, stream_)); + CUDA_CHECK(cudaStreamWaitEvent(cudaStreamDefault, solver_event_, 0)); + } else { + NCCL_CHECK(ncclAllReduce(diff_, diff_, static_cast(size_), + nccl::dataType::type, ncclSum, comm_, + cudaStreamDefault)); + caffe_gpu_scal(static_cast(size_), (Dtype) 1.0 / comm_->nDev, diff_); + } +#endif +} + +template +void NCCL::run(int layer) { +#ifdef USE_NCCL + vector > >& blobs = + solver_->net()->layers()[layer]->blobs(); +#ifdef DEBUG + // Assert blobs are contiguous to reduce in one step (e.g. bias often small) + for (int i = 1; i < blobs.size(); ++i) { + CHECK_EQ(blobs[i - 1]->gpu_diff() + blobs[i - 1]->count(), + blobs[i + 0]->gpu_diff()); + } +#endif + if (blobs.size() > 0) { + // Make sure default stream is done with gradients + CUDA_CHECK(cudaEventRecord(layer_events_[layer], cudaStreamDefault)); + CUDA_CHECK(cudaStreamWaitEvent(stream_, layer_events_[layer], 0)); + // Reduce asynchronously, while default stream continues to next layer + int size = 0; + for (int i = 0; i < blobs.size(); ++i) { + size += blobs[i]->count(); + } + NCCL_CHECK(ncclAllReduce(blobs[0]->mutable_gpu_diff(), + blobs[0]->mutable_gpu_diff(), + size, + nccl::dataType::type, + ncclSum, comm_, stream_)); + caffe_gpu_scal(size, (Dtype) 1.0 / comm_->nDev, + blobs[0]->mutable_gpu_diff(), stream_); + layer_wise_ = true; + } +#endif +} + INSTANTIATE_CLASS(Params); INSTANTIATE_CLASS(GPUParams); +INSTANTIATE_CLASS(NCCL); INSTANTIATE_CLASS(P2PSync); } // namespace caffe diff --git a/src/caffe/solver.cpp b/src/caffe/solver.cpp index ece3913e88a..63fb464fe3e 100644 --- a/src/caffe/solver.cpp +++ b/src/caffe/solver.cpp @@ -50,8 +50,8 @@ void Solver::Init(const SolverParameter& param) { param_ = param; CHECK_GE(param_.average_loss(), 1) << "average_loss should be non-negative."; CheckSnapshotWritePermissions(); - if (Caffe::root_solver() && param_.random_seed() >= 0) { - Caffe::set_random_seed(param_.random_seed()); + if (param_.random_seed() >= 0) { + Caffe::set_random_seed(param_.random_seed() + Caffe::solver_rank()); } // Scaffolding code InitTrainNet(); @@ -203,7 +203,7 @@ void Solver::Step(int iters) { net_->ClearParamDiffs(); if (param_.test_interval() && iter_ % param_.test_interval() == 0 && (iter_ > 0 || param_.test_initialization()) - && Caffe::root_solver()) { + && Caffe::root_solver() && Caffe::solver_rank() == 0) { TestAll(); if (requested_early_exit_) { // Break out of the while loop because stop was requested while testing. diff --git a/src/caffe/test/test_data_transformer.cpp b/src/caffe/test/test_data_transformer.cpp index 31bf1c1fb14..9f07936eb0b 100644 --- a/src/caffe/test/test_data_transformer.cpp +++ b/src/caffe/test/test_data_transformer.cpp @@ -42,7 +42,7 @@ class DataTransformTest : public ::testing::Test { DataTransformer transformer(transform_param, phase); const int crop_size = transform_param.crop_size(); Caffe::set_random_seed(seed_); - transformer.InitRand(); + transformer.InitRand(caffe_rng_rand()); Blob blob(1, datum.channels(), datum.height(), datum.width()); if (transform_param.crop_size() > 0) { blob.Reshape(1, datum.channels(), crop_size, crop_size); @@ -87,7 +87,7 @@ TYPED_TEST(DataTransformTest, TestEmptyTransform) { FillDatum(label, channels, height, width, unique_pixels, &datum); Blob blob(1, channels, height, width); DataTransformer transformer(transform_param, TEST); - transformer.InitRand(); + transformer.InitRand(caffe_rng_rand()); transformer.Transform(datum, &blob); EXPECT_EQ(blob.num(), 1); EXPECT_EQ(blob.channels(), datum.channels()); @@ -110,7 +110,7 @@ TYPED_TEST(DataTransformTest, TestEmptyTransformUniquePixels) { FillDatum(label, channels, height, width, unique_pixels, &datum); Blob blob(1, 3, 4, 5); DataTransformer transformer(transform_param, TEST); - transformer.InitRand(); + transformer.InitRand(caffe_rng_rand()); transformer.Transform(datum, &blob); EXPECT_EQ(blob.num(), 1); EXPECT_EQ(blob.channels(), datum.channels()); @@ -134,7 +134,7 @@ TYPED_TEST(DataTransformTest, TestCropSize) { Datum datum; FillDatum(label, channels, height, width, unique_pixels, &datum); DataTransformer transformer(transform_param, TEST); - transformer.InitRand(); + transformer.InitRand(caffe_rng_rand()); Blob blob(1, channels, crop_size, crop_size); for (int iter = 0; iter < this->num_iter_; ++iter) { transformer.Transform(datum, &blob); @@ -272,7 +272,7 @@ TYPED_TEST(DataTransformTest, TestMeanValue) { FillDatum(label, channels, height, width, unique_pixels, &datum); Blob blob(1, channels, height, width); DataTransformer transformer(transform_param, TEST); - transformer.InitRand(); + transformer.InitRand(caffe_rng_rand()); transformer.Transform(datum, &blob); for (int j = 0; j < blob.count(); ++j) { EXPECT_EQ(blob.cpu_data()[j], label - mean_value); @@ -294,7 +294,7 @@ TYPED_TEST(DataTransformTest, TestMeanValues) { FillDatum(label, channels, height, width, unique_pixels, &datum); Blob blob(1, channels, height, width); DataTransformer transformer(transform_param, TEST); - transformer.InitRand(); + transformer.InitRand(caffe_rng_rand()); transformer.Transform(datum, &blob); for (int c = 0; c < channels; ++c) { for (int j = 0; j < height * width; ++j) { @@ -333,7 +333,7 @@ TYPED_TEST(DataTransformTest, TestMeanFile) { FillDatum(label, channels, height, width, unique_pixels, &datum); Blob blob(1, channels, height, width); DataTransformer transformer(transform_param, TEST); - transformer.InitRand(); + transformer.InitRand(caffe_rng_rand()); transformer.Transform(datum, &blob); for (int j = 0; j < blob.count(); ++j) { EXPECT_EQ(blob.cpu_data()[j], 0); diff --git a/src/caffe/util/benchmark.cpp b/src/caffe/util/benchmark.cpp index 1d269c351c1..d994225f97b 100644 --- a/src/caffe/util/benchmark.cpp +++ b/src/caffe/util/benchmark.cpp @@ -44,7 +44,6 @@ void Timer::Stop() { if (Caffe::mode() == Caffe::GPU) { #ifndef CPU_ONLY CUDA_CHECK(cudaEventRecord(stop_gpu_, 0)); - CUDA_CHECK(cudaEventSynchronize(stop_gpu_)); #else NO_GPU; #endif @@ -66,6 +65,7 @@ float Timer::MicroSeconds() { } if (Caffe::mode() == Caffe::GPU) { #ifndef CPU_ONLY + CUDA_CHECK(cudaEventSynchronize(stop_gpu_)); CUDA_CHECK(cudaEventElapsedTime(&elapsed_milliseconds_, start_gpu_, stop_gpu_)); // Cuda only measure milliseconds @@ -89,6 +89,7 @@ float Timer::MilliSeconds() { } if (Caffe::mode() == Caffe::GPU) { #ifndef CPU_ONLY + CUDA_CHECK(cudaEventSynchronize(stop_gpu_)); CUDA_CHECK(cudaEventElapsedTime(&elapsed_milliseconds_, start_gpu_, stop_gpu_)); #else diff --git a/src/caffe/util/math_functions.cu b/src/caffe/util/math_functions.cu index 4c587537435..6d001026082 100644 --- a/src/caffe/util/math_functions.cu +++ b/src/caffe/util/math_functions.cu @@ -90,6 +90,26 @@ void caffe_gpu_scal(const int N, const double alpha, double *X) { CUBLAS_CHECK(cublasDscal(Caffe::cublas_handle(), N, &alpha, X, 1)); } +template <> +void caffe_gpu_scal(const int N, const float alpha, float* X, + cudaStream_t str) { + cudaStream_t initial_stream; + CUBLAS_CHECK(cublasGetStream(Caffe::cublas_handle(), &initial_stream)); + CUBLAS_CHECK(cublasSetStream(Caffe::cublas_handle(), str)); + CUBLAS_CHECK(cublasSscal(Caffe::cublas_handle(), N, &alpha, X, 1)); + CUBLAS_CHECK(cublasSetStream(Caffe::cublas_handle(), initial_stream)); +} + +template <> +void caffe_gpu_scal(const int N, const double alpha, double* X, + cudaStream_t str) { + cudaStream_t initial_stream; + CUBLAS_CHECK(cublasGetStream(Caffe::cublas_handle(), &initial_stream)); + CUBLAS_CHECK(cublasSetStream(Caffe::cublas_handle(), str)); + CUBLAS_CHECK(cublasDscal(Caffe::cublas_handle(), N, &alpha, X, 1)); + CUBLAS_CHECK(cublasSetStream(Caffe::cublas_handle(), initial_stream)); +} + template <> void caffe_gpu_axpby(const int N, const float alpha, const float* X, const float beta, float* Y) {