Skip to content

Commit

Permalink
Use a stack to process multiple tasks on a worker #1218
Browse files Browse the repository at this point in the history
  • Loading branch information
Bo Peng committed Feb 22, 2019
1 parent b858cec commit 1311609
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 21 deletions.
13 changes: 13 additions & 0 deletions src/sos/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,10 +281,22 @@ def __init__(self):
self.master_push_socket = None
self.master_request_socket = None

self._sub_idx = 0
self._sub_envs = [{}]

# this function is used by tests to reset environments
# after finishing an test
self.reset()

def switch(self, idx):
# save old env
self._sub_envs[self._sub_idx]['sos_dict'] = self.sos_dict
if len(self._sub_envs) > idx:
self.sos_dict = WorkflowDict()
else:
self.sos_dict = self._sub_envs[idx]['sos_dict']
self._sub_idx = idx

_exec_dir = None
_temp_dir = os.path.join(tempfile.gettempdir(), getpass.getuser(), '.sos')

Expand Down Expand Up @@ -1619,3 +1631,4 @@ def separate_options(options: str) -> List[str]:
pieces[idx] += '\n' + pieces[idx + 1]
pieces.pop(idx + 1)
return pieces

32 changes: 14 additions & 18 deletions src/sos/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ def __init__(self, port: int, config: Optional[Dict[str, Any]] = None, args: Opt
self.args = [] if args is None else args

# there can be multiple jobs for this worker, each using their own port and socket
self._envs = []
self._master_sockets = []
self._master_ports = []
self._stack_idx = 0
Expand Down Expand Up @@ -117,9 +116,12 @@ def run(self):

def _process_job(self):
if len(self._master_sockets) > self._stack_idx:
env.switch(self._stack_idx)
# if current stack is ok
env.master_socket = self._master_sockets[self._stack_idx]
else:
# use a new env
env.switch(self._stack_idx)
# a new socket is needed
env.master_socket = create_socket(env.zmq_context, zmq.PAIR)
port = socket.bind_to_random_port('tcp://127.0.0.1')
Expand All @@ -146,24 +148,18 @@ def _process_job(self):
if requested is None:
requested = runner.send(None)
continue
# wait for a reply from the socket
yres = env.master_socket.recv_pyobj()
requested = runner.send(yres)
#
# while True:
# # wait 0.1s
# if env.master_socket.poll(100):
# # we get a response very quickly, so we continue
# yres = env.master_socket.recv_pyobj()
# requested = runner.send(None)
# break
# # now let us ask if the master has something else for us
# env.ctrl_socket.send(b'YIELDED')
# res = env.ctrl_socket.recv_pyobj()
# if res is not None:
# # if there is some job to do
# self._run()

while True:
# wait 0.1s
if env.master_socket.poll(100):
# we get a response very quickly, so we continue
yres = env.master_socket.recv_pyobj()
requested = runner.send(yres)
break
# now let us ask if the master has something else for us
self._stack_idx += 1
self._process_job()
self._stack_idx -= 1
except StopIteration as e:
pass
else:
Expand Down
5 changes: 2 additions & 3 deletions src/sos/workflow_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def execute(self, runnable: Union[SoS_Node, dummy_node], spec: Any) -> None:
else:
close_socket(pi.socket, 'executor master socket', now=True)
master_socket = create_socket(env.zmq_context, zmq.PAIR, 'pair socket for step worker')
master_socket.connect(f'tcp://127.0.0.1:{master_port}')
master_socket.connect(f'tcp://127.0.0.1:{master_port}')

# we need to report number of active works, plus master process itself
send_message_to_controller(['nprocs', self.num_active() + 1])
Expand All @@ -197,8 +197,7 @@ def send_new(self):
return True

def num_active(self) -> int:
return len([x for x in self.procs if x and not x.is_pending()
and not x.in_status('failed')])
return len([x for x in self.procs if x])

def all_busy(self) -> bool:
return self.num_active() >= self.max_workers
Expand Down

0 comments on commit 1311609

Please sign in to comment.