Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xtrigger sequence fix - master #3287

Merged
merged 8 commits into from
Aug 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ Changed the `suite.rc` schema:
causing suites to stall some time after reloading a suite definition that
removed tasks from the graph.

[#3287](https://github.com/cylc/cylc-flow/pull/3287) - fix xtrigger
cycle-sequence specificity.

[#3258](https://github.com/cylc/cylc-flow/pull/3258) - leave '%'-escaped string
templates alone in xtrigger arguments.

Expand Down
68 changes: 19 additions & 49 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
)
from cylc.flow.print_tree import print_tree
from cylc.flow.subprocctx import SubFuncContext
from cylc.flow.subprocpool import get_func
from cylc.flow.suite_srv_files_mgr import SuiteSrvFilesManager
from cylc.flow.taskdef import TaskDef
from cylc.flow.task_id import TaskID
Expand Down Expand Up @@ -149,7 +148,6 @@ def __init__(
self.xtrigger_mgr = XtriggerManager(self.suite)
else:
self.xtrigger_mgr = xtrigger_mgr
self.xtriggers = {}
self.suite_polling_tasks = {}
self._last_graph_raw_id = None
self._last_graph_raw_edges = []
Expand Down Expand Up @@ -1681,11 +1679,25 @@ def generate_triggers(self, lexpression, left_nodes, right, seq,
dependency = Dependency(expr_list, set(triggers.values()), suicide)
self.taskdefs[right].add_dependency(dependency, seq)

# Record xtrigger labels for each task name.
if right not in self.xtriggers:
self.xtriggers[right] = xtrig_labels
else:
self.xtriggers[right] = self.xtriggers[right].union(xtrig_labels)
for label in xtrig_labels:
try:
xtrig = self.cfg['scheduling']['xtriggers'][label]
except KeyError:
if label == 'wall_clock':
# Allow "@wall_clock" in the graph as an undeclared
# zero-offset clock xtrigger.
xtrig = SubFuncContext(
'wall_clock', 'wall_clock', [], {})
else:
raise SuiteConfigError(f"xtrigger not defined: {label}")
if (xtrig.func_name == 'wall_clock' and
self.cfg['scheduling']['cycling mode'] == (
INTEGER_CYCLING_TYPE)):
sig = xtrig.get_signature()
raise SuiteConfigError(
f"clock xtriggers need date-time cycling: {label} = {sig}")
self.xtrigger_mgr.add_trig(label, xtrig, self.fdir)
self.taskdefs[right].add_xtrig_label(label, seq)

def get_actual_first_point(self, start_point):
"""Get actual first cycle point for the suite
Expand Down Expand Up @@ -2115,48 +2127,6 @@ def load_graph(self):
self._proc_triggers(
parser.triggers, parser.original, seq, task_triggers)

xtcfg = self.cfg['scheduling']['xtriggers']
# Taskdefs just know xtrigger labels.
for task_name, xt_labels in self.xtriggers.items():
for label in xt_labels:
try:
xtrig = xtcfg[label]
except KeyError:
if label == 'wall_clock':
# Allow predefined zero-offset wall clock xtrigger.
xtrig = SubFuncContext(
'wall_clock', 'wall_clock', [], {})
else:
raise SuiteConfigError(
"undefined xtrigger label: %s" % label)
if xtrig.func_name.startswith('wall_clock'):
self.xtrigger_mgr.add_clock(label, xtrig)
# Replace existing xclock if the new offset is larger.
try:
offset = get_interval(xtrig.func_kwargs['offset'])
except KeyError:
offset = 0
old_label = self.taskdefs[task_name].xclock_label
if old_label is None:
self.taskdefs[task_name].xclock_label = label
else:
old_xtrig = self.xtrigger_mgr.clockx_map[old_label]
old_offset = get_interval(
old_xtrig.func_kwargs['offset'])
if offset > old_offset:
self.taskdefs[task_name].xclock_label = label
else:
try:
if not callable(get_func(xtrig.func_name, self.fdir)):
hjoliver marked this conversation as resolved.
Show resolved Hide resolved
raise SuiteConfigError(
f"xtrigger function not callable: "
f"{xtrig.func_name}")
except (ModuleNotFoundError, AttributeError):
raise SuiteConfigError(
f"xtrigger function not found: {xtrig.func_name}")
self.xtrigger_mgr.add_trig(label, xtrig)
self.taskdefs[task_name].xtrig_labels.add(label)

# Detect use of xtrigger names with '@' prefix (creates a task).
overlap = set(self.taskdefs.keys()).intersection(
list(self.cfg['scheduling']['xtriggers']))
Expand Down
54 changes: 52 additions & 2 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -708,8 +708,58 @@ def info_get_graph_raw(self, cto, ctn, group_nodes=None,
self.config.feet)

def info_get_task_requisites(self, items, list_prereqs=False):
"""Return prerequisites of a task."""
return self.pool.get_task_requisites(items, list_prereqs=list_prereqs)
"""Return prerequisites and outputs etc. of a task.

Result in a dict of a dict:
{
"task_id": {
"meta": {key: value, ...},
"prerequisites": {key: value, ...},
"outputs": {key: value, ...},
"extras": {key: value, ...},
},
...
}
"""
itasks, bad_items = self.pool.filter_task_proxies(items)
results = {}
now = time()
for itask in itasks:
if list_prereqs:
results[itask.identity] = {
'prerequisites': itask.state.prerequisites_dump(
list_prereqs=True)}
continue
extras = {}
if itask.tdef.clocktrigger_offset is not None:
extras['Clock trigger time reached'] = (
itask.is_waiting_clock_done(now))
extras['Triggers at'] = time2str(
itask.clock_trigger_time)
for trig, satisfied in itask.state.external_triggers.items():
key = f'External trigger "{trig}"'
if satisfied:
extras[key] = 'satisfied'
else:
extras[key] = 'NOT satisfied'
for label, satisfied in itask.state.xtriggers.items():
sig = self.xtrigger_mgr.get_xtrig_ctx(
itask, label).get_signature()
extra = f'xtrigger "{label} = {sig}"'
if satisfied:
extras[extra] = 'satisfied'
else:
extras[extra] = 'NOT satisfied'
outputs = []
for _, msg, is_completed in itask.state.outputs.get_all():
outputs.append(
[f"{itask.identity} {msg}", is_completed])
results[itask.identity] = {
"meta": itask.tdef.describe(),
"prerequisites": itask.state.prerequisites_dump(),
"outputs": outputs,
"extras": extras}
return results, bad_items

def info_ping_task(self, task_id, exists_only=False):
"""Return True if task exists and running."""
Expand Down
60 changes: 1 addition & 59 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@
TASK_STATUS_SUBMIT_FAILED, TASK_STATUS_SUBMIT_RETRYING,
TASK_STATUS_RUNNING, TASK_STATUS_SUCCEEDED, TASK_STATUS_FAILED,
TASK_STATUS_RETRYING)
from cylc.flow.wallclock import (
get_current_time_string, get_time_string_from_unix_time)
from cylc.flow.wallclock import get_current_time_string


class TaskPool(object):
Expand Down Expand Up @@ -1300,63 +1299,6 @@ def ping_task(self, id_, exists_only=False):
else:
return False, "task not found"

def get_task_requisites(self, items, list_prereqs=False):
"""Return task prerequisites.

Result in a dict of a dict:
{
"task_id": {
"meta": {key: value, ...},
"prerequisites": {key: value, ...},
"outputs": {key: value, ...},
"extras": {key: value, ...},
},
...
}
"""
itasks, bad_items = self.filter_task_proxies(items)
results = {}
now = time()
for itask in itasks:
if list_prereqs:
results[itask.identity] = {
'prerequisites': itask.state.prerequisites_dump(
list_prereqs=True)}
continue

extras = {}
if itask.tdef.clocktrigger_offset is not None:
extras['Clock trigger time reached'] = (
itask.is_waiting_clock_done(now))
extras['Triggers at'] = get_time_string_from_unix_time(
hjoliver marked this conversation as resolved.
Show resolved Hide resolved
itask.clock_trigger_time)
for trig, satisfied in itask.state.external_triggers.items():
if satisfied:
extras['External trigger "%s"' % trig] = 'satisfied'
else:
extras['External trigger "%s"' % trig] = 'NOT satisfied'
for label, satisfied in itask.state.xtriggers.items():
if satisfied:
extras['xtrigger "%s"' % label] = 'satisfied'
else:
extras['xtrigger "%s"' % label] = 'NOT satisfied'
if itask.state.xclock is not None:
label, satisfied = itask.state.xclock
if satisfied:
extras['xclock "%s"' % label] = 'satisfied'
else:
extras['xclock "%s"' % label] = 'NOT satisfied'

outputs = []
for _, msg, is_completed in itask.state.outputs.get_all():
outputs.append(["%s %s" % (itask.identity, msg), is_completed])
results[itask.identity] = {
"meta": itask.tdef.describe(),
"prerequisites": itask.state.prerequisites_dump(),
"outputs": outputs,
"extras": extras}
return results, bad_items

def filter_task_proxies(self, items):
"""Return task proxies that match names, points, states in items.

Expand Down
29 changes: 15 additions & 14 deletions cylc/flow/task_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,6 @@ class TaskState(object):
List of prerequisites that will cause the task to suicide.
.time_updated (str):
Time string of latest update time.
.xclock (tuple):
A tuple (clock_label (str), is_done (boolean)) to indicate if a
clock trigger is satisfied or not. Set to `None` if the task has no
clock trigger.
.xtriggers (dict):
xtriggers as {trigger (str): satisfied (boolean), ...}.
._is_satisfied (boolean):
Expand All @@ -216,7 +212,6 @@ class TaskState(object):
"status",
"suicide_prerequisites",
"time_updated",
"xclock",
"xtriggers",
"_is_satisfied",
"_suicide_is_satisfied",
Expand Down Expand Up @@ -248,12 +243,7 @@ def __init__(self, tdef, point, status, is_held):

# xtriggers (represented by labels) satisfied or not
self.xtriggers = {}
for label in tdef.xtrig_labels:
self.xtriggers[label] = False
if tdef.xclock_label:
self.xclock = (tdef.xclock_label, False)
else:
self.xclock = None
self._add_xtriggers(point, tdef)

# Message outputs.
self.outputs = TaskOutputs(tdef)
Expand Down Expand Up @@ -303,9 +293,7 @@ def satisfy_me(self, all_task_outputs):
self._suicide_is_satisfied = None

def xtriggers_all_satisfied(self):
"""Return True if xclock and all xtriggers are satisfied."""
if self.xclock is not None and not self.xclock[1]:
return False
"""Return True if all xtriggers are satisfied."""
return all(self.xtriggers.values())

def prerequisites_are_all_satisfied(self):
Expand Down Expand Up @@ -486,3 +474,16 @@ def _add_prerequisites(self, point, tdef):
p_prev < tdef.start_point)
cpre.set_condition(tdef.name)
self.prerequisites.append(cpre)

def _add_xtriggers(self, point, tdef):
"""Add task xtriggers valid for the current sequence.

Initialize each one to unsatisfied.
"""
# Triggers for sequence_i only used if my cycle point is a
# valid member of sequence_i's sequence of cycle points.
for sequence, xtrig_labels in tdef.xtrig_labels.items():
if not sequence.is_valid(point):
continue
for xtrig_label in xtrig_labels:
self.xtriggers[xtrig_label] = False
25 changes: 14 additions & 11 deletions cylc/flow/taskdef.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ class TaskDef(object):
"intercycle_offsets", "sequential", "is_coldstart",
"suite_polling_cfg", "clocktrigger_offset", "expiration_offset",
"namespace_hierarchy", "dependencies", "outputs", "param_var",
"external_triggers", "xtrig_labels", "xclock_label",
"name", "elapsed_times"]
"external_triggers", "xtrig_labels", "name", "elapsed_times"]

