Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Xtrigger sequence fix #3285

Merged
merged 10 commits into from
Aug 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ with suite configurations that contain tasks with many task outputs.

### Fixes

[#3285](https://github.com/cylc/cylc-flow/pull/3285) - fix xtrigger
cycle-sequence specificity.

[#3257](https://github.com/cylc/cylc-flow/pull/3257) - leave '%'-escaped string
templates alone in xtrigger arguments.

Expand Down
72 changes: 20 additions & 52 deletions lib/cylc/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
from cylc.graphnode import GraphNodeParser, GraphNodeError
from cylc.print_tree import print_tree
from cylc.subprocctx import SubFuncContext
from cylc.subprocpool import get_func
from cylc.suite_srv_files_mgr import SuiteSrvFilesManager
from cylc.taskdef import TaskDef, TaskDefError
from cylc.task_id import TaskID
Expand Down Expand Up @@ -155,7 +154,6 @@ def __init__(
self.xtrigger_mgr = XtriggerManager(self.suite)
else:
self.xtrigger_mgr = xtrigger_mgr
self.xtriggers = {}
self.suite_polling_tasks = {}
self._last_graph_raw_id = None
self._last_graph_raw_edges = []
Expand Down Expand Up @@ -1741,11 +1739,26 @@ def generate_triggers(self, lexpression, left_nodes, right, seq,
dependency = Dependency(expr_list, set(triggers.values()), suicide)
self.taskdefs[right].add_dependency(dependency, seq)

# Record xtrigger labels for each task name.
if right not in self.xtriggers:
self.xtriggers[right] = xtrig_labels
else:
self.xtriggers[right] = self.xtriggers[right].union(xtrig_labels)
for label in xtrig_labels:
try:
xtrig = self.cfg['scheduling']['xtriggers'][label]
except KeyError:
if label == 'wall_clock':
# Allow "@wall_clock" in the graph as an undeclared
# zero-offset clock xtrigger.
hjoliver marked this conversation as resolved.
Show resolved Hide resolved
xtrig = SubFuncContext(
'wall_clock', 'wall_clock', [], {})
else:
raise SuiteConfigError(
"ERROR, undefined xtrigger label: %s" % label)
if (xtrig.func_name == 'wall_clock' and
self.cfg['scheduling']['cycling mode'] == (
INTEGER_CYCLING_TYPE)):
raise SuiteConfigError(
"ERROR: clock triggers are not compatible with integer "
"cycling.\n %s = %s" % (label, xtrig.get_signature()))
self.xtrigger_mgr.add_trig(label, xtrig, self.fdir)
self.taskdefs[right].add_xtrig_label(label, seq)

def get_actual_first_point(self, start_point):
"""Get actual first cycle point for the suite
Expand Down Expand Up @@ -2086,51 +2099,6 @@ def load_graph(self):
self._proc_triggers(
parser.triggers, parser.original, seq, task_triggers)

xtcfg = self.cfg['scheduling']['xtriggers']
# Taskdefs just know xtrigger labels.
for task_name, xt_labels in self.xtriggers.items():
for label in xt_labels:
try:
xtrig = xtcfg[label]
except KeyError:
if label == 'wall_clock':
# Allow predefined zero-offset wall clock xtrigger.
xtrig = SubFuncContext(
'wall_clock', 'wall_clock', [], {})
else:
raise SuiteConfigError(
"ERROR, undefined xtrigger label: %s" % label)
if xtrig.func_name.startswith('wall_clock'):
self.xtrigger_mgr.add_clock(label, xtrig)
# Replace existing xclock if the new offset is larger.
try:
offset = get_interval(xtrig.func_kwargs['offset'])
except KeyError:
offset = 0
old_label = self.taskdefs[task_name].xclock_label
if old_label is None:
self.taskdefs[task_name].xclock_label = label
else:
old_xtrig = self.xtrigger_mgr.clockx_map[old_label]
old_offset = get_interval(
old_xtrig.func_kwargs['offset'])
if offset > old_offset:
self.taskdefs[task_name].xclock_label = label
else:
try:
if not callable(get_func(xtrig.func_name, self.fdir)):
raise SuiteConfigError(
"ERROR, "
"xtrigger function not callable: %s" %
xtrig.func_name)
except (ImportError, AttributeError):
raise SuiteConfigError(
"ERROR, "
"xtrigger function not found: %s" %
xtrig.func_name)
self.xtrigger_mgr.add_trig(label, xtrig)
self.taskdefs[task_name].xtrig_labels.add(label)

# Detect use of xtrigger names with '@' prefix (creates a task).
overlap = set(self.taskdefs.keys()).intersection(
self.cfg['scheduling']['xtriggers'].keys())
Expand Down
15 changes: 5 additions & 10 deletions lib/cylc/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1263,16 +1263,13 @@ def get_task_requisites(self, items, list_prereqs=False):
else:
extras['External trigger "%s"' % trig] = 'NOT satisfied'
for label, satisfied in itask.state.xtriggers.items():
extra = 'xtrigger "%s = %s"' % (
label, self.xtrigger_mgr.get_xtrig_ctx(
itask, label).get_signature())
if satisfied:
extras['xtrigger "%s"' % label] = 'satisfied'
extras[extra] = 'satisfied'
else:
extras['xtrigger "%s"' % label] = 'NOT satisfied'
if itask.state.xclock is not None:
label, satisfied = itask.state.xclock
if satisfied:
extras['xclock "%s"' % label] = 'satisfied'
else:
extras['xclock "%s"' % label] = 'NOT satisfied'
extras[extra] = 'NOT satisfied'
hjoliver marked this conversation as resolved.
Show resolved Hide resolved

outputs = []
for _, msg, is_completed in itask.state.outputs.get_all():
Expand All @@ -1289,8 +1286,6 @@ def check_xtriggers(self):
itasks = self.get_tasks()
self.xtrigger_mgr.collate(itasks)
for itask in itasks:
if itask.state.xclock is not None:
self.xtrigger_mgr.satisfy_xclock(itask)
if itask.state.xtriggers:
self.xtrigger_mgr.satisfy_xtriggers(itask, self.proc_pool)

Expand Down
29 changes: 15 additions & 14 deletions lib/cylc/task_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,6 @@ class TaskState(object):
List of prerequisites that will cause the task to suicide.
.time_updated (str):
Time string of latest update time.
.xclock (tuple):
A tuple (clock_label (str), is_done (boolean)) to indicate if a
clock trigger is satisfied or not. Set to `None` if the task has no
clock trigger.
.xtriggers (dict):
xtriggers as {trigger (str): satisfied (boolean), ...}.
._is_satisfied (boolean):
Expand All @@ -242,7 +238,6 @@ class TaskState(object):
"status",
"suicide_prerequisites",
"time_updated",
"xclock",
"xtriggers",
"_is_satisfied",
"_suicide_is_satisfied",
Expand Down Expand Up @@ -274,12 +269,7 @@ def __init__(self, tdef, point, status, hold_swap):

# xtriggers (represented by labels) satisfied or not
self.xtriggers = {}
for label in tdef.xtrig_labels:
self.xtriggers[label] = False
if tdef.xclock_label:
self.xclock = (tdef.xclock_label, False)
else:
self.xclock = None
self._add_xtriggers(point, tdef)

# Message outputs.
self.outputs = TaskOutputs(tdef)
Expand All @@ -301,9 +291,7 @@ def satisfy_me(self, all_task_outputs):
self._suicide_is_satisfied = None

def xtriggers_all_satisfied(self):
"""Return True if xclock and all xtriggers are satisfied."""
if self.xclock is not None and not self.xclock[1]:
return False
"""Return True if all xtriggers are satisfied."""
return all(self.xtriggers.values())

def prerequisites_are_all_satisfied(self):
Expand Down Expand Up @@ -528,3 +516,16 @@ def _add_prerequisites(self, point, tdef):
p_prev < tdef.start_point)
cpre.set_condition(tdef.name)
self.prerequisites.append(cpre)

def _add_xtriggers(self, point, tdef):
"""Add task xtriggers valid for the current sequence.

Initialize each one to unsatisfied.
"""
# Triggers for sequence_i only used if my cycle point is a
# valid member of sequence_i's sequence of cycle points.
for sequence, xtrig_labels in tdef.xtrig_labels.items():
if not sequence.is_valid(point):
continue
for xtrig_label in xtrig_labels:
self.xtriggers[xtrig_label] = False
31 changes: 20 additions & 11 deletions lib/cylc/taskdef.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ class TaskDef(object):
"intercycle_offsets", "sequential", "is_coldstart",
"suite_polling_cfg", "clocktrigger_offset", "expiration_offset",
"namespace_hierarchy", "dependencies", "outputs", "param_var",
"external_triggers", "xtrig_labels", "xclock_label",
"name", "elapsed_times"]
"external_triggers", "xtrig_labels", "name", "elapsed_times"]

# Store the elapsed times for a maximum of 10 cycles
MAX_LEN_ELAPSED_TIMES = 10
Expand Down Expand Up @@ -78,12 +77,7 @@ def __init__(self, name, rtcfg, run_mode, start_point, spawn_ahead):
self.outputs = set()
self.param_var = {}
self.external_triggers = []
self.xtrig_labels = set()
self.xclock_label = None
# Note a task can only have one clock xtrigger - if it depends on
# several we just keep the label of the one with the largest offset
# (this is determined and set during suite config parsing, to avoid
# storing the offset here in the taskdef).
self.xtrig_labels = {} # {sequence: [labels]}

self.name = name
self.elapsed_times = deque(maxlen=self.MAX_LEN_ELAPSED_TIMES)
Expand All @@ -97,9 +91,24 @@ def add_dependency(self, dependency, sequence):
dependency applies.

"""
if sequence not in self.dependencies:
self.dependencies[sequence] = []
self.dependencies[sequence].append(dependency)
try:
self.dependencies[sequence].append(dependency)
except KeyError:
self.dependencies[sequence] = [dependency]

def add_xtrig_label(self, xtrig_label, sequence):
"""Add an xtrigger to a named sequence.

Args:
xtrig_label: The xtrigger label to add.
sequence (cylc.cycling.SequenceBase): The sequence for which this
xtrigger applies.

"""
try:
self.xtrig_labels[sequence].append(xtrig_label)
except KeyError:
self.xtrig_labels[sequence] = [xtrig_label]

def add_sequence(self, sequence):
"""Add a sequence."""
Expand Down
8 changes: 4 additions & 4 deletions lib/cylc/tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def test_xfunction_imports(self):
f.flush()
suite_config = SuiteConfig(suite="name_a_tree", fpath=f.name)
config = suite_config
self.assertTrue('tree' in config.xtriggers['qux'])
self.assertTrue('tree' in config.xtrigger_mgr.functx_map)
shutil.rmtree(temp_dir)

def test_xfunction_import_error(self):
Expand All @@ -76,7 +76,7 @@ def test_xfunction_import_error(self):
graph = '@oopsie => qux'
""")
f.flush()
with self.assertRaises(SuiteConfigError) as ex:
with self.assertRaises(ImportError) as ex:
SuiteConfig(suite="caiman_suite", fpath=f.name)
self.assertTrue("not found" in str(ex))
shutil.rmtree(temp_dir)
Expand Down Expand Up @@ -104,7 +104,7 @@ def test_xfunction_attribute_error(self):
graph = '@oopsie => qux'
""")
f.flush()
with self.assertRaises(SuiteConfigError) as ex:
with self.assertRaises(AttributeError) as ex:
SuiteConfig(suite="capybara_suite", fpath=f.name)
self.assertTrue("not found" in str(ex))
shutil.rmtree(temp_dir)
Expand Down Expand Up @@ -132,7 +132,7 @@ def test_xfunction_not_callable(self):
graph = '@oopsie => qux'
""")
f.flush()
with self.assertRaises(SuiteConfigError) as ex:
with self.assertRaises(ValueError) as ex:
SuiteConfig(suite="suite_with_not_callable", fpath=f.name)
self.assertTrue("callable" in str(ex))
shutil.rmtree(temp_dir)
Expand Down
Loading