Skip to content

Commit

Permalink
[Unified Checkpoint] Support sharding_comm_overlap (#9392)
Browse files Browse the repository at this point in the history
* update
  • Loading branch information
DesmonDay authored Nov 14, 2024
1 parent 018b530 commit 921fc44
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 28 deletions.
18 changes: 17 additions & 1 deletion paddlenlp/trainer/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@
set_seed,
should_skip_data,
speed_metrics,
split_parallel_config,
)
from .training_args import TrainingArguments
from .unified_checkpoint import UnifiedCheckpointHandler
Expand Down Expand Up @@ -2053,6 +2054,14 @@ def get_expected_keys(inputs, keys):
self.optimizer = mix_precision_utils.MixPrecisionOptimizer(self.optimizer)
self.optimizer = fleet.distributed_optimizer(self.optimizer)

if (
hasattr(self.args, "enable_sharding_comm_overlap")
and self.args.enable_sharding_comm_overlap
and self.args.unified_checkpoint
and "split_param" in split_parallel_config(self.args.sharding_parallel_config)
):
model.register_sharding_comm_overlap_hook(self.optimizer)

# No pipeline mode, sharding only
if not in_pipeline_parallel_mode and in_sharding_parallel_mode:
# Sharded DDP!
Expand Down Expand Up @@ -2840,8 +2849,15 @@ def _load_optimizer_and_scheduler(self, checkpoint):
else:
opt_state_dict = None
else:
model = self.model
if (
hasattr(self.args, "enable_sharding_comm_overlap")
and self.args.enable_sharding_comm_overlap
and "split_param" in split_parallel_config(self.args.sharding_parallel_config)
):
model = self.model_wrapped
opt_state_dict = self.unified_checkpoint_handler.load_unified_optimizer(
model=self.model,
model=model,
optimizer=self.optimizer,
resume_from_checkpoint=checkpoint,
)
Expand Down
8 changes: 8 additions & 0 deletions paddlenlp/trainer/trainer_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1126,3 +1126,11 @@ def should_skip_data(global_step, skip_data_intervals):
skip_flag = True
break
return skip_flag


def split_parallel_config(parallel_config):
if "," in parallel_config:
parallel_config = set(parallel_config.split(","))
else:
parallel_config = set(parallel_config.split(" "))
return parallel_config
25 changes: 5 additions & 20 deletions paddlenlp/trainer/training_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
OptimizerNames,
SchedulerType,
ShardingOption,
split_parallel_config,
)

try:
Expand Down Expand Up @@ -1096,13 +1097,6 @@ def __post_init__(self):
logger.warning("set amp_master_grad to false since amp is disabled.")
self.amp_master_grad = False

def split_parallel_config(parallel_config):
if "," in parallel_config:
parallel_config = set(parallel_config.split(","))
else:
parallel_config = set(parallel_config.split(" "))
return parallel_config

# use_hybrid_parallel
if self.use_hybrid_parallel:

Expand Down Expand Up @@ -1155,29 +1149,20 @@ def split_parallel_config(parallel_config):
or "enable_dp_comm_overlap" in pipeline_parallel_config
)
enable_dp_comm_overlap = using_comm_overlap and self.data_parallel_degree > 1
enable_sharding_comm_overlap = using_comm_overlap and self.sharding_parallel_degree > 1
self.enable_sharding_comm_overlap = using_comm_overlap and self.sharding_parallel_degree > 1
assert not (
enable_dp_comm_overlap and enable_sharding_comm_overlap
enable_dp_comm_overlap and self.enable_sharding_comm_overlap
), "dp_comm_overlap and sharding_comm_overlap cannot be enabled at the same time"

if enable_sharding_comm_overlap and not self.amp_master_grad:
if self.enable_sharding_comm_overlap and not self.amp_master_grad:
raise ValueError(
"If `enable_sharding_comm_overlap` in pipeline_parallel_configs, `amp_master_grad` must be True."
)
if (
enable_sharding_comm_overlap
and self.unified_checkpoint
and "split_param" in split_parallel_config(self.sharding_parallel_config)
):
logger.warning(
"Currently unified checkpoint do not support using `sharding_comm_overlap` and `split_param` at the same time, delete `sharding_comm_overlap`."
)
enable_sharding_comm_overlap = False

