Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cherry-pick][hybrid performance] Grad fuse for gradient merge under pipeline mode (#35004) #35299

Merged
merged 1 commit into from
Aug 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions paddle/fluid/framework/distributed_strategy.proto
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ message DistributedStrategy {
optional int32 fuse_grad_size_in_num = 31 [ default = 8 ];
optional bool calc_comm_same_stream = 32 [ default = false ];
optional bool asp = 33 [ default = false ];
optional bool fuse_grad_merge = 34 [ default = false ];

optional RecomputeConfig recompute_configs = 101;
optional AMPConfig amp_configs = 102;
Expand Down
65 changes: 59 additions & 6 deletions paddle/fluid/operators/coalesce_tensor_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,49 @@
#include "paddle/fluid/framework/var_type.h"
#include "paddle/fluid/operators/math/math_function.h"
#include "paddle/fluid/platform/device_memory_aligment.h"
#ifdef PADDLE_WITH_ASCEND_CL
#include "paddle/fluid/operators/npu_op_runner.h"
#endif

namespace paddle {
namespace operators {

template <typename DeviceContext>
struct FillConstantVisitor {
FillConstantVisitor(const DeviceContext &dev_ctx,
framework::LoDTensor *tensor, const float value)
: dev_ctx_(dev_ctx), tensor_(tensor), value_(value) {}

template <typename T>
void apply(typename std::enable_if<std::is_same<T, int8_t>::value ||
std::is_same<T, int16_t>::value>::type * =
nullptr) const {
PADDLE_THROW(platform::errors::InvalidArgument(
"Not support data type for set_constant attr"));
}

template <typename T>
void apply(typename std::enable_if<!(std::is_same<T, int8_t>::value ||
std::is_same<T, int16_t>::value)>::type
* = nullptr) const {
#ifdef PADDLE_WITH_ASCEND_CL
if (platform::is_npu_place(dev_ctx_.GetPlace())) {
FillNpuTensorWithConstant<T>(tensor_, static_cast<T>(value_));
} else {
math::SetConstant<DeviceContext, T> set_constant;
set_constant(dev_ctx_, tensor_, static_cast<T>(value_));
}
#else
math::SetConstant<DeviceContext, T> set_constant;
set_constant(dev_ctx_, tensor_, static_cast<T>(value_));
#endif
}

const DeviceContext &dev_ctx_;
framework::LoDTensor *tensor_;
float value_;
};

template <typename DeviceContext, typename T>
class CoalesceTensorOpKernel : public framework::OpKernel<T> {
public:
Expand Down Expand Up @@ -70,6 +109,7 @@ class CoalesceTensorOpKernel : public framework::OpKernel<T> {
auto in_tensors = context.MultiInput<framework::LoDTensor>("Input");
bool use_align = context.Attr<bool>("use_align");
auto align_size = context.Attr<int>("align_size");
auto size_of_dtype = context.Attr<int>("user_defined_size_of_dtype");

if (context.Attr<bool>("check_name")) {
for (size_t i = 0; i < in_var_names.size(); ++i) {
Expand All @@ -94,7 +134,9 @@ class CoalesceTensorOpKernel : public framework::OpKernel<T> {
size_t numel = 0;
auto dtype = static_cast<framework::proto::VarType::Type>(
context.Attr<int>("dtype"));
size_t size_of_dtype = framework::SizeOfType(dtype);
if (size_of_dtype == -1) {
size_of_dtype = framework::SizeOfType(dtype);
}
GetMemSizeAndDtype(in_tensors, in_var_names, &numel, size_of_dtype,
context.GetPlace(), use_align, align_size);

Expand All @@ -121,10 +163,9 @@ class CoalesceTensorOpKernel : public framework::OpKernel<T> {
: len;
}
} else if (context.Attr<bool>("set_constant")) {
// TODO(Liu yuang) ADD NPU SET_CONSTANT FUNCTION.
math::SetConstant<DeviceContext, T> set_constant;
set_constant(dev_ctx, fused_tensor,
static_cast<T>(context.Attr<float>("constant")));
framework::VisitDataType(
dtype, FillConstantVisitor<DeviceContext>(
dev_ctx, fused_tensor, context.Attr<float>("constant")));
} else if (context.Attr<bool>("persist_output")) {
for (size_t i = 0; i < out_var_names.size(); ++i) {
size_t len = static_cast<size_t>(out_tensors[i]->numel());
Expand Down Expand Up @@ -227,10 +268,13 @@ class CoalesceTensorOp : public framework::OperatorWithKernel {
}
auto use_align = ctx->Attrs().Get<bool>("use_align");
auto align_size = ctx->Attrs().Get<int>("align_size");
auto size_of_dtype = ctx->Attrs().Get<int>("user_defined_size_of_dtype");

auto dtype = static_cast<framework::proto::VarType::Type>(
ctx->Attrs().Get<int>("dtype"));
size_t size_of_dtype = framework::SizeOfType(dtype);
if (size_of_dtype == -1) {
size_of_dtype = framework::SizeOfType(dtype);
}

auto alignment = [](size_t size, size_t align_size) {
size_t remaining = size % align_size;
Expand Down Expand Up @@ -308,6 +352,15 @@ class CoalesceTensorOpMaker : public framework::OpProtoAndCheckerMaker {
.SetDefault(true);
AddAttr<int>("align_size", "The alignment size when use_align is True")
.SetDefault(-1);
AddAttr<int>("user_defined_size_of_dtype",
"The user defined size of dtype. This is used to coalesce "
"grad vars and merged_grad vars at the same time. For some "
"strategy, the dtype of fused_grad_vars and the dtype of "
"fused_grad_merged_vars are not identical, which will cause "
"the shape of these two coalesced vars are different. To "
"make sure the shape of these two vars are identical with "
"each other, this attr is added.")
.SetDefault(-1);
AddComment(R"DOC(
CoalesceTensor Operator.

Expand Down
22 changes: 22 additions & 0 deletions python/paddle/distributed/fleet/base/distributed_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -967,6 +967,28 @@ def _calc_comm_same_stream(self, same):
"WARNING: calc_comm_same_stream should have value of boolean type"
)

@property
def fuse_grad_merge(self):
"""
Set whether fuse the grad for gradient merge.
Note: this flag will only effect the gradient merge under pipeline mode
The default value for the fuse_grad_merge is False
Examples:
.. code-block:: python
import paddle.distributed.fleet as fleet
strategy = fleet.DistributedStrategy()
strategy.fuse_param_grad = True
"""
return self.strategy.fuse_grad_merge

@fuse_grad_merge.setter
@is_strict_auto
def fuse_grad_merge(self, fuse_grad_merge):
if isinstance(fuse_grad_merge, bool):
self.strategy.fuse_grad_merge = fuse_grad_merge
else:
print("WARNING: fuse_grad_merge should have value of boolean type")

@property
def fuse_grad_size_in_num(self):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ def remove_param(input_name):
for idx, op in enumerate(block.ops):
if is_optimizer_op(op):
break
# TODO (Yuang Liu): tmp solution for fuse_grad_merge + optimize_cast
if not offload and op.type == 'coalesce_tensor':
continue
for input_name in op.desc.input_arg_names():
if input_name not in param_to_idx:
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,11 @@ def insert_allreduce_ops(block,
if len(allreduce_vars) == 0:
return

if user_defined_strategy and user_defined_strategy.fuse_all_reduce_ops:
if user_defined_strategy and \
user_defined_strategy.fuse_all_reduce_ops and \
not user_defined_strategy.fuse_grad_merge:
# If fuse_grad_merge is enable, the grad vars have already been fused during
# gradient merge pass, therefore, those vars are not need to be fused here
insert_fused_allreduce_ops(block, insert_idx, ring_id, allreduce_vars,
op_role, use_calc_stream,
user_defined_strategy.fuse_grad_size_in_MB)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,9 @@ def _insert_allreduce_for_pp(self):
main_block._remove_op(idx)

accumulated_grad_names = self._pp_optimizer._accumulate_gradients(
main_block, fp16_allreduce=fp16_allreduce)
main_block,
fp16_allreduce=fp16_allreduce,
user_defined_strategy=strategy)

len_of_ops = len(main_block.ops)
first_optimize_op_index = get_first_optimize_op_idx(main_block)
Expand Down
Loading