# Store the elapsed times for a maximum of 10 cycles
MAX_LEN_ELAPSED_TIMES = 10
Expand Down Expand Up @@ -68,12 +67,7 @@ def __init__(self, name, rtcfg, run_mode, start_point, spawn_ahead):
self.outputs = []
self.param_var = {}
self.external_triggers = []
self.xtrig_labels = set()
self.xclock_label = None
# Note a task can only have one clock xtrigger - if it depends on
# several we just keep the label of the one with the largest offset
# (this is determined and set during suite config parsing, to avoid
# storing the offset here in the taskdef).
self.xtrig_labels = {} # {sequence: [labels]}

self.name = name
self.elapsed_times = deque(maxlen=self.MAX_LEN_ELAPSED_TIMES)
Expand All @@ -88,9 +82,18 @@ def add_dependency(self, dependency, sequence):
this dependency applies.

"""
if sequence not in self.dependencies:
self.dependencies[sequence] = []
self.dependencies[sequence].append(dependency)
self.dependencies.setdefault(sequence, []).append(dependency)

def add_xtrig_label(self, xtrig_label, sequence):
"""Add an xtrigger to a named sequence.

Args:
xtrig_label: The xtrigger label to add.
sequence (cylc.cycling.SequenceBase): The sequence for which this
xtrigger applies.

