Skip to content

Commit

Permalink
Send request for task results at the worker level #1218
Browse files Browse the repository at this point in the history
  • Loading branch information
Bo Peng committed Feb 21, 2019
1 parent 4d9baa1 commit 8179662
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 11 deletions.
40 changes: 32 additions & 8 deletions src/sos/step_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,9 +517,12 @@ def submit_tasks(self, tasks):

def wait_for_tasks(self, tasks, all_submitted):
# this will be redefined in subclasses
yield None
return {}

def wait_for_results(self, all_submitted):
# this is a generator function because wait_for_tasks is a generator
# function and needs to yield to the caller
if self.concurrent_substep:
self.wait_for_substep()

Expand All @@ -532,7 +535,14 @@ def wait_for_results(self, all_submitted):
self.submit_tasks(tasks)

# waiting for results of specified IDs
results = self.wait_for_tasks(self.task_manager._submitted_tasks, all_submitted)
try:
#1218
runner = self.wait_for_tasks(self.task_manager._submitted_tasks, all_submitted)
while True:
yres = yield next(runner)
runner.send(yres)
except StopIteration as e:
results = e.value
#
# report task
# what we should do here is to get the alias of the Host
Expand Down Expand Up @@ -1275,7 +1285,6 @@ def run(self):
f'Unacceptable value for option active: {active}')

#
yield None
self.log('task')
try:
task_id, taskdef, task_vars = create_task(self.step.global_def, self.step.task,
Expand All @@ -1296,7 +1305,12 @@ def run(self):
# if not concurrent, we have to wait for the completion of the task
if 'concurrent' in env.sos_dict['_runtime'] and env.sos_dict['_runtime']['concurrent'] is False:
# in this case the steps must be executed not concurrently
self.wait_for_results(all_submitted=False)
runner = self.wait_for_results(all_submitted=False)
try:
yres = yield next(runner)
runner.send(yres)
except StopIteration:
pass
#
# endfor loop for each input group
#
Expand All @@ -1313,7 +1327,13 @@ def run(self):
# otherwise there should be nothing interesting in subworkflow
# return value (shared is not handled)

self.wait_for_results(all_submitted=True)
runner = self.wait_for_results(all_submitted=True)
try:
yres = yield next(runner)
runner.send(yres)
except StopIteration:
pass

for idx, res in enumerate(self.proc_results):
if 'sig_skipped' in res:
self.completed['__substep_skipped__'] += 1
Expand Down Expand Up @@ -1427,15 +1447,19 @@ def submit_tasks(self, tasks):
self.socket.send_pyobj(['tasks', host] + tasks)

def wait_for_tasks(self, tasks, all_submitted):
# wait for task is a generator function that yields the request
# to the runner
if not tasks:
return {}
# when we wait, the "outsiders" also need to see the tags etc
# of the tasks so we have to write to the database. #156
send_message_to_controller(['commit_sig'])

# wait till the executor responde
results = {}
while True:
res = self.socket.recv_pyobj()
# yield an indicator of what is requested, for debugging purpose
res = yield "self.socket.recv_pyobj()"
if res is None:
sys.exit(0)
results.update(res)
Expand Down Expand Up @@ -1471,11 +1495,11 @@ def verify_dynamic_targets(self, targets):
def run(self):
try:
try:
# 1218
runner = Base_Step_Executor.run(self)
while True:
pending = next(runner)
# process request
yield pending
yres = yield next(runner)
runner.send(yres)
except StopIteration as e:
res = e.value

Expand Down
17 changes: 14 additions & 3 deletions src/sos/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,13 @@ def run(self):
f'Worker {self.name} receives request {short_repr(work)}')
if work[0] == 'step':
# this is a step ...
for val in self.run_step(*work[1:]):
runner = self.run_step(*work[1:])
try:
while True:
requested = next(runner)
yres = env.master_socket.recv_pyobj()
runner.send(yres)
except StopIteration as e:
pass
else:
self.run_workflow(*work[1:])
Expand Down Expand Up @@ -184,8 +190,13 @@ def run_step(self, section, context, shared, args, config, verbosity):

executor = Step_Executor(
section, env.master_socket, mode=env.config['run_mode'])
for pending in executor.run():
yield pending

runner = executor.run()
try:
yres = yield next(runner)
runner.send(yres)
except StopIteration:
pass


class SoS_SubStep_Worker(mp.Process):
Expand Down

0 comments on commit 8179662

Please sign in to comment.