Skip to content

Commit

Permalink
New design for launch/run (#40086)
Browse files Browse the repository at this point in the history
  • Loading branch information
kuizhiqing committed Mar 15, 2022
1 parent 464f65b commit 67c6ddf
Show file tree
Hide file tree
Showing 25 changed files with 2,546 additions and 0 deletions.
86 changes: 86 additions & 0 deletions python/paddle/distributed/run/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .job.container import Container
from .job.pod import Pod
from .job.job import Job
from . import plugins

#__all__ = [Container, Pod, Job]
'''
Paddle distribution training entry ``python -m paddle.distributed.run``.
Help
# for arg usage and explanation, try the following command
# python -m paddle.distributed.run -h
Collective Mode
Case 1: 1 node
use all visible devices
# python -m paddle.distributed.run train.py
use specified devices
# python -m paddle.distributed.run --devices=0,1,2,3 train.py
Case 2: multi-node, auto detect ip/port
# python -m paddle.distributed.run --np 2 train.py
# auto print following command
# python -m paddle.distributed.run --master 10.0.0.1:13538 --np 2 demo.py
# then copy and paste above command to other nodes
Case 3: multi-node, specified master/rendezvous server
# python -m paddle.distributed.run --np 2 --master 10.0.0.1:2379 train.py
# the master ip must be one of the node and the port must available
Parameter Server Mode
Case 1.1: 1 node, 1 ps, 1 trainer
# python -m paddle.distributed.run --mode ps train.py
# python -m paddle.distributed.run --server_num=1 --trainer_num=1 train.py
Case 1.2: 1 node, 2 ps, 2 trainer
# python -m paddle.distributed.run --server_num=2 --trainer_num=2 train.py
Case 2: 2 node, 2 ps, 2 trainer per node
# python -m paddle.distributed.run --server_num=2 --trainer_num=2 --np 2 train.py
# auto print following command
# python -m paddle.distributed.run --master 10.0.0.1:13538 --server_num=2 --trainer_num=2 --np 2 train.py
# then copy and paste above command to other nodes
Case 3: multi-node, specified master/rendezvous server
# python -m paddle.distributed.run --master 10.0.0.1:13538 --server_num=2 --trainer_num=2 --np 2 train.py
# the master ip must be one of the node and the port must available
Case 4: specified servers and trainers in each node
python -m paddle.distributed.run --servers 127.0.0.1:8900,127.0.0.1:8901 --trainers 127.0.0.1:8902,127.0.0.1:8903 train.py
Elastic Mode
# run following command in 3 node to run immediately, or in 2 node to run after elastic_timeout
# python -m paddle.distributed.run --master etcd://10.0.0.1:2379 --np 2:3 train.py
# once the peer number changes between 2:3, the strategy holds
'''
28 changes: 28 additions & 0 deletions python/paddle/distributed/run/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .context import Context
from . import controllers

# initialize the context to run
ctx = Context()

# initialize the selected controller
c = controllers.init(ctx)

# run the pods
c.run()

# manager or just wait pod
c.finalize()
219 changes: 219 additions & 0 deletions python/paddle/distributed/run/context/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from argparse import ArgumentParser, REMAINDER
import os, copy

from paddle.distributed.run import plugins

from .node import Node
from .status import Status

import logging


class Context(object):
def __init__(self, enable_plugin=True):
os.environ.pop('http_proxy', None)
os.environ.pop('https_proxy', None)

self.args = self.parse_args()
self.envs = self.fetch_envs()
self.logger = self.get_logger()

self.node = Node()
self.status = Status()

self.set_env_in_args()

# design for event queue, later
self.events = []

if enable_plugin:
self._enable_plugin()

def get_envs(self):
return self.envs.copy()

def _enable_plugin(self):
for pl in plugins.enabled_plugins:
pl(self)

def parse_args(self):
parser = ArgumentParser()

base_group = parser.add_argument_group("Base Parameters")

base_group.add_argument(
"--master",
type=str,
default=None,
help="the master/rendezvous server, ip:port")

base_group.add_argument(
"--rank", type=int, default=-1, help="the peer rank")

base_group.add_argument(
"--log", type=str, default="INFO", help="log level. Default INFO")

base_group.add_argument(
"--np",
type=str,
default="1",
help="the number of peers, i.e. pod/node number")

base_group.add_argument(
"--nproc_per_node",
type=int,
default=None,
help="the number of processes in a pod")

base_group.add_argument(
"--log_dir",
type=str,
default="log",
help="the path for each process's log. Default ./log")
base_group.add_argument(
"--mode",
type=str,
default="collective",
help="run mode of the job, collective/ps/ps-heter")

base_group.add_argument(
"--id",
type=str,
default="default",
help="unique id of the job. Default default")

base_group.add_argument(
"--devices",
type=str,
default=None,
help="accelerate devices. as --gpus,npus,xps")

base_group.add_argument(
"--host", type=str, default=None, help="host ip")

base_group.add_argument(
"training_script",
type=str,
help="the full path of py script,"
"followed by arguments for the "
"training script")

base_group.add_argument('training_script_args', nargs=REMAINDER)

ps_group = parser.add_argument_group("Parameter-Server Parameters")
# for parameter server
ps_group.add_argument(
"--servers",
type=str,
default='',
help="servers endpoints full list")
ps_group.add_argument(
"--trainers",
type=str,
default='',
help="trainers endpoints full list")

ps_group.add_argument(
"--trainer_num", type=int, default=None, help="number of trainers")
ps_group.add_argument(
"--server_num", type=int, default=None, help="number of servers")
ps_group.add_argument(
"--gloo_port", type=int, default=6767, help="gloo http port")
ps_group.add_argument(
"--with_gloo", type=str, default="0", help="use gloo or not")

# parameter elastic mode
elastic_group = parser.add_argument_group("Elastic Parameters")
elastic_group.add_argument(
"--max_restart",
type=int,
default=3,
help="the times can restart. Default 3")

elastic_group.add_argument(
"--elastic_level",
type=int,
default=-1,
help="elastic level: -1 disable, 0 failed exit, peers hold, 1 internal restart"
)

elastic_group.add_argument(
"--elastic_timeout",
type=int,
default=30,
help="seconds to wait before elastic perform training")
return parser.parse_args()

def _valide_env(self, key):
if key in ['POD_IP']:
return True
if key.endswith('_VISIBLE_DEVICES'):
return True
if key.startswith('PADDLE_'):
return True

return False

def fetch_envs(self):
ge = os.environ.copy()

black_env_list = ['http_proxy', 'https_proxy']
for key in black_env_list:
ge.pop(key, None)

return ge
'''
# use black list instead white list
return {k: ge[k] for k in ge if self._valide_env(k)}
'''

def get_logger(self, level=logging.INFO):
logger = logging.getLogger("PADDLERUN")
logger.setLevel(self.args.log.upper() or level)
formatter = logging.Formatter(
fmt='%(name)s %(levelname)s %(asctime)s %(message)s')
ch = logging.StreamHandler()
ch.setFormatter(formatter)
logger.addHandler(ch)
return logger

def set_env_in_args(self):
env_args = {
'POD_IP': 'host',
'PADDLE_MASTER': 'master',
'PADDLE_DEVICES': 'devices',
'PADDLE_NP': 'np',
'PADDLE_MODE': 'mode',
'PADDLE_LOG': 'log',
'PADDLE_NPROC_PER_NODE': 'nproc_per_node',
'PADDLE_JOB_ID': 'id',
'PADDLE_RANK': 'rank',
'PADDLE_LOG_DIR': 'log_dir',
'PADDLE_MAX_RESTlRT': 'max_restart',
'PADDLE_ELASTIC_LEVEL': 'elastic_level',
'PADDLE_ELASTIC_TIMEOUT': 'elastic_timeout',
'PADDLE_SERVER_NUM': 'server_num',
'PADDLE_TRAINER_NUM': 'trainer_num',
'PADDLE_SERVERS_ENDPOINTS': 'servers',
'PADDLE_TRAINERS_ENDPOINTS': 'trainers',
'PADDLE_GLOO_PORT': 'gloo_port',
'PADDLE_WITH_GLOO': 'with_gloo',
}

for k, v in env_args.items():
if k in self.envs:
setattr(self.args, v, self.envs[k])
Loading

0 comments on commit 67c6ddf

Please sign in to comment.