Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] C++ implementation of parallel executor #9035

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions doc/design/parallel_executor.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# ParallelExecutor Design Doc

## Introduction

We introduce `ParallelExecutor` to run multi-GPU training in PaddlePaddle Fluid. It supports
1. keeping a copy of the parameters on each GPU
1. allreduce on a separate stream allowing computation and communication overlap

An example of switching single GPU training to multiple GPUs:
```python
cost = your_neural_network()
opt = fluid.optimizer.SGDOptimizer()
opt.minimize(avg_cost)

# change Executor -> ParallelExecutor
exe = fluid.ParallelExecutor(gpu_list=[0, 1])

for iter in xranges(iter_num):
exe.run()
```

## Design

In the constructor, a list of parameter, whose gradients need to be allreduced, is given.

During the runtime, `ParallelExecutor` starts `#gpu` threads to run each `Executor`. For every
operator run on each GPU, it will automatically sync with different streams when necessary.

```c++
// if op's input is params' grad:
// sync with allreduce stream
// e.g. sgd should wait for allreduce to be finished
CallBack->BeforeOp(op);

op->Run(*local_scope, place_);

// if op's output is params' grad:
// sync with computation stream
// e.g. allreduce shoudl wait for fc_grad to be finished.
CallBack->AfterOp(op);
```

And the `Callback` object can be implemented as the following

```c++
struct AllReduceCallBack {
void BeforeOp(framework::OperatorBase* op);
void AfterOp(framework::OperatorBase* op);

std::unordered_set<std::string> reduced_param_grad_names;
std::unordered_set<std::string> param_grad_names_;

platform::DeviceContext* computation_dev_ctx; // computation device context
platform::DeviceContext* communication_dev_ctx; // communication device context

framework::Scope* scope;
platform::NCCL::Communicator* nccl_com;
};

AllReduceCallBack::BeforeOp(framework::OperatorBase* op) {
if (op->Input() in reduced_param_grad_names) {
communication_dev_ctx->Wait();
reduced_param_grad_names.erase(op->Input())
}
}

AllReduceCallBack::AfterOp(framework::OperatorBase* op) {
if (op->Output() in param_grad_names) {
computation_dev_ctx->Wait();
reduced_param_grad_names.insert(op->Output());
ncclAllreduce(scope, op->Output(), communication_dev_ctx);
}
}
```
2 changes: 2 additions & 0 deletions paddle/fluid/framework/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ cc_library(feed_fetch_method SRCS feed_fetch_method.cc DEPS lod_tensor scope glo

cc_library(executor SRCS executor.cc DEPS op_registry device_context scope
framework_proto backward glog lod_rank_table feed_fetch_method)
cc_library(multi_gpu_executor SRCS multi_gpu_executor.cc DEPS op_registry device_context scope
framework_proto backward glog lod_rank_table feed_fetch_method executor reader)

cc_library(prune SRCS prune.cc DEPS framework_proto)
cc_test(prune_test SRCS prune_test.cc DEPS op_info prune recurrent_op device_context)
Expand Down
39 changes: 28 additions & 11 deletions paddle/fluid/framework/executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,32 @@ void Executor::RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope,
} // if (create_local_scope)
} // if (create_vars)

RunOperators(ctx, local_scope);

platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance();
auto& dev_ctx = *pool.Get(place_);
dev_ctx.Wait();

if (FLAGS_benchmark) {
VLOG(2) << "-------------------------------------------------------";
VLOG(2) << "Memory used after operator run: "
<< memory::memory_usage(place_);
VLOG(2) << "-------------------------------------------------------";
}

if (create_vars && create_local_scope) {
scope->DeleteScope(local_scope);
}
if (FLAGS_benchmark) {
VLOG(2) << "-------------------------------------------------------";
VLOG(2) << "Memory used after deleting local scope: "
<< memory::memory_usage(place_);
VLOG(2) << "-------------------------------------------------------";
}
}

