Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix some typos (dst_strategys, etc.) #62003

Merged
merged 1 commit into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -352,13 +352,13 @@ def convert_src_tensor_2_dst_tensor(vars_list, src_attr_map, dst_attr_map):
if src_attr_map is None or len(src_attr_map) == 0:
return vars_list[0]

dst_strategys = {}
src_strategys = {}
dst_strategies = {}
src_strategies = {}
tensors_dict = {}

convert_tensor_dict = None
for var_name in src_attr_map.keys():
assert var_name not in dst_strategys
assert var_name not in dst_strategies
dist_vars = []
for vars in vars_list:
if var_name in vars.keys():
Expand All @@ -367,13 +367,13 @@ def convert_src_tensor_2_dst_tensor(vars_list, src_attr_map, dst_attr_map):
continue

if var_name in dst_attr_map and var_name in src_attr_map:
dst_strategys[var_name] = copy.deepcopy(dst_attr_map[var_name])
src_strategys[var_name] = copy.deepcopy(src_attr_map[var_name])
dst_strategies[var_name] = copy.deepcopy(dst_attr_map[var_name])
src_strategies[var_name] = copy.deepcopy(src_attr_map[var_name])
tensors_dict[var_name] = dist_vars

if src_attr_map == dst_attr_map:
return tensors_dict
converter = Converter(tensors_dict, src_strategys, dst_strategys)
converter = Converter(tensors_dict, src_strategies, dst_strategies)
convert_tensor_dict = converter.convert()

