Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/parallel_do #6730

Merged
merged 18 commits into from
Jan 5, 2018
3 changes: 2 additions & 1 deletion paddle/framework/backward.cc
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,8 @@ std::vector<std::unique_ptr<OpDescBind>> MakeBlockBackward(
VLOG(5) << "Making backward " << (*it)->Type() << " op";
std::vector<std::unique_ptr<OpDescBind>> op_grads;

if ((*it)->Type() == "recurrent" || (*it)->Type() == "while") {
if ((*it)->Type() == "recurrent" || (*it)->Type() == "while" ||
(*it)->Type() == "parallel_do") {
int step_block_idx = (*it)->GetBlockAttr("sub_block");
BlockDescBind* backward_block = CreateStepBlock(
program_desc, no_grad_vars, grad_to_var, step_block_idx);
Expand Down
25 changes: 25 additions & 0 deletions paddle/framework/lod_tensor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -314,5 +314,30 @@ void DeserializeFromStream(std::istream &is, LoDTensor *tensor) {
}
}

void LoDTensor::MergeLoDTensor(
const std::vector<const LoDTensor *> &lod_tensors, platform::Place place) {
PADDLE_ENFORCE(platform::is_cpu_place(place));
PADDLE_ENFORCE(!lod_tensors.empty());

framework::DDim new_dim = lod_tensors[0]->dims();
std::type_index new_type = lod_tensors[0]->type();
for (auto *lod : lod_tensors) {
PADDLE_ENFORCE(new_dim == lod->dims());
PADDLE_ENFORCE(new_type == lod->type());
PADDLE_ENFORCE(platform::is_cpu_place(lod->place()));
}
new_dim[0] *= lod_tensors.size();
Resize(new_dim);

auto *dst_ptr = reinterpret_cast<uint8_t *>(mutable_data(place, new_type));
for (auto *src : lod_tensors) {
auto size = src->numel() * SizeOfType(src->type());
memory::Copy(boost::get<platform::CPUPlace>(place), dst_ptr,
boost::get<platform::CPUPlace>(src->place()),
src->data<void>(), size);
dst_ptr += size;
}
}

} // namespace framework
} // namespace paddle
3 changes: 3 additions & 0 deletions paddle/framework/lod_tensor.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ class LoDTensor : public Tensor {
*/
void ShrinkInLevel(size_t level, size_t elem_begin, size_t elem_end);

void MergeLoDTensor(const std::vector<const LoDTensor*>& lod_tensors,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to add comments about this function?

platform::Place place);

private:
LoD lod_;
};
Expand Down
20 changes: 16 additions & 4 deletions paddle/framework/operator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,13 @@ static const Tensor* GetTensorFromVar(const Variable* var) {
const Tensor* t = nullptr;
if (var->IsType<LoDTensor>()) {
t = &(var->Get<LoDTensor>());
} else if (var->IsType<Tensor>()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A little confused about it: do the ParallelDo operator need it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a leftover from debugging. Sorry for the confusion.

t = &(var->Get<Tensor>());
} else if (var->IsType<SelectedRows>()) {
t = &(var->Get<SelectedRows>().value());
} else {
PADDLE_THROW("Variable type must be LoDTensor/SelectedRows.");
PADDLE_THROW("Variable type_id %s, expect LoDTensor/SelectedRows.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

expect LoDTensor/SelectedRows => expect LoDTensor/SelectedRows/Tensor?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a leftover from debugging. Sorry for the confusion.

var->Type().name());
}
return t;
}
Expand All @@ -191,10 +194,13 @@ static Tensor* GetMutableTensorFromVar(Variable* var) {
Tensor* t = nullptr;
if (var->IsType<LoDTensor>()) {
t = var->GetMutable<LoDTensor>();
} else if (var->IsType<Tensor>()) {
t = var->GetMutable<Tensor>();
} else if (var->IsType<SelectedRows>()) {
t = var->GetMutable<SelectedRows>()->mutable_value();
} else {
PADDLE_THROW("Variable type must be LoDTensor/SelectedRows.");
PADDLE_THROW("Variable type_id %s, expect LoDTensor/SelectedRows.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

expect LoDTensor/SelectedRows => expect LoDTensor/SelectedRows/Tensor?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a leftover from debugging. Sorry for the confusion.

var->Type().name());
}
return t;
}
Expand Down Expand Up @@ -359,21 +365,27 @@ class RuntimeInferShapeContext : public InferShapeContext {
Variable* var = scope_.FindVar(name);
if (var->IsType<LoDTensor>()) {
return var->Get<LoDTensor>().dims();
} else if (var->IsType<Tensor>()) {
return var->Get<Tensor>().dims();
} else if (var->IsType<SelectedRows>()) {
return var->Get<SelectedRows>().GetCompleteDims();
} else {
PADDLE_THROW("Variable type must be LoDTensor/SelectedRows.");
PADDLE_THROW("Variable %s type_id %s, expect LoDTensor/SelectedRows.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a leftover from debugging. Sorry for the confusion.

name, var->Type().name());
}
}

