Skip to content

Commit

Permalink
Using coroutine to execute multiple steps in a single worker #1056 #1213
Browse files Browse the repository at this point in the history
  • Loading branch information
Bo Peng committed Feb 25, 2019
1 parent e414903 commit b6d57e6
Show file tree
Hide file tree
Showing 10 changed files with 702 additions and 417 deletions.
7 changes: 4 additions & 3 deletions src/sos/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,11 +544,12 @@ def sos_run(workflow=None, targets=None, shared=None, args=None, source=None, **
x: (env.sos_dict[x] if x in env.sos_dict else None) for x in shared}

wf_ids = [str(uuid.uuid4()) for wf in wfs]
env.__socket__.send_pyobj(['workflow', wf_ids, wfs, targets, args, shared, env.config])

if env.sos_dict.get('__concurrent_subworkflow__', False):
return {'pending_workflows': wf_ids}
blocking = not env.sos_dict.get('__concurrent_subworkflow__', False)
env.__socket__.send_pyobj(['workflow', wf_ids, wfs, targets, args, shared, env.config, blocking])

if not blocking:
return {'pending_workflows': wf_ids}
res = {}
for wf in wfs:
wf_res = env.__socket__.recv_pyobj()
Expand Down
168 changes: 33 additions & 135 deletions src/sos/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,107 +191,13 @@ def done(self, msg):
self.update('done', msg)
self.stop_event.set()

class WorkerManager(object):
# manager worker processes

def __init__(self, backend_socket):
self._substep_workers = []
self._n_working_workers = 0
self._n_requested = 0
self._n_ready = 0
self._worker_pending_time = None
self._worker_alive_time = time.time()

self._frontend_requests = []
self._substep_backend_socket = backend_socket

def report(self, msg):
return
env.logger.trace(f'{msg}: workers: {self._n_working_workers} pending requests: {len(self._frontend_requests)}, pending: {self._worker_pending_time is not None}, requested: {self._n_requested}, num_ready: {self._n_ready}')

def add_request(self, msg):
self._frontend_requests.insert(0, msg)
self.report('add_request')

def process_request(self):
self._n_ready += 1
if self._frontend_requests:
msg = self._frontend_requests.pop()
self._substep_backend_socket.send_pyobj(msg)
self.report('process request')
elif self._worker_pending_time is not None:
# if there is a pending worker and another has also been completed
# kill one.
self._worker_pending_time = time.time()
self._substep_backend_socket.send_pyobj(None)
self._n_working_workers -= 1
self.report('kill one worker')
else:
# one or more messages are pending
self._worker_pending_time = time.time()
self.report('worker pending')

def num_working(self):
return self._n_working_workers

def use_pending(self, msg):
self._n_requested += 1
if not self._worker_pending_time:
return False
self._substep_backend_socket.send_pyobj(msg)
self._worker_pending_time = None
self.report('resume pending')
return True

def start(self):
from .workers import SoS_SubStep_Worker
worker = SoS_SubStep_Worker(env.config)
worker.start()
self._substep_workers.append(worker)
self._n_working_workers += 1
self.report('start worker')

def check_workers(self):
'''Kill workers that have been pending for a while and check if all workers
are alive. '''
if time.time() - self._worker_alive_time > 5:
self._worker_alive_time = time.time()
self._substep_workers = [worker for worker in self._substep_workers if worker.is_alive()]
if len(self._substep_workers) < self._n_working_workers:
raise ProcessKilled('Substep worker killed')
# a pending worker can survive 5 seconds without job
if self._worker_pending_time is None or time.time() - self._worker_pending_time < 2:
return
self._substep_backend_socket.send_pyobj(None)
self._n_working_workers -= 1
self._worker_pending_time = None
self.report('kill a long standing workers')

