diff --git a/src/sos/section_analyzer.py b/src/sos/section_analyzer.py index 35742b89d..894def9dd 100644 --- a/src/sos/section_analyzer.py +++ b/src/sos/section_analyzer.py @@ -138,7 +138,8 @@ def get_signature_vars(section): def get_step_depends(section): - step_depends: sos_targets = sos_targets([]) + step_depends: sos_targets = sos_targets() + dynamic_depends = True input_idx = find_statement(section, 'input') depends_idx = find_statement(section, 'depends') @@ -180,9 +181,10 @@ def get_step_depends(section): args, kwargs = SoS_eval(f'__null_func__({value})', extra_dict=env.sos_dict._dict) if any(isinstance(x, (dynamic, remote)) for x in args): - step_depends = sos_targets() + dynamic_depends = True else: - step_depends.extend(sos_targets(*args)) + step_depends.extend(sos_targets(*args, **kwargs)) + dynamic_depends = False except SyntaxError as e: raise except Exception as e: @@ -191,7 +193,8 @@ def get_step_depends(section): [env.sos_dict._dict.pop(x) for x in svars] env.sos_dict._dict.update(old_values) # env.logger.debug(f"Args {value} cannot be determined: {e}") - return step_depends + return step_depends, dynamic_depends + def get_step_input(section, default_input): '''Find step input @@ -373,6 +376,8 @@ def analyze_section(section: SoS_Step, default_input: Optional[sos_targets] = No } if not vars_and_output_only: res['step_input'] = get_step_input(section, default_input) - res['step_depends'] = get_step_depends(section) + deps = get_step_depends(section) + res['step_depends'] = deps[0] + res['dynamic_depends'] = deps[1] # analysis_cache[analysis_key] = res return res diff --git a/src/sos/step_executor.py b/src/sos/step_executor.py index 73938443c..f250b812a 100755 --- a/src/sos/step_executor.py +++ b/src/sos/step_executor.py @@ -349,6 +349,9 @@ def process_input_args(self, ifiles: sos_targets, **kwargs): # return ifiles.groups + def verify_dynamic_depends(self, target): + return True + def process_depends_args(self, dfiles: sos_targets, **kwargs): for k in kwargs.keys(): if k not in SOS_DEPENDS_OPTIONS: @@ -356,6 +359,9 @@ def process_depends_args(self, dfiles: sos_targets, **kwargs): if dfiles.undetermined(): raise ValueError(r"Depends needs to handle undetermined") + if env.sos_dict.get('__dynamic_depends__', False): + self.verify_dynamic_depends([x for x in dfiles if isinstance(x, file_target)]) + env.sos_dict.set('_depends', dfiles) env.sos_dict.set('step_depends', dfiles) @@ -1402,6 +1408,14 @@ def handle_unknown_target(self, e): if not res: raise e + def verify_dynamic_depends(self, targets): + if not targets: + return + self.socket.send_pyobj(['dependent_target'] + targets) + res = self.socket.recv() + if res != b'target_resolved': + raise RuntimeError(f'Failed to veryify dependent target {targets}') + def run(self): try: res = Base_Step_Executor.run(self) diff --git a/src/sos/workflow_executor.py b/src/sos/workflow_executor.py index 473b56c6a..4d30d3bde 100755 --- a/src/sos/workflow_executor.py +++ b/src/sos/workflow_executor.py @@ -528,6 +528,7 @@ def resolve_dangling_targets(self, dag: SoS_DAG, targets: Optional[sos_targets]= '__signature_vars__': signature_vars, '__environ_vars__': environ_vars, '__changed_vars__': changed_vars, + '__dynamic_depends__': res['dynamic_depends'], } if idx == 0: context['__step_output__'] = env.sos_dict['__step_output__'] @@ -603,6 +604,7 @@ def resolve_dangling_targets(self, dag: SoS_DAG, targets: Optional[sos_targets]= context['__environ_vars__'] = res['environ_vars'] context['__changed_vars__'] = res['changed_vars'] context['__default_output__'] = env.sos_dict['__default_output__'] + context['__dynamic_depends__'] = res['dynamic_depends'] # NOTE: If a step is called multiple times with different targets, it is much better # to use different names because pydotplus can be very slow in handling graphs with nodes # with identical names. @@ -672,6 +674,8 @@ def resolve_dangling_targets(self, dag: SoS_DAG, targets: Optional[sos_targets]= context['__environ_vars__'] = res['environ_vars'] context['__changed_vars__'] = res['changed_vars'] context['__default_output__'] = env.sos_dict['__default_output__'] + context['__dynamic_depends__'] = res['dynamic_depends'] + # NOTE: If a step is called multiple times with different targets, it is much better # to use different names because pydotplus can be very slow in handling graphs with nodes # with identical names. @@ -739,6 +743,8 @@ def resolve_dangling_targets(self, dag: SoS_DAG, targets: Optional[sos_targets]= context['__environ_vars__'] = res['environ_vars'] context['__changed_vars__'] = res['changed_vars'] context['__default_output__'] = env.sos_dict['__default_output__'] + context['__dynamic_depends__'] = res['dynamic_depends'] + # NOTE: If a step is called multiple times with different targets, it is much better # to use different names because pydotplus can be very slow in handling graphs with nodes # with identical names. @@ -787,7 +793,8 @@ def initialize_dag(self, targets: Optional[List[str]] = [], nested: bool = False context = {'__signature_vars__': signature_vars, '__environ_vars__': environ_vars, - '__changed_vars__': changed_vars} + '__changed_vars__': changed_vars, + '__dynamic_depends__': res['dynamic_depends']} # for nested workflow, the input is specified by sos_run, not None. if idx == 0: @@ -876,7 +883,7 @@ def step_completed(self, res, dag, runnable): | {'_input', '__step_output__', '__args__'})) node._context.update(svar) if node._status == 'target_pending': - if node._pending_target.target_exists('target'): + if all(x.target_exists('target') for x in node._pending_targets): # in a master node, this _socket points to the step # in a nested node, this _socket points to the parent socket node._socket.send(b'target_resolved') @@ -888,6 +895,99 @@ def step_completed(self, res, dag, runnable): runnable._status = 'completed' dag.save(env.config['output_dag']) + def handle_dependent_target(self, dag, targets, runnable) -> int: + total_added = 0 + resolved = 0 + while True: + added_node = 0 + # for depending targets... they already exist but we will add + # nodes that generates them if available. + node_added = False + depending_targets = set(dag.dangling(targets)[2]) + for target in depending_targets: + if node_added: + depending_targets = set(dag.dangling(targets)[2]) + node_added = False + if target not in depending_targets: + continue + mo = self.match(target) + if not mo: + # this is ok, this is just an existing target, no one is designed to + # generate it. + env.logger.info(f'{target} already exists') + continue + if len(mo) > 1: + # this is not ok. + raise RuntimeError( + f'Multiple steps {", ".join(x.step_name() for x in mo)} to generate target {target}') + # + # only one step, we need to process it # execute section with specified input + # + if not isinstance(mo[0], tuple): + section = mo[0] + env.sos_dict['__default_output__'] = sos_targets(target) + context = {} + else: + section = mo[0][0] + if isinstance(mo[0][1], dict): + for k, v in mo[0][1].items(): + env.sos_dict.set(k, v) + + if mo[0][1]: + env.sos_dict['__default_output__'] = sos_targets(target) + context = {} + else: + env.sos_dict['__default_output__'] = sos_targets( + section.options['provides']) + context = mo[0][1] + # will become input, set to None + env.sos_dict['__step_output__'] = sos_targets() + # + res = analyze_section(section, default_output=env.sos_dict['__default_output__']) + # + # build DAG with input and output files of step + env.logger.debug( + f'Adding step {res["step_name"]} with output {short_repr(res["step_output"])} to resolve target {target}') + + context['__signature_vars__'] = res['signature_vars'] + context['__environ_vars__'] = res['environ_vars'] + context['__changed_vars__'] = res['changed_vars'] + context['__default_output__'] = env.sos_dict['__default_output__'] + context['__dynamic_depends__'] = res['dynamic_depends'] + + # NOTE: If a step is called multiple times with different targets, it is much better + # to use different names because pydotplus can be very slow in handling graphs with nodes + # with identical names. + node_name = section.step_name() + if env.sos_dict["__default_output__"]: + node_name += f' {short_repr(env.sos_dict["__default_output__"])})' + dag.add_step(section.uuid, node_name, + None, res['step_input'], + res['step_depends'], res['step_output'], context=context) + node_added = True + added_node += 1 + resolved += 1 + + total_added += added_node + if added_node == 0: + break + # + if total_added: + if runnable._depends_targets.valid(): + runnable._depends_targets.extend(targets) + for taget in targets: + if runnable not in dag._all_depends_files[target]: + dag._all_depends_files[target].append( + runnable) + dag.build() + # + cycle = dag.circular_dependencies() + if cycle: + raise RuntimeError( + f'Circular dependency detected {cycle}. It is likely a later step produces input of a previous step.') + dag.save(env.config['output_dag']) + return total_added + def handle_unknown_target(self, target, dag, runnable): runnable._status = None dag.save(env.config['output_dag']) @@ -1086,9 +1186,8 @@ def i_am(): reply = runnable._child_socket.recv_pyobj() if reply: # if the target is resolvable in nested workflow runnable._status = 'target_pending' - runnable._pending_target = missed + runnable._pending_targets = [missed] # tell the step that the target is resolved and it can continue - env.logger.error(f'{runnable} is target_pending') runnable._socket = proc.socket else: # otherwise say the target cannot be resolved @@ -1099,12 +1198,42 @@ def i_am(): try: self.handle_unknown_target(missed, dag, runnable) runnable._status = 'target_pending' - runnable._pending_target = missed + runnable._pending_targets = [missed] runnable._socket = proc.socket except Exception as e: env.logger.error(e) proc.socket.send(b'') proc.set_status('failed') + elif res[0] == 'dependent_target': + # The target might be dependent on other steps and we + # are trying to extend the DAG to verify the target + # if possible. It does not matter if the DAG cannot be + # extended. + if hasattr(runnable, '_from_nested'): + # if the step is from a subworkflow, then the missing target + # should be resolved by the nested workflow + runnable._child_socket.send_pyobj(res) + reply = runnable._child_socket.recv_pyobj() + if reply: + # if there are dependent steps, the current step + # has to wait + runnable._status = 'target_pending' + runnable._pending_targets = res[1:] + # tell the step that the target is resolved and it can continue + runnable._socket = proc.socket + else: + # otherwise there is no target to verify + # and we just continue + proc.socket.send(b'target_resolved') + else: + # if the missing target is from master, resolve from here + reply = self.handle_dependent_target(dag, sos_targets(res[1:]), runnable) + if reply: + runnable._status = 'target_pending' + runnable._pending_targets = res[1:] + runnable._socket = proc.socket + else: + proc.socket.send(b'target_resolved') elif res[0] == 'step': # step sent from nested workflow step_id = res[1] @@ -1405,7 +1534,7 @@ def i_am(): # tell the master that the nested can resolve the target proc.socket.send_pyobj(True) runnable._status = 'target_pending' - runnable._pending_target = missed + runnable._pending_targets = [missed] # when the target is resolved, tell the parent that # the target is resolved and the step can continue runnable._socket = proc.socket @@ -1415,6 +1544,17 @@ def i_am(): # target so the workflow should stop proc.socket.send_pyobj(False) continue + elif res[0] == 'dependent_target': + reply = self.handle_dependent_target(dag, sos_targets(res[1:]), runnable) + proc.socket.send_pyobj(reply) + if reply: + # tell the master that the nested can resolve the target + runnable._status = 'target_pending' + runnable._pending_targets = res[1:] + # when the target is resolved, tell the parent that + # the target is resolved and the step can continue + runnable._socket = proc.socket + continue else: raise RuntimeError( f'Unexpected value from step {short_repr(res)}') diff --git a/test/test_execute.py b/test/test_execute.py index 446f5be9d..92e73ac8e 100644 --- a/test/test_execute.py +++ b/test/test_execute.py @@ -2060,6 +2060,32 @@ def testStepIDVars(self): wf = script.workflow() Base_Executor(wf).run() + def testReexecutionOfDynamicDepends(self): + '''Testing the rerun of steps to verify dependency''' + for file in ('a.bam', 'a.bam.bai'): + if os.path.isfile(file): + os.remove(file) + script = SoS_Script(''' +[BAI: provides='{filename}.bam.bai'] +_output.touch() + +[BAM] +output: 'a.bam' +_output.touch() + +[default] +input: 'a.bam' +depends: _input.with_suffix('.bam.bai') +''') + wf = script.workflow() + Base_Executor(wf).run() + # if we run again, because depends, the step will be re-checked + os.remove('a.bam') + res = Base_Executor(wf).run() + self.assertEqual(res['__completed__']['__step_completed__'], 2) + self.assertEqual(res['__completed__']['__step_skipped__'], 1) + + if __name__ == '__main__': unittest.main()