return convert_tensor_dict
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ def _update_op_dims_mapping_and_distoperatorimpl(

updated = dist_op_container.update_dims_mapping(dist_op)
changed = updated or changed
# TODO(ljz) remove the below code once we introduce general reshard to replace specifc distopimpls
# TODO(ljz) remove the below code once we introduce general reshard to replace specific distopimpls
reverted = dist_op_container.mapping_to_dist_operator_impl(
dist_op, original_op_dist_attr
)
Expand Down
6 changes: 3 additions & 3 deletions python/paddle/distributed/auto_parallel/static/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ def convert(self, strict=True):
>>> import numpy as np
>>> from paddle.distributed.auto_parallel.static.converter import Converter
>>> complete_tensors = np.arange(4).reshape([2, 2])
>>> partitial_tensors = np.split(complete_tensors, 2, axis=0)
>>> partial_tensors = np.split(complete_tensors, 2, axis=0)
>>> name = "tmp_0"
>>> tensors_dict = {name: partitial_tensors}
>>> tensors_dict = {name: partial_tensors}
>>> strategy_1 = {
... name: {
... "process_shape": [2],
Expand Down Expand Up @@ -345,7 +345,7 @@ def slice_with_dist_attr(tensor, dist_attr):
@staticmethod
def merge(partition_tensor_list, tensor, partition_index, complete_shape):
"""
Merge partitial tensors to a complete.
Merge partial tensors to a complete.

Returns:
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,7 @@ def comm_count(self):
shape = None
if self.op is not None:
vars = self.op.block.vars
# NOTE: The tensor communicated input_name is "X" in default. Otherwise, this function should be overrided
# NOTE: The tensor communicated input_name is "X" in default. Otherwise, this function should be overridden
try:
var_name = self.op.input("X")[0]
except:
Expand Down
10 changes: 5 additions & 5 deletions python/paddle/distributed/auto_parallel/static/cost_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,18 +98,18 @@ def init_comm_cost(self, cluster=None):
# should get from `cluster`
BANDWIDTH = 32 * 1024 / 1000 # MB/ms, V100 PCIe
num_ranks = len(self.ranks)
comm_volumn = np.prod(self.input_shape) * 4
comm_volume = np.prod(self.input_shape) * 4

if 'allreduce' in self.comm_type:
self._cost = comm_volumn / (
self._cost = comm_volume / (
BANDWIDTH * num_ranks / (2 * (num_ranks - 1))
)
elif 'gather' in self.comm_type:
self._cost = comm_volumn / (BANDWIDTH * num_ranks / (num_ranks - 1))
self._cost = comm_volume / (BANDWIDTH * num_ranks / (num_ranks - 1))
elif 'broadcast' in self.comm_type:
self._cost = comm_volumn / BANDWIDTH
self._cost = comm_volume / BANDWIDTH
elif 'send' in self.comm_type or 'recv' in self.comm_type:
self._cost = comm_volumn / BANDWIDTH
self._cost = comm_volume / BANDWIDTH
else:
self._cost = 0

Expand Down
4 changes: 2 additions & 2 deletions python/paddle/distributed/auto_parallel/static/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,8 @@ class ProgramHelper:

def __init__(self, layer, loss_func, metrics, inputs_spec, labels_spec):
# original model config information
# TODO(Aurelius84): Implenet append_backward and optimizer in ProxyLayer
# after distribute engine satisify basic condition.
# TODO(Aurelius84): Implement append_backward and optimizer in ProxyLayer
# after distribute engine satisfy basic condition.
self.proxy_layer = ProxyLayer(layer, loss_func, metrics)
self.inputs_spec = inputs_spec
self.labels_spec = labels_spec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ def _generate_backward(
# NOTE(zhaoyinglia):
# Guarantee the order of params_grads is same between dynamic mode and static mode
# by making parameter_list equal to model.parameters(),
# because the order affact the result of ClipGradByGLobalNorm.
# because the order affect the result of ClipGradByGLobalNorm.
# If parameter_list is not None, the order of params_grads is same with parameter_list.
# If parameter_list is None, params_grads will be as prog.global_block().all_parameters().
with program_guard(main_program, startup_program):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1498,7 +1498,7 @@ def sharding_configs(self):
This configuration will affect the communication speed in sharding training, and should be an empirical value decided by your model size and network topology.
Only enable when sharding_segment_strategy = segment_broadcast_MB. Default is 32.0 .

segment_anchors(list): list of anchors used to segment the program, which allows a finner control of program segmentation.
segment_anchors(list): list of anchors used to segment the program, which allows a finer control of program segmentation.
this strategy is experimental by now. Only enable when sharding_segment_strategy = segment_anchors.

sharding_degree(int, optional): specific the number of gpus within each sharding parallelism group; and sharding will be turn off if sharding_degree=1. Default is 8.
Expand Down
4 changes: 2 additions & 2 deletions python/paddle/distributed/fleet/fleet.py
Original file line number Diff line number Diff line change
Expand Up @@ -1194,7 +1194,7 @@ def save_persistables(self, executor, dirname, main_program=None, mode=0):

dirname(str, optional): The saving directory path.
When you need to save the parameter to the memory, set it to None.
main_program(Program, optional): The program whose persistbale tensors will
main_program(Program, optional): The program whose persistable tensors will
be saved. Default: None.


Expand Down Expand Up @@ -1419,7 +1419,7 @@ def amp_init(
... init_loss_scaling=128.0,
... use_dynamic_loss_scaling=True,
... use_pure_fp16=True)
... # If you don't use the default_startup_program(), you sholud pass
... # If you don't use the default_startup_program(), you should pass
... # your defined `startup_program` into `minimize`.
... optimizer.minimize(loss)
... exe.run(paddle.static.default_startup_program())
Expand Down
2 changes: 1 addition & 1 deletion python/paddle/distributed/fleet/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def _parse_args():
"--backend",
type=str,
default=os.environ.get('PADDLE_DISTRI_BACKEND', 'auto'),
help="Specifize the backend, can be gloo|nccl|bkcl|auto|heter. "
help="Specify the backend, can be gloo|nccl|bkcl|auto|heter. "
"Default value is auto which prefers nccl or bkcl.",
)
base_group.add_argument(
Expand Down
14 changes: 7 additions & 7 deletions python/paddle/distributed/fleet/launch_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,12 +339,12 @@ def terminate_local_procs(procs):
p.log_fn.close()
logger.debug(f"terminate process id:{p.proc.pid}")

# wait all process terminiated
# wait all process terminated
time.sleep(3)
for step in range(0, 50):
alive = False
for p in procs:
if p.proc.poll() is None: # not termniate
if p.proc.poll() is None: # not terminate
os.kill(p.proc.pid, signal.SIGKILL)
alive = True

Expand Down Expand Up @@ -414,7 +414,7 @@ def __free_port():
step += 1
if step > 400:
print(
"can't find avilable port and use the specified static port now!"
"can't find available port and use the specified static port now!"
)
return None

Expand Down Expand Up @@ -705,7 +705,7 @@ def get_gpus(gpus):
for x in gpus.split(',')
]
logger.info(
f"Change selected_gpus into reletive values. --ips:{gpus} "
f"Change selected_gpus into relative values. --ips:{gpus} "
f"will change into relative_ips:{res_gpus} according to your "
f"CUDA_VISIBLE_DEVICES:{cuda_visible_devices_list}"
)
Expand Down Expand Up @@ -736,7 +736,7 @@ def get_xpus(xpus):
for x in xpus.split(',')
]
logger.info(
f"Change selected_xpus into reletive values. --ips:{xpus} "
f"Change selected_xpus into relative values. --ips:{xpus} "
f"will change into relative_ips:{res_xpus} according to your "
f"XPU_VISIBLE_DEVICES:{xpu_visible_devices_list}"
)
Expand Down Expand Up @@ -859,9 +859,9 @@ def get_custom_endpoints(origin_endpoints, offset=0):
# assert paddle_pserver_endpoints != None
#
# # hard code for paddlecloud custom-framework
# avilable_ports = os.getenv("TRAINER_PORTS", "").split(",")
# available_ports = os.getenv("TRAINER_PORTS", "").split(",")
# assert len(
# avilable_ports
# available_ports
# ) >= 2, "set paddle_ports_num >= 2 in config.ini for paddlecloud job submit"
#
# # hard code for paddlecloud custom-framework
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def _unscale(self, optimizer):
if not self._use_dp_mode:
self._found_inf = paddle.cast(self._found_inf, dtype="int32")
# TODO(shenliang03) Since the minimize call in the optimizer is
# after the gradscaler, check_finite needs to synchronize global
# after the grad scaler, check_finite needs to synchronize global
# information. In the future, we should use check_group
paddle.distributed.all_reduce(
self._found_inf, op=paddle.distributed.ReduceOp.MAX, group=None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def check_allreduce_sum(block, shard, sharding_ring_id, dp_ring_id=-1):
- 1: sync_calc
- 2: reduce_sum_sharding (allreduce --> reduce)
- 3: sync_comm
- 4: allreuce_sum_dp (dp_grads)
- 4: allreduce_sum_dp (dp_grads)
- 5: sync_comm (dp_grads)
- 6: op that use Var (dp_grads & sum)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def prune_weight_decay(self, block, shard):
continue
if OP_ROLE_VAR_KEY not in op.attr_names:
raise ValueError(
"The Weight Dacay op should hold op_role_var attribute"
"The Weight Decay op should hold op_role_var attribute"
f"but the {op.type} op does not hold op_role_var"
)
op_role_var = op.all_attrs()[OP_ROLE_VAR_KEY]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ def _get_hybrid_dp_mode(self):
# pipeline: communication across nodes, and therefore should insert in update segment,
# conduct just once per global step.
dp_mode = None
# dp here is the pure dp as the outest parallelism
# dp here is the pure dp as the outermost parallelism
if self.hybrid_dp:
if self.pp_degree > 1:
dp_mode = "pp_hybrid_dp"
Expand Down Expand Up @@ -598,8 +598,8 @@ def _adapt_amp_clip_without_sharding(self):
rings = [self.mp_ring_id, self.pp_ring_id]
FP16Utils.sync_amp_check_nan_inf(main_block, rings)

gradientclip_helper = GradientClipHelper(None)
gradientclip_helper.sync_global_norm(
gradient_clip_helper = GradientClipHelper(None)
gradient_clip_helper.sync_global_norm(
main_block, [self.mp_ring_id, self.pp_ring_id], self.mp_rank
)

Expand Down Expand Up @@ -996,8 +996,8 @@ def _prune_main_program(self, block, shard, rings):
4. prune optimizer op + param + gradient

"""
weightdecay_helper = WeightDecayHelper()
weightdecay_helper.prune_weight_decay(block, shard)
weight_decay_helper = WeightDecayHelper()
weight_decay_helper.prune_weight_decay(block, shard)

# FIXME(wangxi): mp should prune duplicated param_grads
# NOTE (JZ-LIANG) the sync of FoundInfinite should among one entire Model Parallelism
Expand All @@ -1006,8 +1006,8 @@ def _prune_main_program(self, block, shard, rings):
FP16Utils.prune_fp16(block, shard, self._reduced_grads_to_param, rings)

# clipbyglobalnorm should only use the Model parallelism group (mp-sharding-pp)
gradientclip_helper = GradientClipHelper(None)
gradientclip_helper.prune_gradient_clip(block, shard, rings)
gradient_clip_helper = GradientClipHelper(None)
gradient_clip_helper.prune_gradient_clip(block, shard, rings)

# build prog deps
reduced_grads = []
Expand Down Expand Up @@ -1645,7 +1645,7 @@ def _build_groups(self):

# global group
# use for gen_nccl_comm_sync, amp check nan inf, clip by global norm
# NOTE (JZ-LIANG) when use global ring for calc global norm and dp_degree > 1, the allreduce result should be devided by dp_degree
# NOTE (JZ-LIANG) when use global ring for calc global norm and dp_degree > 1, the allreduce result should be divided by dp_degree
self.global_ring_id = 3

logger.info(f"global word size: {self.global_word_size}")
Expand Down Expand Up @@ -1727,7 +1727,7 @@ def recreate_not_persist_param_as_var(program):

def _initialization_broadcast(self):
"""
this funtion is to ensure the initialization between dp group to be
this function is to ensure the initialization between dp group to be
identical when hybrid-dp is used, and the initialization of
not distributed param between mp group to be identical.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def __init__(self, layers, hcg, strategy):
'accumulate_steps'
]
# If sent tensor are not the same from different hosts,
# they shouldn't been sent partially and then concated as a whole tensor.
# they shouldn't been sent partially and then concatenated as a whole tensor.
self._enable_partial_send_recv = self._strategy.pipeline_configs[
'enable_partial_send_recv'
]
Expand Down Expand Up @@ -640,7 +640,7 @@ def _prepare_training(self, data, optimizer, lr_scheduler):

def _wrap_data(self, data):
"""
for backward compatibilty, wrap data to Fake FakeMicroDataset if it is of type list or tuple
for backward compatibility, wrap data to Fake FakeMicroDataset if it is of type list or tuple
"""
if (not isinstance(data, tuple)) and (not isinstance(data, list)):
return data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,6 @@ def cvt_to_device(x, dev_id, blocking=True):
place = paddle.XPUPlace(dev_id)
else:
raise OSError(
"Only supported compiled paddle with gpu/rocm and xpu , but current verison is compiled with cpu."
"Only supported compiled paddle with gpu/rocm and xpu, but current version is compiled with cpu."
)
return x._copy_to(place, blocking)
16 changes: 8 additions & 8 deletions python/paddle/distributed/fleet/recompute/recompute.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def check_recompute_necessary(inputs):


@contextlib.contextmanager
def swith_rng_state_tracker(rng_state, tracker):
def switch_rng_state_tracker(rng_state, tracker):
orig_rng_state = paddle.get_rng_state()
orig_rng_tracker = get_rng_state_tracker().get_states_tracker()
paddle.set_rng_state(rng_state)
Expand Down Expand Up @@ -155,8 +155,8 @@ def forward(ctx, run_function, preserve_rng_state, *args, **kwargs):
ctx.inputs.append(arg)
ctx.save_for_backward(*tensor_inputs)

# NOTE recompute with restore RNG only support one senario where one process for one cuda gpu.
# one process with multiple gpu and mix-gpu-cpu senarios are not support
# NOTE recompute with restore RNG only support one scenario where one process for one cuda gpu.
# one process with multiple gpu and mix-gpu-cpu scenarios are not support
if ctx.preserve_rng_state:
ctx.fw_rng_state = paddle.get_rng_state()
ctx.fwd_rng_state_tracker = (
Expand Down Expand Up @@ -208,7 +208,7 @@ def backward(ctx, *args):
# NOTE support AMP
# need restore auto_cast state as well as w/b list
if ctx.preserve_rng_state:
with swith_rng_state_tracker(
with switch_rng_state_tracker(
ctx.fw_rng_state, ctx.fwd_rng_state_tracker
):
with paddle.amp.auto_cast(
Expand Down Expand Up @@ -273,7 +273,7 @@ def backward(ctx, *args):
# all tensors in the tuple doesn't need grad, only return a None for the whole tuple
grads.append(None)
else:
# all tensors in the tuple nees grad, should return a tuple of grads
# all tensors in the tuple need grad, should return a tuple of grads
grads.append(tuple(i._grad_ivar() for i in inp))

if in_dynamic_mode():
Expand Down Expand Up @@ -303,7 +303,7 @@ def _recompute_without_reentrant(
fw_cuda_rng_state = paddle.get_rng_state(cur_device)
else:
raise RuntimeError(
"Recompute with RNG perserve is not support current device: {}.".format(
"Recompute with RNG preserve is not support current device: {}.".format(
cur_device
)
)
Expand Down Expand Up @@ -358,10 +358,10 @@ def inner_pack(inner_x):
return

def inner_unpack(inner_x):
raise Exception("An unexcepted backward called on a tensor!")
raise Exception("An unexpected backward called on a tensor!")

if preserve_rng_state:
with swith_rng_state_tracker(
with switch_rng_state_tracker(
fw_cuda_rng_state, fwd_cuda_rng_state_tracker
):
with paddle.set_grad_enabled(True):
Expand Down
4 changes: 2 additions & 2 deletions python/paddle/distributed/fleet/recompute/recompute_hybrid.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from .recompute import (
check_recompute_necessary,
detach_variable,
swith_rng_state_tracker,
switch_rng_state_tracker,
)

__all__ = []
Expand Down Expand Up @@ -198,7 +198,7 @@ def backward(ctx, *args):
tracer._has_grad = True

# need restore auto_cast state as well as w/b list
with swith_rng_state_tracker(
with switch_rng_state_tracker(
ctx.fwd_rng_state, ctx.fwd_rng_state_tracker
):
if ctx.is_fw_autocast:
Expand Down
Loading