"""
self.xtrig_labels.setdefault(sequence, []).append(xtrig_label)

def add_sequence(self, sequence):
"""Add a sequence."""
Expand Down
10 changes: 5 additions & 5 deletions cylc/flow/tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import pytest
from tempfile import TemporaryDirectory, NamedTemporaryFile
from pathlib import Path
from cylc.flow.config import SuiteConfig, SuiteConfigError
from cylc.flow.config import SuiteConfig


def get_test_inheritance_quotes():
Expand Down Expand Up @@ -110,7 +110,7 @@ def test_xfunction_imports(self):
f.flush()
suite_config = SuiteConfig(suite="name_a_tree", fpath=f.name)
config = suite_config
assert 'tree' in config.xtriggers['qux']
assert 'tree' in config.xtrigger_mgr.functx_map

def test_xfunction_import_error(self):
"""Test for error when a xtrigger function cannot be imported."""
Expand All @@ -133,7 +133,7 @@ def test_xfunction_import_error(self):
R1 = '@oopsie => qux'
""")
f.flush()
with pytest.raises(SuiteConfigError) as excinfo:
hjoliver marked this conversation as resolved.
Show resolved Hide resolved
with pytest.raises(ImportError) as excinfo:
SuiteConfig(suite="caiman_suite", fpath=f.name)
assert "not found" in str(excinfo.value)

Expand All @@ -158,7 +158,7 @@ def test_xfunction_attribute_error(self):
R1 = '@oopsie => qux'
""")
f.flush()
with pytest.raises(SuiteConfigError) as excinfo:
with pytest.raises(AttributeError) as excinfo:
SuiteConfig(suite="capybara_suite", fpath=f.name)
assert "not found" in str(excinfo.value)

Expand All @@ -183,7 +183,7 @@ def test_xfunction_not_callable(self):
R1 = '@oopsie => qux'
""")
f.flush()
with pytest.raises(SuiteConfigError) as excinfo:
with pytest.raises(ValueError) as excinfo:
SuiteConfig(suite="suite_with_not_callable", fpath=f.name)
assert "callable" in str(excinfo.value)

Expand Down
Loading