Skip to content

Commit

Permalink
fuse reduce op
Browse files Browse the repository at this point in the history
  • Loading branch information
chengduoZH committed May 15, 2018
1 parent 9923be5 commit d885304
Show file tree
Hide file tree
Showing 5 changed files with 405 additions and 9 deletions.
5 changes: 3 additions & 2 deletions paddle/fluid/framework/details/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@ if(WITH_GPU)
set(multi_devices_graph_builder_deps nccl_all_reduce_op_handle)
nv_library(reduce_op_handle SRCS reduce_op_handle.cc DEPS op_handle_base variable_visitor scope ddim dynload_cuda)
nv_library(broadcast_op_handle SRCS broadcast_op_handle.cc DEPS op_handle_base scope ddim memory variable_visitor dynload_cuda)

nv_library(reduce_blockop_handle SRCS reduce_blockop_handle.cc DEPS op_handle_base scope ddim memory variable_visitor dynload_cuda)
else()
set(multi_devices_graph_builder_deps)
cc_library(reduce_op_handle SRCS reduce_op_handle.cc DEPS op_handle_base variable_visitor scope ddim)
cc_library(broadcast_op_handle SRCS broadcast_op_handle.cc DEPS op_handle_base scope ddim memory variable_visitor)
cc_library(reduce_blockop_handle SRCS reduce_blockop_handle.cc DEPS op_handle_base scope ddim memory variable_visitor)
endif()

cc_library(gather_op_handle SRCS gather_op_handle.cc DEPS op_handle_base scope ddim memory variable_visitor)

cc_library(multi_devices_graph_builder SRCS multi_devices_graph_builder.cc DEPS ssa_graph_builder computation_op_handle
scale_loss_grad_op_handle send_op_handle ${multi_devices_graph_builder_deps} reduce_op_handle broadcast_op_handle)
scale_loss_grad_op_handle send_op_handle ${multi_devices_graph_builder_deps} reduce_op_handle broadcast_op_handle reduce_blockop_handle)