void SetDim(const std::string& name, const DDim& dim) override {
Variable* var = scope_.FindVar(name);
if (var->IsType<LoDTensor>()) {
var->GetMutable<LoDTensor>()->Resize(dim);
} else if (var->IsType<Tensor>()) {
var->GetMutable<Tensor>()->Resize(dim);
} else if (var->IsType<SelectedRows>()) {
var->GetMutable<SelectedRows>()->set_height(dim[0]);
} else {
PADDLE_THROW("Variable type must be LoDTensor/SelectedRows.");
PADDLE_THROW("Variable %s type_id %s, expect LoDTensor/SelectedRows.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a leftover from debugging. Sorry for the confusion.

name, var->Type().name());
}
}

Expand Down
11 changes: 11 additions & 0 deletions paddle/framework/tensor.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class Tensor {
template <typename T>
inline const T* data() const;

inline void switch_place(platform::Place new_place);

/**
* @brief Return a pointer to mutable memory block.
* @note If not exist, then allocation.
Expand Down Expand Up @@ -183,6 +185,15 @@ class Tensor {
size_t offset_;
};

inline void Tensor::switch_place(platform::Place new_place) {
if (holder_->place() == new_place) {
return;
}

// TODO(tonyyang-svail): do memcpy here.
PADDLE_THROW("Not Implemented");
}

} // namespace framework
} // namespace paddle

Expand Down
2 changes: 2 additions & 0 deletions paddle/operators/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ set(DEPS_OPS
cond_op
cross_entropy_op
recurrent_op
parallel_do_op
softmax_with_cross_entropy_op
softmax_op
sequence_softmax_op
Expand Down Expand Up @@ -256,6 +257,7 @@ op_library(lstm_op DEPS sequence2batch lstm_compute)
op_library(conv_transpose_op DEPS vol2col)
op_library(gru_op DEPS sequence2batch gru_compute)
op_library(recurrent_op SRCS recurrent_op.cc DEPS executor)
op_library(parallel_do_op SRCS parallel_do_op.cc DEPS executor)

# FIXME(typhoonzero): save/load depends lodtensor serialization functions
op_library(save_op DEPS lod_tensor)
Expand Down
203 changes: 203 additions & 0 deletions paddle/operators/parallel_do_op.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The license format is not correct, please refer to #7022

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.


#include <vector>
#include "chunk_eval_op.h"
#include "paddle/framework/executor.h"
#include "paddle/framework/op_registry.h"
#include "paddle/framework/operator.h"
#include "paddle/platform/place.h"

namespace paddle {
namespace operators {

constexpr char kInputs[] = "inputs";
constexpr char kParameters[] = "parameters";
constexpr char kPlaces[] = "places";

constexpr char kOutputs[] = "outputs";
constexpr char kParallelScopes[] = "parallel_scopes";

constexpr char kParallelBlock[] = "sub_block";

using ParallelScopeVar = std::vector<framework::Scope *>;
using OperatorBase = framework::OperatorBase;

class ParallelDoOp : public OperatorBase {
public:
ParallelDoOp(const std::string &type,
const framework::VariableNameMap &inputs,
const framework::VariableNameMap &outputs,
const framework::AttributeMap &attrs)
: OperatorBase(type, inputs, outputs, attrs) {}

void Run(const framework::Scope &scope,
const platform::DeviceContext &dev_ctx) const override {
auto *block = Attr<framework::BlockDescBind *>(kParallelBlock);
auto *program = block->Program();

// TODO(tonyyang-svail): get places from input
std::vector<platform::Place> places;
places.emplace_back(platform::CPUPlace());
places.emplace_back(platform::CPUPlace());

std::vector<framework::Scope *> sub_scopes;
for (int place_idx = 0; place_idx < places.size(); ++place_idx) {
VLOG(3) << "Run " << place_idx;

sub_scopes.push_back(&scope.NewScope());

auto &place = places[place_idx];
auto *cur_scope = sub_scopes[place_idx];

// copy parameter
if (dev_ctx.GetPlace() != place) {
PADDLE_THROW("Not Implemented");
}

// feed input
for (auto &argu : Inputs(kInputs)) {
auto *var = scope.FindVar(argu);
const auto &tensor = var->Get<LoDTensor>();
if (!tensor.lod().empty()) {
PADDLE_THROW("Disable parallel lod for now");
} else {
PADDLE_ENFORCE(tensor.dims()[0] % places.size() == 0,
"Batch size should be divided by places size");
int begin = place_idx * tensor.dims()[0] / places.size();
int end = (place_idx + 1) * tensor.dims()[0] / places.size();
auto feed_tensor = tensor.Slice(begin, end);
feed_tensor.switch_place(place);

auto *cur_var = cur_scope->Var(argu);
auto *cur_tensor = cur_var->GetMutable<Tensor>();
*cur_tensor = feed_tensor;
}
}

// execute
auto executor = framework::Executor(place);
executor.Run(*program, cur_scope, block->ID(),
false /*create_local_scope*/);
}

