Skip to content

Commit

Permalink
Handling input and depends targets differently #1186
Browse files Browse the repository at this point in the history
  • Loading branch information
Bo Peng committed Jan 26, 2019
1 parent 6279933 commit 59d928c
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 43 deletions.
54 changes: 38 additions & 16 deletions src/sos/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ def show(self):
class SoS_DAG(nx.DiGraph):
def __init__(self, *args, **kwargs):
nx.DiGraph.__init__(self, *args, **kwargs)
# all_dependent files includes input and depends files
self._all_dependent_files = defaultdict(list)
# all_input
self._all_input_files = defaultdict(list)
self._all_depends_files = defaultdict(list)
self._all_output_files = defaultdict(list)
# index of mini
self._forward_workflow_id = 0
Expand All @@ -132,11 +133,11 @@ def add_step(self, step_uuid, node_name, node_index, input_targets: sos_targets,
self._all_output_files[sos_step(node_name.split(' ')[0])].append(node)

for x in input_targets:
if node not in self._all_dependent_files[x]:
self._all_dependent_files[x].append(node)
if node not in self._all_input_files[x]:
self._all_input_files[x].append(node)
for x in depends_targets:
if node not in self._all_dependent_files[x]:
self._all_dependent_files[x].append(node)
if node not in self._all_depends_files[x]:
self._all_depends_files[x].append(node)
for x in output_targets:
if node not in self._all_output_files[x]:
self._all_output_files[x].append(node)
Expand All @@ -148,11 +149,11 @@ def add_step(self, step_uuid, node_name, node_index, input_targets: sos_targets,

def update_step(self, node, input_targets: sos_targets, output_targets: sos_targets, depends_targets: sos_targets):
for x in input_targets:
if node not in self._all_dependent_files[x]:
self._all_dependent_files[x].append(node)
if node not in self._all_input_files[x]:
self._all_input_files[x].append(node)
for x in depends_targets:
if node not in self._all_dependent_files[x]:
self._all_dependent_files[x].append(node)
if node not in self._all_depends_files[x]:
self._all_depends_files[x].append(node)
for x in output_targets:
if node not in self._all_output_files[x]:
self._all_output_files[x].append(node)
Expand Down Expand Up @@ -213,8 +214,10 @@ def circular_dependencies(self):
return []

def steps_depending_on(self, target: BaseTarget, workflow):
if target in self._all_dependent_files:
return ' requested by ' + ', '.join(set([workflow.section_by_id(x._step_uuid).step_name() for x in self._all_dependent_files[target]]))
if target in self._all_depends_files:
return ' requested by ' + ', '.join(set([workflow.section_by_id(x._step_uuid).step_name() for x in self._all_depends_files[target]]))
elif target in self._all_input_files:
return ' requested by ' + ', '.join(set([workflow.section_by_id(x._step_uuid).step_name() for x in self._all_input_files[target]]))
else:
return ''

Expand All @@ -223,20 +226,32 @@ def pending(self):

def dangling(self, targets: sos_targets):
missing = []
depending = []
existing = []
for x in self._all_dependent_files.keys():
for x in self._all_input_files.keys():
# for input files, if it exists, and not in output files,
# declear exist, if it is in output_files, good.
if x.target_exists():
if x not in self._all_output_files:
existing.append(x)
# if it does not exist, and not in output_files, declear missing
elif x not in self._all_output_files:
missing.append(x)
for x in self._all_depends_files.keys():
# for dependent files, if it exists, and not in output files
# we still try to find steps to satify it
if x.target_exists():
if x not in self._all_output_files:
depending.append(x)
elif x not in self._all_output_files:
missing.append(x)
for x in targets:
if x.target_exists():
if x not in self._all_output_files:
existing.append(x)
depending.append(x)
elif x not in self._all_output_files:
missing.append(x)
return missing, existing
return missing, existing, depending

def regenerate_target(self, target: BaseTarget):
if target in self._all_output_files:
Expand Down Expand Up @@ -307,7 +322,14 @@ def build(self):
self.add_edge(indexed[idx - 1], node)
#
# 3. if the input of a step depends on the output of another step
for target, in_node in self._all_dependent_files.items():
for target, in_node in self._all_depends_files.items():
for out_node in [y for (x, y) in self._all_output_files.items() if x == target]:
for i in in_node:
for j in out_node:
if j != i:
self.add_edge(j, i)
#
for target, in_node in self._all_input_files.items():
for out_node in [y for (x, y) in self._all_output_files.items() if x == target]:
for i in in_node:
for j in out_node:
Expand Down
101 changes: 74 additions & 27 deletions src/sos/workflow_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,8 @@ def resolve_dangling_targets(self, dag: SoS_DAG, targets: Optional[sos_targets]=
resolved = 0
while True:
added_node = 0
dangling_targets, existing_targets = dag.dangling(targets)
# first resolve missing
dangling_targets = dag.dangling(targets)[0]
if dangling_targets:
env.logger.debug(
f'Resolving {dangling_targets} objects from {dag.number_of_nodes()} nodes')
Expand All @@ -457,7 +458,7 @@ def resolve_dangling_targets(self, dag: SoS_DAG, targets: Optional[sos_targets]=
# so the execution of its previous steps would solves the dependency
#
# find all the nodes that depends on target
nodes = dag._all_dependent_files[target]
nodes = dag._all_depends_files[target]
for node in nodes:
# if this is an index step... simply let it depends on previous steps
if node._node_index is not None:
Expand Down Expand Up @@ -684,6 +685,73 @@ def resolve_dangling_targets(self, dag: SoS_DAG, targets: Optional[sos_targets]=
added_node += 1
# this case do not count as resolved
# resolved += 1

# for depending targets... they already exist but we will add
# nodes that generates them if available.
node_added = False
depending_targets = set(dag.dangling(targets)[2])
for target in depending_targets:
if node_added:
depending_targets = set(dag.dangling(targets)[2])
node_added = False
if target not in depending_targets:
continue
mo = self.match(target)
if not mo:
# this is ok, this is just an existing target, no one is designed to
# generate it.
continue
if len(mo) > 1:
# this is not ok.
raise RuntimeError(
f'Multiple steps {", ".join(x.step_name() for x in mo)} to generate target {target}')
#
# only one step, we need to process it # execute section with specified input
#
if not isinstance(mo[0], tuple):
section = mo[0]
env.sos_dict['__default_output__'] = sos_targets(target)
context = {}
else:
section = mo[0][0]
if isinstance(mo[0][1], dict):
for k, v in mo[0][1].items():
env.sos_dict.set(k, v)

if mo[0][1]:
env.sos_dict['__default_output__'] = sos_targets(target)
context = {}
else:
env.sos_dict['__default_output__'] = sos_targets(
section.options['provides'])
context = mo[0][1]
# will become input, set to None
env.sos_dict['__step_output__'] = sos_targets()
#
res = analyze_section(section, default_output=env.sos_dict['__default_output__'])
#
# build DAG with input and output files of step
env.logger.debug(
f'Adding step {res["step_name"]} with output {short_repr(res["step_output"])} to resolve target {target}')

context['__signature_vars__'] = res['signature_vars']
context['__environ_vars__'] = res['environ_vars']
context['__changed_vars__'] = res['changed_vars']
context['__default_output__'] = env.sos_dict['__default_output__']
# NOTE: If a step is called multiple times with different targets, it is much better
# to use different names because pydotplus can be very slow in handling graphs with nodes
# with identical names.
node_name = section.step_name()
if env.sos_dict["__default_output__"]:
node_name += f' {short_repr(env.sos_dict["__default_output__"])})'
dag.add_step(section.uuid, node_name,
None, res['step_input'],
res['step_depends'], res['step_output'], context=context)
node_added = True
added_node += 1
# this case do not count as resolved
# resolved += 1

if added_node == 0:
break
return resolved
Expand Down Expand Up @@ -784,16 +852,6 @@ def describe_completed(self):
else:
return 'no step executed'

def check_targets(self, targets: sos_targets):
for target in sos_targets(targets):
if target.target_exists('target'):
if env.config['sig_mode'] == 'force':
env.logger.info(f'Re-generating {target}')
target.unlink()
else:
env.logger.info(f'Target {target} already exists')
return [x for x in targets if not file_target(x).target_exists('target') or env.config['sig_mode'] == 'force']

def step_completed(self, res, dag, runnable):
for k, v in res['__completed__'].items():
self.completed[k] += v
Expand Down Expand Up @@ -833,7 +891,7 @@ def handle_unknown_target(self, res, dag, runnable):

if dag.regenerate_target(target):
# runnable._depends_targets.append(target)
# dag._all_dependent_files[target].append(runnable)
# dag._all_depends_files[target].append(runnable)
dag.build()
#
cycle = dag.circular_dependencies()
Expand All @@ -847,8 +905,8 @@ def handle_unknown_target(self, res, dag, runnable):
f'Failed to regenerate or resolve {target}{dag.steps_depending_on(target, self.workflow)}.')
if runnable._depends_targets.valid():
runnable._depends_targets.extend(target)
if runnable not in dag._all_dependent_files[target]:
dag._all_dependent_files[target].append(
if runnable not in dag._all_depends_files[target]:
dag._all_depends_files[target].append(
runnable)
dag.build()
#
Expand Down Expand Up @@ -944,11 +1002,6 @@ def i_am():
wf_result = {'__workflow_id__': self.md5, 'shared': {}}
# if targets are specified and there are only signatures for them, we need
# to remove the signature and really generate them
if targets:
targets = self.check_targets(targets)
if len(targets) == 0:
return wf_result

try:
if env.config['output_dag'] and os.path.isfile(env.config['output_dag']):
os.unlink(env.config['output_dag'])
Expand Down Expand Up @@ -1284,13 +1337,7 @@ def i_am():
env.sos_dict.set('run_mode', env.config['run_mode'])

wf_result = {'__workflow_id__': my_workflow_id, 'shared': {}}
# if targets are specified and there are only signatures for them, we need
# to remove the signature and really generate them
if targets:
targets = self.check_targets(targets)
if len(targets) == 0:
parent_socket.send_pyobj(wf_result)
return


dag = self.initialize_dag(targets=targets)
# the mansger will have all fake executors
Expand Down

0 comments on commit 59d928c

Please sign in to comment.