Skip to content

Commit

Permalink
Verify dynamic depends #1186
Browse files Browse the repository at this point in the history
  • Loading branch information
Bo Peng committed Jan 27, 2019
1 parent 226320f commit 51f98b7
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 11 deletions.
15 changes: 10 additions & 5 deletions src/sos/section_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ def get_signature_vars(section):

def get_step_depends(section):

step_depends: sos_targets = sos_targets([])
step_depends: sos_targets = sos_targets()
dynamic_depends = True

input_idx = find_statement(section, 'input')
depends_idx = find_statement(section, 'depends')
Expand Down Expand Up @@ -180,9 +181,10 @@ def get_step_depends(section):
args, kwargs = SoS_eval(f'__null_func__({value})',
extra_dict=env.sos_dict._dict)
if any(isinstance(x, (dynamic, remote)) for x in args):
step_depends = sos_targets()
dynamic_depends = True
else:
step_depends.extend(sos_targets(*args))
step_depends.extend(sos_targets(*args, **kwargs))
dynamic_depends = False
except SyntaxError as e:
raise
except Exception as e:
Expand All @@ -191,7 +193,8 @@ def get_step_depends(section):
[env.sos_dict._dict.pop(x) for x in svars]
env.sos_dict._dict.update(old_values)
# env.logger.debug(f"Args {value} cannot be determined: {e}")
return step_depends
return step_depends, dynamic_depends


