diff --git a/paddle/fluid/framework/distributed_strategy.proto b/paddle/fluid/framework/distributed_strategy.proto index 546b9d2601df5..58ae35f268979 100644 --- a/paddle/fluid/framework/distributed_strategy.proto +++ b/paddle/fluid/framework/distributed_strategy.proto @@ -200,6 +200,7 @@ message DistributedStrategy { optional int32 fuse_grad_size_in_num = 31 [ default = 8 ]; optional bool calc_comm_same_stream = 32 [ default = false ]; optional bool asp = 33 [ default = false ]; + optional bool fuse_grad_merge = 34 [ default = false ]; optional RecomputeConfig recompute_configs = 101; optional AMPConfig amp_configs = 102; diff --git a/paddle/fluid/operators/coalesce_tensor_op.cc b/paddle/fluid/operators/coalesce_tensor_op.cc index 6ea8809dae13f..4c5f3a2a47bd8 100644 --- a/paddle/fluid/operators/coalesce_tensor_op.cc +++ b/paddle/fluid/operators/coalesce_tensor_op.cc @@ -20,10 +20,49 @@ #include "paddle/fluid/framework/var_type.h" #include "paddle/fluid/operators/math/math_function.h" #include "paddle/fluid/platform/device_memory_aligment.h" +#ifdef PADDLE_WITH_ASCEND_CL +#include "paddle/fluid/operators/npu_op_runner.h" +#endif namespace paddle { namespace operators { +template +struct FillConstantVisitor { + FillConstantVisitor(const DeviceContext &dev_ctx, + framework::LoDTensor *tensor, const float value) + : dev_ctx_(dev_ctx), tensor_(tensor), value_(value) {} + + template + void apply(typename std::enable_if::value || + std::is_same::value>::type * = + nullptr) const { + PADDLE_THROW(platform::errors::InvalidArgument( + "Not support data type for set_constant attr")); + } + + template + void apply(typename std::enable_if::value || + std::is_same::value)>::type + * = nullptr) const { +#ifdef PADDLE_WITH_ASCEND_CL + if (platform::is_npu_place(dev_ctx_.GetPlace())) { + FillNpuTensorWithConstant(tensor_, static_cast(value_)); + } else { + math::SetConstant set_constant; + set_constant(dev_ctx_, tensor_, static_cast(value_)); + } +#else + math::SetConstant set_constant; + set_constant(dev_ctx_, tensor_, static_cast(value_)); +#endif + } + + const DeviceContext &dev_ctx_; + framework::LoDTensor *tensor_; + float value_; +}; + template class CoalesceTensorOpKernel : public framework::OpKernel { public: @@ -70,6 +109,7 @@ class CoalesceTensorOpKernel : public framework::OpKernel { auto in_tensors = context.MultiInput("Input"); bool use_align = context.Attr("use_align"); auto align_size = context.Attr("align_size"); + auto size_of_dtype = context.Attr("user_defined_size_of_dtype"); if (context.Attr("check_name")) { for (size_t i = 0; i < in_var_names.size(); ++i) { @@ -94,7 +134,9 @@ class CoalesceTensorOpKernel : public framework::OpKernel { size_t numel = 0; auto dtype = static_cast( context.Attr("dtype")); - size_t size_of_dtype = framework::SizeOfType(dtype); + if (size_of_dtype == -1) { + size_of_dtype = framework::SizeOfType(dtype); + } GetMemSizeAndDtype(in_tensors, in_var_names, &numel, size_of_dtype, context.GetPlace(), use_align, align_size); @@ -121,10 +163,9 @@ class CoalesceTensorOpKernel : public framework::OpKernel { : len; } } else if (context.Attr("set_constant")) { - // TODO(Liu yuang) ADD NPU SET_CONSTANT FUNCTION. - math::SetConstant set_constant; - set_constant(dev_ctx, fused_tensor, - static_cast(context.Attr("constant"))); + framework::VisitDataType( + dtype, FillConstantVisitor( + dev_ctx, fused_tensor, context.Attr("constant"))); } else if (context.Attr("persist_output")) { for (size_t i = 0; i < out_var_names.size(); ++i) { size_t len = static_cast(out_tensors[i]->numel()); @@ -227,10 +268,13 @@ class CoalesceTensorOp : public framework::OperatorWithKernel { } auto use_align = ctx->Attrs().Get("use_align"); auto align_size = ctx->Attrs().Get("align_size"); + auto size_of_dtype = ctx->Attrs().Get("user_defined_size_of_dtype"); auto dtype = static_cast( ctx->Attrs().Get("dtype")); - size_t size_of_dtype = framework::SizeOfType(dtype); + if (size_of_dtype == -1) { + size_of_dtype = framework::SizeOfType(dtype); + } auto alignment = [](size_t size, size_t align_size) { size_t remaining = size % align_size; @@ -308,6 +352,15 @@ class CoalesceTensorOpMaker : public framework::OpProtoAndCheckerMaker { .SetDefault(true); AddAttr("align_size", "The alignment size when use_align is True") .SetDefault(-1); + AddAttr("user_defined_size_of_dtype", + "The user defined size of dtype. This is used to coalesce " + "grad vars and merged_grad vars at the same time. For some " + "strategy, the dtype of fused_grad_vars and the dtype of " + "fused_grad_merged_vars are not identical, which will cause " + "the shape of these two coalesced vars are different. To " + "make sure the shape of these two vars are identical with " + "each other, this attr is added.") + .SetDefault(-1); AddComment(R"DOC( CoalesceTensor Operator. diff --git a/python/paddle/distributed/fleet/base/distributed_strategy.py b/python/paddle/distributed/fleet/base/distributed_strategy.py index d43292ddbd32e..d19cfd2169802 100644 --- a/python/paddle/distributed/fleet/base/distributed_strategy.py +++ b/python/paddle/distributed/fleet/base/distributed_strategy.py @@ -967,6 +967,28 @@ def _calc_comm_same_stream(self, same): "WARNING: calc_comm_same_stream should have value of boolean type" ) + @property + def fuse_grad_merge(self): + """ + Set whether fuse the grad for gradient merge. + Note: this flag will only effect the gradient merge under pipeline mode + The default value for the fuse_grad_merge is False + Examples: + .. code-block:: python + import paddle.distributed.fleet as fleet + strategy = fleet.DistributedStrategy() + strategy.fuse_param_grad = True + """ + return self.strategy.fuse_grad_merge + + @fuse_grad_merge.setter + @is_strict_auto + def fuse_grad_merge(self, fuse_grad_merge): + if isinstance(fuse_grad_merge, bool): + self.strategy.fuse_grad_merge = fuse_grad_merge + else: + print("WARNING: fuse_grad_merge should have value of boolean type") + @property def fuse_grad_size_in_num(self): """ diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding/offload_helper.py b/python/paddle/distributed/fleet/meta_optimizers/sharding/offload_helper.py index a96705b09e835..8aee34960332a 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding/offload_helper.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding/offload_helper.py @@ -122,6 +122,9 @@ def remove_param(input_name): for idx, op in enumerate(block.ops): if is_optimizer_op(op): break + # TODO (Yuang Liu): tmp solution for fuse_grad_merge + optimize_cast + if not offload and op.type == 'coalesce_tensor': + continue for input_name in op.desc.input_arg_names(): if input_name not in param_to_idx: continue diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py b/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py index 52ef843aa0d75..16fbc7bea6c8b 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py @@ -341,7 +341,11 @@ def insert_allreduce_ops(block, if len(allreduce_vars) == 0: return - if user_defined_strategy and user_defined_strategy.fuse_all_reduce_ops: + if user_defined_strategy and \ + user_defined_strategy.fuse_all_reduce_ops and \ + not user_defined_strategy.fuse_grad_merge: + # If fuse_grad_merge is enable, the grad vars have already been fused during + # gradient merge pass, therefore, those vars are not need to be fused here insert_fused_allreduce_ops(block, insert_idx, ring_id, allreduce_vars, op_role, use_calc_stream, user_defined_strategy.fuse_grad_size_in_MB) diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py index 5c2f24054f835..c94bd572f0587 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py @@ -319,7 +319,9 @@ def _insert_allreduce_for_pp(self): main_block._remove_op(idx) accumulated_grad_names = self._pp_optimizer._accumulate_gradients( - main_block, fp16_allreduce=fp16_allreduce) + main_block, + fp16_allreduce=fp16_allreduce, + user_defined_strategy=strategy) len_of_ops = len(main_block.ops) first_optimize_op_index = get_first_optimize_op_idx(main_block) diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index 9e87681c4bef3..58f61b77fd1fe 100755 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -5037,11 +5037,18 @@ def _rename_gradient_var_name(self, block): def _accumulate_gradients(self, block, pp_allreduce_in_optimize=False, - fp16_allreduce=False): + fp16_allreduce=False, + user_defined_strategy=None): """ Create a new merged gradient for each parameter and accumulate the corresponding gradient to it. """ + if user_defined_strategy and user_defined_strategy.fuse_grad_merge: + fused_gradient_names = self._accumulate_gradients_with_fuse( + block, fp16_allreduce, + user_defined_strategy.fuse_grad_size_in_MB) + return fused_gradient_names + merged_gradient_names = [] first_opt_op_idx = None @@ -5171,6 +5178,252 @@ def _accumulate_gradients(self, return merged_gradient_names + def _accumulate_gradients_with_fuse(self, main_block, fp16, fused_size): + first_opt_op_idx = None + grad_param_pairs = [] + # obtain all param/grad pairs that needed to be fused + for index, op in reversed(tuple(enumerate(list(main_block.ops)))): + # remove the cast op of fp16 grad to fp32 grad + if self._is_optimize_op(op) and op.type == 'cast': + in_name = op.input_arg_names[0] + out_name = op.output_arg_names[0] + if out_name.strip('@GRAD') in self._param_device_map: + assert in_name.replace('.cast_fp16', '') == out_name + main_block._remove_op(index) + continue + + if self._is_backward_op(op) and first_opt_op_idx is None: + first_opt_op_idx = index + 1 + # no optimize phase + if first_opt_op_idx == len(main_block.ops): + return + + if self._is_backward_op(op) and ( + self._op_role_var_key in op.attr_names): + op_role_var = op.attr(self._op_role_var_key) + if len(op_role_var) == 0: + continue + assert len(op_role_var) % 2 == 0 + for i in range(0, len(op_role_var), 2): + param_name = op_role_var[i] + if not main_block.has_var(param_name): + continue + if '@BroadCast' in param_name: + continue + grad_param_pairs.append( + (op_role_var[i + 1], op_role_var[i])) + + if len(grad_param_pairs) == 0: + return + + grad_param_segments = [] + merged_suffix = '@MERGED@FP16' if fp16 else '@MERGED' + dtype = paddle.float16 if fp16 else paddle.float32 + cur_size = 0. + last_dtype = None + # split the grad based on dtype and fused size + for grad, param in grad_param_pairs: + real_grad = main_block.var(grad) + # create the gradient merged var for each grad + merged_grad_var = main_block.create_var( + name=param + core.grad_var_suffix() + merged_suffix, + dtype=dtype, + shape=real_grad.shape, + persistable=True, + stop_gradient=False) + real_param = main_block.var(param) + tmp_size = self._get_var_size(real_grad) + # two strategies for splitting the grad + # 1. the current segment's size reach the user defined grad_size_in_MB + # 2. the upcoming grad holds different dtype compared with grads in current segment + if len(grad_param_segments) == 0 \ + or cur_size + tmp_size > fused_size \ + or real_grad.dtype != last_dtype: + grad_param_segments.append( + ([real_grad], [real_param], [merged_grad_var])) + last_dtype = real_grad.dtype + cur_size = 0. + else: + grad_param_segments[-1][0].append(real_grad) + grad_param_segments[-1][1].append(real_param) + grad_param_segments[-1][2].append(merged_grad_var) + cur_size += tmp_size + + fused_gradients = [] + fused_merged_gradients = [] + # create fused vars for grad and param + for grad_param_segment in grad_param_segments: + grad_segment = grad_param_segment[0] + merged_grad_segment = grad_param_segment[2] + fused_grad = main_block.create_var( + name='FusedGrad_{}'.format(grad_segment[0].name), + dtype=grad_segment[0].dtype, + persistable=False, + stop_gradient=False) + # keep the '.cast_fp16' info in the fuse var name + fused_merged_grad_name_prefix = 'FusedMergedGrad.cast_fp16.' if \ + merged_grad_segment[0].dtype == paddle.float16 else 'FusedMergedGrad' + fused_merged_grad_name = fused_merged_grad_name_prefix + '_{}'.format( + merged_grad_segment[0].name) + fused_merged_grad = main_block.create_var( + name=fused_merged_grad_name, + dtype=merged_grad_segment[0].dtype, + persistable=True, + stop_gradient=False) + fused_gradients.append(fused_grad) + fused_merged_gradients.append(fused_merged_grad) + + assert len(fused_gradients) == len(grad_param_segments) + assert len(fused_merged_gradients) == len(grad_param_segments) + + # insert coalesce op at the start of the backward pass + # use param as the coalesce input to make sure the two Fused vars are in same shape + first_back_op_idx = None + for index, op in enumerate(main_block.ops): + if self._is_backward_op(op) and first_back_op_idx is None: + first_back_op_idx = index + break + assert first_back_op_idx is not None + offset = 0 + for i in range(len(grad_param_segments)): + fused_grad = fused_gradients[i] + fused_merged_grad = fused_merged_gradients[i] + grads = grad_param_segments[i][0] + params = grad_param_segments[i][1] + merged_grads = grad_param_segments[i][2] + main_block._insert_op_without_sync( + first_back_op_idx + offset, + type="coalesce_tensor", + inputs={"Input": params}, + outputs={"Output": grads, + "FusedOutput": fused_grad}, + attrs={ + # Explanation of user_defined_size_of_dtype: + # In coalesce op, the align size is 256 bytes + # the float takes 4 bytes while fp16 takes 2 bytes. + # To meet the requirement, 128 fp16 or 64 float will be aligned + # Think the total shape of the input tensors if [64], + # if the dtype is float, then the shape of the fuse var is [64] + # however if the dytpe if fp16, the shape of the fuse var is [128], + # which will cause the fused vars' shape vary between each other. + # To make sure the shape of the fused vars are identical, + # we set the dtype of float and fp16 both to 2. + # Under this way, the fused vars' shape for float and fp16 are all [128] + "user_defined_size_of_dtype": 2, + "copy_data": False, + "use_align": True, + "dtype": grads[0].dtype, + self._op_role_key: self._op_role.Backward + }) + offset += 1 + # For the gradient_merged_fused_var, given a init value during the coalesce op + # this will remove a problematic fill_constant op. This op role of this coalesce + # is set to be LRSched to make this coalesce (with init) only run once + main_block._insert_op_without_sync( + first_back_op_idx + offset, + type="coalesce_tensor", + inputs={"Input": params}, + outputs={ + "Output": merged_grads, + "FusedOutput": fused_merged_grad + }, + attrs={ + "user_defined_size_of_dtype": 2, + "set_constant": True, + "constant": float(0.0), + "copy_data": False, + "use_align": True, + "dtype": merged_grads[0].dtype, + self._op_role_key: self._op_role.Optimize.LRSched + }) + offset += 1 + + # insert gradient merge relating ops + first_opt_op_idx += offset + offset = 0 + for i in range(len(fused_gradients)): + fused_grad = fused_gradients[i] + fused_merged_grad = fused_merged_gradients[i] + is_fp16_grad = 'cast_fp16' in fused_grad.name + need_cast = (is_fp16_grad is not fp16) + if need_cast: + # for fp16 allreduce, cast fp32 grad to fp16 + # for fp32 allreduce, cast fp16 grad to fp32 + cast_grad_var_name = fused_grad.name + '@TMP' + cast_grad_var = main_block.create_var( + name=cast_grad_var_name, + dtype=dtype, + persistable=False, + stop_gradient=False) + main_block._insert_op( + index=first_opt_op_idx + offset, + type='cast', + inputs={'X': fused_grad}, + outputs={'Out': cast_grad_var}, + attrs={ + 'in_dtype': fused_grad.dtype, + 'out_dtype': cast_grad_var.dtype, + self._op_role_key: self._op_role.Backward, + }) + offset += 1 + fused_grad = cast_grad_var + main_block._insert_op( + index=first_opt_op_idx + offset, + type='sum', + inputs={'X': [fused_merged_grad, fused_grad]}, + outputs={'Out': fused_merged_grad}, + attrs={self._op_role_key: self._op_role.Backward}) + offset += 1 + + if fp16: + # if using fp16 allreduce, the optimizer needs fp32 grads, cast them back to fp32 + for grad, param in grad_param_pairs: + real_grad = main_block.var(grad) + fp16_grad_name = param + core.grad_var_suffix() + '@MERGED@FP16' + assert main_block.has_var(fp16_grad_name) + fp16_grad = main_block.var(fp16_grad_name) + fp32_grad_name = param + core.grad_var_suffix() + '@MERGED' + fp32_grad = main_block.create_var( + name=fp32_grad_name, + dtype=paddle.float32, + shape=real_grad.shape, + persistable=False, + stop_gradient=False) + main_block._insert_op( + index=first_opt_op_idx + offset, + type='cast', + inputs={'X': fp16_grad}, + outputs={'Out': fp32_grad}, + attrs={ + 'in_dtype': paddle.float16, + 'out_dtype': paddle.float32, + self._op_role_key: self._op_role.Optimize, + }) + offset += 1 + + # replace the var with it's name, which will be used for inserting allreduce + for i in range(len(fused_merged_gradients)): + fused_merged_gradients[i] = fused_merged_gradients[i].name + + main_block._sync_with_cpp() + + return fused_merged_gradients + + def _get_var_size(self, var): + dtype_to_size = { + core.VarDesc.VarType.FP16: 2, + core.VarDesc.VarType.FP32: 4, + core.VarDesc.VarType.FP64: 8, + core.VarDesc.VarType.INT16: 2, + core.VarDesc.VarType.INT32: 4, + core.VarDesc.VarType.INT64: 8, + core.VarDesc.VarType.BOOL: 1, + core.VarDesc.VarType.UINT8: 1, + } + assert -1 not in var.shape + return reduce(lambda x, y: x * y, + var.shape) * dtype_to_size[var.dtype] / 1024.0 / 1024.0 + def _add_sub_blocks(self, main_block, program_list): main_program = main_block.program for prog in program_list: diff --git a/python/paddle/fluid/tests/unittests/npu/test_coalesce_tensor_op_npu.py b/python/paddle/fluid/tests/unittests/npu/test_coalesce_tensor_op_npu.py index f1bbf0becf195..93a969bf10f03 100644 --- a/python/paddle/fluid/tests/unittests/npu/test_coalesce_tensor_op_npu.py +++ b/python/paddle/fluid/tests/unittests/npu/test_coalesce_tensor_op_npu.py @@ -90,7 +90,8 @@ def init_attr(self): "set_constant": False, "constant": 0.5, "use_align": True, - "dtype": self.fluid_dtype + "dtype": self.fluid_dtype, + "user_defined_size_of_dtype": 2 } def test_check_output(self): diff --git a/python/paddle/fluid/tests/unittests/test_coalesce_tensor_op.py b/python/paddle/fluid/tests/unittests/test_coalesce_tensor_op.py index a5b30330448d2..868a72334247d 100644 --- a/python/paddle/fluid/tests/unittests/test_coalesce_tensor_op.py +++ b/python/paddle/fluid/tests/unittests/test_coalesce_tensor_op.py @@ -92,7 +92,8 @@ def init_attr(self): "copy_data": False, "set_constant": True, "constant": 0.5, - "dtype": self.fluid_dtype + "dtype": self.fluid_dtype, + "user_defined_size_of_dtype": 2 } def test_check_output(self): diff --git a/python/paddle/fluid/tests/unittests/test_fleet_sharding_meta_optimizer.py b/python/paddle/fluid/tests/unittests/test_fleet_sharding_meta_optimizer.py index 5a981a470cb4e..3b0df74d3e6b4 100755 --- a/python/paddle/fluid/tests/unittests/test_fleet_sharding_meta_optimizer.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_sharding_meta_optimizer.py @@ -1050,6 +1050,189 @@ def test_hybrid_with_pp_dp_amp_fp16allreduce_optimize_offload(self): self.assertEqual(dp_group_waiting_ports, ['127.0.0.1:36002']) + def test_hybrid_with_pp_dp_amp_fp16allreduce_optimize_cast_with_gradient_fuse( + self): + train_prog, startup_prog = paddle.fluid.Program(), paddle.fluid.Program( + ) + avg_cost, strategy = self.pp_net(train_prog, startup_prog) + strategy.amp = True + strategy.amp_configs = {'custom_black_varnames': ['fc_6.b_0'], } + strategy.sharding = True + strategy.sharding_configs = { + "sharding_degree": 1, + "mp_degree": 1, + "pp_degree": 2, + "dp_degree": 2, + "optimize_cast": True, + } + strategy.pipeline = True + strategy.pipeline_configs = { + "schedule_mode": "1F1B", + "micro_batch_size": 2, + "accumulate_steps": 4, + } + strategy.fp16_allreduce = True + strategy.fuse_grad_merge = True + self.optimizer(avg_cost, strategy, train_prog, startup_prog) + train_prog = train_prog._pipeline_opt['section_program'] + startup_prog = startup_prog._pipeline_opt['startup_program'] + + startup_prog_ops = startup_prog.global_block().ops + main_prog_ops = train_prog.global_block().ops + + # check program + startup_prog_op_types = [op.type for op in startup_prog_ops] + main_prog_op_types = [op.type for op in main_prog_ops] + + # ring: mp, pp_group, pp_pair, pp_pair + self.assertEqual(startup_prog_op_types, [ + 'uniform_random', 'cast', 'fill_constant', 'cast', 'uniform_random', + 'cast', 'fill_constant', 'cast', 'uniform_random', 'cast', + 'fill_constant', 'cast', 'uniform_random', 'cast', 'fill_constant', + 'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant', + 'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant', + 'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant', + 'c_gen_nccl_id', 'c_comm_init', 'c_gen_nccl_id', 'c_comm_init', + 'c_gen_nccl_id', 'c_comm_init', 'c_gen_nccl_id', 'c_comm_init', + 'c_sync_comm_stream' + ]) + + self.assertEqual(main_prog_op_types, [ + 'recv_v2', 'mul', 'elementwise_add', 'tanh', 'mul', + 'elementwise_add', 'tanh', 'mul', 'elementwise_add', 'tanh', 'mul', + 'cast', 'elementwise_add', 'softmax', 'cross_entropy2', 'mean', + 'elementwise_mul', 'coalesce_tensor', 'coalesce_tensor', + 'coalesce_tensor', 'coalesce_tensor', 'fill_constant', 'scale', + 'scale', 'elementwise_mul_grad', 'mean_grad', 'cross_entropy_grad2', + 'softmax_grad', 'elementwise_add_grad', 'cast', 'mul_grad', + 'tanh_grad', 'elementwise_add_grad', 'mul_grad', 'tanh_grad', + 'elementwise_add_grad', 'mul_grad', 'tanh_grad', + 'elementwise_add_grad', 'mul_grad', 'c_sync_calc_stream', 'send_v2', + 'sum', 'cast', 'sum', 'c_allreduce_sum', 'c_allreduce_sum', 'cast', + 'cast', 'cast', 'cast', 'cast', 'cast', 'cast', 'cast', + 'c_sync_comm_stream', 'check_finite_and_unscale', 'cast', + 'c_allreduce_max', 'cast', 'update_loss_scaling', 'momentum', + 'cast', 'momentum', 'cast', 'momentum', 'cast', 'momentum', 'cast', + 'momentum', 'cast', 'momentum', 'cast', 'momentum', 'momentum', + 'cast' + ]) + + # amp check_finite_and_unscale, allreduce(pp) + self.assertEqual(main_prog_op_types.count('c_allreduce_max'), 1) + + # should has ring id for pp + created_ring_ids = [ + op.desc.attr("ring_id") for op in startup_prog_ops + if op.type == "c_comm_init" + ] + self.assertIn(self.pp_pair_ring_id, created_ring_ids) + self.assertIn(self.dp_ring_id, created_ring_ids) + + # check correctness of pp group + for op in startup_prog_ops: + if op.type == "c_gen_nccl_id" and op.desc.output_arg_names()[ + 0] == "comm_id_0": + pp_group_waiting_ports = op.desc.attr("other_endpoints") + + self.assertEqual(pp_group_waiting_ports, ['127.0.0.1:36003']) + + # check correctness of dp group + for op in startup_prog_ops: + if op.type == "c_gen_nccl_id" and op.desc.output_arg_names()[ + 0] == "comm_id_3": + dp_group_waiting_ports = op.desc.attr("other_endpoints") + + self.assertEqual(dp_group_waiting_ports, ['127.0.0.1:36002']) + + def test_hybrid_with_pp_dp_amp_with_gradient_fuse(self): + train_prog, startup_prog = paddle.fluid.Program(), paddle.fluid.Program( + ) + avg_cost, strategy = self.pp_net(train_prog, startup_prog) + strategy.amp = True + strategy.amp_configs = {'custom_black_varnames': ['fc_6.b_0'], } + strategy.sharding = True + strategy.sharding_configs = { + "sharding_degree": 1, + "mp_degree": 1, + "pp_degree": 2, + "dp_degree": 2, + } + strategy.pipeline = True + strategy.pipeline_configs = { + "schedule_mode": "1F1B", + "micro_batch_size": 2, + "accumulate_steps": 4, + } + strategy.fuse_grad_merge = True + self.optimizer(avg_cost, strategy, train_prog, startup_prog) + train_prog = train_prog._pipeline_opt['section_program'] + startup_prog = startup_prog._pipeline_opt['startup_program'] + + startup_prog_ops = startup_prog.global_block().ops + main_prog_ops = train_prog.global_block().ops + + # check program + startup_prog_op_types = [op.type for op in startup_prog_ops] + main_prog_op_types = [op.type for op in main_prog_ops] + + # ring: mp, pp_group, pp_pair, pp_pair + self.assertEqual(startup_prog_op_types, [ + 'uniform_random', 'fill_constant', 'uniform_random', + 'fill_constant', 'uniform_random', 'fill_constant', + 'uniform_random', 'fill_constant', 'fill_constant', 'fill_constant', + 'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant', + 'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant', + 'fill_constant', 'fill_constant', 'c_gen_nccl_id', 'c_comm_init', + 'c_gen_nccl_id', 'c_comm_init', 'c_gen_nccl_id', 'c_comm_init', + 'c_gen_nccl_id', 'c_comm_init', 'c_sync_comm_stream' + ]) + + self.assertEqual(main_prog_op_types, [ + 'recv_v2', 'cast', 'mul', 'cast', 'elementwise_add', 'tanh', 'cast', + 'mul', 'cast', 'elementwise_add', 'tanh', 'cast', 'mul', 'cast', + 'elementwise_add', 'tanh', 'cast', 'mul', 'cast', 'elementwise_add', + 'softmax', 'cross_entropy2', 'mean', 'elementwise_mul', + 'coalesce_tensor', 'coalesce_tensor', 'coalesce_tensor', + 'coalesce_tensor', 'fill_constant', 'scale', 'scale', + 'elementwise_mul_grad', 'mean_grad', 'cross_entropy_grad2', + 'softmax_grad', 'elementwise_add_grad', 'cast', 'mul_grad', + 'tanh_grad', 'elementwise_add_grad', 'mul_grad', 'tanh_grad', + 'elementwise_add_grad', 'mul_grad', 'tanh_grad', + 'elementwise_add_grad', 'mul_grad', 'c_sync_calc_stream', 'send_v2', + 'cast', 'sum', 'sum', 'c_allreduce_sum', 'c_allreduce_sum', + 'c_sync_comm_stream', 'check_finite_and_unscale', 'cast', + 'c_allreduce_max', 'cast', 'update_loss_scaling', 'momentum', + 'momentum', 'momentum', 'momentum', 'momentum', 'momentum', + 'momentum', 'momentum' + ]) + + # amp check_finite_and_unscale, allreduce(pp) + self.assertEqual(main_prog_op_types.count('c_allreduce_max'), 1) + + # should has ring id for pp + created_ring_ids = [ + op.desc.attr("ring_id") for op in startup_prog_ops + if op.type == "c_comm_init" + ] + self.assertIn(self.pp_pair_ring_id, created_ring_ids) + self.assertIn(self.dp_ring_id, created_ring_ids) + + # check correctness of pp group + for op in startup_prog_ops: + if op.type == "c_gen_nccl_id" and op.desc.output_arg_names()[ + 0] == "comm_id_0": + pp_group_waiting_ports = op.desc.attr("other_endpoints") + + self.assertEqual(pp_group_waiting_ports, ['127.0.0.1:36003']) + + # check correctness of dp group + for op in startup_prog_ops: + if op.type == "c_gen_nccl_id" and op.desc.output_arg_names()[ + 0] == "comm_id_3": + dp_group_waiting_ports = op.desc.attr("other_endpoints") + + self.assertEqual(dp_group_waiting_ports, ['127.0.0.1:36002']) + if __name__ == "__main__": unittest.main()