Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix decotate decorate #61408

Merged
merged 4 commits into from
Feb 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions python/paddle/amp/auto_cast.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def _is_gpu_bfloat16_supported():

def need_keep_fp32(layer, dtype):
need_keep_fp32 = False
# Highest prority. Because all the layers except BN will use bfloat16 params in bfoat16 training,
# Highest priority. Because all the layers except BN will use bfloat16 params in bfloat16 training,
# here we provide a option to keep fp32 param.
if not layer._cast_to_low_precision:
need_keep_fp32 = True
Expand Down Expand Up @@ -238,7 +238,7 @@ def check_models(models):
)
if isinstance(model, paddle.DataParallel):
raise RuntimeError(
"For distributed AMP training, you should first use paddle.amp.decorate() to decotate origin model, and then call paddle.DataParallel get distributed model."
"For distributed AMP training, you should first use paddle.amp.decorate() to decorate origin model, and then call paddle.DataParallel get distributed model."
)


Expand Down Expand Up @@ -440,7 +440,7 @@ def master_grad_hook():

# TODO(zhiqiu) set amp related flags automatically in this guard
# Currently, if FLAGS_cudnn_batchnorm_spatial_persistent is set True in amp_guard,
# batch_norm can run in fast mode, but batch_norm_grad can not if backward if not executed insise amp_guard.
# batch_norm can run in fast mode, but batch_norm_grad can not if backward if not executed inside amp_guard.
# So, users need to set related flags manually.