def kill_all(self):
'''Kill all workers'''
if self._worker_pending_time:
self._substep_backend_socket.send_pyobj(None)
self._n_working_workers -= 1
self.report('kill a pending worker')
# there are a few more workers
for worker in range(self._n_working_workers):
# we should get a ready signal
if self._substep_backend_socket.poll(100):
self._substep_backend_socket.recv_pyobj()
self._substep_backend_socket.send_pyobj(None)
self._n_working_workers -= 1
self.report('kill a done worker')


class Controller(threading.Thread):
'''This controller is used by both sos and sos-notebook, and there
can be two controllers one as a slave (sos) and one as a master
(notebook). We shared the same code base because step executors need
need to talk to the same controller (signature, controller etc) when
they are executed in sos or sos notebook.
'''
LRU_READY = "READY"

def __init__(self, ready, kernel=None):
threading.Thread.__init__(self)
#self.daemon = True
Expand Down Expand Up @@ -325,17 +231,9 @@ def __init__(self, ready, kernel=None):

def handle_master_push_msg(self, msg):
try:
if isinstance(msg, dict): # substep
if self.workers.use_pending(msg):
# in this case the msg is directly consumed by a pending worker
return

if msg[0] in ('substep', 'step', 'workflow'):
# cache the request, route to first available worker
self.workers.add_request(msg)
# start a worker is necessary
if self.workers.num_working() == 0 or self.workers.num_working() + self._nprocs < env.config['max_procs']:
self.workers.start()

self.workers.add_request(msg[0], msg[1])
elif msg[0] == 'nprocs':
env.logger.trace(f'Active running process set to {msg[1]}')
self._nprocs = msg[1]
Expand Down Expand Up @@ -411,6 +309,8 @@ def handle_master_request_msg(self, msg):
break
if not found:
self.master_request_socket.send_pyobj(None)
elif msg[0] == 'worker_available':
self.master_request_socket.send_pyobj(self.workers.worker_available(msg[1]))
elif msg[0] == 'done':
# handle all ctl_push_msgs #1062
while True:
Expand Down Expand Up @@ -451,18 +351,9 @@ def handle_master_request_msg(self, msg):
self.master_request_socket.send_pyobj(None)


def handle_substep_backend_msg(self, msg):
# Use worker address for LRU routing
if not msg:
return False

# Forward message to client if it's not a READY
if msg != self.LRU_READY:
raise RuntimeError(
f'substep worker should only send ready message: {msg} received')

# now see if we have any work to do, if the process will be marked as pending
self.workers.process_request()
def handle_worker_backend_msg(self, msg):
# msg should be a port number from the worker
self.workers.process_request(msg[0], msg[1:])

def handle_tapping_logging_msg(self, msg):
if env.config['exec_mode'] == 'both':
Expand Down Expand Up @@ -511,12 +402,10 @@ def run(self):
'tcp://127.0.0.1')