dygraph_pp_configs = {
"delay_scale_loss": True if "enable_delay_scale_loss" in pipeline_parallel_config else False,
"dp_comm_overlap": enable_dp_comm_overlap,
"sharding_comm_overlap": enable_sharding_comm_overlap,
"sharding_comm_overlap": self.enable_sharding_comm_overlap,
"enable_timer": "enable_timer" in pipeline_parallel_config,
"release_gradients": "enable_release_grads" in pipeline_parallel_config or self.release_grads,
"overlap_p2p_comm": "enable_overlap_p2p_comm" in pipeline_parallel_config,
Expand Down
3 changes: 2 additions & 1 deletion paddlenlp/trainer/unified_checkpoint/check_completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ def check_unified_optimizer(args, model, optimizer, resume_from_checkpoint, safe
sharding_group = hcg.get_sharding_parallel_group()
sharding_rank = sharding_group.rank
dp_rank = dp_group.rank if dp_group.nranks > 1 else 0
struct2static_name_mappings = {k: v.name for k, v in model.state_dict().items()}
model_state_dict = get_expected_state_dict(model)
struct2static_name_mappings = {k: v.name for k, v in model_state_dict.items()}

if is_sharding_split_param_mode(args):
# We do not check optimizer files completion for split_param, since it is very complicated. Directly support local resume.
Expand Down
2 changes: 1 addition & 1 deletion paddlenlp/trainer/unified_checkpoint/load_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def _remove_unused_keys(
def load_unified_optimizer_locally(args, model, optimizer, resume_from_checkpoint, safe_serialization=False):
# Special process with split param.
if is_sharding_split_param_mode(args):
returned_optim_state_dict = load_unified_optimizer_split_param(model, optimizer, resume_from_checkpoint)
returned_optim_state_dict = load_unified_optimizer_split_param(args, model, optimizer, resume_from_checkpoint)
return returned_optim_state_dict

# init and get optimizer LR_Scheduler
Expand Down
14 changes: 11 additions & 3 deletions paddlenlp/trainer/unified_checkpoint/sharding_split_param_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@

import gc
import os
from itertools import chain

import paddle
import paddle.distributed as dist
from paddle.distributed import fleet
from tqdm.auto import tqdm

from paddlenlp.peft import LoRAModel, PrefixModelForCausalLM
from paddlenlp.transformers.model_utils import load_state_dict
from paddlenlp.transformers.model_utils import load_state_dict, unwrap_model
from paddlenlp.utils.env import (
SAFE_MASTER_WEIGHTS_INDEX_NAME,
SAFE_OPTIMIZER_INDEX_NAME,
Expand Down Expand Up @@ -97,6 +98,7 @@ def gather_splited_param_for_optimizer(optimizer):
global_rank = dist.get_rank()
param_slice_info = {}
param_shape_info = {}

for buffer in optimizer._inner_opt._comm_buffer_list:
for key in buffer._sharding_param_grad_view.keys():
param_slice_info[key] = (
Expand Down Expand Up @@ -153,7 +155,7 @@ def gather_splited_param_for_optimizer(optimizer):
return optim_state_dict, master_weights


def load_unified_optimizer_split_param(model, optimizer, resume_from_checkpoint):
def load_unified_optimizer_split_param(args, model, optimizer, resume_from_checkpoint):
returned_optim_state_dict = nested_copy(optimizer.state_dict())

index_filename, index_filename_master_weights = SAFE_OPTIMIZER_INDEX_NAME, SAFE_MASTER_WEIGHTS_INDEX_NAME
Expand All @@ -177,7 +179,13 @@ def load_unified_optimizer_split_param(model, optimizer, resume_from_checkpoint)
expected_keys = []
param_slice_info = {}
param_shape_info = {}
for buffer in optimizer._inner_opt._comm_buffer_list:

comm_buffer_list = optimizer._inner_opt._comm_buffer_list
if hasattr(args, "enable_sharding_comm_overlap") and args.enable_sharding_comm_overlap:
comm_buffer_list = list(chain(*model._chunk_2_comm_buffers.values()))
model = unwrap_model(model)

for buffer in comm_buffer_list:
for key in buffer._sharding_param_grad_view.keys():
begin = buffer._sharding_param_grad_view[key]._param_begin
end = buffer._sharding_param_grad_view[key]._param_end
Expand Down
10 changes: 8 additions & 2 deletions paddlenlp/trainer/unified_checkpoint/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@
from paddlenlp.peft import LoRAModel, PrefixModelForCausalLM
from paddlenlp.trainer.trainer_utils import ExplicitEnum, ShardingOption
from paddlenlp.trainer.utils.helper import distributed_isfile
from paddlenlp.transformers.model_utils import PretrainedModel, get_parameter_dtype
from paddlenlp.transformers.model_utils import (
PretrainedModel,
get_parameter_dtype,
unwrap_model,
)
from paddlenlp.transformers.utils import dtype_byte_size
from paddlenlp.utils.distributed import distributed_allgather, distributed_gather
from paddlenlp.utils.env import (
Expand Down Expand Up @@ -193,6 +197,8 @@ def get_expected_state_dict(model_to_save):
"""
Get trainable state_dict of model_to_save.
"""
model_to_save = unwrap_model(model_to_save)

if isinstance(model_to_save, PretrainedModel):
state_dict = model_to_save.state_dict()
if (
Expand Down Expand Up @@ -221,7 +227,7 @@ def get_expected_keys(args, sharded_metadata, model, optimizer, is_master_weight
params2rank = optimizer._param2rank

model_state_dict = get_expected_state_dict(model)
struct2static_name_mappings = {k: v.name for k, v in get_expected_state_dict(model).items()}
struct2static_name_mappings = {k: v.name for k, v in model_state_dict.items()}

expected_keys = []
for key in list(sharded_metadata["all_optimizer_keys"]):
Expand Down

0 comments on commit 921fc44

Please sign in to comment.