// merge output
for (auto &o_name : Outputs(kOutputs)) {
std::vector<const framework::LoDTensor *> lod_tensors;
for (auto *sub_scope : sub_scopes) {
lod_tensors.push_back(&sub_scope->FindVar(o_name)->Get<LoDTensor>());
}

auto *lod_tensor_to_be_merged =
scope.FindVar(o_name)->GetMutable<LoDTensor>();
lod_tensor_to_be_merged->MergeLoDTensor(lod_tensors, dev_ctx.GetPlace());
}
}
};

class ParallelDoOpProtoMaker : public framework::OpProtoAndCheckerMaker {
public:
ParallelDoOpProtoMaker(framework::OpProto *proto,
framework::OpAttrChecker *op_checker)
: OpProtoAndCheckerMaker(proto, op_checker) {
AddInput(kInputs, "").AsDuplicable();
AddInput(kParameters, "").AsDuplicable();
AddInput(kPlaces, "");
AddOutput(kOutputs, "").AsDuplicable();
AddOutput(kParallelScopes, "");
AddAttr<framework::BlockDescBind *>(kParallelBlock, "");
AddComment(R"DOC(
ParallelDo Operator.
)DOC");
}
};

class ParallelDoGradOp : public OperatorBase {
public:
ParallelDoGradOp(const std::string &type,
const framework::VariableNameMap &inputs,
const framework::VariableNameMap &outputs,
const framework::AttributeMap &attrs)
: OperatorBase(type, inputs, outputs, attrs) {}

void Run(const framework::Scope &scope,
const platform::DeviceContext &dev_ctx) const override {}
};

class ParallelDoGradOpDescMaker : public framework::SingleGradOpDescMaker {
public:
using framework::SingleGradOpDescMaker::SingleGradOpDescMaker;

protected:
virtual std::unique_ptr<framework::OpDescBind> Apply() const {
auto *grad = new framework::OpDescBind();
grad->SetType("parallel_do_grad");
for (auto &input_param : this->InputNames()) {
LOG(INFO) << input_param;
grad->SetInput(input_param, this->Input(input_param));
grad->SetOutput(framework::GradVarName(input_param),
this->InputGrad(input_param));
}

for (auto &output_param : this->OutputNames()) {
if (output_param == kParallelScopes) {
grad->SetInput(output_param, this->Output(output_param));
grad->SetInput(framework::GradVarName(output_param),
this->Output(output_param));
} else {
grad->SetInput(output_param, this->Output(output_param));
grad->SetInput(framework::GradVarName(output_param),
this->OutputGrad(output_param));
}
}
grad->SetAttrMap(this->Attrs());
grad->SetBlockAttr(kParallelBlock, *grad_block_[0]);

return std::unique_ptr<framework::OpDescBind>(grad);
}
};

class ParallelDoGradOpShapeInference : public framework::InferShapeBase {
public:
void operator()(framework::InferShapeContext *ctx) const override {
std::vector<std::string> input{kParameters, kInputs};
std::vector<std::string> output{kOutputs};
for (auto &s : input) {
PADDLE_ENFORCE(ctx->HasInputs(s));
PADDLE_ENFORCE(ctx->HasOutputs(framework::GradVarName(s)),
"Cannot find the gradient variable %s",
framework::GradVarName(s));
}
for (auto &s : output) {
PADDLE_ENFORCE(ctx->HasInputs(s));
}
for (auto &s : input) {
ctx->SetOutputsDim(framework::GradVarName(s), ctx->GetInputsDim(s));
}
if (ctx->HasInputs(kParameters)) {
PADDLE_ENFORCE(ctx->HasOutputs(framework::GradVarName(kParameters)));
ctx->SetOutputsDim(framework::GradVarName(kParameters),
ctx->GetInputsDim(kParameters));
}
}
};

} // namespace operators
} // namespace paddle

REGISTER_OPERATOR(parallel_do, paddle::operators::ParallelDoOp,
paddle::operators::ParallelDoOpProtoMaker,
paddle::operators::ParallelDoGradOpDescMaker);
REGISTER_OPERATOR(parallel_do_grad, paddle::operators::ParallelDoGradOp,
paddle::operators::ParallelDoGradOpShapeInference);
3 changes: 2 additions & 1 deletion python/paddle/v2/fluid/framework.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,8 @@ def find_name(var_list, name):
self.desc.check_attrs()
no_kernel_op_set = {
'feed', 'fetch', 'save', 'load', 'recurrent',
'rnn_memory_helper_grad', 'conditional_block', 'while'
'rnn_memory_helper_grad', 'conditional_block', 'while',
'parallel_do'
}
if type not in no_kernel_op_set:
self.desc.infer_var_type(self.block.desc)
Expand Down
Loading