void Executor::RunOperators(const ExecutorPrepareContext* ctx,
const Scope* local_scope) const {
for (auto& op : ctx->ops_) {
VLOG(3) << place_ << " " << op->DebugStringEx(local_scope);
op->Run(*local_scope, place_);
Expand All @@ -326,21 +352,12 @@ void Executor::RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope,
for (auto& vname : op->OutputVars(true)) {
auto* var = local_scope->FindVar(vname);
if (var == nullptr) continue;
if (var->IsType<framework::LoDTensor>()) {
CheckTensorNANOrInf(vname, var->Get<framework::LoDTensor>());
if (var->IsType<LoDTensor>()) {
CheckTensorNANOrInf(vname, var->Get<LoDTensor>());
}
}
}
}
if (create_vars && create_local_scope) {
scope->DeleteScope(local_scope);
}
if (FLAGS_benchmark) {
VLOG(2) << "-------------------------------------------------------";
VLOG(2) << "Memory used after deleting local scope: "
<< memory::memory_usage(place_);
VLOG(2) << "-------------------------------------------------------";
}
}

} // namespace framework
Expand Down
6 changes: 5 additions & 1 deletion paddle/fluid/framework/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ limitations under the License. */
#pragma once

#include "paddle/fluid/framework/op_info.h"
#include "paddle/fluid/framework/operator.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/tensor.h"
Expand All @@ -39,6 +40,7 @@ class Executor {
: Executor(device.GetPlace()) {}

explicit Executor(const platform::Place& place);
virtual ~Executor() {}

/* @Brief
* Runtime evaluation of the given ProgramDesc under certain Scope
Expand All @@ -64,7 +66,9 @@ class Executor {
bool create_local_scope = true,
bool create_vars = true);

private:
virtual void RunOperators(const ExecutorPrepareContext* ctx,
const Scope* local_scope) const;

const platform::Place place_;
};

Expand Down
197 changes: 197 additions & 0 deletions paddle/fluid/framework/multi_gpu_executor.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#include "paddle/fluid/framework/multi_gpu_executor.h"

#include <thread>
#include "paddle/fluid/framework/operator.h"
#include "paddle/fluid/framework/reader.h"

namespace paddle {
namespace framework {

ExecutorWithAllReduce::ExecutorWithAllReduce(
const platform::Place& place, std::unordered_set<std::string>* param_grads,
NCCLContext* nccl_context)
: Executor(place), param_grads_(param_grads) {
int device_id = boost::get<platform::CUDAPlace>(place).device;
comm_ = &nccl_context->comms_[device_id];
io_ctx_ = nccl_context->ctxs_[device_id];
}

// TODO(yy): Move this function somewhere
ncclDataType_t ToNCCLDataType(std::type_index type) {
if (type == typeid(float)) { // NOLINT
return ncclFloat;
} else if (type == typeid(double)) { // NOLINT
return ncclDouble;
} else if (type == typeid(int)) { // NOLINT
return ncclInt;
} else {
PADDLE_THROW("Not supported");
}
}

void ExecutorWithAllReduce::RunOperators(const ExecutorPrepareContext* ctx,
const Scope* local_scope) const {
cudaSetDevice(boost::get<platform::CUDAPlace>(place_).device);

platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance();
auto* dev_ctx = pool.Get(place_);
cudaStream_t computation_stream =
reinterpret_cast<const platform::CUDADeviceContext*>(dev_ctx)->stream();
cudaStream_t all_reduce_stream = io_ctx_->stream();

std::unordered_map<std::string, cudaEvent_t> computation_event;
std::unordered_map<std::string, cudaEvent_t> all_reduce_event;
for (auto& argu : *param_grads_) {
PADDLE_ENFORCE(cudaEventCreateWithFlags(&computation_event[argu],
cudaEventDisableTiming));
PADDLE_ENFORCE(cudaEventCreateWithFlags(&all_reduce_event[argu],
cudaEventDisableTiming));
}

for (auto& op : ctx->ops_) {
// sgd should wait for allreduce finished
for (auto& param2argu : op->Inputs()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of looping every input every time, perhaps the op can cache the param inputs and only wait for them

for (auto& argu : param2argu.second) {
if (param_grads_->count(argu) != 0) {
VLOG(5) << place_ << " " << op->Type() << param2argu.first << " "
<< argu;
PADDLE_ENFORCE(cudaStreamWaitEvent(computation_stream,
all_reduce_event[argu], 0));
}
}
}

VLOG(4) << place_ << " " << op->DebugStringEx(local_scope);
op->Run(*local_scope, place_);
VLOG(3) << place_ << " " << op->DebugStringEx(local_scope);

for (auto& param2argu : op->Outputs()) {
for (auto& argu : param2argu.second) {
if (param_grads_->count(argu) != 0) {
VLOG(5) << place_ << " " << op->Type() << " Launch allreduce on "
<< argu;

PADDLE_ENFORCE(
cudaEventRecord(computation_event[argu], computation_stream));
PADDLE_ENFORCE(cudaStreamWaitEvent(all_reduce_stream,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to block the next computation op. We should profile and see how much it hurts

computation_event[argu], 0));

auto& tensor = local_scope->FindVar(argu)->Get<LoDTensor>();
void* data = const_cast<void*>(tensor.data<void>());
PADDLE_ENFORCE(platform::dynload::ncclAllReduce(
data, data, tensor.numel(), ToNCCLDataType(tensor.type()),
ncclSum, *comm_, all_reduce_stream));

PADDLE_ENFORCE(
cudaEventRecord(all_reduce_event[argu], all_reduce_stream));
}
}
}
}

cudaStreamSynchronize(computation_stream);
cudaStreamSynchronize(all_reduce_stream);
for (auto& argu : *param_grads_) {
PADDLE_ENFORCE(cudaEventDestroy(computation_event[argu]));
PADDLE_ENFORCE(cudaEventDestroy(all_reduce_event[argu]));
}
}

MultiGPUExecutor::MultiGPUExecutor(
const std::vector<platform::Place>& places,
const std::unordered_set<std::string>& params)
: nccl_ctx_(places), params_(params) {
for (auto& param : params) {
param_grads_.insert(GradVarName(param));
}
for (size_t i = 0; i < places.size(); ++i) {
auto& place = places[i];
exes_.push_back(
framework::ExecutorWithAllReduce(place, &param_grads_, &nccl_ctx_));
}
}

void MultiGPUExecutor::Init(const ProgramDesc& prog, Scope* scope, int block_id,
bool create_local_scope, bool create_vars) {
// init parameters on one device
auto replicas = scope->replicas(exes_.size() - 1);
exes_[0].Run(prog, scope, block_id, create_local_scope, create_vars);

for (auto* var_desc : prog.Block(0).AllVars()) {
if (var_desc->GetType() == proto::VarType::LOD_TENSOR) {
auto& main_tensor = scope->FindVar(var_desc->Name())->Get<LoDTensor>();
ncclDataType_t data_type = ToNCCLDataType(main_tensor.type());
auto& dims = main_tensor.dims();
size_t numel = main_tensor.numel();

platform::dynload::ncclGroupStart();
for (size_t i = 0; i < exes_.size(); ++i) {
void* buffer;
if (i == 0) {
buffer = const_cast<void*>(main_tensor.data<void>());
} else {
auto local_scope = replicas[i - 1];
auto* t = local_scope->Var(var_desc->Name())->GetMutable<LoDTensor>();
t->Resize(dims);
buffer = t->mutable_data(exes_[i].place_, main_tensor.type());
}

platform::dynload::ncclBcast(buffer, numel, data_type, 0,
nccl_ctx_.comms_[i],
nccl_ctx_.ctxs_[i]->stream());
}
platform::dynload::ncclGroupEnd();
} else if (var_desc->GetType() == proto::VarType::READER) {
VLOG(4) << "Copy reader " << var_desc->Name();
auto& reader =
scope->FindVar(var_desc->Name())->Get<framework::ReaderHolder>();
for (size_t i = 0; i < replicas.size(); ++i) {
auto* reader_dup = replicas[i]
->Var(var_desc->Name())
->GetMutable<framework::ReaderHolder>();
*reader_dup = reader;
}
}
}
}

void MultiGPUExecutor::Run(const ProgramDesc& prog, Scope* scope, int block_id,
bool create_local_scope, bool create_vars) {
// prepare prog in a single thread to avoid race
auto context = exes_[0].Prepare(prog, block_id);
auto replicas = scope->replicas(exes_.size() - 1);
std::vector<std::thread> threads;
for (size_t i = 0; i < exes_.size(); ++i) {
threads.push_back(std::thread([&, i] {
Scope* cur_scope;
if (i == 0) {
cur_scope = scope;
} else {
cur_scope = replicas[i - 1].get();
}
exes_[i].RunPreparedContext(context.get(), cur_scope, create_local_scope,
create_vars);
}));
}

for (auto& t : threads) {
t.join();
}
}

} // namespace framework
} // namespace paddle
Loading