def get_step_input(section, default_input):
'''Find step input
Expand Down Expand Up @@ -373,6 +376,8 @@ def analyze_section(section: SoS_Step, default_input: Optional[sos_targets] = No
}
if not vars_and_output_only:
res['step_input'] = get_step_input(section, default_input)
res['step_depends'] = get_step_depends(section)
deps = get_step_depends(section)
res['step_depends'] = deps[0]
res['dynamic_depends'] = deps[1]
# analysis_cache[analysis_key] = res
return res
14 changes: 14 additions & 0 deletions src/sos/step_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,13 +349,19 @@ def process_input_args(self, ifiles: sos_targets, **kwargs):
#
return ifiles.groups

def verify_dynamic_depends(self, target):
return True

def process_depends_args(self, dfiles: sos_targets, **kwargs):
for k in kwargs.keys():
if k not in SOS_DEPENDS_OPTIONS:
raise RuntimeError(f'Unrecognized depends option {k}')
if dfiles.undetermined():
raise ValueError(r"Depends needs to handle undetermined")

if env.sos_dict.get('__dynamic_depends__', False):
self.verify_dynamic_depends([x for x in dfiles if isinstance(x, file_target)])

env.sos_dict.set('_depends', dfiles)
env.sos_dict.set('step_depends', dfiles)

Expand Down Expand Up @@ -1402,6 +1408,14 @@ def handle_unknown_target(self, e):
if not res:
raise e

def verify_dynamic_depends(self, targets):
if not targets:
return
self.socket.send_pyobj(['dependent_target'] + targets)
res = self.socket.recv()
if res != b'target_resolved':
raise RuntimeError(f'Failed to veryify dependent target {targets}')

def run(self):
try:
res = Base_Step_Executor.run(self)
Expand Down
152 changes: 146 additions & 6 deletions src/sos/workflow_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,7 @@ def resolve_dangling_targets(self, dag: SoS_DAG, targets: Optional[sos_targets]=
'__signature_vars__': signature_vars,
'__environ_vars__': environ_vars,
'__changed_vars__': changed_vars,
'__dynamic_depends__': res['dynamic_depends'],
}
if idx == 0:
context['__step_output__'] = env.sos_dict['__step_output__']
Expand Down Expand Up @@ -603,6 +604,7 @@ def resolve_dangling_targets(self, dag: SoS_DAG, targets: Optional[sos_targets]=
context['__environ_vars__'] = res['environ_vars']
context['__changed_vars__'] = res['changed_vars']
context['__default_output__'] = env.sos_dict['__default_output__']
context['__dynamic_depends__'] = res['dynamic_depends']
# NOTE: If a step is called multiple times with different targets, it is much better
# to use different names because pydotplus can be very slow in handling graphs with nodes
# with identical names.
Expand Down Expand Up @@ -672,6 +674,8 @@ def resolve_dangling_targets(self, dag: SoS_DAG, targets: Optional[sos_targets]=
context['__environ_vars__'] = res['environ_vars']
context['__changed_vars__'] = res['changed_vars']
context['__default_output__'] = env.sos_dict['__default_output__']
context['__dynamic_depends__'] = res['dynamic_depends']

# NOTE: If a step is called multiple times with different targets, it is much better
# to use different names because pydotplus can be very slow in handling graphs with nodes
# with identical names.
Expand Down Expand Up @@ -739,6 +743,8 @@ def resolve_dangling_targets(self, dag: SoS_DAG, targets: Optional[sos_targets]=
context['__environ_vars__'] = res['environ_vars']
context['__changed_vars__'] = res['changed_vars']
context['__default_output__'] = env.sos_dict['__default_output__']
context['__dynamic_depends__'] = res['dynamic_depends']

# NOTE: If a step is called multiple times with different targets, it is much better
# to use different names because pydotplus can be very slow in handling graphs with nodes
# with identical names.
Expand Down Expand Up @@ -787,7 +793,8 @@ def initialize_dag(self, targets: Optional[List[str]] = [], nested: bool = False

context = {'__signature_vars__': signature_vars,
'__environ_vars__': environ_vars,
'__changed_vars__': changed_vars}
'__changed_vars__': changed_vars,
'__dynamic_depends__': res['dynamic_depends']}

# for nested workflow, the input is specified by sos_run, not None.
if idx == 0:
Expand Down Expand Up @@ -876,7 +883,7 @@ def step_completed(self, res, dag, runnable):
| {'_input', '__step_output__', '__args__'}))
node._context.update(svar)
if node._status == 'target_pending':
if node._pending_target.target_exists('target'):
if all(x.target_exists('target') for x in node._pending_targets):
# in a master node, this _socket points to the step
# in a nested node, this _socket points to the parent socket
node._socket.send(b'target_resolved')
Expand All @@ -888,6 +895,99 @@ def step_completed(self, res, dag, runnable):
runnable._status = 'completed'
dag.save(env.config['output_dag'])

def handle_dependent_target(self, dag, targets, runnable) -> int:
total_added = 0
resolved = 0
while True:
added_node = 0
# for depending targets... they already exist but we will add
# nodes that generates them if available.
node_added = False
depending_targets = set(dag.dangling(targets)[2])
for target in depending_targets:
if node_added:
depending_targets = set(dag.dangling(targets)[2])
node_added = False
if target not in depending_targets:
continue
mo = self.match(target)
if not mo:
# this is ok, this is just an existing target, no one is designed to
# generate it.
env.logger.info(f'{target} already exists')
continue
if len(mo) > 1:
# this is not ok.
raise RuntimeError(
f'Multiple steps {", ".join(x.step_name() for x in mo)} to generate target {target}')
#
# only one step, we need to process it # execute section with specified input
#
if not isinstance(mo[0], tuple):
section = mo[0]
env.sos_dict['__default_output__'] = sos_targets(target)
context = {}
else:
section = mo[0][0]
if isinstance(mo[0][1], dict):
for k, v in mo[0][1].items():
env.sos_dict.set(k, v)

if mo[0][1]:
env.sos_dict['__default_output__'] = sos_targets(target)
context = {}
else:
env.sos_dict['__default_output__'] = sos_targets(
section.options['provides'])
context = mo[0][1]
# will become input, set to None
env.sos_dict['__step_output__'] = sos_targets()
#
res = analyze_section(section, default_output=env.sos_dict['__default_output__'])
#
# build DAG with input and output files of step
env.logger.debug(
f'Adding step {res["step_name"]} with output {short_repr(res["step_output"])} to resolve target {target}')

context['__signature_vars__'] = res['signature_vars']
context['__environ_vars__'] = res['environ_vars']
context['__changed_vars__'] = res['changed_vars']
context['__default_output__'] = env.sos_dict['__default_output__']
context['__dynamic_depends__'] = res['dynamic_depends']

# NOTE: If a step is called multiple times with different targets, it is much better
# to use different names because pydotplus can be very slow in handling graphs with nodes
# with identical names.
node_name = section.step_name()
if env.sos_dict["__default_output__"]:
node_name += f' {short_repr(env.sos_dict["__default_output__"])})'
dag.add_step(section.uuid, node_name,
None, res['step_input'],
res['step_depends'], res['step_output'], context=context)
node_added = True
added_node += 1
resolved += 1

total_added += added_node
if added_node == 0:
break
#
if total_added:
if runnable._depends_targets.valid():
runnable._depends_targets.extend(targets)
for taget in targets:
if runnable not in dag._all_depends_files[target]:
dag._all_depends_files[target].append(
runnable)
dag.build()
#
cycle = dag.circular_dependencies()
if cycle:
raise RuntimeError(
f'Circular dependency detected {cycle}. It is likely a later step produces input of a previous step.')
dag.save(env.config['output_dag'])
return total_added

def handle_unknown_target(self, target, dag, runnable):
runnable._status = None
dag.save(env.config['output_dag'])
Expand Down Expand Up @@ -1086,9 +1186,8 @@ def i_am():
reply = runnable._child_socket.recv_pyobj()
if reply: # if the target is resolvable in nested workflow
runnable._status = 'target_pending'
runnable._pending_target = missed
runnable._pending_targets = [missed]
# tell the step that the target is resolved and it can continue
env.logger.error(f'{runnable} is target_pending')
runnable._socket = proc.socket
else:
# otherwise say the target cannot be resolved
Expand All @@ -1099,12 +1198,42 @@ def i_am():
try:
self.handle_unknown_target(missed, dag, runnable)
runnable._status = 'target_pending'
runnable._pending_target = missed
runnable._pending_targets = [missed]
runnable._socket = proc.socket
except Exception as e:
env.logger.error(e)
proc.socket.send(b'')
proc.set_status('failed')
elif res[0] == 'dependent_target':
# The target might be dependent on other steps and we
# are trying to extend the DAG to verify the target
# if possible. It does not matter if the DAG cannot be
# extended.
if hasattr(runnable, '_from_nested'):
# if the step is from a subworkflow, then the missing target
# should be resolved by the nested workflow
runnable._child_socket.send_pyobj(res)
reply = runnable._child_socket.recv_pyobj()
if reply:
# if there are dependent steps, the current step
# has to wait
runnable._status = 'target_pending'
runnable._pending_targets = res[1:]
# tell the step that the target is resolved and it can continue
runnable._socket = proc.socket
else:
# otherwise there is no target to verify
# and we just continue
proc.socket.send(b'target_resolved')
else:
# if the missing target is from master, resolve from here
reply = self.handle_dependent_target(dag, sos_targets(res[1:]), runnable)
if reply:
runnable._status = 'target_pending'
runnable._pending_targets = res[1:]
runnable._socket = proc.socket
else:
proc.socket.send(b'target_resolved')
elif res[0] == 'step':
# step sent from nested workflow
step_id = res[1]
Expand Down Expand Up @@ -1405,7 +1534,7 @@ def i_am():
# tell the master that the nested can resolve the target
proc.socket.send_pyobj(True)
runnable._status = 'target_pending'
runnable._pending_target = missed
runnable._pending_targets = [missed]
# when the target is resolved, tell the parent that
# the target is resolved and the step can continue
runnable._socket = proc.socket
Expand All @@ -1415,6 +1544,17 @@ def i_am():
# target so the workflow should stop
proc.socket.send_pyobj(False)
continue
elif res[0] == 'dependent_target':
reply = self.handle_dependent_target(dag, sos_targets(res[1:]), runnable)
proc.socket.send_pyobj(reply)
if reply:
# tell the master that the nested can resolve the target
runnable._status = 'target_pending'
runnable._pending_targets = res[1:]
# when the target is resolved, tell the parent that
# the target is resolved and the step can continue
runnable._socket = proc.socket
continue
else:
raise RuntimeError(
f'Unexpected value from step {short_repr(res)}')
Expand Down
26 changes: 26 additions & 0 deletions test/test_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -2060,6 +2060,32 @@ def testStepIDVars(self):
wf = script.workflow()
Base_Executor(wf).run()

def testReexecutionOfDynamicDepends(self):
'''Testing the rerun of steps to verify dependency'''
for file in ('a.bam', 'a.bam.bai'):
if os.path.isfile(file):
os.remove(file)
script = SoS_Script('''
[BAI: provides='{filename}.bam.bai']
_output.touch()
[BAM]
output: 'a.bam'
_output.touch()
[default]
input: 'a.bam'
depends: _input.with_suffix('.bam.bai')
''')
wf = script.workflow()
Base_Executor(wf).run()
# if we run again, because depends, the step will be re-checked
os.remove('a.bam')
res = Base_Executor(wf).run()
self.assertEqual(res['__completed__']['__step_completed__'], 2)
self.assertEqual(res['__completed__']['__step_skipped__'], 1)



if __name__ == '__main__':
unittest.main()

0 comments on commit 51f98b7

Please sign in to comment.