# original_flags = get_flags(AMP_RELATED_FLAGS)
Expand Down Expand Up @@ -522,7 +522,7 @@ def amp_decorate(
level(str, optional): Auto mixed precision level. Accepted values are "O1" and "O2": O1 represent mixed precision, the decorator will do nothing;
O2 represent Pure fp16/bf16, the decorator will cast all parameters of models to FP16/BF16, except BatchNorm, InstanceNorm and LayerNorm. Default is O1(amp)
dtype(str, optional): Whether to use 'float16' or 'bfloat16'. Default is 'float16'.
master_weight(bool, optinal): For level='O2', whether to use multi-precision during weight updating. If master_weight is None, in O2 level optimizer will use multi-precision. Default is None.
master_weight(bool, optional): For level='O2', whether to use multi-precision during weight updating. If master_weight is None, in O2 level optimizer will use multi-precision. Default is None.
save_dtype(float, optional): The save model parameter dtype when use `paddle.save` or `paddle.jit.save`,it should be float16, bfloat16, float32, float64 or None.
The save_dtype will not change model parameters dtype, it just change the state_dict dtype. When save_dtype is None, the save dtype is same as model dtype. Default is None.

Expand Down
6 changes: 3 additions & 3 deletions python/paddle/base/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1055,7 +1055,7 @@ def _get_program_and_executor(self, cached_data):
# if enables distributed training with prim mechanism (prim is behind of distributed)
# step 1: translate program to pir program.
# step 2: decompose PHI ops in pir program into prim ops.
# When decomposing backward ops, the grad_var_to_var in distributed context is needed to finding correpsonding forward op.
# When decomposing backward ops, the grad_var_to_var in distributed context is needed to finding corresponding forward op.
if (
os.getenv("FLAGS_enable_prim_after_distribute")
in ['True', 'true', '1']
Expand Down Expand Up @@ -1410,7 +1410,7 @@ def _fetch_data(self, fetch_list, fetch_var_name, scope):
@classmethod
def _split_optimize_ops_in_fetch_list(cls, fetch_list):
"""
Split optimize_ops from fetch_list, which provided to specify program prunning.
Split optimize_ops from fetch_list, which provided to specify program pruning.
Args:
fetch_list(list): The original fetch_list.
Possible types of fetch_list are:
Expand Down Expand Up @@ -1666,7 +1666,7 @@ def run(
and fetch_list Tensor) of this interface remains unchanged during running.
The default is False.
use_prune(bool): This parameter indicates whether the input :code:`Program` will be pruned.
If the parameter is True, the program will be pruned accroding to the given feed and fetch_list,
If the parameter is True, the program will be pruned according to the given feed and fetch_list,
which means the operators and variables in program that generate :code:`feed` and are not
needed to generate :code:`fetch_list` will be pruned. The default is False, which means the
program will not pruned and all the operators and variables will be executed during running.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def check_broadcast(block):
if "@BroadCast" in var_name:
if var_name in broadcast_vars:
raise ValueError(
"var_name areadly exist: {}"
"var_name already exist: {}"
"the old pos is {}, the new pos is {}".format(
var_name,
broadcast_vars[var_name]["broadcast_pos"],
Expand Down Expand Up @@ -1015,7 +1015,7 @@ def is_opt_vars(var):
def is_gradient_merge_vars(var):
# NOTE(JZ-LIANG): to revise save/load logic in framework instead of write this naive rule

return var.name.endswith("@GradiantMerge")
return var.name.endswith("@GradientMerge")

def is_trainable(var):
return (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -987,7 +987,7 @@ def _split_program(self, block):

def _prune_main_program(self, block, shard, rings):
"""
calculate deps from allredce op to optimize op,
calculate deps from allreduce op to optimize op,
remove ops and vars not needed in this worker

1. prune regularization (weight decay)
Expand All @@ -1005,7 +1005,7 @@ def _prune_main_program(self, block, shard, rings):
# amp could use global group for sync
FP16Utils.prune_fp16(block, shard, self._reduced_grads_to_param, rings)

# clipbyglobalnorm should only use the Model paramllelism group (mp-sharding-pp)
# clipbyglobalnorm should only use the Model parallelism group (mp-sharding-pp)
gradientclip_helper = GradientClipHelper(None)
gradientclip_helper.prune_gradient_clip(block, shard, rings)

Expand Down Expand Up @@ -1133,7 +1133,7 @@ def _add_broadcast_allreduce(self, block):
self._segments[-1]._end_idx = new_end_idx

if self._segments[-1]._allreduce_vars:
shard_allredue_vars = self._shard.filter_grads(
shard_allreduce_vars = self._shard.filter_grads(
self._segments[-1]._allreduce_vars
)
if (
Expand All @@ -1143,20 +1143,20 @@ def _add_broadcast_allreduce(self, block):
if (
self.hybrid_dp
and self.hybrid_dp_mode == "sharding_hybrid_dp"
and len(shard_allredue_vars) >= 1
and len(shard_allreduce_vars) >= 1
):
if not self._use_calc_stream:
insert_sync_comm_ops(
block,
self._segments[-1]._end_idx,
self.dp_ring_id,
shard_allredue_vars,
shard_allreduce_vars,
)
insert_allreduce_ops(
block,
self._segments[-1]._end_idx,
self.dp_ring_id,
shard_allredue_vars,
shard_allreduce_vars,
user_defined_strategy=self.user_defined_strategy,
use_calc_stream=self._use_calc_stream,
)
Expand All @@ -1169,7 +1169,7 @@ def _add_broadcast_allreduce(self, block):
block,
self._startup_program.global_block(),
self._segments[-1]._end_idx,
shard_allredue_vars,
shard_allreduce_vars,
self._shard,
)
if not self._use_calc_stream:
Expand Down Expand Up @@ -1241,7 +1241,7 @@ def _add_broadcast_allreduce(self, block):
)

# step2: add Sync ops
shard_allredue_vars = self._shard.filter_grads(allreduce_vars)
shard_allreduce_vars = self._shard.filter_grads(allreduce_vars)

if (
self.gradient_merge_mode != "sharding_gm"
Expand All @@ -1250,14 +1250,14 @@ def _add_broadcast_allreduce(self, block):
if (
self.hybrid_dp
and self.hybrid_dp_mode == "sharding_hybrid_dp"
and len(shard_allredue_vars) >= 1
and len(shard_allreduce_vars) >= 1
):
if not self._use_calc_stream:
insert_sync_comm_ops(
block,
segment._end_idx,
self.dp_ring_id,
shard_allredue_vars,
shard_allreduce_vars,
)

broad_cast_vars = [x[0] for x in broadcast_vars]
Expand Down Expand Up @@ -1322,7 +1322,7 @@ def _add_broadcast_allreduce(self, block):
block,
self._startup_program.global_block(),
segment._start_idx,
shard_allredue_vars,
shard_allreduce_vars,
self._shard,
)

Expand All @@ -1343,13 +1343,13 @@ def _add_broadcast_allreduce(self, block):
if (
self.hybrid_dp
and self.hybrid_dp_mode == "sharding_hybrid_dp"
and len(shard_allredue_vars) >= 1
and len(shard_allreduce_vars) >= 1
):
insert_allreduce_ops(
block,
segment._start_idx,
self.dp_ring_id,
shard_allredue_vars,
shard_allreduce_vars,
user_defined_strategy=self.user_defined_strategy,
use_calc_stream=self._use_calc_stream,
)
Expand Down Expand Up @@ -1562,7 +1562,7 @@ def _build_groups(self):
// (self.sharding_degree * self.mp_degree)
% self.pp_degree
)
# (NOTE): Already adjust for (outter-pure) dp
# (NOTE): Already adjust for (outer-pure) dp
self.pp_group_id = self.global_rank // (
self.mp_degree * self.sharding_degree * self.pp_degree
)
Expand All @@ -1588,10 +1588,10 @@ def _build_groups(self):
self.pp_group_id = -1
self.pp_group_endpoints = []