# broker to handle the execution of substeps
self.substep_backend_socket = create_socket(self.context, zmq.REP, 'controller backend rep') # ROUTER
env.config['sockets']['substep_backend'] = self.substep_backend_socket.bind_to_random_port(
self.worker_backend_socket = create_socket(self.context, zmq.REP, 'controller backend rep') # ROUTER
env.config['sockets']['worker_backend'] = self.worker_backend_socket.bind_to_random_port(
'tcp://127.0.0.1')

# create a manager
self.workers = WorkerManager(self.substep_backend_socket)

# tapping
if env.config['exec_mode'] == 'master':
Expand All @@ -541,11 +430,15 @@ def run(self):
# tell others that the sockets are ready
self.ready.set()

# create a manager
from .workers import WorkerManager
self.workers = WorkerManager(env.config['max_procs'], self.worker_backend_socket)

# Process messages from receiver and controller
poller = zmq.Poller()
poller.register(self.master_push_socket, zmq.POLLIN)
poller.register(self.master_request_socket, zmq.POLLIN)
poller.register(self.substep_backend_socket, zmq.POLLIN)
poller.register(self.worker_backend_socket, zmq.POLLIN)
if env.config['exec_mode'] == 'master':
poller.register(self.tapping_logging_socket, zmq.POLLIN)
poller.register(self.tapping_listener_socket, zmq.POLLIN)
Expand All @@ -559,7 +452,15 @@ def run(self):

try:
while True:
socks = dict(poller.poll())

while True:
socks = dict(poller.poll(1000))
if socks:
break
# if the last worker has been pending for more than 5
# seconds, kill it. It is also possible that some others are killed
# by external process.
self.workers.check_workers()

if self.master_push_socket in socks:
while True:
Expand All @@ -573,11 +474,11 @@ def run(self):
if not self.handle_master_request_msg(self.master_request_socket.recv_pyobj()):
break

if self.substep_backend_socket in socks:
if self.worker_backend_socket in socks:
while True:
if self.substep_backend_socket.poll(0):
self.handle_substep_backend_msg(
self.substep_backend_socket.recv_pyobj())
if self.worker_backend_socket.poll(0):
self.handle_worker_backend_msg(
self.worker_backend_socket.recv_pyobj())
else:
break

Expand All @@ -594,19 +495,16 @@ def run(self):
self.handle_tapping_controller_msg(
self.tapping_controller_socket.recv_pyobj())

# if the last worker has been pending for more than 5
# seconds, kill it. It is also possible that some others are killed
# by external process.
self.workers.check_workers()


# if monitor_socket in socks:
# evt = recv_monitor_message(monitor_socket)
# if evt['event'] == zmq.EVENT_ACCEPTED:
# self._num_clients += 1
# elif evt['event'] == zmq.EVENT_DISCONNECTED:
# self._num_clients -= 1
except ProcessKilled:
env.logger.error('A substep worker has failed or has been killed externally. Quitting.')
except ProcessKilled as e:
env.logger.error(str(e))
os._exit(1)
except Exception as e:
sys.stderr.write(f'{env.config["exec_mode"]} get an error {e}')
Expand All @@ -621,7 +519,7 @@ def run(self):

poller.unregister(self.master_push_socket)
poller.unregister(self.master_request_socket)
poller.unregister(self.substep_backend_socket)
poller.unregister(self.worker_backend_socket)
if env.config['exec_mode'] == 'master':
poller.unregister(self.tapping_logging_socket)
poller.unregister(self.tapping_listener_socket)
Expand All @@ -630,7 +528,7 @@ def run(self):

close_socket(self.master_push_socket, now=True)
close_socket(self.master_request_socket, now=True)
close_socket(self.substep_backend_socket, now=True)
close_socket(self.worker_backend_socket, now=True)

if env.config['exec_mode'] == 'master':
close_socket(self.tapping_logging_socket, now=True)
Expand All @@ -639,4 +537,4 @@ def run(self):
if env.config['exec_mode'] in ('master', 'slave'):
close_socket(self.tapping_controller_socket, now=True)

env.logger.trace(f'controller stopped {os.getpid()}')
env.logger.trace(f'controller stopped {os.getpid()}')
11 changes: 10 additions & 1 deletion src/sos/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@ def __init__(self, *args, **kwargs):
self._all_output_files = defaultdict(list)
# index of mini
self._forward_workflow_id = 0
# if dag has been changed
self._dirty = True

def mark_dirty(self, dirty=True):
self._dirty = dirty

def dirty(self):
return self._dirty

def new_forward_workflow(self):
self._forward_workflow_id += 1
Expand Down Expand Up @@ -235,7 +243,7 @@ def dangling(self, targets: sos_targets):
else:
missing.append(x)
else:
missing = [x for x in self._all_depends_files.keys() if x not in self._all_output_files and not x.target_exists()]
missing = [x for x in self._all_depends_files.keys() if x not in self._all_output_files and not x.target_exists()]
for x in targets:
if x not in self._all_output_files:
if x.target_exists('target'):
Expand Down Expand Up @@ -322,6 +330,7 @@ def build(self):
for j in out_node:
if j != i:
self.add_edge(j, i)
self.mark_dirty()

def save(self, dest=None):
if not dest:
Expand Down
Loading

0 comments on commit b6d57e6

Please sign in to comment.