Skip to content

Commit

Permalink
fix unitest
Browse files Browse the repository at this point in the history
  • Loading branch information
kuizhiqing committed Mar 10, 2022
1 parent 0cd44b5 commit 65f24de
Show file tree
Hide file tree
Showing 13 changed files with 164 additions and 60 deletions.
14 changes: 13 additions & 1 deletion python/paddle/distributed/run/context/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@

class Context(object):
def __init__(self, enable_plugin=True):
os.environ.pop('http_proxy', None)
os.environ.pop('https_proxy', None)

self.args = self.parse_args()
self.envs = self.fetch_envs()
self.logger = self.get_logger()
Expand Down Expand Up @@ -145,7 +148,7 @@ def parse_args(self):
"--elastic_level",
type=int,
default=-1,
help="elastic level: -1 disable, 0 failed exit, peers hold, 1 interal restart"
help="elastic level: -1 disable, 0 failed exit, peers hold, 1 internal restart"
)

elastic_group.add_argument(
Expand All @@ -167,7 +170,16 @@ def _valide_env(self, key):

def fetch_envs(self):
ge = os.environ.copy()

black_env_list = ['http_proxy', 'https_proxy']
for key in black_env_list:
ge.pop(key, None)

return ge
'''
# use black list instead white list
return {k: ge[k] for k in ge if self._valide_env(k)}
'''

def get_logger(self, level=logging.INFO):
logger = logging.getLogger("PADDLERUN")
Expand Down
3 changes: 1 addition & 2 deletions python/paddle/distributed/run/context/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def __init__(self, dtype=None, count=1, memory="", labels=""):
self.memory = memory
self.labels = labels

def labels_string(self):
def __str__(self):
return ",".join(self.labels)

@classmethod
Expand Down Expand Up @@ -57,7 +57,6 @@ def parse_device(self):

@classmethod
def detect_device(self):
# enable paddle detection is very expansive
import paddle.fluid as fluid

dev = Device()
Expand Down
5 changes: 2 additions & 3 deletions python/paddle/distributed/run/context/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

class Node(object):
def __init__(self):
# enable paddle detection is very expansive
# self.device = Device.detect_device()
self.device = Device.parse_device()
self.ip = self.get_host_ip()
Expand Down Expand Up @@ -54,8 +53,8 @@ def get_free_port(self):
@classmethod
def is_server_ready(self, ip, port):
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
sock.settimeout(1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
#sock.settimeout(0.01)
#sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if hasattr(socket, 'SO_REUSEPORT'):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
result = sock.connect_ex((ip, int(port)))
Expand Down
2 changes: 1 addition & 1 deletion python/paddle/distributed/run/context/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@

class Resource(object):
def __init__(self):
self.devices = [Device()]
self.devices = []
4 changes: 2 additions & 2 deletions python/paddle/distributed/run/controllers/collective.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def build_pod(self):
global_size = sum([i['replicas'] for i in peer_list])
rank_offset = sum([i['replicas'] for i in peer_list[:rank]])
'''
The new desinged collective need nothing but a master endpoint
The new designed collective need nothing but a master endpoint
'''
collective_master = peer_list[0]['candidate']

Expand Down Expand Up @@ -114,7 +114,7 @@ def watch(self) -> bool:
'''
watch self and peer status, return true to exit
'''
while True:
while not self.ctx.status.is_done():
# self status
status = self.pod.watch(timeout=2)
self.ctx.logger.debug("Pod status {}, Ctx status {}".format(
Expand Down
28 changes: 23 additions & 5 deletions python/paddle/distributed/run/controllers/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ def run(self):
self.build_job()
self.build_pod()

assert len(self.pod.containers) > 0, "No container in the pod"
if len(self.pod.containers) < 1:
self.ctx.logger.error("No container in the pod {}".format(self.pod))
return

self.ctx.logger.info("Run {}".format(self.pod))
self.ctx.logger.debug(self.pod.containers[0])

Expand All @@ -65,8 +68,9 @@ def watch(self) -> bool:
self.ctx.logger.info("Pod {}".format(status))
elif status == self.ctx.status.FAILED:
self.ctx.logger.info("Pod {}".format(status))
self.ctx.logger.info("Container failed !!!\n{}".format(
self.ctx.logger.error("Container failed !!!\n{}".format(
self.pod.failed_container()))
self.pod.tail()
self.pod.stop()

def stop(self, sigint=None):
Expand All @@ -78,15 +82,21 @@ def finalize(self):
self.pod.join()
self.master.stop()

self.ctx.logger.info("Done with code {}".format(self.pod.exit_code))
self.ctx.logger.info("Exit code {}".format(self.pod.exit_code))
sys.exit(self.pod.exit_code)

def signal_handler(self, sigint, frame):
self.ctx.logger.info("Termiating with signal {}".format(sigint))
self.ctx.logger.info("Terminating with signal {}".format(sigint))

if hasattr(self, 'sigint'):
time.sleep(5)
sys.exit(sigint)

self.sigint = sigint
self.ctx.status.done()
self.stop(sigint)
time.sleep(1)
self.ctx.logger.debug("Exit with signal {}".format(sigint))
sys.exit(sigint)


Expand All @@ -96,9 +106,17 @@ class Controller(ControllerBase):
'''

def build_job(self):
'''
build job fill the job info.
'''
self.ctx.logger.info(self.job)

def build_pod(self):
def build_pod(self) -> bool:
'''
build pod includes creating containers etc.
Return True if succeed
'''
raise NotImplementedError

def _get_entrypoint(self):
Expand Down
57 changes: 35 additions & 22 deletions python/paddle/distributed/run/controllers/master.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@
import six
import threading
import copy
import random

ETCD_PROTOCAL = 'etcd://'


class Master(object):
'''
Master is a distributed store desgin to exchange info among nodes
Master is a distributed store design to exchange info among nodes
'''

MAIN = "main"
STANBY = "stanby"
STANDBY = "standby"
PATICIPANT = "participant"

def __init__(self, ctx):
Expand All @@ -39,8 +40,6 @@ def __init__(self, ctx):
self.initialized = False
self.endpoint = None

self.gc = []

def stop(self):
raise NotImplementedError

Expand All @@ -65,17 +64,25 @@ def lazy_init(self):
if self.ctx.args.master:
self.endpoint = self.ctx.args.master
ip, port = self.endpoint.split(':')
if ip in ['127.0.0.1', self.ctx.node.ip
] and not self.ctx.node.is_server_ready(ip, int(port)):
self.server = KVServer(int(port))
self.role = Master.MAIN
if ip in ['127.0.0.1', self.ctx.node.ip]:
time.sleep(2 * random.random())
while not self.ctx.node.is_server_ready(ip, int(port)):
try:
self.server = KVServer(int(port))
self.role = Master.MAIN
break
except Exception as e:
self.ctx.logger.warning("start master failed {}".format(
e))
time.sleep(0.1)
continue
else:
port = self.ctx.node.get_free_port()
self.endpoint = "{}:{}".format(self.ctx.node.ip, port)
self.server = KVServer(port)
self.role = Master.MAIN

print("Copy the following commond to other nodes to run.")
print("Copy the following command to other nodes to run.")
cmd = [
sys.executable.split('/')[-1], "-m", "paddle.distributed.run"
]
Expand All @@ -89,6 +96,8 @@ def lazy_init(self):
self.ctx.logger.warning(
"--rank set in the command may not compatible in auto mode")

if '127.0.0.1' in self.endpoint:
self.endpoint = self.endpoint.replace('127.0.0.1', self.ctx.node.ip)
self.client = KVClient(self.endpoint)

self.initialized = True
Expand All @@ -114,18 +123,26 @@ def sync_peers(self, prefix, key, value, size, rank=-1) -> (list, int):

self.lazy_init()

assert self.client.wait_server_ready(timeout=600), 'server is not ready'
while not self.ctx.status.is_done():
if self.client.wait_server_ready(timeout=5):
break
else:
self.ctx.logger.warning("master not ready")
time.sleep(0.1)

# 'aaaaaa' make suer main pod (master server) as rank 0
ky = 'aaaaaa' if rank < 0 and self.role == Master.MAIN else key
k = "{}/{}/{}".format(prefix, ky, rank)
assert self.client.put(k, value), 'put value failed'
self.gc.append(k)

while True:
while not self.ctx.status.is_done():
if not self.client.put(k, value):
self.ctx.logger.warning("put value failed")
time.sleep(0.1)
continue

rjson = self.client.get_prefix(prefix)
self.ctx.logger.debug("sync peers {}".format(rjson))
if len(rjson) == size:
if rjson and len(rjson) == size:
if rank < 0:
keys = list(rjson.keys())
keys.sort()
Expand All @@ -141,10 +158,6 @@ def sync_peers(self, prefix, key, value, size, rank=-1) -> (list, int):
time.sleep(0.5)
return [], 0

def clean(self):
for i in self.gc:
self.client.delete(i)


class ETCDMaster(Master):
def __init__(self, ctx):
Expand All @@ -161,7 +174,7 @@ def __init__(self, ctx):

def sync_peers(self, prefix, key, value, size, rank=-1) -> (list, int):
'''
sync_peers gather all value for key under scop prefix
sync_peers gather all value for key under scope prefix
result always be sorted either by rank or alphabet of pod.name
'''
path = "{}/{}/{}".format(prefix, key, rank)
Expand All @@ -170,7 +183,7 @@ def sync_peers(self, prefix, key, value, size, rank=-1) -> (list, int):

self.ctx.logger.debug("sync path {} value {}".format(path, value))

while True:
while not self.ctx.status.is_done():
self.client.put(path, six.b(value))

result = [i for i in self.client.get_prefix(prefix)]
Expand Down Expand Up @@ -245,8 +258,8 @@ def fetch_peer_alive(self):
return peer_alive

def wait_peer_ready(self, replicas_min, replicas_max, timeout):
st = time.time()
while st + timeout > time.time():
end = time.time() + timeout
while not self.ctx.status.is_done() and time.time() < end:
if len(self.fetch_peer_alive()) == replicas_max:
return (True, replicas_max)
else:
Expand Down
27 changes: 21 additions & 6 deletions python/paddle/distributed/run/job/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def _get_fd(self, pth):
return None

def start(self, timeout=-1):
st = time.time()
end = time.time() + timeout

if self._proc and self._proc.alive():
return True
Expand All @@ -106,7 +106,7 @@ def start(self, timeout=-1):
self._entrypoint, env=self._env, out=self._stdout, err=self._stderr)
self._proc.start()

while timeout > 0 and time.time() - st < timeout:
while timeout > 0 and time.time() < end:
if self._proc.alive():
time.sleep(0.1)
continue
Expand Down Expand Up @@ -147,18 +147,33 @@ def __str__(self):
self.errfile,
self._env, )

def logs(self, fn=None, offset=-1):
def logs(self, fn=None, offset=0, whence=1, lines=1000):
if not self._log_handler:
self._log_handler = open(self._out)

if offset >= 0:
self._log_handler.seek(offset, 0)

if fn is None:
fn = sys.stdout

self._log_handler.seek(offset, whence)

try:
idx = 0
for line in self._log_handler:
fn.write(line)
idx += 1
if idx > lines:
break
finally:
return self._log_handler.tell()

def tail(self, length=3000):
if not self._log_handler:
self._log_handler = open(self._out)

self._log_handler.seek(0, 2)
ed = self._log_handler.tell()

if ed > length:
self.logs(offset=ed - length, whence=0)
else:
self.logs(offset=0, whence=0)
23 changes: 19 additions & 4 deletions python/paddle/distributed/run/job/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,23 @@ def is_completed(self):
return False
return True

def logs(self, idx=0):
self._containers[idx].logs()
def logs(self, idx=None):
if idx is None:
if self.failed_container():
self.failed_container().logs()
else:
self._containers[0].logs()
else:
self._containers[idx].logs()

def tail(self, idx=None):
if idx is None:
if self.failed_container():
self.failed_container().tail()
else:
self._containers[0].tail()
else:
self._containers[idx].tail()

def watch(self,
all_list=[Status.COMPLETED],
Expand All @@ -157,8 +172,8 @@ def watch(self,
watch return if any container status in any_list
or all container status in all_list
'''
st = time.time()
while timeout < 0 or st + timeout > time.time():
end = time.time() + timeout
while timeout < 0 or time.time() < end:
for c in self._containers:
if c.status() in any_list:
return c.status()
Expand Down
Loading

0 comments on commit 65f24de

Please sign in to comment.