cc_library(ssa_graph_executor SRCS ssa_graph_executor.cc DEPS ssa_graph framework_proto)
cc_library(threaded_ssa_graph_executor SRCS threaded_ssa_graph_executor.cc DEPS fetch_op_handle ssa_graph_executor scope
Expand Down
164 changes: 158 additions & 6 deletions paddle/fluid/framework/details/multi_devices_graph_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <utility>
#include "paddle/fluid/framework/details/broadcast_op_handle.h"
#include "paddle/fluid/framework/details/computation_op_handle.h"
#include "paddle/fluid/framework/details/reduce_blockop_handle.h"
#include "paddle/fluid/framework/details/reduce_op_handle.h"
#include "paddle/fluid/framework/details/scale_loss_grad_op_handle.h"
#include "paddle/fluid/framework/details/send_op_handle.h"
Expand Down Expand Up @@ -113,9 +114,9 @@ bool MultiDevSSAGraphBuilder::IsDistTrainOp(const OpDesc &op,

std::unique_ptr<SSAGraph> MultiDevSSAGraphBuilder::Build(
const ProgramDesc &program) const {
std::unordered_map<std::string, proto::VarType::Type> var_types;
std::unordered_map<std::string, VarDesc *> all_vars;
for (auto *var : program.Block(0).AllVars()) {
var_types[var->Name()] = var->GetType();
all_vars[var->Name()] = var;
}

auto graph = new SSAGraph();
Expand Down Expand Up @@ -172,7 +173,7 @@ std::unique_ptr<SSAGraph> MultiDevSSAGraphBuilder::Build(
og.substr(0, og.size() - strlen(kGradVarSuffix)));
cur_device_id = (cur_device_id + 1) % places_.size();
} else {
if (IsSparseGradient(var_types, og)) {
if (IsSparseGradient(all_vars, og)) {
CreateReduceOp(&result, og, 0);
CreateBroadcastOp(&result, og, 0);
} else {
Expand All @@ -192,6 +193,10 @@ std::unique_ptr<SSAGraph> MultiDevSSAGraphBuilder::Build(
CreateBroadcastOp(&result, bcast_name, dev_id);
}
}

// Fuse reduce operation
FuseReduceOpHandles(all_vars, &result);

/*
Dependency graph has been constructed. However, there are still data
harzaeds need to be handled.
Expand All @@ -212,11 +217,158 @@ std::unique_ptr<SSAGraph> MultiDevSSAGraphBuilder::Build(
return std::unique_ptr<SSAGraph>(graph);
}

// Fuse reduce operation
void MultiDevSSAGraphBuilder::FuseReduceOpHandles(
const std::unordered_map<std::string, VarDesc *> &all_vars,
SSAGraph *result) const {
std::vector<std::unordered_set<OpHandleBase *>> reduce_op_handles;
std::vector<std::unordered_set<VarHandle *>> input_set_of_reduce_op;
std::vector<std::unordered_set<VarHandle *>> output_set_of_reduce_op;
reduce_op_handles.resize(this->places_.size());
input_set_of_reduce_op.resize(this->places_.size());
output_set_of_reduce_op.resize(this->places_.size());

bool has_reduce_op = false;
// First, collect all the reduce_ops
// Only fuse the reduce_ops whose outputs' type is proto::VarType::LOD_TENSOR
// and output's place is on GPU .
for (size_t i = 0; i < result->ops_.size(); ++i) {
ReduceOpHandle *reduce_op =
dynamic_cast<ReduceOpHandle *>(result->ops_[i].get());
if (reduce_op) {
auto out_var_handle = dynamic_cast<VarHandle *>(reduce_op->Outputs()[0]);
PADDLE_ENFORCE_NOT_NULL(out_var_handle);
if (all_vars.at(out_var_handle->name_)->GetType() !=
proto::VarType::LOD_TENSOR ||
platform::is_cpu_place(out_var_handle->place_)) {
continue;
}
size_t dev_id = out_var_handle->scope_idx_;
reduce_op_handles.at(dev_id).insert(result->ops_[i].get());
for (auto in : reduce_op->Inputs()) {
input_set_of_reduce_op.at(dev_id).insert(dynamic_cast<VarHandle *>(in));
}
output_set_of_reduce_op.at(dev_id).insert(out_var_handle);
has_reduce_op = true;
}
}
if (!has_reduce_op) return;

// Second, alloc continuous address and create new reduce_op
for (size_t dev_id = 0; dev_id < reduce_op_handles.size(); ++dev_id) {
// AllocateContinuousAddress
auto out_var_handles = output_set_of_reduce_op[dev_id];
if (out_var_handles.size() == 0) continue;

platform::Place run_place = (*out_var_handles.begin())->place_;
proto::VarType::Type out0_var_data_type =
all_vars.at((*out_var_handles.begin())->name_)->GetDataType();

int64_t total_numel = 0;
for (auto out_var_handle : out_var_handles) {
auto var_desc = all_vars.at(out_var_handle->name_);

PADDLE_ENFORCE_EQ(out0_var_data_type, var_desc->GetDataType());
PADDLE_ENFORCE_EQ(run_place, out_var_handle->place_);

auto dim = framework::make_ddim(var_desc->GetShape());
int64_t numel = framework::product(dim);
PADDLE_ENFORCE_GT(numel, 0);
total_numel += numel;
}

auto reduce_var_name = string::Sprintf("REDUCEBLOCK_DATA_%d", dev_id);
for (size_t k = 0; k < this->places_.size(); ++k) {
// Allocate gradients space
auto reduce_t = this->local_scopes_.at(k)
->Var(reduce_var_name)
->GetMutable<LoDTensor>();
PADDLE_ENFORCE(platform::is_gpu_place(this->places_[k]));

reduce_t->Resize(make_ddim({total_numel}))
.mutable_data(this->places_[k],
framework::ToTypeIndex(out0_var_data_type));

VLOG(8) << this->places_[k] << " " << reduce_var_name
<< " total_numel: " << total_numel;

int64_t s = 0;
for (auto out : out_var_handles) {
LoDTensor *tensor =
this->local_scopes_.at(k)->Var(out->name_)->GetMutable<LoDTensor>();

int64_t mem_size = framework::product(
framework::make_ddim(all_vars.at(out->name_)->GetShape()));
tensor->ShareDataWith(reduce_t->Slice(s, s + mem_size));

VLOG(8) << out->name_ << " mem_size:" << mem_size << " s:" << s
<< " e:" << s + mem_size;

s += mem_size;
}
}
CreateReduceBlockOp(result, dev_id, reduce_var_name,
input_set_of_reduce_op.at(dev_id),
output_set_of_reduce_op.at(dev_id));
}
RemoveOps(reduce_op_handles, result);
}

void MultiDevSSAGraphBuilder::RemoveOps(
const std::vector<std::unordered_set<OpHandleBase *>> &reduce_op_handles,
SSAGraph *result) const {
for (auto iter = result->ops_.begin(); iter != result->ops_.end();) {
bool remove_op = false;
for (size_t j = 0; j < reduce_op_handles.size(); ++j) {
if (reduce_op_handles[j].count((*iter).get())) {
for (auto in_var : (*iter)->Inputs()) {
in_var->pending_ops_.erase(in_var->pending_ops_.find((*iter).get()));
}
iter = result->ops_.erase(iter);
remove_op = true;
break;
}
}
if (!remove_op) {
iter++;
}
}
}

void MultiDevSSAGraphBuilder::CreateReduceBlockOp(
SSAGraph *result, const int root_id, const std::string &reduce_var_name,
const std::unordered_set<VarHandle *> &inputs,
const std::unordered_set<VarHandle *> &outputs) const {
#ifdef PADDLE_WITH_CUDA
result->ops_.emplace_back(new ReduceBlockOpHandle(
local_scopes_, places_, reduce_var_name, root_id, nccl_ctxs_));
#else
result->ops_.emplace_back(new ReduceBlockOpHandle(local_scopes_, places_,
reduce_var_name, root_id));
#endif
auto *op_handle = result->ops_.back().get();

#ifndef PADDLE_WITH_CUDA
for (size_t i = 0; i < places_.size(); ++i) {
auto &p = places_[i];
op_handle->SetDeviceContext(p,
platform::DeviceContextPool::Instance().Get(p));
}
#endif

for (auto input : inputs) {
op_handle->AddInput(input);
}
for (auto output : outputs) {
op_handle->AddOutput(output);
}
}

bool MultiDevSSAGraphBuilder::IsSparseGradient(
const std::unordered_map<std::string, proto::VarType::Type> &var_types,
const std::unordered_map<std::string, VarDesc *> &all_vars,
const std::string &og) const {
PADDLE_ENFORCE(var_types.count(og) != 0);
if (var_types.at(og) == proto::VarType::SELECTED_ROWS) {
PADDLE_ENFORCE(all_vars.count(og) != 0);
if (all_vars.at(og)->GetType() == proto::VarType::SELECTED_ROWS) {
return true;
}
return false;
Expand Down
15 changes: 14 additions & 1 deletion paddle/fluid/framework/details/multi_devices_graph_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,21 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder {
OpDesc *GetSendOpDesc(const ProgramDesc &program) const;

bool IsSparseGradient(
const std::unordered_map<std::string, proto::VarType::Type> &var_types,
const std::unordered_map<std::string, VarDesc *> &all_vars,
const std::string &og) const;

void FuseReduceOpHandles(
const std::unordered_map<std::string, VarDesc *> &all_vars,
SSAGraph *result) const;

void CreateReduceBlockOp(
SSAGraph *result, const int root_id, const std::string &reduce_var_name,
const std::unordered_set<VarHandle *> &inputs,
const std::unordered_set<VarHandle *> &outputs) const;

void RemoveOps(
const std::vector<std::unordered_set<OpHandleBase *>> &reduce_op_handles,
SSAGraph *result) const;
};
} // namespace details
} // namespace framework
Expand Down
145 changes: 145 additions & 0 deletions paddle/fluid/framework/details/reduce_blockop_handle.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/framework/details/reduce_blockop_handle.h"
#include "paddle/fluid/framework/details/container_cast.h"
#include "paddle/fluid/framework/details/reduce_and_gather.h"
#include "paddle/fluid/framework/details/variable_visitor.h"

namespace paddle {
namespace framework {
namespace details {

void ReduceBlockOpHandle::RunImpl() {
if (places_.size() == 1) return;

WaitInputVarGenerated();

auto in_var_handles = DynamicCast<VarHandle>(inputs_);
auto out_var_handles = DynamicCast<VarHandle>(outputs_);

PADDLE_ENFORCE_EQ(in_var_handles.size(),
places_.size() * out_var_handles.size(),
"The number of input and output is not consistent.");

#ifdef PADDLE_WITH_CUDA
std::vector<const LoDTensor *> lod_tensors = GetInputValues();
auto type = lod_tensors[0]->type();
auto numel = lod_tensors[0]->numel();
std::vector<std::function<void()>> nccl_reduce_calls;
auto root_dev_id =
boost::get<platform::CUDAPlace>(places_[dst_scope_id_]).device;
for (size_t i = 0; i < local_scopes_.size(); ++i) {
int dev_id =
boost::get<platform::CUDAPlace>(lod_tensors[i]->place()).device;
auto &nccl_ctx = nccl_ctxs_->at(dev_id);

void *buffer = const_cast<void *>(lod_tensors[i]->data<void>());
void *recvbuffer = nullptr;
if (i == dst_scope_id_) {
auto reduce_var = local_scopes_[dst_scope_id_]->FindVar(var_name_);
recvbuffer = reduce_var->GetMutable<framework::LoDTensor>()->mutable_data(
lod_tensors[dst_scope_id_]->place());
}

int nccl_type = platform::ToNCCLDataType(type);
nccl_reduce_calls.emplace_back(
[buffer, recvbuffer, nccl_type, numel, root_dev_id, &nccl_ctx] {
PADDLE_ENFORCE(platform::dynload::ncclReduce(
buffer, recvbuffer, numel, static_cast<ncclDataType_t>(nccl_type),
ncclSum, root_dev_id, nccl_ctx.comm_, nccl_ctx.stream()));
});
}
#endif

std::vector<const Scope *> var_scopes;
for (auto *s : local_scopes_) {
auto var = s->FindVar(kLocalExecScopeName);
PADDLE_ENFORCE_NOT_NULL(var);
var_scopes.emplace_back(var->Get<Scope *>());
}

// Reduce the variable which is in CPU side.
std::vector<std::vector<const LoDTensor *>> cpu_lod_tensors;
std::vector<Variable *> cpu_out_vars;
for (auto out_var_h : out_var_handles) {
auto out_var = var_scopes[dst_scope_id_]->FindVar(out_var_h->name_);
PADDLE_ENFORCE_NOT_NULL(out_var);

if (platform::is_cpu_place(out_var->Get<LoDTensor>().place())) {
std::vector<const LoDTensor *> lod_tensors;
for (size_t i = 0; i < places_.size(); ++i) {
lod_tensors.emplace_back(
&(var_scopes[i]->FindVar(out_var_h->name_)->Get<LoDTensor>()));
PADDLE_ENFORCE(platform::is_cpu_place(lod_tensors.back()->place()));
}

cpu_lod_tensors.push_back(lod_tensors);
cpu_out_vars.push_back(out_var);
}
}
#ifdef PADDLE_WITH_CUDA
this->RunAndRecordEvent([&] {
{
platform::NCCLGroupGuard guard;
for (auto &call : nccl_reduce_calls) {
call();
}
}
for (size_t i = 0; i < cpu_lod_tensors.size(); ++i) {
ReduceLoDTensor func(cpu_lod_tensors[i],
cpu_out_vars[i]->GetMutable<framework::LoDTensor>());
VisitDataType(ToDataType(cpu_lod_tensors[i][0]->type()), func);
}
});
#endif
}

std::vector<const LoDTensor *> ReduceBlockOpHandle::GetInputValues() {
std::vector<const LoDTensor *> lod_tensors;

auto &value_0 = GetInputValue(0);
lod_tensors.emplace_back(&value_0);

std::unordered_set<int> dev_id_set;
auto dev_id = boost::get<platform::CUDAPlace>(value_0.place()).device;
dev_id_set.insert(dev_id);

for (size_t i = 1; i < local_scopes_.size(); ++i) {
auto &value = GetInputValue(i);
PADDLE_ENFORCE_EQ(value_0.type(), value.type());
PADDLE_ENFORCE_EQ(value_0.numel(), value.numel());
lod_tensors.emplace_back(&value);

auto dev_id = boost::get<platform::CUDAPlace>(value.place()).device;
if (dev_id_set.count(dev_id)) {
PADDLE_THROW("dev_%d has been in dev_id_set.", dev_id);
}
dev_id_set.insert(dev_id);
}
return lod_tensors;
}

const LoDTensor &ReduceBlockOpHandle::GetInputValue(size_t idx) {
auto reduce_var = local_scopes_.at(idx)->FindVar(var_name_);
PADDLE_ENFORCE_NOT_NULL(reduce_var, "%s is not found.", var_name_);
auto &lod_tensor = reduce_var->Get<LoDTensor>();
PADDLE_ENFORCE(platform::is_gpu_place(lod_tensor.place()));
return lod_tensor;
}

std::string ReduceBlockOpHandle::Name() const { return "reduce_block"; }
} // namespace details
} // namespace framework
} // namespace paddle
Loading

0 comments on commit d885304

Please sign in to comment.