# outter-pure-dp group
# NOTE (JZ-LIANG) support outter-pure-dp to scale the throughput in 3D parallelism
# outer-pure-dp group
# NOTE (JZ-LIANG) support outer-pure-dp to scale the throughput in 3D parallelism
# e.g. mp-sharding-pp-dp
# sharding-hybrid-dp as one senario of outter-pure-dp
# sharding-hybrid-dp as one scenario of outer-pure-dp
local_pp_degree = self.pp_degree
if os.getenv("PADDLE_MANUAL_PIPELINE_STAGE", None):
assert self.pp_degree == 2, (
Expand Down Expand Up @@ -1785,7 +1785,7 @@ def create_persistable_gradients_and_insert_merge_ops(
assert (
get_grad_device(grad_name, shard) == shard.worker_idx
), f"try to merge gradient not belong to current shard: [{grad_name}]"
persistable_grad_name = grad_name + '@GradiantMerge'
persistable_grad_name = grad_name + '@GradientMerge'
assert (
grad_name not in self._grad2merged_grad
), "grad [{}] already in grad2merged_grad, maybe you meet sharing weight case !".format(
Expand Down Expand Up @@ -2206,7 +2206,7 @@ def _init_communicator(
):
nranks = len(endpoints)
block = program.global_block()
# init mulit node nccl
# init multi node nccl
if nranks > 1:
other_endpoints = endpoints[:]
other_endpoints.remove(current_endpoint)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,7 @@ def send_backward_recv_forward(self, input_tensor_grad, pp_first_stage):
def send_forward_backward_recv_forward_backward(
self, output_tensor, input_tensor_grad, recv_prev, recv_next
):
# always have to send dytpe info to downstream
# always have to send dtype info to downstream
global _timers
if _timers is not None:
_timers("send_forward_backward_recv_forward_backward").start()
Expand All @@ -837,7 +837,7 @@ def send_forward_backward_recv_forward_backward(
return input_tensor, output_tensor_grad

def send_forward_recv_forward(self, output_tensor, recv_prev):
# always have to send dytpe info to downstream
# always have to send dtype info to downstream
global _timers
if _timers is not None:
_timers("send_forward_recv_forward").start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ def send_backward_recv_forward(self, input_tensor_grad, pp_first_stage):
def send_forward_backward_recv_forward_backward(
self, output_tensor, input_tensor_grad, recv_prev, recv_next
):
# always have to send dytpe info to downstream
# always have to send dtype info to downstream
global _timers
if _timers is not None:
_timers("send_forward_backward_recv_forward_backward").start()
Expand All @@ -661,7 +661,7 @@ def send_forward_backward_recv_forward_backward(
return input_tensor, output_tensor_grad

def send_forward_recv_forward(self, output_tensor, recv_prev):
# always have to send dytpe info to downstream
# always have to send dtype info to downstream
global _timers
if _timers is not None:
_timers("send_forward_recv_forward").start()
Expand Down
4 changes: 2 additions & 2 deletions python/paddle/distributed/fleet/recompute/recompute_hybrid.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,14 +250,14 @@ def backward(ctx, *args):
def recompute_hybrid(ctx, function, *args, **kwargs):
"""
recompute intermediate activations to save the memory in hybrid parallel scene.
# NODTE(shenliang03)The current hybrid parallel recompute has limitations.
# NOTE(shenliang03)The current hybrid parallel recompute has limitations.
# It cannot handle the following situations:
# 1. The calculation output of recompute, there are tensors that do not require gradients.
# 2. The forward output tensor has no gradient. This problem can be solved temporarily by detach().
# 3. Here, we only use float dtype to distinguish whether a gradient is needed in output tensor

Parameters:
ctx(dict): include 'mp_group', 'offload', and 'partition' keys. the key 'mp_group' (Group), represents the avtivations are splitted
ctx(dict): include 'mp_group', 'offload', and 'partition' keys. the key 'mp_group' (Group), represents the activations are splitted
in which group. the key 'offload' (bool, optional, default=False), represents whether to offload to cpu. the key 'partition' (bool, optional, default=False),
represents whether to split activations in the mp_group.
function(paddle.nn.Layer): layer of sequence of layers that describes part of forward pass of the model
Expand Down
12 changes: 6 additions & 6 deletions python/paddle/distributed/fleet/utils/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class LocalFS(FS):

def ls_dir(self, fs_path):
"""
List directorys and files under `fs_path` .
List directories and files under `fs_path` .

Args:
fs_path(str): The local file path.
Expand Down Expand Up @@ -377,7 +377,7 @@ def mv(self, src_path, dst_path, overwrite=False, test_exists=False):

def list_dirs(self, fs_path):
"""
Only list directorys under `fs_path` .
Only list directories under `fs_path` .

Args:
fs_path(str): The local file path.
Expand Down Expand Up @@ -516,7 +516,7 @@ def _run_cmd(self, cmd, redirect_stderr=False, retry_times=5):
@_handle_errors()
def list_dirs(self, fs_path):
"""
Only list directorys under `fs_path` .
Only list directories under `fs_path` .

Args:
fs_path(str): The HDFS file path.
Expand Down Expand Up @@ -550,7 +550,7 @@ def list_dirs(self, fs_path):
@_handle_errors()
def ls_dir(self, fs_path):
"""
List directorys and files under `fs_path` .
List directories and files under `fs_path` .

Args:
fs_path(str): The HDFS file path.
Expand Down Expand Up @@ -1226,7 +1226,7 @@ def init(self, fs_name, fs_user, fs_passwd, fs_conf):

def list_dirs(self, fs_path):
"""
Only list directorys under `fs_path` .
Only list directories under `fs_path` .

Args:
fs_path(str): The HDFS file path.
Expand Down Expand Up @@ -1254,7 +1254,7 @@ def list_dirs(self, fs_path):

def ls_dir(self, fs_path):
"""
List directorys and files under `fs_path` .
List directories and files under `fs_path` .

Args:
fs_path(str): The HDFS file path.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class HybridParallelInferenceHelper:
num_pp (int): number of pipeline parallel degree. Default ``1``.
micro_batch_size (int): number of micro batch size. Default ``1``.
beam_size (int): number of beam search size. Default ``1``.
init_comm (bool): wheter if initilize comminication group. Default ``True``.
init_comm (bool): whether if initialize communication group. Default ``True``.
role_maker (RoleMakerBase or subclass): user custom define RoleMakerBase.
If ``role_maker==None``, then use PaddleCloudRoleMaker. Default ``None``.

Expand Down
Loading