From 0621c327f1d0dd272ab7248c50e9afa8ae0fc0c0 Mon Sep 17 00:00:00 2001 From: Yang Yang Date: Tue, 13 Mar 2018 23:52:35 +0000 Subject: [PATCH 01/25] init commit --- doc/design/parallel_executor.md | 52 ++++++++++++++++++ paddle/fluid/framework/CMakeLists.txt | 2 + paddle/fluid/framework/executor.cc | 13 +++++ paddle/fluid/framework/executor.h | 1 + paddle/fluid/framework/parallel_executor.cc | 19 +++++++ paddle/fluid/framework/parallel_executor.h | 61 +++++++++++++++++++++ 6 files changed, 148 insertions(+) create mode 100644 doc/design/parallel_executor.md create mode 100644 paddle/fluid/framework/parallel_executor.cc create mode 100644 paddle/fluid/framework/parallel_executor.h diff --git a/doc/design/parallel_executor.md b/doc/design/parallel_executor.md new file mode 100644 index 0000000000000..567eede1bd59b --- /dev/null +++ b/doc/design/parallel_executor.md @@ -0,0 +1,52 @@ +# ParallelExecutor Design Doc + +## Introduction + +We introduce `ParallelExecutor` to run multi-GPU training in PaddlePaddle Fluid. It supports +1. keeping a copy of the parameters on each GPU +1. allreduce on a separate stream allowing computation and communication overlap + +An example of switching single GPU training to multiple GPUs: +```python +cost = your_neural_network() +opt = fluid.optimizer.SGDOptimizer() +opt.minimize(avg_cost) + +# change Executor -> ParallelExecutor +exe = fluid.ParallelExecutor(gpu_list=[0, 1]) + +for iter in xranges(iter_num): + exe.run() +``` + +## Design + +In the constructor, a list of parameter, whose gradients need to be allreduced, is given. + +During the runtime, `ParallelExecutor` starts `#gpu` threads to run each `Executor`. For every +operator run on each GPU, it will automatically sync with different streams when necessary. + +```c++ +// if op's input is params' grad: + // sync with allreduce stream + // e.g. sgd should wait for allreduce to be finished +SyncMultipleStreams(op); + +op->Run(*local_scope, place_); + +// if op's output is params' grad: +// sync with computation stream +// e.g. allreduce shoudl wait for fc_grad to be finished. +SyncMultipleStreams(op); +``` + + +## API + +The `ParallelExecutor.run` has similar interface as `Executor.run`. Besides +1. Scope: we don't expose `scope` in `ParallelExecutor.run` since `ParallelExecutor` has its +own scope to maintain NCCL. +1. Feed: we don't expose `feed` in the API either, because the whole point of implementing +parallel_executor is the speed. The input for NN should be implemented in an reader OP. +1. Fetch: we return the fetched value on all GPUs as a list. (e.g. `exe.run(..., fetch=loss)` +with return `[loss_on_gpu0, loss_on_gpu1]`) diff --git a/paddle/fluid/framework/CMakeLists.txt b/paddle/fluid/framework/CMakeLists.txt index 15e5574ecfd40..934bb43ffea45 100644 --- a/paddle/fluid/framework/CMakeLists.txt +++ b/paddle/fluid/framework/CMakeLists.txt @@ -86,6 +86,8 @@ cc_library(feed_fetch_method SRCS feed_fetch_method.cc DEPS lod_tensor scope glo cc_library(executor SRCS executor.cc DEPS op_registry device_context scope framework_proto backward glog lod_rank_table feed_fetch_method) +cc_library(parallel_executor SRCS parallel_executor.cc DEPS op_registry device_context scope + framework_proto backward glog lod_rank_table feed_fetch_method executor) cc_library(prune SRCS prune.cc DEPS framework_proto) cc_test(prune_test SRCS prune_test.cc DEPS op_info prune recurrent_op device_context) diff --git a/paddle/fluid/framework/executor.cc b/paddle/fluid/framework/executor.cc index 5cae38b2a857b..6ee3f18dd42ef 100644 --- a/paddle/fluid/framework/executor.cc +++ b/paddle/fluid/framework/executor.cc @@ -305,10 +305,23 @@ void Executor::RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope, } // if (create_vars) for (auto& op : ctx->ops_) { + // TODO(ty): + // e.g. sgd should wait for allreduce to be finished + // if op's input is params' grad: + // sync with allreduce stream + // SyncMultipleStreams(op); + VLOG(4) << place_ << " " << op->DebugStringEx(local_scope); op->Run(*local_scope, place_); VLOG(3) << place_ << " " << op->DebugStringEx(local_scope); + // TODO(ty): + // e.g. allreduce shoudl wait for fc_grad to be finished. + // if op's output is params' grad: + // sync with computation stream + // apply allreduce on allreduce stream + // SyncMultipleStreams(op); + if (FLAGS_benchmark) { VLOG(2) << "Memory used after operator " + op->Type() + " running: " << memory::memory_usage(place_); diff --git a/paddle/fluid/framework/executor.h b/paddle/fluid/framework/executor.h index 28ce3315154ce..8d8a7cf4db690 100644 --- a/paddle/fluid/framework/executor.h +++ b/paddle/fluid/framework/executor.h @@ -47,6 +47,7 @@ class Executor { const std::string& feed_holder_name = "feed", const std::string& fetch_holder_name = "fetch"); + private: static ExecutorPrepareContext* Prepare(const ProgramDesc& program, int block_id); diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc new file mode 100644 index 0000000000000..e9f213ae2cff6 --- /dev/null +++ b/paddle/fluid/framework/parallel_executor.cc @@ -0,0 +1,19 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/fluid/framework/parallel_executor.h" + +namespace paddle { +namespace framework {} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/parallel_executor.h b/paddle/fluid/framework/parallel_executor.h new file mode 100644 index 0000000000000..47e0005e58d91 --- /dev/null +++ b/paddle/fluid/framework/parallel_executor.h @@ -0,0 +1,61 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#pragma once + +#include + +#include "paddle/fluid/framework/executor.h" +#include "paddle/fluid/framework/op_info.h" +#include "paddle/fluid/framework/program_desc.h" +#include "paddle/fluid/framework/scope.h" +#include "paddle/fluid/framework/tensor.h" + +#include "paddle/fluid/operators/nccl/nccl_gpu_common.h" +#include "paddle/fluid/platform/device_context.h" + +namespace paddle { +namespace framework { + +struct AllReduceCallBack { + void operator()(framework::OperatorBase* op); + + std::unordered_set param_grad_names_; + platform::DeviceContext dev_ctx; +}; + +class ParallelExecutor { + explicit ParallelExecutor(const std::vector& places, + const std::unordered_set& params); + + /* @Brief + * Runtime evaluation of the given ProgramDesc under certain Scope + * + * @param + * ProgramDesc + * Scope + */ + void Run(const ProgramDesc& prog, Scope* scope, int block_id, + bool create_local_scope = true, bool create_vars = true); + + private: + std::vector exes_; + std::vector scopes_; + AllReduceCallBack all_reduce_callbacks_; + std::unordered_set params_; // where to initilize it? + platform::Communicator nccl_com_; +}; + +} // namespace framework +} // namespace paddle From e67325cdaf8ce85342dab45b06dbc286c77a5555 Mon Sep 17 00:00:00 2001 From: Yang Yang Date: Wed, 14 Mar 2018 00:11:32 +0000 Subject: [PATCH 02/25] update readme --- doc/design/parallel_executor.md | 42 +++++++++++++++++++++++++-------- 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/doc/design/parallel_executor.md b/doc/design/parallel_executor.md index 567eede1bd59b..78ef74f159d47 100644 --- a/doc/design/parallel_executor.md +++ b/doc/design/parallel_executor.md @@ -30,23 +30,45 @@ operator run on each GPU, it will automatically sync with different streams when // if op's input is params' grad: // sync with allreduce stream // e.g. sgd should wait for allreduce to be finished -SyncMultipleStreams(op); +CallBack->BeforeOp(op); op->Run(*local_scope, place_); // if op's output is params' grad: // sync with computation stream // e.g. allreduce shoudl wait for fc_grad to be finished. -SyncMultipleStreams(op); +CallBack->AfterOp(op); ``` +And the `Callback` object can be implemented as the following -## API +```c++ +struct AllReduceCallBack { + void BeforeOp(framework::OperatorBase* op); + void AfterOp(framework::OperatorBase* op); + + std::unordered_set reduced_param_grad_names; + std::unordered_set param_grad_names_; + + platform::DeviceContext* computation_dev_ctx; // computation device context + platform::DeviceContext* communication_dev_ctx; // communication device context -The `ParallelExecutor.run` has similar interface as `Executor.run`. Besides -1. Scope: we don't expose `scope` in `ParallelExecutor.run` since `ParallelExecutor` has its -own scope to maintain NCCL. -1. Feed: we don't expose `feed` in the API either, because the whole point of implementing -parallel_executor is the speed. The input for NN should be implemented in an reader OP. -1. Fetch: we return the fetched value on all GPUs as a list. (e.g. `exe.run(..., fetch=loss)` -with return `[loss_on_gpu0, loss_on_gpu1]`) + framework::Scope* scope; + platform::NCCL::Communicator* nccl_com; +}; + +AllReduceCallBack::BeforeOp(framework::OperatorBase* op) { + if (op->Input() in reduced_param_grad_names) { + communication_dev_ctx->Wait(); + reduced_param_grad_names.erase(op->Input()) + } +} + +AllReduceCallBack::AfterOp(framework::OperatorBase* op) { + if (op->Output() in param_grad_names) { + computation_dev_ctx->Wait(); + reduced_param_grad_names.insert(op->Output()); + ncclAllreduce(scope, op->Output(), communication_dev_ctx); + } +} +``` From 8f061e43b71b398d37aebc3576e2c2f21d5fae73 Mon Sep 17 00:00:00 2001 From: Yang Yang Date: Wed, 14 Mar 2018 00:16:11 +0000 Subject: [PATCH 03/25] delete param name --- paddle/fluid/framework/parallel_executor.h | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/paddle/fluid/framework/parallel_executor.h b/paddle/fluid/framework/parallel_executor.h index 47e0005e58d91..f67b9266949de 100644 --- a/paddle/fluid/framework/parallel_executor.h +++ b/paddle/fluid/framework/parallel_executor.h @@ -52,8 +52,7 @@ class ParallelExecutor { private: std::vector exes_; std::vector scopes_; - AllReduceCallBack all_reduce_callbacks_; - std::unordered_set params_; // where to initilize it? + std::vector all_reduce_callbacks_; platform::Communicator nccl_com_; }; From a62d1423b64bf823d548ea742a5e07a0820f29d5 Mon Sep 17 00:00:00 2001 From: Yang Yang Date: Thu, 15 Mar 2018 17:56:23 +0000 Subject: [PATCH 04/25] better name --- .../framework/{parallel_executor.cc => multi_gpu_executor.cc} | 0 .../fluid/framework/{parallel_executor.h => multi_gpu_executor.h} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename paddle/fluid/framework/{parallel_executor.cc => multi_gpu_executor.cc} (100%) rename paddle/fluid/framework/{parallel_executor.h => multi_gpu_executor.h} (100%) diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/multi_gpu_executor.cc similarity index 100% rename from paddle/fluid/framework/parallel_executor.cc rename to paddle/fluid/framework/multi_gpu_executor.cc diff --git a/paddle/fluid/framework/parallel_executor.h b/paddle/fluid/framework/multi_gpu_executor.h similarity index 100% rename from paddle/fluid/framework/parallel_executor.h rename to paddle/fluid/framework/multi_gpu_executor.h From 65c7ed5e15fc0d66e87b5ab571c61b8695aca8d1 Mon Sep 17 00:00:00 2001 From: Yang Yang Date: Fri, 16 Mar 2018 22:51:05 +0000 Subject: [PATCH 05/25] pass run --- paddle/fluid/framework/CMakeLists.txt | 2 +- paddle/fluid/framework/executor.cc | 37 +++--- paddle/fluid/framework/executor.h | 17 ++- paddle/fluid/framework/multi_gpu_executor.cc | 124 +++++++++++++++++- paddle/fluid/framework/multi_gpu_executor.h | 48 +++++-- paddle/fluid/platform/place.h | 11 ++ paddle/fluid/pybind/CMakeLists.txt | 2 +- paddle/fluid/pybind/pybind.cc | 10 ++ .../unittests/test_multi_gpu_executor.py | 100 ++++++++++++++ 9 files changed, 313 insertions(+), 38 deletions(-) create mode 100644 python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py diff --git a/paddle/fluid/framework/CMakeLists.txt b/paddle/fluid/framework/CMakeLists.txt index 934bb43ffea45..31a1d15b6f900 100644 --- a/paddle/fluid/framework/CMakeLists.txt +++ b/paddle/fluid/framework/CMakeLists.txt @@ -86,7 +86,7 @@ cc_library(feed_fetch_method SRCS feed_fetch_method.cc DEPS lod_tensor scope glo cc_library(executor SRCS executor.cc DEPS op_registry device_context scope framework_proto backward glog lod_rank_table feed_fetch_method) -cc_library(parallel_executor SRCS parallel_executor.cc DEPS op_registry device_context scope +cc_library(multi_gpu_executor SRCS multi_gpu_executor.cc DEPS op_registry device_context scope framework_proto backward glog lod_rank_table feed_fetch_method executor) cc_library(prune SRCS prune.cc DEPS framework_proto) diff --git a/paddle/fluid/framework/executor.cc b/paddle/fluid/framework/executor.cc index 6ee3f18dd42ef..a65b2fcea92eb 100644 --- a/paddle/fluid/framework/executor.cc +++ b/paddle/fluid/framework/executor.cc @@ -34,15 +34,6 @@ DEFINE_bool(check_nan_inf, false, namespace paddle { namespace framework { -struct ExecutorPrepareContext { - ExecutorPrepareContext(const framework::ProgramDesc& prog, size_t block_id) - : prog_(prog), block_id_(block_id) {} - - framework::ProgramDesc prog_; - size_t block_id_; - std::vector> ops_; -}; - Executor::Executor(const platform::Place& place) : place_(place) {} static void CreateTensor(Variable* var, proto::VarType::Type var_type) { @@ -304,6 +295,21 @@ void Executor::RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope, } // if (create_local_scope) } // if (create_vars) + RunOperators(ctx, local_scope); + + if (create_vars && create_local_scope) { + scope->DeleteScope(local_scope); + } + if (FLAGS_benchmark) { + VLOG(2) << "-------------------------------------------------------"; + VLOG(2) << "Memory used after deleting local scope: " + << memory::memory_usage(place_); + VLOG(2) << "-------------------------------------------------------"; + } +} + +void Executor::RunOperators(const ExecutorPrepareContext* ctx, + const Scope* local_scope) const { for (auto& op : ctx->ops_) { // TODO(ty): // e.g. sgd should wait for allreduce to be finished @@ -330,21 +336,12 @@ void Executor::RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope, for (auto& vname : op->OutputVars(true)) { auto* var = local_scope->FindVar(vname); if (var == nullptr) continue; - if (var->IsType()) { - CheckTensorNANOrInf(vname, var->Get()); + if (var->IsType()) { + CheckTensorNANOrInf(vname, var->Get()); } } } } - if (create_vars && create_local_scope) { - scope->DeleteScope(local_scope); - } - if (FLAGS_benchmark) { - VLOG(2) << "-------------------------------------------------------"; - VLOG(2) << "Memory used after deleting local scope: " - << memory::memory_usage(place_); - VLOG(2) << "-------------------------------------------------------"; - } } } // namespace framework diff --git a/paddle/fluid/framework/executor.h b/paddle/fluid/framework/executor.h index 8d8a7cf4db690..701d96e6337fe 100644 --- a/paddle/fluid/framework/executor.h +++ b/paddle/fluid/framework/executor.h @@ -22,7 +22,16 @@ limitations under the License. */ namespace paddle { namespace framework { -struct ExecutorPrepareContext; + +struct ExecutorPrepareContext { + ExecutorPrepareContext(const framework::ProgramDesc& prog, size_t block_id) + : prog_(prog), block_id_(block_id) {} + + framework::ProgramDesc prog_; + size_t block_id_; + std::vector> ops_; +}; + class Executor { public: // TODO(dzhwinter) : Do not rely on this function, it will be removed @@ -30,6 +39,7 @@ class Executor { : Executor(device.GetPlace()) {} explicit Executor(const platform::Place& place); + virtual ~Executor() {} /* @Brief * Runtime evaluation of the given ProgramDesc under certain Scope @@ -47,7 +57,6 @@ class Executor { const std::string& feed_holder_name = "feed", const std::string& fetch_holder_name = "fetch"); - private: static ExecutorPrepareContext* Prepare(const ProgramDesc& program, int block_id); @@ -55,7 +64,9 @@ class Executor { bool create_local_scope = true, bool create_vars = true); - private: + virtual void RunOperators(const ExecutorPrepareContext* ctx, + const Scope* local_scope) const; + const platform::Place place_; }; diff --git a/paddle/fluid/framework/multi_gpu_executor.cc b/paddle/fluid/framework/multi_gpu_executor.cc index e9f213ae2cff6..9e11c3a45c346 100644 --- a/paddle/fluid/framework/multi_gpu_executor.cc +++ b/paddle/fluid/framework/multi_gpu_executor.cc @@ -12,8 +12,128 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -#include "paddle/fluid/framework/parallel_executor.h" +#include "paddle/fluid/framework/multi_gpu_executor.h" + +#include +#include "paddle/fluid/framework/operator.h" namespace paddle { -namespace framework {} // namespace framework +namespace framework { + +ExecutorWithAllReduce::ExecutorWithAllReduce( + const platform::Place& place, std::unordered_set* param_grads, + NCCLContext* nccl_context) + : Executor(place), param_grads_(param_grads) { + int device_id = boost::get(place).device; + comm_ = &nccl_context->comms_[device_id]; + io_ctx_ = nccl_context->ctxs_[device_id]; +} + +// TODO(yy): Move this function somewhere +ncclDataType_t ToNCCLDataType(std::type_index type) { + if (type == typeid(float)) { // NOLINT + return ncclFloat; + } else if (type == typeid(double)) { // NOLINT + return ncclDouble; + } else if (type == typeid(int)) { // NOLINT + return ncclInt; + } else { + PADDLE_THROW("Not supported"); + } +} + +void ExecutorWithAllReduce::RunOperators(const ExecutorPrepareContext* ctx, + const Scope* local_scope) const { + cudaSetDevice(boost::get(place_).device); + + platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); + auto* dev_ctx = pool.Get(place_); + cudaStream_t computation_stream = + reinterpret_cast(dev_ctx)->stream(); + cudaStream_t all_reduce_stream = io_ctx_->stream(); + + std::map computation_event; + std::map all_reduce_event; + for (auto& argu : *param_grads_) { + PADDLE_ENFORCE(cudaEventCreateWithFlags(&computation_event[argu], + cudaEventDisableTiming)); + PADDLE_ENFORCE(cudaEventCreateWithFlags(&all_reduce_event[argu], + cudaEventDisableTiming)); + } + + for (auto& op : ctx->ops_) { + for (auto& param2argu : op->Inputs()) { + for (auto& argu : param2argu.second) { + if (param_grads_->count(argu) != 0) { + cudaStreamWaitEvent(computation_stream, all_reduce_event[argu], 0); + } + } + } + + VLOG(4) << place_ << " " << op->DebugStringEx(local_scope); + op->Run(*local_scope, place_); + VLOG(3) << place_ << " " << op->DebugStringEx(local_scope); + + for (auto& param2argu : op->Outputs()) { + for (auto& argu : param2argu.second) { + if (param_grads_->count(argu) != 0) { + LOG(INFO) << place_ << "Launch allreduce on " << argu; + + PADDLE_ENFORCE( + cudaEventRecord(computation_event[argu], computation_stream)); + PADDLE_ENFORCE(cudaStreamWaitEvent(all_reduce_stream, + computation_event[argu], 0)); + + auto& tensor = local_scope->FindVar(argu)->Get(); + void* data = const_cast(tensor.data()); + PADDLE_ENFORCE(platform::dynload::ncclAllReduce( + data, data, tensor.numel(), ToNCCLDataType(tensor.type()), + ncclSum, *comm_, all_reduce_stream)); + + PADDLE_ENFORCE( + cudaEventRecord(all_reduce_event[argu], all_reduce_stream)); + } + } + } + } + + cudaStreamSynchronize(computation_stream); + cudaStreamSynchronize(all_reduce_stream); +} + +MultiGPUExecutor::MultiGPUExecutor( + const std::vector& places, + const std::unordered_set& params) + : nccl_ctx_(places) { + for (auto& param : params) { + param_grads_.insert(GradVarName(param)); + } + for (auto& place : places) { + exes_.push_back( + framework::ExecutorWithAllReduce(place, ¶m_grads_, &nccl_ctx_)); + scopes_.push_back(new framework::Scope()); + } +} + +void MultiGPUExecutor::Run(const ProgramDesc& prog, int block_id, + bool create_local_scope, bool create_vars) { + // prepare prog in a single thread to avoid race + auto* context = exes_[0].Prepare(prog, block_id); + + std::vector threads; + threads.push_back(std::thread([&] { + exes_[0].RunPreparedContext(context, scopes_[0], create_local_scope, + create_vars); + })); + threads.push_back(std::thread([&] { + exes_[1].RunPreparedContext(context, scopes_[1], create_local_scope, + create_vars); + })); + + for (auto& t : threads) { + t.join(); + } +} + +} // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/multi_gpu_executor.h b/paddle/fluid/framework/multi_gpu_executor.h index f67b9266949de..7a735f4c1e3fa 100644 --- a/paddle/fluid/framework/multi_gpu_executor.h +++ b/paddle/fluid/framework/multi_gpu_executor.h @@ -28,16 +28,42 @@ limitations under the License. */ namespace paddle { namespace framework { -struct AllReduceCallBack { - void operator()(framework::OperatorBase* op); +struct NCCLContext { + std::vector ctxs_; + std::vector comms_; - std::unordered_set param_grad_names_; - platform::DeviceContext dev_ctx; + explicit NCCLContext(const std::vector& places) { + std::vector devs; + devs.reserve(places.size()); + for (auto& p : places) { + devs.push_back(boost::get(p).device); + ctxs_.push_back( + new platform::CUDADeviceContext(boost::get(p))); + } + comms_.reserve(places.size()); + platform::dynload::ncclCommInitAll( + &comms_[0], static_cast(places.size()), &devs[0]); + } }; -class ParallelExecutor { - explicit ParallelExecutor(const std::vector& places, - const std::unordered_set& params); +class ExecutorWithAllReduce : public Executor { + public: + explicit ExecutorWithAllReduce(const platform::Place& p, + std::unordered_set* param_grads, + NCCLContext* nccl_context); + + private: + void RunOperators(const ExecutorPrepareContext* ctx, + const Scope* local_scope) const override; + platform::CUDADeviceContext* io_ctx_; + ncclComm_t* comm_; + std::unordered_set* param_grads_; +}; + +class MultiGPUExecutor { + public: + explicit MultiGPUExecutor(const std::vector& places, + const std::unordered_set& params); /* @Brief * Runtime evaluation of the given ProgramDesc under certain Scope @@ -46,14 +72,14 @@ class ParallelExecutor { * ProgramDesc * Scope */ - void Run(const ProgramDesc& prog, Scope* scope, int block_id, + void Run(const ProgramDesc& prog, int block_id, bool create_local_scope = true, bool create_vars = true); private: - std::vector exes_; + std::vector exes_; std::vector scopes_; - std::vector all_reduce_callbacks_; - platform::Communicator nccl_com_; + NCCLContext nccl_ctx_; + std::unordered_set param_grads_; }; } // namespace framework diff --git a/paddle/fluid/platform/place.h b/paddle/fluid/platform/place.h index 501bddfc6ec8b..633251eb47427 100644 --- a/paddle/fluid/platform/place.h +++ b/paddle/fluid/platform/place.h @@ -65,6 +65,17 @@ bool is_cpu_place(const Place &); bool places_are_same_class(const Place &, const Place &); bool is_same_place(const Place &, const Place &); +struct PlaceHash { + std::size_t operator()(const Place &p) const { + std::hash ihash; + size_t dev_id = 0; + if (is_gpu_place(p)) { + dev_id = boost::get(p).device; + } + return ihash(dev_id << 2 | p.which()); + } +}; + std::ostream &operator<<(std::ostream &, const Place &); template diff --git a/paddle/fluid/pybind/CMakeLists.txt b/paddle/fluid/pybind/CMakeLists.txt index 8942b5c9430ff..bb74e01fd11e3 100644 --- a/paddle/fluid/pybind/CMakeLists.txt +++ b/paddle/fluid/pybind/CMakeLists.txt @@ -1,7 +1,7 @@ if(WITH_PYTHON) cc_library(paddle_pybind SHARED SRCS pybind.cc exception.cc protobuf.cc const_value.cc recordio.cc - DEPS pybind python backward proto_desc paddle_memory executor prune init profiler feed_fetch_method + DEPS pybind python backward proto_desc paddle_memory executor prune init profiler feed_fetch_method multi_gpu_executor ${GLOB_OP_LIB}) if(NOT APPLE AND NOT ANDROID) target_link_libraries(paddle_pybind rt) diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index d2e883caccdd3..be9e28d0ab3d4 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -42,6 +42,7 @@ limitations under the License. */ #include "paddle/fluid/string/to_string.h" #ifdef PADDLE_WITH_CUDA +#include "paddle/fluid/framework/multi_gpu_executor.h" #include "paddle/fluid/operators/nccl/nccl_gpu_common.h" #include "paddle/fluid/platform/cuda_profiler.h" #include "paddle/fluid/platform/gpu_info.h" @@ -466,6 +467,15 @@ All parameter, weight, gradient are variables in Paddle. m.def("nvprof_init", platform::CudaProfilerInit); m.def("nvprof_start", platform::CudaProfilerStart); m.def("nvprof_stop", platform::CudaProfilerStop); + + py::class_(m, "ParallelExecutor") + .def( + "__init__", + [](MultiGPUExecutor &self, const std::vector &places, + const std::unordered_set ¶ms) { + new (&self) MultiGPUExecutor(places, params); + }) + .def("run", &MultiGPUExecutor::Run); #endif py::enum_(m, "ProfilerState", py::arithmetic()) diff --git a/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py b/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py new file mode 100644 index 0000000000000..042b249b17c1d --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py @@ -0,0 +1,100 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import paddle.fluid as fluid +import paddle.v2 as paddle +import paddle.v2.dataset.mnist as mnist + + +def vgg16_bn_drop(input): + def conv_block(input, num_filter, groups, dropouts): + return fluid.nets.img_conv_group( + input=input, + pool_size=2, + pool_stride=2, + conv_num_filter=[num_filter] * groups, + conv_filter_size=3, + conv_act='relu', + conv_with_batchnorm=False, + conv_batchnorm_drop_rate=dropouts, + pool_type='max') + + conv1 = conv_block(input, 64, 2, [0.3, 0]) + conv2 = conv_block(conv1, 128, 2, [0.4, 0]) + conv3 = conv_block(conv2, 256, 3, [0.4, 0.4, 0]) + conv4 = conv_block(conv3, 512, 3, [0.4, 0.4, 0]) + conv5 = conv_block(conv4, 512, 3, [0.4, 0.4, 0]) + + fc1 = fluid.layers.fc(input=conv5, size=4096, act=None) + fc2 = fluid.layers.fc(input=fc1, size=4096, act=None) + return fc2 + + +class ParallelExecutor(unittest.TestCase): + def setUp(self): + # Convert mnist to recordio file + with fluid.program_guard(fluid.Program(), fluid.Program()): + reader = paddle.batch(mnist.train(), batch_size=32) + feeder = fluid.DataFeeder( + feed_list=[ # order is image and label + fluid.layers.data( + name='image', shape=[784]), + fluid.layers.data( + name='label', shape=[1], dtype='int64'), + ], + place=fluid.CPUPlace()) + fluid.recordio_writer.convert_reader_to_recordio_file( + './mnist.recordio', reader, feeder) + + def test_main(self): + main = fluid.Program() + startup = fluid.Program() + + with fluid.program_guard(main, startup): + # reader = fluid.layers.open_recordio_file( + # filename='./mnist.recordio', + # shapes=[[-1, 784], [-1, 1]], + # lod_levels=[0, 0], + # dtypes=['float32', 'int64']) + # img, label = fluid.layers.read_file(reader) + img = fluid.layers.fill_constant( + shape=[32, 784], dtype='float32', value=1.0) + label = fluid.layers.fill_constant( + shape=[32, 1], dtype='int64', value=1) + hidden = fluid.layers.fc(img, size=2000, act='tanh') + hidden = fluid.layers.fc(hidden, size=2000, act='tanh') + hidden = fluid.layers.fc(hidden, size=2000, act='tanh') + prediction = fluid.layers.fc(hidden, size=10, act='softmax') + loss = fluid.layers.mean(prediction) + # loss = fluid.layers.cross_entropy(input=prediction, label=label) + # loss = fluid.layers.mean(loss) + adam = fluid.optimizer.Adam() + adam.minimize(loss) + act_places = [] + for each in [fluid.CUDAPlace(0), fluid.CUDAPlace(1)]: + p = fluid.core.Place() + p.set_place(each) + act_places.append(p) + + exe = fluid.core.ParallelExecutor( + act_places, + set([p.name for p in main.global_block().iter_parameters()])) + + exe.run(startup.desc, 0, True, True) + exe.run(main.desc, 0, True, True) + + +if __name__ == '__main__': + unittest.main() From 7fa64ee5f329b22ca0ef6a29edf1c6cf3fe214b4 Mon Sep 17 00:00:00 2001 From: Yang Yang Date: Fri, 16 Mar 2018 23:18:41 +0000 Subject: [PATCH 06/25] switch to larger network --- paddle/fluid/framework/multi_gpu_executor.cc | 4 + .../unittests/test_multi_gpu_executor.py | 146 +++++++++++------- 2 files changed, 93 insertions(+), 57 deletions(-) diff --git a/paddle/fluid/framework/multi_gpu_executor.cc b/paddle/fluid/framework/multi_gpu_executor.cc index 9e11c3a45c346..9feb6e80132c2 100644 --- a/paddle/fluid/framework/multi_gpu_executor.cc +++ b/paddle/fluid/framework/multi_gpu_executor.cc @@ -99,6 +99,10 @@ void ExecutorWithAllReduce::RunOperators(const ExecutorPrepareContext* ctx, cudaStreamSynchronize(computation_stream); cudaStreamSynchronize(all_reduce_stream); + for (auto& argu : *param_grads_) { + PADDLE_ENFORCE(cudaEventDestroy(computation_event[argu])); + PADDLE_ENFORCE(cudaEventDestroy(all_reduce_event[argu])); + } } MultiGPUExecutor::MultiGPUExecutor( diff --git a/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py b/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py index 042b249b17c1d..e05b5a3ad7711 100644 --- a/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py +++ b/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py @@ -1,4 +1,4 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserve. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,10 +12,54 @@ # See the License for the specific language governing permissions and # limitations under the License. -import unittest +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import numpy as np +import argparse +import time + import paddle.fluid as fluid -import paddle.v2 as paddle -import paddle.v2.dataset.mnist as mnist + +SEED = 1 +DTYPE = "float32" + +# random seed must set before configuring the network. +# fluid.default_startup_program().random_seed = SEED + + +def parse_args(): + parser = argparse.ArgumentParser("mnist model benchmark.") + parser.add_argument( + '--label_size', type=int, default=10, help='The label size.') + parser.add_argument( + '--batch_size', type=int, default=10, help='The minibatch size.') + parser.add_argument( + '--iterations', type=int, default=5, help='The number of minibatches.') + parser.add_argument( + '--use_nccl', + default=False, + action='store_true', + help='If set, use nccl') + args = parser.parse_args() + return args + + +def print_arguments(args): + print('----------- Configuration Arguments -----------') + for arg, value in sorted(vars(args).iteritems()): + print('%s: %s' % (arg, value)) + print('------------------------------------------------') + + +def program_summary(program): + print("--------------------") + for block in program.blocks: + for op in block.ops: + outputs = [[x + ":"] + op.output(x) for x in op.output_names] + inputs = [[x + ":"] + op.input(x) for x in op.input_names] + print(block.idx, op.type, inputs, "|", outputs) def vgg16_bn_drop(input): @@ -42,59 +86,47 @@ def conv_block(input, num_filter, groups, dropouts): return fc2 -class ParallelExecutor(unittest.TestCase): - def setUp(self): - # Convert mnist to recordio file - with fluid.program_guard(fluid.Program(), fluid.Program()): - reader = paddle.batch(mnist.train(), batch_size=32) - feeder = fluid.DataFeeder( - feed_list=[ # order is image and label - fluid.layers.data( - name='image', shape=[784]), - fluid.layers.data( - name='label', shape=[1], dtype='int64'), - ], - place=fluid.CPUPlace()) - fluid.recordio_writer.convert_reader_to_recordio_file( - './mnist.recordio', reader, feeder) - - def test_main(self): - main = fluid.Program() - startup = fluid.Program() - - with fluid.program_guard(main, startup): - # reader = fluid.layers.open_recordio_file( - # filename='./mnist.recordio', - # shapes=[[-1, 784], [-1, 1]], - # lod_levels=[0, 0], - # dtypes=['float32', 'int64']) - # img, label = fluid.layers.read_file(reader) - img = fluid.layers.fill_constant( - shape=[32, 784], dtype='float32', value=1.0) - label = fluid.layers.fill_constant( - shape=[32, 1], dtype='int64', value=1) - hidden = fluid.layers.fc(img, size=2000, act='tanh') - hidden = fluid.layers.fc(hidden, size=2000, act='tanh') - hidden = fluid.layers.fc(hidden, size=2000, act='tanh') - prediction = fluid.layers.fc(hidden, size=10, act='softmax') - loss = fluid.layers.mean(prediction) - # loss = fluid.layers.cross_entropy(input=prediction, label=label) - # loss = fluid.layers.mean(loss) - adam = fluid.optimizer.Adam() - adam.minimize(loss) - act_places = [] - for each in [fluid.CUDAPlace(0), fluid.CUDAPlace(1)]: - p = fluid.core.Place() - p.set_place(each) - act_places.append(p) - - exe = fluid.core.ParallelExecutor( - act_places, - set([p.name for p in main.global_block().iter_parameters()])) - - exe.run(startup.desc, 0, True, True) - exe.run(main.desc, 0, True, True) +def run_benchmark(args): + # Train program + images = fluid.layers.fill_constant( + shape=(args.batch_size, 3, 200, 200), dtype='float32', value=0.01) + predict = vgg16_bn_drop(images) + label = fluid.layers.fill_constant( + shape=(args.batch_size, 1), dtype='int64', value=0) + cost = fluid.layers.cross_entropy(input=predict, label=label) + avg_cost = fluid.layers.mean(x=cost) + + # Optimization + # Note the flag append_all_reduce=True + opt = fluid.optimizer.SGDOptimizer(learning_rate=0.001) + opt.minimize(avg_cost) + + # program_summary(fluid.default_main_program()) + act_places = [] + for each in [fluid.CUDAPlace(0), fluid.CUDAPlace(1)]: + p = fluid.core.Place() + p.set_place(each) + act_places.append(p) + + exe = fluid.core.ParallelExecutor(act_places, + set([ + p.name + for p in fluid.default_main_program() + .global_block().iter_parameters() + ])) + + # Parameter initialization + exe.run(fluid.default_startup_program().desc, 0, True, True) + + for iter_id in range(0, args.iterations): + start = time.time() + exe.run(fluid.default_main_program().desc, 0, True, True) + end = time.time() + print("iter=%d, elapse=%f" % (iter_id, (end - start))) + time.sleep(1) if __name__ == '__main__': - unittest.main() + args = parse_args() + print_arguments(args) + run_benchmark(args) From 003b165ee93de0ebdb0504280d98b66890351537 Mon Sep 17 00:00:00 2001 From: Yang Yang Date: Sat, 17 Mar 2018 00:03:52 +0000 Subject: [PATCH 07/25] use all device --- paddle/fluid/framework/multi_gpu_executor.cc | 14 ++++++-------- .../tests/unittests/test_multi_gpu_executor.py | 5 ++++- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/paddle/fluid/framework/multi_gpu_executor.cc b/paddle/fluid/framework/multi_gpu_executor.cc index 9feb6e80132c2..32471933db3f7 100644 --- a/paddle/fluid/framework/multi_gpu_executor.cc +++ b/paddle/fluid/framework/multi_gpu_executor.cc @@ -125,14 +125,12 @@ void MultiGPUExecutor::Run(const ProgramDesc& prog, int block_id, auto* context = exes_[0].Prepare(prog, block_id); std::vector threads; - threads.push_back(std::thread([&] { - exes_[0].RunPreparedContext(context, scopes_[0], create_local_scope, - create_vars); - })); - threads.push_back(std::thread([&] { - exes_[1].RunPreparedContext(context, scopes_[1], create_local_scope, - create_vars); - })); + for (size_t i = 0; i < exes_.size(); ++i) { + threads.push_back(std::thread([&, i] { + exes_[i].RunPreparedContext(context, scopes_[i], create_local_scope, + create_vars); + })); + } for (auto& t : threads) { t.join(); diff --git a/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py b/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py index e05b5a3ad7711..0d585a1d04134 100644 --- a/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py +++ b/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py @@ -103,7 +103,10 @@ def run_benchmark(args): # program_summary(fluid.default_main_program()) act_places = [] - for each in [fluid.CUDAPlace(0), fluid.CUDAPlace(1)]: + for each in [ + fluid.CUDAPlace(i) + for i in range(fluid.core.get_cuda_device_count()) + ]: p = fluid.core.Place() p.set_place(each) act_places.append(p) From b2f4c5a191f3321d1d3b2076e8e5f62d2b1e8f76 Mon Sep 17 00:00:00 2001 From: Yang Yang Date: Mon, 19 Mar 2018 20:01:50 +0000 Subject: [PATCH 08/25] fix compile --- paddle/fluid/framework/executor.h | 1 + .../fluid/tests/unittests/test_multi_gpu_executor.py | 9 +++++---- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/paddle/fluid/framework/executor.h b/paddle/fluid/framework/executor.h index 701d96e6337fe..4c55fc7e1567e 100644 --- a/paddle/fluid/framework/executor.h +++ b/paddle/fluid/framework/executor.h @@ -15,6 +15,7 @@ limitations under the License. */ #pragma once #include "paddle/fluid/framework/op_info.h" +#include "paddle/fluid/framework/operator.h" #include "paddle/fluid/framework/program_desc.h" #include "paddle/fluid/framework/scope.h" #include "paddle/fluid/framework/tensor.h" diff --git a/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py b/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py index 0d585a1d04134..60b2db3ee7a56 100644 --- a/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py +++ b/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py @@ -91,10 +91,11 @@ def run_benchmark(args): images = fluid.layers.fill_constant( shape=(args.batch_size, 3, 200, 200), dtype='float32', value=0.01) predict = vgg16_bn_drop(images) - label = fluid.layers.fill_constant( - shape=(args.batch_size, 1), dtype='int64', value=0) - cost = fluid.layers.cross_entropy(input=predict, label=label) - avg_cost = fluid.layers.mean(x=cost) + + avg_cost = fluid.layers.mean(x=predict) + + fluid.layers.Print(predict, summarize=3) + fluid.layers.Print(avg_cost) # Optimization # Note the flag append_all_reduce=True From 07ee125c1e12cc039abbabdeafd4d576a5afbbfe Mon Sep 17 00:00:00 2001 From: Yang Yang Date: Mon, 19 Mar 2018 20:52:10 +0000 Subject: [PATCH 09/25] add broadcast --- paddle/fluid/framework/multi_gpu_executor.cc | 36 ++++++++++++++++++- paddle/fluid/framework/multi_gpu_executor.h | 11 ++++-- paddle/fluid/pybind/pybind.cc | 1 + .../unittests/test_multi_gpu_executor.py | 8 ++--- 4 files changed, 48 insertions(+), 8 deletions(-) diff --git a/paddle/fluid/framework/multi_gpu_executor.cc b/paddle/fluid/framework/multi_gpu_executor.cc index 32471933db3f7..484b0fe490c7c 100644 --- a/paddle/fluid/framework/multi_gpu_executor.cc +++ b/paddle/fluid/framework/multi_gpu_executor.cc @@ -108,7 +108,7 @@ void ExecutorWithAllReduce::RunOperators(const ExecutorPrepareContext* ctx, MultiGPUExecutor::MultiGPUExecutor( const std::vector& places, const std::unordered_set& params) - : nccl_ctx_(places) { + : nccl_ctx_(places), params_(params) { for (auto& param : params) { param_grads_.insert(GradVarName(param)); } @@ -119,6 +119,40 @@ MultiGPUExecutor::MultiGPUExecutor( } } +void MultiGPUExecutor::Init(const ProgramDesc& prog, int block_id, + bool create_local_scope, bool create_vars) { + // init parameters on one device + exes_[0].Run(prog, scopes_[0], block_id, create_local_scope, create_vars); + + for (auto* var_desc : prog.Block(0).AllVars()) { + if (var_desc->GetType() == proto::VarType::LOD_TENSOR) { + auto& main_tensor = + scopes_[0]->FindVar(var_desc->Name())->Get(); + ncclDataType_t data_type = ToNCCLDataType(main_tensor.type()); + auto& dims = main_tensor.dims(); + size_t numel = main_tensor.numel(); + + platform::dynload::ncclGroupStart(); + for (size_t i = 0; i < exes_.size(); ++i) { + void* buffer; + if (i == 0) { + buffer = const_cast(main_tensor.data()); + } else { + auto local_scope = scopes_[i]; + auto* t = local_scope->Var(var_desc->Name())->GetMutable(); + t->Resize(dims); + buffer = t->mutable_data(exes_[i].place_, main_tensor.type()); + } + + platform::dynload::ncclBcast(buffer, numel, data_type, 0, + nccl_ctx_.comms_[i], + nccl_ctx_.ctxs_[i]->stream()); + } + platform::dynload::ncclGroupEnd(); + } + } +} + void MultiGPUExecutor::Run(const ProgramDesc& prog, int block_id, bool create_local_scope, bool create_vars) { // prepare prog in a single thread to avoid race diff --git a/paddle/fluid/framework/multi_gpu_executor.h b/paddle/fluid/framework/multi_gpu_executor.h index 7a735f4c1e3fa..6acde1e0a490e 100644 --- a/paddle/fluid/framework/multi_gpu_executor.h +++ b/paddle/fluid/framework/multi_gpu_executor.h @@ -65,6 +65,10 @@ class MultiGPUExecutor { explicit MultiGPUExecutor(const std::vector& places, const std::unordered_set& params); + // Init parameters on one device and broadcast them to other devices + void Init(const ProgramDesc& prog, int block_id, + bool create_local_scope = true, bool create_vars = true); + /* @Brief * Runtime evaluation of the given ProgramDesc under certain Scope * @@ -76,10 +80,13 @@ class MultiGPUExecutor { bool create_local_scope = true, bool create_vars = true); private: - std::vector exes_; - std::vector scopes_; NCCLContext nccl_ctx_; + + std::unordered_set params_; std::unordered_set param_grads_; + + std::vector exes_; + std::vector scopes_; }; } // namespace framework diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index be9e28d0ab3d4..f979403a77a16 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -475,6 +475,7 @@ All parameter, weight, gradient are variables in Paddle. const std::unordered_set ¶ms) { new (&self) MultiGPUExecutor(places, params); }) + .def("init", &MultiGPUExecutor::Init) .def("run", &MultiGPUExecutor::Run); #endif diff --git a/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py b/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py index 60b2db3ee7a56..c95c297da5496 100644 --- a/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py +++ b/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py @@ -89,12 +89,12 @@ def conv_block(input, num_filter, groups, dropouts): def run_benchmark(args): # Train program images = fluid.layers.fill_constant( - shape=(args.batch_size, 3, 200, 200), dtype='float32', value=0.01) + shape=(args.batch_size, 3, 200, 200), dtype='float32', value=0.1) predict = vgg16_bn_drop(images) avg_cost = fluid.layers.mean(x=predict) - fluid.layers.Print(predict, summarize=3) + # fluid.layers.Print(predict, summarize=3) fluid.layers.Print(avg_cost) # Optimization @@ -120,14 +120,12 @@ def run_benchmark(args): ])) # Parameter initialization - exe.run(fluid.default_startup_program().desc, 0, True, True) - + exe.init(fluid.default_startup_program().desc, 0, True, True) for iter_id in range(0, args.iterations): start = time.time() exe.run(fluid.default_main_program().desc, 0, True, True) end = time.time() print("iter=%d, elapse=%f" % (iter_id, (end - start))) - time.sleep(1) if __name__ == '__main__': From c3d6b86e4800d79630f5acc99c59e807df7a79ad Mon Sep 17 00:00:00 2001 From: Yang Yang Date: Wed, 21 Mar 2018 00:36:46 +0000 Subject: [PATCH 10/25] add resnet --- paddle/fluid/framework/multi_gpu_executor.cc | 11 ++- .../unittests/test_multi_gpu_executor.py | 75 ++++++++++++++++++- 2 files changed, 80 insertions(+), 6 deletions(-) diff --git a/paddle/fluid/framework/multi_gpu_executor.cc b/paddle/fluid/framework/multi_gpu_executor.cc index 484b0fe490c7c..feebe7377ae83 100644 --- a/paddle/fluid/framework/multi_gpu_executor.cc +++ b/paddle/fluid/framework/multi_gpu_executor.cc @@ -52,8 +52,8 @@ void ExecutorWithAllReduce::RunOperators(const ExecutorPrepareContext* ctx, reinterpret_cast(dev_ctx)->stream(); cudaStream_t all_reduce_stream = io_ctx_->stream(); - std::map computation_event; - std::map all_reduce_event; + std::unordered_map computation_event; + std::unordered_map all_reduce_event; for (auto& argu : *param_grads_) { PADDLE_ENFORCE(cudaEventCreateWithFlags(&computation_event[argu], cudaEventDisableTiming)); @@ -62,10 +62,13 @@ void ExecutorWithAllReduce::RunOperators(const ExecutorPrepareContext* ctx, } for (auto& op : ctx->ops_) { + // sgd should wait for allreduce finished for (auto& param2argu : op->Inputs()) { for (auto& argu : param2argu.second) { if (param_grads_->count(argu) != 0) { - cudaStreamWaitEvent(computation_stream, all_reduce_event[argu], 0); + LOG(INFO) << place_ << " " << op->Type() << " " << argu; + PADDLE_ENFORCE(cudaStreamWaitEvent(computation_stream, + all_reduce_event[argu], 0)); } } } @@ -77,7 +80,7 @@ void ExecutorWithAllReduce::RunOperators(const ExecutorPrepareContext* ctx, for (auto& param2argu : op->Outputs()) { for (auto& argu : param2argu.second) { if (param_grads_->count(argu) != 0) { - LOG(INFO) << place_ << "Launch allreduce on " << argu; + // LOG(INFO) << place_ << " Launch allreduce on " << argu; PADDLE_ENFORCE( cudaEventRecord(computation_event[argu], computation_stream)); diff --git a/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py b/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py index c95c297da5496..4fe95508f09f4 100644 --- a/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py +++ b/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py @@ -86,16 +86,87 @@ def conv_block(input, num_filter, groups, dropouts): return fc2 +def conv_bn_layer(input, ch_out, filter_size, stride, padding, act='relu'): + conv1 = fluid.layers.conv2d( + input=input, + filter_size=filter_size, + num_filters=ch_out, + stride=stride, + padding=padding, + act=None, + bias_attr=False) + # return conv1 + return fluid.layers.batch_norm(input=conv1, act=act) + + +def shortcut(input, ch_out, stride): + ch_in = input.shape[1] + if ch_in != ch_out: + return conv_bn_layer(input, ch_out, 1, stride, 0, None) + else: + return input + + +def basicblock(input, ch_out, stride): + short = shortcut(input, ch_out, stride) + conv1 = conv_bn_layer(input, ch_out, 3, stride, 1) + conv2 = conv_bn_layer(conv1, ch_out, 3, 1, 1, act=None) + return fluid.layers.elementwise_add(x=short, y=conv2, act='relu') + + +def bottleneck(input, ch_out, stride): + short = shortcut(input, ch_out * 4, stride) + conv1 = conv_bn_layer(input, ch_out, 1, stride, 0) + conv2 = conv_bn_layer(conv1, ch_out, 3, 1, 1) + conv3 = conv_bn_layer(conv2, ch_out * 4, 1, 1, 0, act=None) + return fluid.layers.elementwise_add(x=short, y=conv3, act='relu') + + +def layer_warp(block_func, input, ch_out, count, stride): + res_out = block_func(input, ch_out, stride) + for i in range(1, count): + res_out = block_func(res_out, ch_out, 1) + return res_out + + +def resnet_imagenet(input, class_dim, depth=50, data_format='NCHW'): + + cfg = { + 18: ([2, 2, 2, 1], basicblock), + 34: ([3, 4, 6, 3], basicblock), + 50: ([3, 4, 6, 3], bottleneck), + 101: ([3, 4, 23, 3], bottleneck), + 152: ([3, 8, 36, 3], bottleneck) + } + stages, block_func = cfg[depth] + conv1 = conv_bn_layer(input, ch_out=64, filter_size=7, stride=2, padding=3) + pool1 = fluid.layers.pool2d( + input=conv1, pool_type='avg', pool_size=3, pool_stride=2) + res1 = layer_warp(block_func, pool1, 64, stages[0], 1) + res2 = layer_warp(block_func, res1, 128, stages[1], 2) + res3 = layer_warp(block_func, res2, 256, stages[2], 2) + res4 = layer_warp(block_func, res3, 512, stages[3], 2) + pool2 = fluid.layers.pool2d( + input=res4, + pool_size=7, + pool_type='avg', + pool_stride=1, + global_pooling=True) + out = fluid.layers.fc(input=pool2, size=class_dim, act='softmax') + return out + + def run_benchmark(args): # Train program images = fluid.layers.fill_constant( shape=(args.batch_size, 3, 200, 200), dtype='float32', value=0.1) - predict = vgg16_bn_drop(images) + # predict = vgg16_bn_drop(images) + predict = resnet_imagenet(images, class_dim=1000) avg_cost = fluid.layers.mean(x=predict) # fluid.layers.Print(predict, summarize=3) - fluid.layers.Print(avg_cost) + # fluid.layers.Print(avg_cost) # Optimization # Note the flag append_all_reduce=True From 0760aaf4401b2e87684a9ae8e7931cf9e51a74b8 Mon Sep 17 00:00:00 2001 From: Yu Yang Date: Wed, 21 Mar 2018 19:20:49 +0800 Subject: [PATCH 11/25] Shrink batch_norm_grad's inputs --- paddle/fluid/operators/batch_norm_op.cc | 31 +++++++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/paddle/fluid/operators/batch_norm_op.cc b/paddle/fluid/operators/batch_norm_op.cc index 5d27f5b60c711..36049ee6a4a0d 100644 --- a/paddle/fluid/operators/batch_norm_op.cc +++ b/paddle/fluid/operators/batch_norm_op.cc @@ -457,12 +457,39 @@ class BatchNormGradKernel } }; +class BatchNormGradMaker : public framework::SingleGradOpDescMaker { + public: + using framework::SingleGradOpDescMaker::SingleGradOpDescMaker; + + protected: + std::unique_ptr Apply() const override { + auto *op = new framework::OpDesc(); + op->SetType("batch_norm_grad"); + op->SetInput("X", Input("X")); + op->SetInput(framework::GradVarName("Y"), OutputGrad("Y")); + + op->SetInput("Scale", Input("Scale")); + op->SetInput("SavedMean", Output("SavedMean")); + op->SetInput("SavedVariance", Output("SavedVariance")); + + op->SetAttrMap(Attrs()); + + op->SetOutput(framework::GradVarName("X"), InputGrad("X")); + op->SetOutput(framework::GradVarName("Scale"), InputGrad("Scale")); + op->SetOutput(framework::GradVarName("Bias"), InputGrad("Bias")); + + return std::unique_ptr(op); + } +}; + } // namespace operators } // namespace paddle namespace ops = paddle::operators; -REGISTER_OP(batch_norm, ops::BatchNormOp, ops::BatchNormOpMaker, - batch_norm_grad, ops::BatchNormGradOp); +REGISTER_OPERATOR(batch_norm, ops::BatchNormOp, ops::BatchNormOpMaker, + ops::BatchNormGradMaker); +REGISTER_OPERATOR(batch_norm_grad, ops::BatchNormGradOp); + REGISTER_OP_CPU_KERNEL( batch_norm, ops::BatchNormKernel); From 8699e4ee2f74c6028c4beac0438eaac171b04fa6 Mon Sep 17 00:00:00 2001 From: Yang Yang Date: Wed, 21 Mar 2018 17:41:05 +0000 Subject: [PATCH 12/25] add vgg_bn_drop --- paddle/fluid/framework/multi_gpu_executor.cc | 6 ++-- .../unittests/test_multi_gpu_executor.py | 31 +++++++++++++++++-- 2 files changed, 33 insertions(+), 4 deletions(-) diff --git a/paddle/fluid/framework/multi_gpu_executor.cc b/paddle/fluid/framework/multi_gpu_executor.cc index feebe7377ae83..f7d1595a0563e 100644 --- a/paddle/fluid/framework/multi_gpu_executor.cc +++ b/paddle/fluid/framework/multi_gpu_executor.cc @@ -66,7 +66,8 @@ void ExecutorWithAllReduce::RunOperators(const ExecutorPrepareContext* ctx, for (auto& param2argu : op->Inputs()) { for (auto& argu : param2argu.second) { if (param_grads_->count(argu) != 0) { - LOG(INFO) << place_ << " " << op->Type() << " " << argu; + LOG(INFO) << place_ << " " << op->Type() << param2argu.first << " " + << argu; PADDLE_ENFORCE(cudaStreamWaitEvent(computation_stream, all_reduce_event[argu], 0)); } @@ -80,7 +81,8 @@ void ExecutorWithAllReduce::RunOperators(const ExecutorPrepareContext* ctx, for (auto& param2argu : op->Outputs()) { for (auto& argu : param2argu.second) { if (param_grads_->count(argu) != 0) { - // LOG(INFO) << place_ << " Launch allreduce on " << argu; + LOG(INFO) << place_ << " " << op->Type() << " Launch allreduce on " + << argu; PADDLE_ENFORCE( cudaEventRecord(computation_event[argu], computation_stream)); diff --git a/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py b/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py index 4fe95508f09f4..41b230545a08e 100644 --- a/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py +++ b/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py @@ -63,6 +63,33 @@ def program_summary(program): def vgg16_bn_drop(input): + def conv_block(input, num_filter, groups, dropouts): + return fluid.nets.img_conv_group( + input=input, + pool_size=2, + pool_stride=2, + conv_num_filter=[num_filter] * groups, + conv_filter_size=3, + conv_act='relu', + conv_with_batchnorm=True, + conv_batchnorm_drop_rate=dropouts, + pool_type='max') + + conv1 = conv_block(input, 64, 2, [0.3, 0]) + conv2 = conv_block(conv1, 128, 2, [0.4, 0]) + conv3 = conv_block(conv2, 256, 3, [0.4, 0.4, 0]) + conv4 = conv_block(conv3, 512, 3, [0.4, 0.4, 0]) + conv5 = conv_block(conv4, 512, 3, [0.4, 0.4, 0]) + + drop = fluid.layers.dropout(x=conv5, dropout_prob=0.5) + fc1 = fluid.layers.fc(input=drop, size=4096, act=None) + bn = fluid.layers.batch_norm(input=fc1, act='relu') + drop2 = fluid.layers.dropout(x=bn, dropout_prob=0.5) + fc2 = fluid.layers.fc(input=drop2, size=4096, act=None) + return fc2 + + +def vgg16_bn(input): def conv_block(input, num_filter, groups, dropouts): return fluid.nets.img_conv_group( input=input, @@ -160,8 +187,8 @@ def run_benchmark(args): # Train program images = fluid.layers.fill_constant( shape=(args.batch_size, 3, 200, 200), dtype='float32', value=0.1) - # predict = vgg16_bn_drop(images) - predict = resnet_imagenet(images, class_dim=1000) + predict = vgg16_bn_drop(images) + # predict = resnet_imagenet(images, class_dim=1000) avg_cost = fluid.layers.mean(x=predict) From f07c25e1e5224cd1937a7aa92e1426aebad00645 Mon Sep 17 00:00:00 2001 From: Helin Wang Date: Thu, 22 Mar 2018 16:21:02 -0700 Subject: [PATCH 13/25] hook up data reader for multi-gpu executor example --- paddle/fluid/operators/read_op.cc | 5 +++- .../reader/create_recordio_file_reader_op.cc | 23 ++++++++++++---- .../unittests/test_multi_gpu_executor.py | 27 ++++++++++++++++--- 3 files changed, 46 insertions(+), 9 deletions(-) diff --git a/paddle/fluid/operators/read_op.cc b/paddle/fluid/operators/read_op.cc index 2a5605e0d378a..2925b8a85da1b 100644 --- a/paddle/fluid/operators/read_op.cc +++ b/paddle/fluid/operators/read_op.cc @@ -14,6 +14,7 @@ #include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/framework/reader.h" +#include "paddle/fluid/operators/detail/safe_ref.h" namespace paddle { namespace operators { @@ -59,7 +60,9 @@ class ReadOp : public framework::OperatorBase { void RunImpl(const framework::Scope& scope, const platform::Place& dev_place) const override { framework::ReaderHolder* reader = - scope.FindVar(Input("Reader"))->GetMutable(); + detail::Ref(scope.FindVar(Input("Reader")), + "Cannot find reader variable %s", Input("Reader")) + .GetMutable(); std::vector out_arg_names = Outputs("Out"); std::vector ins; reader->ReadNext(&ins); diff --git a/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc b/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc index c4aa29c7206db..0e00f218f9a83 100644 --- a/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc +++ b/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc @@ -18,6 +18,7 @@ namespace paddle { namespace operators { namespace reader { +template class RecordIOFileReader : public framework::FileReader { public: explicit RecordIOFileReader(const std::string& filename, @@ -25,7 +26,12 @@ class RecordIOFileReader : public framework::FileReader { : FileReader(dims), scanner_(filename), dev_ctx_(*platform::DeviceContextPool::Instance().Get( - platform::CPUPlace())) {} + platform::CPUPlace())) { + if (ThreadSafe) { + mutex_.reset(new std::mutex()); + } + LOG(INFO) << "Creating file reader" << filename; + } bool HasNext() const override { return scanner_.HasNext(); } @@ -33,10 +39,16 @@ class RecordIOFileReader : public framework::FileReader { protected: void ReadNextImpl(std::vector* out) override { - *out = framework::ReadFromRecordIO(scanner_, dev_ctx_); + if (ThreadSafe) { + std::lock_guard guard(*mutex_); + *out = framework::ReadFromRecordIO(scanner_, dev_ctx_); + } else { + *out = framework::ReadFromRecordIO(scanner_, dev_ctx_); + } } private: + std::unique_ptr mutex_; recordio::Scanner scanner_; const platform::DeviceContext& dev_ctx_; }; @@ -59,8 +71,9 @@ class CreateRecordIOReaderOp : public framework::OperatorBase { auto* out = scope.FindVar(Output("Out")) ->template GetMutable(); - out->Reset( - new RecordIOFileReader(filename, RestoreShapes(shape_concat, ranks))); + + out->Reset(new RecordIOFileReader( + filename, RestoreShapes(shape_concat, ranks))); } }; @@ -87,4 +100,4 @@ REGISTER_FILE_READER_OPERATOR(create_recordio_file_reader, reader::CreateRecordIOReaderOp, reader::CreateRecordIOReaderOpMaker); -REGISTER_FILE_READER(recordio, reader::RecordIOFileReader); +REGISTER_FILE_READER(recordio, reader::RecordIOFileReader); diff --git a/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py b/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py index 41b230545a08e..5d43e5ab3ad7c 100644 --- a/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py +++ b/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py @@ -21,6 +21,7 @@ import time import paddle.fluid as fluid +import paddle.v2 as paddle SEED = 1 DTYPE = "float32" @@ -29,6 +30,20 @@ # fluid.default_startup_program().random_seed = SEED +def convert_reader_to_record_io(): + reader = paddle.batch(paddle.dataset.flowers.train(), batch_size=32) + feeder = fluid.DataFeeder( + feed_list=[ # order is image and label + fluid.layers.data( + name='image', shape=[3, 32, 32]), + fluid.layers.data( + name='label', shape=[1], dtype='int64'), + ], + place=fluid.CPUPlace()) + fluid.recordio_writer.convert_reader_to_recordio_file('./flowers.recordio', + reader, feeder) + + def parse_args(): parser = argparse.ArgumentParser("mnist model benchmark.") parser.add_argument( @@ -185,9 +200,13 @@ def resnet_imagenet(input, class_dim, depth=50, data_format='NCHW'): def run_benchmark(args): # Train program - images = fluid.layers.fill_constant( - shape=(args.batch_size, 3, 200, 200), dtype='float32', value=0.1) - predict = vgg16_bn_drop(images) + reader = fluid.layers.open_recordio_file( + filename='./flowers.recordio', + shapes=[[-1, 3, 32, 32], [-1, 1]], + lod_levels=[0, 0], + dtypes=['float32', 'int64']) + img, label = fluid.layers.read_file(reader) + predict = vgg16_bn_drop(img) # predict = resnet_imagenet(images, class_dim=1000) avg_cost = fluid.layers.mean(x=predict) @@ -218,6 +237,7 @@ def run_benchmark(args): ])) # Parameter initialization + exe.run(fluid.default_startup_program().desc, 0, True, True) exe.init(fluid.default_startup_program().desc, 0, True, True) for iter_id in range(0, args.iterations): start = time.time() @@ -230,3 +250,4 @@ def run_benchmark(args): args = parse_args() print_arguments(args) run_benchmark(args) + # convert_reader_to_record_io() From b343ce3eaf3952a8550081f11fe132a37917ac36 Mon Sep 17 00:00:00 2001 From: Yang Yang Date: Thu, 22 Mar 2018 23:22:55 +0000 Subject: [PATCH 14/25] add embedding --- python/paddle/fluid/executor.py | 22 +++++++++++++------ .../unittests/test_multi_gpu_executor.py | 10 ++++++--- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index 2612fb1ae4198..1953a235d1ef8 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -333,17 +333,19 @@ def run(self, :param use_program_cache: set use_program_cache to true if program not changed compare to the last step. :return: result according to fetch_list. """ + if program is None: + program = default_main_program() + if not isinstance(program, Program): + raise TypeError() + if feed is None: feed = {} if not isinstance(feed, dict): raise TypeError("feed should be a map") if fetch_list is None: fetch_list = [] - if program is None: - program = default_main_program() - - if not isinstance(program, Program): - raise TypeError() + if not isinstance(fetch_list, dict): + raise TypeError("fetch should be a list") if scope is None: scope = global_scope() @@ -369,9 +371,15 @@ def run(self, feed_var_name=feed_var_name, fetch_var_name=fetch_var_name) + outs = self._run(feed, feed_var_name, fetch_list, fetch_var_name, + program, scope) + if return_numpy: + outs = as_numpy(outs) + return outs + + def _run(self, feed, feed_var_name, fetch_list, fetch_var_name, program, + scope): self._feed_data(program, feed, feed_var_name, scope) self.executor.run(program.desc, scope, 0, True, True) outs = self._fetch_data(fetch_list, fetch_var_name, scope) - if return_numpy: - outs = as_numpy(outs) return outs diff --git a/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py b/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py index 41b230545a08e..93ad4b1e55f8b 100644 --- a/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py +++ b/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py @@ -187,12 +187,16 @@ def run_benchmark(args): # Train program images = fluid.layers.fill_constant( shape=(args.batch_size, 3, 200, 200), dtype='float32', value=0.1) - predict = vgg16_bn_drop(images) + idx = fluid.layers.fill_constant(shape=[4, 1], dtype='int64', value=1) + emb = fluid.layers.embedding(input=idx, size=[2, 3]) + avg_cost = fluid.layers.mean(emb) + # predict = vgg16_bn_drop(images) # predict = resnet_imagenet(images, class_dim=1000) - avg_cost = fluid.layers.mean(x=predict) + # avg_cost = fluid.layers.mean(x=predict) - # fluid.layers.Print(predict, summarize=3) + # fluid.layers.Print(idx, summarize=7) + # fluid.layers.Print(emb, summarize=7) # fluid.layers.Print(avg_cost) # Optimization From 5f1127c03e90bc47d4acb2bc2149ce7cd522b1d6 Mon Sep 17 00:00:00 2001 From: Helin Wang Date: Thu, 22 Mar 2018 17:53:56 -0700 Subject: [PATCH 15/25] take the device context improvement from reyoung; turn on thread safe for record io reader; use double buffer reader --- .../reader/create_recordio_file_reader_op.cc | 2 +- paddle/fluid/platform/device_context.cc | 35 ++++++++++--------- paddle/fluid/platform/device_context.h | 18 +++------- .../unittests/test_multi_gpu_executor.py | 17 +++++---- 4 files changed, 32 insertions(+), 40 deletions(-) mode change 100644 => 100755 paddle/fluid/platform/device_context.h diff --git a/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc b/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc index 0e00f218f9a83..8703f7bcbecb9 100644 --- a/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc +++ b/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc @@ -100,4 +100,4 @@ REGISTER_FILE_READER_OPERATOR(create_recordio_file_reader, reader::CreateRecordIOReaderOp, reader::CreateRecordIOReaderOpMaker); -REGISTER_FILE_READER(recordio, reader::RecordIOFileReader); +REGISTER_FILE_READER(recordio, reader::RecordIOFileReader); diff --git a/paddle/fluid/platform/device_context.cc b/paddle/fluid/platform/device_context.cc index 98b4178177b0a..59b76a1edb5ec 100644 --- a/paddle/fluid/platform/device_context.cc +++ b/paddle/fluid/platform/device_context.cc @@ -10,43 +10,45 @@ See the License for the specific language governing permissions and limitations under the License. */ #include "paddle/fluid/platform/device_context.h" +#include #include "paddle/fluid/memory/memory.h" - namespace paddle { namespace platform { DeviceContextPool* DeviceContextPool::pool = nullptr; -const platform::DeviceContext* DeviceContextPool::Get( - const platform::Place& place) { +platform::DeviceContext* DeviceContextPool::Get(const platform::Place& place) { auto it = device_contexts_.find(place); if (it == device_contexts_.end()) { PADDLE_THROW( "'Place' is not supported, Please re-compile with WITH_GPU " "option"); } - return it->second; + return it->second.get(); } DeviceContextPool::DeviceContextPool( const std::vector& places) { PADDLE_ENFORCE_GT(places.size(), 0); - for (size_t i = 0; i < places.size(); i++) { - if (platform::is_cpu_place(places[i])) { + using PtrType = std::unique_ptr; + std::unordered_set set; + for (auto& p : places) { + set.insert(p); + } + + for (auto& p : set) { + if (platform::is_cpu_place(p)) { #ifdef PADDLE_WITH_MKLDNN - device_contexts_.emplace(places[i], - new platform::MKLDNNDeviceContext( - boost::get(places[i]))); + device_contexts_.emplace( + p, PtrType(new MKLDNNDeviceContext(boost::get(p)))); #else - device_contexts_.emplace(places[i], - new platform::CPUDeviceContext( - boost::get(places[i]))); + device_contexts_.emplace( + p, PtrType(new CPUDeviceContext(boost::get(p)))); #endif - } else if (platform::is_gpu_place(places[i])) { + } else if (platform::is_gpu_place(p)) { #ifdef PADDLE_WITH_CUDA - device_contexts_.emplace(places[i], - new platform::CUDADeviceContext( - boost::get(places[i]))); + device_contexts_.emplace( + p, PtrType(new CUDADeviceContext(boost::get(p)))); #else PADDLE_THROW( "'CUDAPlace' is not supported, Please re-compile with WITH_GPU " @@ -159,6 +161,7 @@ CUDADeviceContext::~CUDADeviceContext() { Place CUDADeviceContext::GetPlace() const { return place_; } void CUDADeviceContext::Wait() const { + std::lock_guard guard(mutex_); PADDLE_ENFORCE(cudaStreamSynchronize(stream_)); PADDLE_ENFORCE(cudaGetLastError()); } diff --git a/paddle/fluid/platform/device_context.h b/paddle/fluid/platform/device_context.h old mode 100644 new mode 100755 index 603b890af13b5..202394c7be7e1 --- a/paddle/fluid/platform/device_context.h +++ b/paddle/fluid/platform/device_context.h @@ -103,6 +103,7 @@ class CUDADeviceContext : public DeviceContext { std::unique_ptr eigen_device_; std::unique_ptr eigen_stream_; + mutable std::mutex mutex_; cudaStream_t stream_; cudnnHandle_t cudnn_handle_; cublasHandle_t cublas_handle_; @@ -159,7 +160,7 @@ class DeviceContextPool { } /*! \brief Return handle of single device context. */ - const platform::DeviceContext* Get(const platform::Place& place); + platform::DeviceContext* Get(const platform::Place& place); template const typename DefaultDeviceContextType::TYPE* GetByPlace( @@ -172,19 +173,8 @@ class DeviceContextPool { private: static DeviceContextPool* pool; - constexpr static int LEFT_SHIFT = 8; - struct Hash { - std::hash hash_; - size_t operator()(const platform::Place& place) const { - int pre_hash = place.which() << LEFT_SHIFT; - if (platform::is_gpu_place(place)) { - pre_hash += boost::get(place).GetDeviceId(); - } - return hash_(pre_hash); - } - }; - std::unordered_map + std::unordered_map, PlaceHash> device_contexts_; DISABLE_COPY_AND_ASSIGN(DeviceContextPool); }; diff --git a/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py b/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py index 5d43e5ab3ad7c..bfb7af29ade85 100644 --- a/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py +++ b/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py @@ -31,11 +31,11 @@ def convert_reader_to_record_io(): - reader = paddle.batch(paddle.dataset.flowers.train(), batch_size=32) + reader = paddle.batch(paddle.dataset.flowers.train(), batch_size=4) feeder = fluid.DataFeeder( feed_list=[ # order is image and label fluid.layers.data( - name='image', shape=[3, 32, 32]), + name='image', shape=[3, 224, 224]), fluid.layers.data( name='label', shape=[1], dtype='int64'), ], @@ -48,8 +48,6 @@ def parse_args(): parser = argparse.ArgumentParser("mnist model benchmark.") parser.add_argument( '--label_size', type=int, default=10, help='The label size.') - parser.add_argument( - '--batch_size', type=int, default=10, help='The minibatch size.') parser.add_argument( '--iterations', type=int, default=5, help='The number of minibatches.') parser.add_argument( @@ -200,11 +198,12 @@ def resnet_imagenet(input, class_dim, depth=50, data_format='NCHW'): def run_benchmark(args): # Train program - reader = fluid.layers.open_recordio_file( - filename='./flowers.recordio', - shapes=[[-1, 3, 32, 32], [-1, 1]], - lod_levels=[0, 0], - dtypes=['float32', 'int64']) + reader = fluid.layers.create_double_buffer_reader( + fluid.layers.open_recordio_file( + filename='./flowers.recordio', + shapes=[[-1, 3, 224, 224], [-1, 1]], + lod_levels=[0, 0], + dtypes=['float32', 'int64'])) img, label = fluid.layers.read_file(reader) predict = vgg16_bn_drop(img) # predict = resnet_imagenet(images, class_dim=1000) From bb07417bce0ade298469e408eab41682a77f821f Mon Sep 17 00:00:00 2001 From: Yang Yang Date: Fri, 23 Mar 2018 23:32:26 +0000 Subject: [PATCH 16/25] add share comment --- paddle/fluid/framework/reader.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/paddle/fluid/framework/reader.h b/paddle/fluid/framework/reader.h index 065943fa22d9e..2a50e4e6d524d 100644 --- a/paddle/fluid/framework/reader.h +++ b/paddle/fluid/framework/reader.h @@ -88,6 +88,8 @@ class ReaderHolder { bool HasNext() const { return reader_->HasNext(); } private: + // TODO(tonyyang): if we want to share this + // make sure it is thread-safe std::shared_ptr reader_; }; From 9e5d957ed33aaf47e73c6df5e9d40cac3c18fb20 Mon Sep 17 00:00:00 2001 From: Yang Yang Date: Mon, 26 Mar 2018 23:51:21 +0000 Subject: [PATCH 17/25] change name --- paddle/fluid/pybind/pybind.cc | 2 +- .../unittests/test_multi_gpu_executor.py | 19 ++++++++++++------- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index d595b1109d5f7..53fc3211c4c00 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -476,7 +476,7 @@ All parameter, weight, gradient are variables in Paddle. m.def("nvprof_start", platform::CudaProfilerStart); m.def("nvprof_stop", platform::CudaProfilerStop); - py::class_(m, "ParallelExecutor") + py::class_(m, "MultiGPUExecutor") .def( "__init__", [](MultiGPUExecutor &self, const std::vector &places, diff --git a/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py b/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py index 7f9c03716ec03..2d7c3fd159440 100644 --- a/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py +++ b/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py @@ -102,7 +102,7 @@ def conv_block(input, num_filter, groups, dropouts): return fc2 -def vgg16_bn(input): +def vgg16(input): def conv_block(input, num_filter, groups, dropouts): return fluid.nets.img_conv_group( input=input, @@ -121,8 +121,8 @@ def conv_block(input, num_filter, groups, dropouts): conv4 = conv_block(conv3, 512, 3, [0.4, 0.4, 0]) conv5 = conv_block(conv4, 512, 3, [0.4, 0.4, 0]) - fc1 = fluid.layers.fc(input=conv5, size=4096, act=None) - fc2 = fluid.layers.fc(input=fc1, size=4096, act=None) + fc1 = fluid.layers.fc(input=conv5, size=4096, act='softmax') + fc2 = fluid.layers.fc(input=fc1, size=4096, act='softmax') return fc2 @@ -204,12 +204,17 @@ def run_benchmark(args): lod_levels=[0, 0], dtypes=['float32', 'int64']) img, label = fluid.layers.read_file(reader) - predict = vgg16_bn_drop(img) - avg_cost = fluid.layers.mean(predict) - # fluid.layers.Print(avg_cost) + predict = vgg16(img) + cost = fluid.layers.cross_entropy(input=predict, label=label) + avg_cost = fluid.layers.mean(cost) opt = fluid.optimizer.SGDOptimizer(learning_rate=0.001) opt.minimize(avg_cost) + # fluid.layers.Print(predict, summarize=10) + # fluid.layers.Print(label, summarize=10) + # fluid.layers.Print(cost, summarize=10) + fluid.layers.Print(avg_cost) + # fluid.memory_optimize(fluid.default_main_program(), print_log=True) # program_summary(fluid.default_main_program()) @@ -222,7 +227,7 @@ def run_benchmark(args): p.set_place(each) act_places.append(p) - exe = fluid.core.ParallelExecutor(act_places, + exe = fluid.core.MultiGPUExecutor(act_places, set([ p.name for p in fluid.default_main_program() From 33ada99e5cddc06c3c0b44a14a58efb3e70bb8a2 Mon Sep 17 00:00:00 2001 From: Helin Wang Date: Mon, 26 Mar 2018 17:20:34 -0700 Subject: [PATCH 18/25] scope: add replicas, used for multi gpu executor --- paddle/fluid/framework/multi_gpu_executor.cc | 29 +++++++++++-------- paddle/fluid/framework/multi_gpu_executor.h | 5 ++-- paddle/fluid/framework/scope.cc | 15 ++++++++++ paddle/fluid/framework/scope.h | 3 ++ .../unittests/test_multi_gpu_executor.py | 6 ++-- 5 files changed, 41 insertions(+), 17 deletions(-) diff --git a/paddle/fluid/framework/multi_gpu_executor.cc b/paddle/fluid/framework/multi_gpu_executor.cc index 6cfe911ad439c..9e6213e3b9844 100644 --- a/paddle/fluid/framework/multi_gpu_executor.cc +++ b/paddle/fluid/framework/multi_gpu_executor.cc @@ -122,19 +122,18 @@ MultiGPUExecutor::MultiGPUExecutor( auto& place = places[i]; exes_.push_back( framework::ExecutorWithAllReduce(place, ¶m_grads_, &nccl_ctx_)); - scopes_.push_back(new framework::Scope()); } } -void MultiGPUExecutor::Init(const ProgramDesc& prog, int block_id, +void MultiGPUExecutor::Init(const ProgramDesc& prog, Scope* scope, int block_id, bool create_local_scope, bool create_vars) { // init parameters on one device - exes_[0].Run(prog, scopes_[0], block_id, create_local_scope, create_vars); + auto replicas = scope->replicas(exes_.size() - 1); + exes_[0].Run(prog, scope, block_id, create_local_scope, create_vars); for (auto* var_desc : prog.Block(0).AllVars()) { if (var_desc->GetType() == proto::VarType::LOD_TENSOR) { - auto& main_tensor = - scopes_[0]->FindVar(var_desc->Name())->Get(); + auto& main_tensor = scope->FindVar(var_desc->Name())->Get(); ncclDataType_t data_type = ToNCCLDataType(main_tensor.type()); auto& dims = main_tensor.dims(); size_t numel = main_tensor.numel(); @@ -145,7 +144,7 @@ void MultiGPUExecutor::Init(const ProgramDesc& prog, int block_id, if (i == 0) { buffer = const_cast(main_tensor.data()); } else { - auto local_scope = scopes_[i]; + auto local_scope = replicas[i - 1]; auto* t = local_scope->Var(var_desc->Name())->GetMutable(); t->Resize(dims); buffer = t->mutable_data(exes_[i].place_, main_tensor.type()); @@ -159,9 +158,9 @@ void MultiGPUExecutor::Init(const ProgramDesc& prog, int block_id, } else if (var_desc->GetType() == proto::VarType::READER) { VLOG(4) << "Copy reader " << var_desc->Name(); auto& reader = - scopes_[0]->FindVar(var_desc->Name())->Get(); - for (size_t i = 1; i < scopes_.size(); ++i) { - auto* reader_dup = scopes_[i] + scope->FindVar(var_desc->Name())->Get(); + for (size_t i = 0; i < replicas.size(); ++i) { + auto* reader_dup = replicas[i] ->Var(var_desc->Name()) ->GetMutable(); *reader_dup = reader; @@ -170,15 +169,21 @@ void MultiGPUExecutor::Init(const ProgramDesc& prog, int block_id, } } -void MultiGPUExecutor::Run(const ProgramDesc& prog, int block_id, +void MultiGPUExecutor::Run(const ProgramDesc& prog, Scope* scope, int block_id, bool create_local_scope, bool create_vars) { // prepare prog in a single thread to avoid race auto context = exes_[0].Prepare(prog, block_id); - + auto replicas = scope->replicas(exes_.size() - 1); std::vector threads; for (size_t i = 0; i < exes_.size(); ++i) { threads.push_back(std::thread([&, i] { - exes_[i].RunPreparedContext(context.get(), scopes_[i], create_local_scope, + Scope* cur_scope; + if (i == 0) { + cur_scope = scope; + } else { + cur_scope = replicas[i - 1].get(); + } + exes_[i].RunPreparedContext(context.get(), cur_scope, create_local_scope, create_vars); })); } diff --git a/paddle/fluid/framework/multi_gpu_executor.h b/paddle/fluid/framework/multi_gpu_executor.h index 6acde1e0a490e..5673dbb7b94fb 100644 --- a/paddle/fluid/framework/multi_gpu_executor.h +++ b/paddle/fluid/framework/multi_gpu_executor.h @@ -66,7 +66,7 @@ class MultiGPUExecutor { const std::unordered_set& params); // Init parameters on one device and broadcast them to other devices - void Init(const ProgramDesc& prog, int block_id, + void Init(const ProgramDesc& prog, Scope* scope, int block_id, bool create_local_scope = true, bool create_vars = true); /* @Brief @@ -76,7 +76,7 @@ class MultiGPUExecutor { * ProgramDesc * Scope */ - void Run(const ProgramDesc& prog, int block_id, + void Run(const ProgramDesc& prog, Scope* scope, int block_id, bool create_local_scope = true, bool create_vars = true); private: @@ -86,7 +86,6 @@ class MultiGPUExecutor { std::unordered_set param_grads_; std::vector exes_; - std::vector scopes_; }; } // namespace framework diff --git a/paddle/fluid/framework/scope.cc b/paddle/fluid/framework/scope.cc index 17e38b1cf0426..f5b304db9de3c 100644 --- a/paddle/fluid/framework/scope.cc +++ b/paddle/fluid/framework/scope.cc @@ -25,6 +25,7 @@ DEFINE_bool(benchmark, false, "Doing memory benchmark. It will make deleting scope synchronized, " "and add some memory usage logs." "Default cuda is asynchronous device, set to True will" + "force op run in synchronous mode."); namespace paddle { @@ -38,6 +39,20 @@ Scope::~Scope() { } } +std::vector>& Scope::replicas(size_t replica_count) { + if (replicas_.size() >= replica_count) { + return replicas_; + } + + auto more = replica_count - replicas_.size(); + replicas_.reserve(more); + for (size_t i = 0; i < more; i++) { + replicas_.push_back(std::make_shared()); + } + + return replicas_; +} + Scope& Scope::NewScope() const { kids_.push_back(new Scope(this)); return *kids_.back(); diff --git a/paddle/fluid/framework/scope.h b/paddle/fluid/framework/scope.h index c1e1f49caaa5a..9c5556974ea9b 100644 --- a/paddle/fluid/framework/scope.h +++ b/paddle/fluid/framework/scope.h @@ -59,6 +59,8 @@ class Scope { const Scope& parent() const { return *parent_; } + std::vector>& replicas(size_t replica_count); + /// Find the scope or an ancestor scope that contains the given variable. const Scope* FindScope(const Variable* var) const; @@ -86,6 +88,7 @@ class Scope { mutable std::unordered_map vars_; mutable std::list kids_; Scope const* parent_{nullptr}; + std::vector> replicas_; DISABLE_COPY_AND_ASSIGN(Scope); }; diff --git a/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py b/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py index 2d7c3fd159440..6627837476fdf 100644 --- a/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py +++ b/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py @@ -235,10 +235,12 @@ def run_benchmark(args): ])) # Parameter initialization - exe.init(fluid.default_startup_program().desc, 0, True, True) + exe.init(fluid.default_startup_program().desc, + fluid.global_scope(), 0, True, True) for iter_id in range(0, args.iterations): start = time.time() - exe.run(fluid.default_main_program().desc, 0, True, True) + exe.run(fluid.default_main_program().desc, + fluid.global_scope(), 0, True, True) end = time.time() print("iter=%d, elapse=%f" % (iter_id, (end - start))) From 27d17e0d8892b6b97fb37a970e4693af5bcc7cac Mon Sep 17 00:00:00 2001 From: Yang Yang Date: Tue, 27 Mar 2018 21:51:37 +0000 Subject: [PATCH 19/25] add wait on executor --- paddle/fluid/framework/executor.cc | 11 +++++++++++ .../fluid/tests/unittests/test_multi_gpu_executor.py | 4 ++-- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/paddle/fluid/framework/executor.cc b/paddle/fluid/framework/executor.cc index 88a9bc64fb2d5..87e4e5b600eab 100644 --- a/paddle/fluid/framework/executor.cc +++ b/paddle/fluid/framework/executor.cc @@ -316,6 +316,17 @@ void Executor::RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope, RunOperators(ctx, local_scope); + platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); + auto& dev_ctx = *pool.Get(place_); + dev_ctx.Wait(); + + if (FLAGS_benchmark) { + VLOG(2) << "-------------------------------------------------------"; + VLOG(2) << "Memory used after operator run: " + << memory::memory_usage(place_); + VLOG(2) << "-------------------------------------------------------"; + } + if (create_vars && create_local_scope) { scope->DeleteScope(local_scope); } diff --git a/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py b/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py index 2d7c3fd159440..f2c952c6023db 100644 --- a/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py +++ b/python/paddle/fluid/tests/unittests/test_multi_gpu_executor.py @@ -204,7 +204,7 @@ def run_benchmark(args): lod_levels=[0, 0], dtypes=['float32', 'int64']) img, label = fluid.layers.read_file(reader) - predict = vgg16(img) + predict = resnet_imagenet(img, class_dim=1000) cost = fluid.layers.cross_entropy(input=predict, label=label) avg_cost = fluid.layers.mean(cost) opt = fluid.optimizer.SGDOptimizer(learning_rate=0.001) @@ -213,7 +213,7 @@ def run_benchmark(args): # fluid.layers.Print(predict, summarize=10) # fluid.layers.Print(label, summarize=10) # fluid.layers.Print(cost, summarize=10) - fluid.layers.Print(avg_cost) + # fluid.layers.Print(avg_cost) # fluid.memory_optimize(fluid.default_main_program(), print_log=True) From 7aad0216d4e207c2e8f237ff07e0c90cfa40cdd6 Mon Sep 17 00:00:00 2001 From: Helin Wang Date: Wed, 28 Mar 2018 13:31:38 -0700 Subject: [PATCH 20/25] Add test parallel executor and transformer model from reyoung's PR --- .../tests/unittests/test_parallel_executor.py | 429 +++++++++++++++ .../tests/unittests/transformer_model.py | 487 ++++++++++++++++++ 2 files changed, 916 insertions(+) create mode 100644 python/paddle/fluid/tests/unittests/test_parallel_executor.py create mode 100644 python/paddle/fluid/tests/unittests/transformer_model.py diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor.py b/python/paddle/fluid/tests/unittests/test_parallel_executor.py new file mode 100644 index 0000000000000..bbfd03c638dac --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor.py @@ -0,0 +1,429 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import numpy +import unittest + +import paddle.fluid as fluid +import paddle.v2 as paddle +import paddle.v2.dataset.mnist as mnist +import paddle.v2.dataset.wmt16 as wmt16 + + +def simple_fc_net(): + reader = fluid.layers.open_recordio_file( + filename='./mnist.recordio', + shapes=[[-1, 784], [-1, 1]], + lod_levels=[0, 0], + dtypes=['float32', 'int64']) + img, label = fluid.layers.read_file(reader) + hidden = img + for _ in xrange(4): + hidden = fluid.layers.fc( + hidden, + size=200, + act='tanh', + bias_attr=fluid.ParamAttr( + initializer=fluid.initializer.Constant(value=1.0))) + prediction = fluid.layers.fc(hidden, size=10, act='softmax') + loss = fluid.layers.cross_entropy(input=prediction, label=label) + loss = fluid.layers.mean(loss) + return loss + + +def fc_with_batchnorm(): + reader = fluid.layers.open_recordio_file( + filename='./mnist.recordio', + shapes=[[-1, 784], [-1, 1]], + lod_levels=[0, 0], + dtypes=['float32', 'int64']) + img, label = fluid.layers.read_file(reader) + hidden = img + for _ in xrange(1): + hidden = fluid.layers.fc( + hidden, + size=200, + act='tanh', + bias_attr=fluid.ParamAttr( + initializer=fluid.initializer.Constant(value=1.0))) + + hidden = fluid.layers.batch_norm(input=hidden) + + prediction = fluid.layers.fc(hidden, size=10, act='softmax') + loss = fluid.layers.cross_entropy(input=prediction, label=label) + loss = fluid.layers.mean(loss) + return loss + + +def squeeze_excitation(input, num_channels, reduction_ratio): + # pool = fluid.layers.pool2d( + # input=input, pool_size=0, pool_type='avg', global_pooling=True) + conv = input + shape = conv.shape + reshape = fluid.layers.reshape( + x=conv, shape=[-1, shape[1], shape[2] * shape[3]]) + pool = fluid.layers.reduce_mean(input=reshape, dim=2) + + squeeze = fluid.layers.fc(input=pool, + size=num_channels / reduction_ratio, + act='relu') + excitation = fluid.layers.fc(input=squeeze, + size=num_channels, + act='sigmoid') + scale = fluid.layers.elementwise_mul(x=input, y=excitation, axis=0) + return scale + + +def conv_bn_layer(input, num_filters, filter_size, stride=1, groups=1, + act=None): + conv = fluid.layers.conv2d( + input=input, + num_filters=num_filters, + filter_size=filter_size, + stride=stride, + padding=(filter_size - 1) / 2, + groups=groups, + act=None, + bias_attr=False) + return fluid.layers.batch_norm(input=conv, act=act, momentum=0.1) + + +def shortcut(input, ch_out, stride): + ch_in = input.shape[1] + if ch_in != ch_out: + if stride == 1: + filter_size = 1 + else: + filter_size = 3 + return conv_bn_layer(input, ch_out, filter_size, stride) + else: + return input + + +def bottleneck_block(input, num_filters, stride, cardinality, reduction_ratio): + # The number of first 1x1 convolutional channels for each bottleneck build block + # was halved to reduce the compution cost. + conv0 = conv_bn_layer( + input=input, num_filters=num_filters, filter_size=1, act='relu') + conv1 = conv_bn_layer( + input=conv0, + num_filters=num_filters * 2, + filter_size=3, + stride=stride, + groups=cardinality, + act='relu') + conv2 = conv_bn_layer( + input=conv1, num_filters=num_filters * 2, filter_size=1, act=None) + scale = squeeze_excitation( + input=conv2, + num_channels=num_filters * 2, + reduction_ratio=reduction_ratio) + + short = shortcut(input, num_filters * 2, stride) + + return fluid.layers.elementwise_add(x=short, y=scale, act='relu') + + +def SE_ResNeXt152(batch_size=4): + img = fluid.layers.fill_constant( + shape=[batch_size, 3, 224, 224], dtype='float32', value=0.0) + label = fluid.layers.fill_constant( + shape=[batch_size, 1], dtype='int64', value=0.0) + + conv = conv_bn_layer( + input=img, num_filters=64, filter_size=3, stride=2, act='relu') + conv = conv_bn_layer( + input=conv, num_filters=64, filter_size=3, stride=1, act='relu') + conv = conv_bn_layer( + input=conv, num_filters=128, filter_size=3, stride=1, act='relu') + conv = fluid.layers.pool2d( + input=conv, pool_size=3, pool_stride=2, pool_padding=1, pool_type='max') + + cardinality = 64 + reduction_ratio = 16 + depth = [3, 8, 36, 3] + num_filters = [128, 256, 512, 1024] + + for block in range(len(depth)): + for i in range(depth[block]): + conv = bottleneck_block( + input=conv, + num_filters=num_filters[block], + stride=2 if i == 0 and block != 0 else 1, + cardinality=cardinality, + reduction_ratio=reduction_ratio) + + shape = conv.shape + reshape = fluid.layers.reshape( + x=conv, shape=[-1, shape[1], shape[2] * shape[3]]) + pool = fluid.layers.reduce_mean(input=reshape, dim=2) + dropout = fluid.layers.dropout(x=pool, dropout_prob=0.2) + # Classifier layer: + prediction = fluid.layers.fc(input=dropout, size=1000, act='softmax') + loss = fluid.layers.cross_entropy(input=prediction, label=label) + loss = fluid.layers.mean(loss) + return loss + + +import time + + +class TestParallelExecutorBase(unittest.TestCase): + def check_network_convergence(self, + method, + memory_opt=True, + iter=10, + batch_size=None): + main = fluid.Program() + startup = fluid.Program() + with fluid.program_guard(main, startup): + loss = method() + adam = fluid.optimizer.Adam() + adam.minimize(loss) + if memory_opt: + fluid.memory_optimize(main) + + exe = fluid.ParallelExecutor(loss_name=loss.name, use_cuda=True) + if batch_size is not None: + batch_size *= fluid.core.get_cuda_device_count() + begin = time.time() + first_loss, = exe.run([loss.name]) + first_loss = numpy.array(first_loss) + + for i in xrange(iter): + exe.run([]) + + last_loss, = exe.run([loss.name]) + end = time.time() + + if batch_size is not None: + print "%.4f Instance per second" % ( + (batch_size * iter + 2) / (end - begin)) + + last_loss = numpy.array(last_loss) + + print first_loss, last_loss + # self.assertGreater(first_loss[0], last_loss[0]) + + +class TestMNIST(TestParallelExecutorBase): + @classmethod + def setUpClass(cls): + # Convert mnist to recordio file + with fluid.program_guard(fluid.Program(), fluid.Program()): + reader = paddle.batch(mnist.train(), batch_size=32) + feeder = fluid.DataFeeder( + feed_list=[ # order is image and label + fluid.layers.data( + name='image', shape=[784]), + fluid.layers.data( + name='label', shape=[1], dtype='int64'), + ], + place=fluid.CPUPlace()) + fluid.recordio_writer.convert_reader_to_recordio_file( + './mnist.recordio', reader, feeder) + + def test_simple_fc(self): + self.check_network_convergence(simple_fc_net) + + def test_batchnorm_fc(self): + self.check_network_convergence(fc_with_batchnorm) + + +class TestResnet(TestParallelExecutorBase): + # @classmethod + # def setUpClass(cls): + # # import os + # # if os.path.exists('./flowers.recordio'): + # # return + # with fluid.program_guard(fluid.Program(), fluid.Program()): + # reader = paddle.batch(flowers.train(), batch_size=4) + # feeder = fluid.DataFeeder( + # feed_list=[ + # fluid.layers.data( + # name='image', shape=[3, 224, 224]), + # fluid.layers.data( + # name='label', shape=[1], dtype='int64'), + # ], + # place=fluid.CPUPlace()) + # fluid.recordio_writer.convert_reader_to_recordio_file( + # "./flowers.recordio", reader, feeder, compressor=fluid.core.RecordIOWriter.Compressor.NoCompress) + + def test_resnet(self): + import functools + batch_size = 4 + self.check_network_convergence( + functools.partial( + SE_ResNeXt152, batch_size=batch_size), + iter=20, + batch_size=batch_size) + + +class ModelHyperParams(object): + # Dictionary size for source and target language. This model directly uses + # paddle.dataset.wmt16 in which , and token has + # alreay been added, but the token is not added. Transformer requires + # sequences in a mini-batch are padded to have the same length. A token is + # added into the original dictionary in paddle.dateset.wmt16. + + # size of source word dictionary. + src_vocab_size = 10000 + # index for token in source language. + src_pad_idx = src_vocab_size + + # size of target word dictionay + trg_vocab_size = 10000 + # index for token in target language. + trg_pad_idx = trg_vocab_size + + # position value corresponding to the token. + pos_pad_idx = 0 + + # max length of sequences. It should plus 1 to include position + # padding token for position encoding. + max_length = 50 + + # the dimension for word embeddings, which is also the last dimension of + # the input and output of multi-head attention, position-wise feed-forward + # networks, encoder and decoder. + + d_model = 512 + # size of the hidden layer in position-wise feed-forward networks. + d_inner_hid = 1024 + # the dimension that keys are projected to for dot-product attention. + d_key = 64 + # the dimension that values are projected to for dot-product attention. + d_value = 64 + # number of head used in multi-head attention. + n_head = 8 + # number of sub-layers to be stacked in the encoder and decoder. + n_layer = 6 + # dropout rate used by all dropout layers. + dropout = 0.1 + + +import numpy as np + + +def prepare_batch_input(insts, src_pad_idx, trg_pad_idx, n_head): + """ + Pad the instances to the max sequence length in batch, and generate the + corresponding position data and attention bias. Then, convert the numpy + data to tensors and return a dict mapping names to tensors. + """ + + def __pad_batch_data(insts, + pad_idx, + is_target=False, + return_pos=True, + return_attn_bias=True, + return_max_len=True): + """ + Pad the instances to the max sequence length in batch, and generate the + corresponding position data and attention bias. + """ + return_list = [] + max_len = max(len(inst) for inst in insts) + inst_data = np.array( + [inst + [pad_idx] * (max_len - len(inst)) for inst in insts]) + return_list += [inst_data.astype("int64").reshape([-1, 1])] + if return_pos: + inst_pos = np.array([[ + pos_i + 1 if w_i != pad_idx else 0 + for pos_i, w_i in enumerate(inst) + ] for inst in inst_data]) + + return_list += [inst_pos.astype("int64").reshape([-1, 1])] + if return_attn_bias: + if is_target: + # This is used to avoid attention on paddings and subsequent + # words. + slf_attn_bias_data = np.ones((inst_data.shape[0], max_len, + max_len)) + slf_attn_bias_data = np.triu(slf_attn_bias_data, 1).reshape( + [-1, 1, max_len, max_len]) + slf_attn_bias_data = np.tile(slf_attn_bias_data, + [1, n_head, 1, 1]) * [-1e9] + else: + # This is used to avoid attention on paddings. + slf_attn_bias_data = np.array([[0] * len(inst) + [-1e9] * + (max_len - len(inst)) + for inst in insts]) + slf_attn_bias_data = np.tile( + slf_attn_bias_data.reshape([-1, 1, 1, max_len]), + [1, n_head, max_len, 1]) + return_list += [slf_attn_bias_data.astype("float32")] + if return_max_len: + return_list += [max_len] + return return_list if len(return_list) > 1 else return_list[0] + + def data_to_tensor(data_list, name_list, input_dict, place): + assert len(data_list) == len(name_list) + for i in range(len(name_list)): + tensor = fluid.LoDTensor() + tensor.set(data_list[i], place) + input_dict[name_list[i]] = tensor + + src_word, src_pos, src_slf_attn_bias, src_max_len = __pad_batch_data( + [inst[0] for inst in insts], src_pad_idx, is_target=False) + trg_word, trg_pos, trg_slf_attn_bias, trg_max_len = __pad_batch_data( + [inst[1] for inst in insts], trg_pad_idx, is_target=True) + trg_src_attn_bias = np.tile(src_slf_attn_bias[:, :, ::src_max_len, :], + [1, 1, trg_max_len, 1]).astype("float32") + lbl_word = __pad_batch_data([inst[2] for inst in insts], trg_pad_idx, False, + False, False, False) + lbl_weight = (lbl_word != trg_pad_idx).astype("float32").reshape([-1, 1]) + + return [ + src_word, src_pos, trg_word, trg_pos, src_slf_attn_bias, + trg_slf_attn_bias, trg_src_attn_bias, lbl_word, lbl_weight + ] + + +import transformer_model + + +def transformer(): + return transformer_model.transformer( + ModelHyperParams.src_vocab_size + 1, + ModelHyperParams.trg_vocab_size + 1, ModelHyperParams.max_length + 1, + ModelHyperParams.n_layer, ModelHyperParams.n_head, + ModelHyperParams.d_key, ModelHyperParams.d_value, + ModelHyperParams.d_model, ModelHyperParams.d_inner_hid, + ModelHyperParams.dropout, ModelHyperParams.src_pad_idx, + ModelHyperParams.trg_pad_idx, ModelHyperParams.pos_pad_idx) + + +class TestTransformer(TestParallelExecutorBase): + @classmethod + def setUpClass(cls): + reader = paddle.batch( + wmt16.train(ModelHyperParams.src_vocab_size, + ModelHyperParams.trg_vocab_size), + batch_size=transformer_model.batch_size) + + with fluid.recordio_writer.create_recordio_writer( + "./wmt16.recordio") as writer: + for batch in reader(): + for tensor in prepare_batch_input( + batch, ModelHyperParams.src_pad_idx, + ModelHyperParams.trg_pad_idx, ModelHyperParams.n_head): + t = fluid.LoDTensor() + t.set(tensor, fluid.CPUPlace()) + writer.append_tensor(t) + writer.complete_append_tensor() + + @unittest.skip("transformer is buggy in multi gpu") + def test_main(self): + self.check_network_convergence(transformer) diff --git a/python/paddle/fluid/tests/unittests/transformer_model.py b/python/paddle/fluid/tests/unittests/transformer_model.py new file mode 100644 index 0000000000000..c62792face3c3 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/transformer_model.py @@ -0,0 +1,487 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from functools import partial +import numpy as np + +import paddle.fluid as fluid +import paddle.fluid.layers as layers + +pos_enc_param_names = ( + "src_pos_enc_table", + "trg_pos_enc_table", ) + +batch_size = 64 + + +def position_encoding_init(n_position, d_pos_vec): + """ + Generate the initial values for the sinusoid position encoding table. + """ + position_enc = np.array([[ + pos / np.power(10000, 2 * (j // 2) / d_pos_vec) + for j in range(d_pos_vec) + ] if pos != 0 else np.zeros(d_pos_vec) for pos in range(n_position)]) + position_enc[1:, 0::2] = np.sin(position_enc[1:, 0::2]) # dim 2i + position_enc[1:, 1::2] = np.cos(position_enc[1:, 1::2]) # dim 2i+1 + return position_enc.astype("float32") + + +def multi_head_attention(queries, + keys, + values, + attn_bias, + d_key, + d_value, + d_model, + n_head=1, + dropout_rate=0.): + """ + Multi-Head Attention. Note that attn_bias is added to the logit before + computing softmax activiation to mask certain selected positions so that + they will not considered in attention weights. + """ + if not (len(queries.shape) == len(keys.shape) == len(values.shape) == 3): + raise ValueError( + "Inputs: quries, keys and values should all be 3-D tensors.") + + def __compute_qkv(queries, keys, values, n_head, d_key, d_value): + """ + Add linear projection to queries, keys, and values. + """ + q = layers.fc(input=queries, + size=d_key * n_head, + param_attr=fluid.initializer.Xavier( + uniform=False, + fan_in=d_model * d_key, + fan_out=n_head * d_key), + bias_attr=False, + num_flatten_dims=2) + k = layers.fc(input=keys, + size=d_key * n_head, + param_attr=fluid.initializer.Xavier( + uniform=False, + fan_in=d_model * d_key, + fan_out=n_head * d_key), + bias_attr=False, + num_flatten_dims=2) + v = layers.fc(input=values, + size=d_value * n_head, + param_attr=fluid.initializer.Xavier( + uniform=False, + fan_in=d_model * d_value, + fan_out=n_head * d_value), + bias_attr=False, + num_flatten_dims=2) + return q, k, v + + def __split_heads(x, n_head): + """ + Reshape the last dimension of inpunt tensor x so that it becomes two + dimensions and then transpose. Specifically, input a tensor with shape + [bs, max_sequence_length, n_head * hidden_dim] then output a tensor + with shape [bs, n_head, max_sequence_length, hidden_dim]. + """ + if n_head == 1: + return x + + hidden_size = x.shape[-1] + # FIXME(guosheng): Decouple the program desc with batch_size. + reshaped = layers.reshape( + x=x, shape=[batch_size, -1, n_head, hidden_size // n_head]) + + # permuate the dimensions into: + # [batch_size, n_head, max_sequence_len, hidden_size_per_head] + return layers.transpose(x=reshaped, perm=[0, 2, 1, 3]) + + def __combine_heads(x): + """ + Transpose and then reshape the last two dimensions of inpunt tensor x + so that it becomes one dimension, which is reverse to __split_heads. + """ + if len(x.shape) == 3: return x + if len(x.shape) != 4: + raise ValueError("Input(x) should be a 4-D Tensor.") + + trans_x = layers.transpose(x, perm=[0, 2, 1, 3]) + # FIXME(guosheng): Decouple the program desc with batch_size. + return layers.reshape( + x=trans_x, + shape=map(int, + [batch_size, -1, trans_x.shape[2] * trans_x.shape[3]])) + + def scaled_dot_product_attention(q, k, v, attn_bias, d_model, dropout_rate): + """ + Scaled Dot-Product Attention + """ + + # FIXME(guosheng): Optimize the shape in reshape_op or softmax_op. + + # The current implementation of softmax_op only supports 2D tensor, + # consequently it cannot be directly used here. + # If to use the reshape_op, Besides, the shape of product inferred in + # compile-time is not the actual shape in run-time. It cann't be used + # to set the attribute of reshape_op. + # So, here define the softmax for temporary solution. + + def __softmax(x, eps=1e-9): + exp_out = layers.exp(x=x) + sum_out = layers.reduce_sum(exp_out, dim=-1, keep_dim=False) + return layers.elementwise_div(x=exp_out, y=sum_out, axis=0) + + scaled_q = layers.scale(x=q, scale=d_model**-0.5) + product = layers.matmul(x=scaled_q, y=k, transpose_y=True) + weights = __softmax(layers.elementwise_add(x=product, y=attn_bias)) + if dropout_rate: + weights = layers.dropout( + weights, dropout_prob=dropout_rate, is_test=False) + out = layers.matmul(weights, v) + return out + + q, k, v = __compute_qkv(queries, keys, values, n_head, d_key, d_value) + + q = __split_heads(q, n_head) + k = __split_heads(k, n_head) + v = __split_heads(v, n_head) + + ctx_multiheads = scaled_dot_product_attention(q, k, v, attn_bias, d_model, + dropout_rate) + + out = __combine_heads(ctx_multiheads) + + # Project back to the model size. + proj_out = layers.fc(input=out, + size=d_model, + param_attr=fluid.initializer.Xavier(uniform=False), + bias_attr=False, + num_flatten_dims=2) + return proj_out + + +def positionwise_feed_forward(x, d_inner_hid, d_hid): + """ + Position-wise Feed-Forward Networks. + This module consists of two linear transformations with a ReLU activation + in between, which is applied to each position separately and identically. + """ + hidden = layers.fc(input=x, + size=d_inner_hid, + num_flatten_dims=2, + param_attr=fluid.initializer.Uniform( + low=-(d_hid**-0.5), high=(d_hid**-0.5)), + act="relu") + out = layers.fc(input=hidden, + size=d_hid, + num_flatten_dims=2, + param_attr=fluid.initializer.Uniform( + low=-(d_inner_hid**-0.5), high=(d_inner_hid**-0.5))) + return out + + +def pre_post_process_layer(prev_out, out, process_cmd, dropout=0.): + """ + Add residual connection, layer normalization and droput to the out tensor + optionally according to the value of process_cmd. + + This will be used before or after multi-head attention and position-wise + feed-forward networks. + """ + for cmd in process_cmd: + if cmd == "a": # add residual connection + out = out + prev_out if prev_out else out + elif cmd == "n": # add layer normalization + out = layers.layer_norm( + out, + begin_norm_axis=len(out.shape) - 1, + param_attr=fluid.initializer.Constant(1.), + bias_attr=fluid.initializer.Constant(0.)) + elif cmd == "d": # add dropout + if dropout: + out = layers.dropout(out, dropout_prob=dropout, is_test=False) + return out + + +pre_process_layer = partial(pre_post_process_layer, None) +post_process_layer = pre_post_process_layer + + +def prepare_encoder(src_word, + src_pos, + src_vocab_size, + src_emb_dim, + src_pad_idx, + src_max_len, + dropout=0., + pos_pad_idx=0, + pos_enc_param_name=None): + """Add word embeddings and position encodings. + The output tensor has a shape of: + [batch_size, max_src_length_in_batch, d_model]. + + This module is used at the bottom of the encoder stacks. + """ + src_word_emb = layers.embedding( + src_word, + size=[src_vocab_size, src_emb_dim], + padding_idx=src_pad_idx, + param_attr=fluid.initializer.Normal(0., 1.)) + src_pos_enc = layers.embedding( + src_pos, + size=[src_max_len, src_emb_dim], + padding_idx=pos_pad_idx, + param_attr=fluid.ParamAttr( + name=pos_enc_param_name, trainable=False)) + enc_input = src_word_emb + src_pos_enc + + # FIXME(guosheng): Decouple the program desc with batch_size. + enc_input = layers.reshape(x=enc_input, shape=[batch_size, -1, src_emb_dim]) + return layers.dropout( + enc_input, dropout_prob=dropout, + is_test=False) if dropout else enc_input + + +prepare_encoder = partial( + prepare_encoder, pos_enc_param_name=pos_enc_param_names[0]) +prepare_decoder = partial( + prepare_encoder, pos_enc_param_name=pos_enc_param_names[1]) + + +def encoder_layer(enc_input, + attn_bias, + n_head, + d_key, + d_value, + d_model, + d_inner_hid, + dropout_rate=0.): + """The encoder layers that can be stacked to form a deep encoder. + + This module consits of a multi-head (self) attention followed by + position-wise feed-forward networks and both the two components companied + with the post_process_layer to add residual connection, layer normalization + and droput. + """ + attn_output = multi_head_attention(enc_input, enc_input, enc_input, + attn_bias, d_key, d_value, d_model, + n_head, dropout_rate) + attn_output = post_process_layer(enc_input, attn_output, "dan", + dropout_rate) + ffd_output = positionwise_feed_forward(attn_output, d_inner_hid, d_model) + return post_process_layer(attn_output, ffd_output, "dan", dropout_rate) + + +def encoder(enc_input, + attn_bias, + n_layer, + n_head, + d_key, + d_value, + d_model, + d_inner_hid, + dropout_rate=0.): + """ + The encoder is composed of a stack of identical layers returned by calling + encoder_layer. + """ + for i in range(n_layer): + enc_output = encoder_layer(enc_input, attn_bias, n_head, d_key, d_value, + d_model, d_inner_hid, dropout_rate) + enc_input = enc_output + return enc_output + + +def decoder_layer(dec_input, + enc_output, + slf_attn_bias, + dec_enc_attn_bias, + n_head, + d_key, + d_value, + d_model, + d_inner_hid, + dropout_rate=0.): + """ The layer to be stacked in decoder part. + + The structure of this module is similar to that in the encoder part except + a multi-head attention is added to implement encoder-decoder attention. + """ + slf_attn_output = multi_head_attention( + dec_input, + dec_input, + dec_input, + slf_attn_bias, + d_key, + d_value, + d_model, + n_head, + dropout_rate, ) + slf_attn_output = post_process_layer( + dec_input, + slf_attn_output, + "dan", # residual connection + dropout + layer normalization + dropout_rate, ) + enc_attn_output = multi_head_attention( + slf_attn_output, + enc_output, + enc_output, + dec_enc_attn_bias, + d_key, + d_value, + d_model, + n_head, + dropout_rate, ) + enc_attn_output = post_process_layer( + slf_attn_output, + enc_attn_output, + "dan", # residual connection + dropout + layer normalization + dropout_rate, ) + ffd_output = positionwise_feed_forward( + enc_attn_output, + d_inner_hid, + d_model, ) + dec_output = post_process_layer( + enc_attn_output, + ffd_output, + "dan", # residual connection + dropout + layer normalization + dropout_rate, ) + return dec_output + + +def decoder(dec_input, + enc_output, + dec_slf_attn_bias, + dec_enc_attn_bias, + n_layer, + n_head, + d_key, + d_value, + d_model, + d_inner_hid, + dropout_rate=0.): + """ + The decoder is composed of a stack of identical decoder_layer layers. + """ + for i in range(n_layer): + dec_output = decoder_layer( + dec_input, + enc_output, + dec_slf_attn_bias, + dec_enc_attn_bias, + n_head, + d_key, + d_value, + d_model, + d_inner_hid, + dropout_rate, ) + dec_input = dec_output + return dec_output + + +def transformer( + src_vocab_size, + trg_vocab_size, + max_length, + n_layer, + n_head, + d_key, + d_value, + d_model, + d_inner_hid, + dropout_rate, + src_pad_idx, + trg_pad_idx, + pos_pad_idx, ): + file_obj = fluid.layers.open_recordio_file( + filename='./wmt16.recordio', + shapes=[ + [batch_size * max_length, 1], + [batch_size * max_length, 1], + [batch_size * max_length, 1], + [batch_size * max_length, 1], + [batch_size, n_head, max_length, max_length], + [batch_size, n_head, max_length, max_length], + [batch_size, n_head, max_length, max_length], + [batch_size * max_length, 1], + [batch_size * max_length, 1], + ], + dtypes=[ + 'int64', + 'int64', + 'int64', + 'int64', + 'float32', + 'float32', + 'float32', + 'int64', + 'float32', + ], + lod_levels=[0] * 9) + + src_word, src_pos, trg_word, trg_pos, src_slf_attn_bias, trg_slf_attn_bias, trg_src_attn_bias, gold, weights = fluid.layers.read_file( + file_obj) + + enc_input = prepare_encoder( + src_word, + src_pos, + src_vocab_size, + d_model, + src_pad_idx, + max_length, + dropout_rate, ) + enc_output = encoder( + enc_input, + src_slf_attn_bias, + n_layer, + n_head, + d_key, + d_value, + d_model, + d_inner_hid, + dropout_rate, ) + + dec_input = prepare_decoder( + trg_word, + trg_pos, + trg_vocab_size, + d_model, + trg_pad_idx, + max_length, + dropout_rate, ) + dec_output = decoder( + dec_input, + enc_output, + trg_slf_attn_bias, + trg_src_attn_bias, + n_layer, + n_head, + d_key, + d_value, + d_model, + d_inner_hid, + dropout_rate, ) + + # TODO(guosheng): Share the weight matrix between the embedding layers and + # the pre-softmax linear transformation. + predict = layers.reshape( + x=layers.fc(input=dec_output, + size=trg_vocab_size, + param_attr=fluid.initializer.Xavier(uniform=False), + bias_attr=False, + num_flatten_dims=2), + shape=[-1, trg_vocab_size], + act="softmax") + + cost = layers.cross_entropy(input=predict, label=gold) + weighted_cost = cost * weights + return layers.reduce_sum(weighted_cost) From 069b7266c5f142fae84dec361f0d80d6bbc4fe36 Mon Sep 17 00:00:00 2001 From: Helin Wang Date: Wed, 28 Mar 2018 14:53:13 -0700 Subject: [PATCH 21/25] test multi gpu executor in test_parallel_executor.py command: CUDA_VISIBLE_DEVICES=3 python -m unittest test_parallel_executor.TestResnet --- .../tests/unittests/test_parallel_executor.py | 37 ++++++++++++++----- 1 file changed, 28 insertions(+), 9 deletions(-) diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor.py b/python/paddle/fluid/tests/unittests/test_parallel_executor.py index bbfd03c638dac..7a1a60dcb88d9 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor.py @@ -191,29 +191,48 @@ def check_network_convergence(self, loss = method() adam = fluid.optimizer.Adam() adam.minimize(loss) + avg_loss = fluid.layers.mean(loss) + fluid.layers.Print(avg_loss) if memory_opt: fluid.memory_optimize(main) - exe = fluid.ParallelExecutor(loss_name=loss.name, use_cuda=True) + act_places = [] + for each in [ + fluid.CUDAPlace(i) + for i in range(fluid.core.get_cuda_device_count()) + ]: + p = fluid.core.Place() + p.set_place(each) + act_places.append(p) + + exe = fluid.core.MultiGPUExecutor( + act_places, + set([ + p.name + for p in fluid.default_main_program() + .global_block().iter_parameters() + ])) + exe.init(startup.desc, fluid.global_scope(), 0, True, True) + if batch_size is not None: - batch_size *= fluid.core.get_cuda_device_count() + batch_size *= len(act_places) begin = time.time() - first_loss, = exe.run([loss.name]) - first_loss = numpy.array(first_loss) + # first_loss, = exe.run([loss.name]) + # first_loss = numpy.array(first_loss) for i in xrange(iter): - exe.run([]) + exe.run(main.desc, fluid.global_scope(), 0, True, True) - last_loss, = exe.run([loss.name]) + # last_loss, = exe.run([loss.name]) end = time.time() if batch_size is not None: print "%.4f Instance per second" % ( (batch_size * iter + 2) / (end - begin)) - last_loss = numpy.array(last_loss) + # last_loss = numpy.array(last_loss) - print first_loss, last_loss + # print first_loss, last_loss # self.assertGreater(first_loss[0], last_loss[0]) @@ -262,7 +281,7 @@ class TestResnet(TestParallelExecutorBase): def test_resnet(self): import functools - batch_size = 4 + batch_size = 8 self.check_network_convergence( functools.partial( SE_ResNeXt152, batch_size=batch_size), From 924cada2c3babb94113b2b184a1f24181478063c Mon Sep 17 00:00:00 2001 From: Yang Yang Date: Tue, 27 Mar 2018 22:10:32 +0000 Subject: [PATCH 22/25] make bn inplace in img_conv_group by default --- python/paddle/fluid/nets.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/paddle/fluid/nets.py b/python/paddle/fluid/nets.py index 3b2e1a3073251..bbedf6fde0872 100644 --- a/python/paddle/fluid/nets.py +++ b/python/paddle/fluid/nets.py @@ -98,7 +98,7 @@ def __extend_list__(obj): use_mkldnn=use_mkldnn) if conv_with_batchnorm[i]: - tmp = layers.batch_norm(input=tmp, act=conv_act) + tmp = layers.batch_norm(input=tmp, act=conv_act, in_place=True) drop_rate = conv_batchnorm_drop_rate[i] if abs(drop_rate) > 1e-5: tmp = layers.dropout(x=tmp, dropout_prob=drop_rate) From 799446be8ed480d9b30f43d9cd5945bf91c9bff1 Mon Sep 17 00:00:00 2001 From: Yang Yang Date: Tue, 27 Mar 2018 22:09:43 +0000 Subject: [PATCH 23/25] add inplace attr to bn --- python/paddle/fluid/layers/nn.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/paddle/fluid/layers/nn.py b/python/paddle/fluid/layers/nn.py index 2ce68f95057f7..bb247859f7f22 100644 --- a/python/paddle/fluid/layers/nn.py +++ b/python/paddle/fluid/layers/nn.py @@ -1478,6 +1478,7 @@ def batch_norm(input, param_attr=None, bias_attr=None, data_layout='NCHW', + in_place=False, name=None, moving_mean_name=None, moving_variance_name=None): @@ -1533,7 +1534,7 @@ def batch_norm(input, saved_mean = helper.create_tmp_variable(dtype=dtype, stop_gradient=True) saved_variance = helper.create_tmp_variable(dtype=dtype, stop_gradient=True) - batch_norm_out = helper.create_tmp_variable(dtype) + batch_norm_out = input if in_place else helper.create_tmp_variable(dtype) helper.append_op( type="batch_norm", From e67070bb05598907c62e8d4b288bc91c97aec92a Mon Sep 17 00:00:00 2001 From: Yang Yang Date: Tue, 27 Mar 2018 21:54:07 +0000 Subject: [PATCH 24/25] merge append act inplace --- python/paddle/fluid/layer_helper.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/python/paddle/fluid/layer_helper.py b/python/paddle/fluid/layer_helper.py index da7e74c901e1f..bc1d3ac718ea1 100644 --- a/python/paddle/fluid/layer_helper.py +++ b/python/paddle/fluid/layer_helper.py @@ -398,14 +398,16 @@ def append_activation(self, input_var): return input_var if isinstance(act, basestring): act = {'type': act} - tmp = self.create_tmp_variable(dtype=input_var.dtype) + + if 'use_mkldnn' in self.kwargs: + act['use_mkldnn'] = self.kwargs.get('use_mkldnn') act_type = act.pop('type') self.append_op( type=act_type, inputs={"X": [input_var]}, - outputs={"Out": [tmp]}, + outputs={"Out": [input_var]}, attrs=act) - return tmp + return input_var def _get_default_initializer(self, dtype): if dtype is None or dtype_is_floating(dtype) is True: From 41f1a87eef83c104f72ee7021209f77358b47bc2 Mon Sep 17 00:00:00 2001 From: Yang Yang Date: Wed, 28 Mar 2018 22:42:32 +0000 Subject: [PATCH 25/25] add in place --- .../paddle/fluid/tests/unittests/test_parallel_executor.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor.py b/python/paddle/fluid/tests/unittests/test_parallel_executor.py index 7a1a60dcb88d9..ad8c8b4d7039d 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor.py @@ -58,7 +58,7 @@ def fc_with_batchnorm(): bias_attr=fluid.ParamAttr( initializer=fluid.initializer.Constant(value=1.0))) - hidden = fluid.layers.batch_norm(input=hidden) + hidden = fluid.layers.batch_norm(input=hidden, in_place=True) prediction = fluid.layers.fc(hidden, size=10, act='softmax') loss = fluid.layers.cross_entropy(input=prediction, label=label) @@ -96,7 +96,7 @@ def conv_bn_layer(input, num_filters, filter_size, stride=1, groups=1, groups=groups, act=None, bias_attr=False) - return fluid.layers.batch_norm(input=conv, act=act, momentum=0.1) + return fluid.layers.batch_norm(input=conv, act=act, momentum=0.1, in_place=True) def shortcut(input, ch_out, stride): @@ -281,7 +281,7 @@ class TestResnet(TestParallelExecutorBase): def test_resnet(self): import functools - batch_size = 8 + batch_size = 16 self.check_network_convergence( functools.partial( SE_ResNeXt152, batch_size=batch_size),