Skip to content

Commit

Permalink
[cherry-pick] Fix perf issues of mp/pp/fuse in eager mode (#47071)
Browse files Browse the repository at this point in the history
* [Dygraph] Fix performance of pp+mp by using send/recv_calc_stream instead of send/recv (#46116)

* [Dygraph] Fix Perf of FusedFeedForward and FusedAttention with AllReduce (#46780)

* update
  • Loading branch information
haohongxiang authored Oct 18, 2022
1 parent 5f6b9f1 commit b84edd9
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 19 deletions.
6 changes: 4 additions & 2 deletions paddle/fluid/operators/fused/fused_attention_op.cu
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ limitations under the License. */
#include "paddle/phi/kernels/funcs/math_function.h"

#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
#include "paddle/fluid/distributed/collective/ProcessGroup.h"
#include "paddle/fluid/distributed/collective/ProcessGroupNCCL.h"
#include "paddle/fluid/platform/collective_helper.h"
#include "paddle/fluid/platform/device/gpu/nccl_helper.h"
#endif
Expand All @@ -50,13 +50,15 @@ static void AllReduce(framework::Tensor &tensor, // NOLINT

if (map->has(ring_id)) {
paddle::distributed::ProcessGroup *pg = map->get(ring_id);
auto pg_nccl = static_cast<distributed::ProcessGroupNCCL *>(pg);

std::vector<phi::DenseTensor> in_tensor;
std::vector<phi::DenseTensor> out_tensor;
in_tensor.push_back(tensor);
out_tensor.push_back(tensor);
paddle::distributed::AllreduceOptions opts;
opts.reduce_op = distributed::ReduceOp::SUM;
auto task = pg->AllReduce(in_tensor, out_tensor, opts);
auto task = pg_nccl->AllReduce(in_tensor, out_tensor, opts, true, true);
task->Wait();
} else {
auto dtype = platform::ToNCCLDataType(
Expand Down
6 changes: 4 additions & 2 deletions paddle/fluid/operators/fused/fused_feedforward_op.cu
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ limitations under the License. */
#include "paddle/phi/kernels/funcs/elementwise_functor.h"

#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
#include "paddle/fluid/distributed/collective/ProcessGroup.h"
#include "paddle/fluid/distributed/collective/ProcessGroupNCCL.h"
#include "paddle/fluid/platform/collective_helper.h"
#include "paddle/fluid/platform/device/gpu/nccl_helper.h"
#endif
Expand All @@ -43,13 +43,15 @@ static void AllReduce(framework::Tensor& tensor, // NOLINT

if (map->has(ring_id)) {
paddle::distributed::ProcessGroup* pg = map->get(ring_id);
auto pg_nccl = static_cast<distributed::ProcessGroupNCCL*>(pg);

std::vector<phi::DenseTensor> in_tensor;
std::vector<phi::DenseTensor> out_tensor;
in_tensor.push_back(tensor);
out_tensor.push_back(tensor);
paddle::distributed::AllreduceOptions opts;
opts.reduce_op = distributed::ReduceOp::SUM;
auto task = pg->AllReduce(in_tensor, out_tensor, opts);
auto task = pg_nccl->AllReduce(in_tensor, out_tensor, opts, true, true);
task->Wait();
} else {
auto dtype = platform::ToNCCLDataType(
Expand Down
21 changes: 20 additions & 1 deletion python/paddle/distributed/fleet/layers/mpu/mp_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,26 @@ def _c_identity(tensor, group=None):
return
ring_id = 0 if group is None else group.id

if _non_static_mode():
if in_dygraph_mode():
from paddle.autograd import PyLayer

class c_identity_eager(PyLayer):

@staticmethod
def forward(ctx, tensor):
return _legacy_C_ops.c_identity(tensor, 'use_calc_stream', True,
'ring_id', group.id,
'use_model_parallel', True)

@staticmethod
def backward(ctx, dy):
op_type = collective._get_reduce_op(ReduceOp.SUM, "_c_identity")
group.process_group.allreduce_on_calc_stream(dy, op_type)
return dy

return c_identity_eager.apply(tensor)

elif _in_legacy_dygraph():
return _legacy_C_ops.c_identity(tensor, 'use_calc_stream', True,
'ring_id', ring_id,
'use_model_parallel', True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,9 @@ def _partial_send_op(tensor, group, use_calc_stream, ring_id, dst, nranks,
elif in_dygraph_mode():
group = paddle.distributed.collective._get_default_group(
) if group is None else group
return group.process_group.send_partial(tensor, dst, nranks, rank_id)
comm_op = group.process_group.send_partial_on_calc_stream \
if use_calc_stream else group.process_group.send_partial
return comm_op(tensor, dst, nranks, rank_id)


def send_partial(tensor,
Expand Down Expand Up @@ -212,12 +214,9 @@ def _partial_recv_op(tensor, group, use_calc_stream, ring_id, src, nranks,
elif in_dygraph_mode():
group = paddle.distributed.collective._get_default_group(
) if group is None else group
task = group.process_group.recv_partial(tensor, src, nranks, rank_id)
if use_calc_stream:
task.wait()
return None
else:
return task
comm_op = group.process_group.recv_partial_on_calc_stream \
if use_calc_stream else group.process_group.recv_partial
return comm_op(tensor, src, nranks, rank_id)


def recv_partial(tensor,
Expand Down Expand Up @@ -255,13 +254,9 @@ def _partial_allgather_op(tensor, group, use_calc_stream, ring_id, nranks,
elif in_dygraph_mode():
group = paddle.distributed.collective._get_default_group(
) if group is None else group
task = group.process_group.all_gather_partial(tensor, tensor, nranks,
rank_id)
if use_calc_stream:
task.wait()
return None
else:
return task
comm_op = group.process_group.all_gather_partial_on_calc_stream \
if use_calc_stream else group.process_group.all_gather_partial
return comm_op(tensor, tensor, nranks, rank_id)


def allgather_partial(tensor,
Expand Down

0 comments on commit b84edd9

Please sign in to comment.