Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#976: advanced dependencies #1044

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion bin/cylc-restart
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ from cylc.task_state import task_state
from cylc.run import main
from cylc.command_prep import prep_file
import cylc.TaskID
from cylc.cycling.loader import get_point
from cylc.cycling.loader import get_point, DefaultCycler, ISO8601_CYCLING_TYPE
from cylc.wallclock import get_current_time_string


Expand Down Expand Up @@ -124,6 +124,11 @@ where they got to while the suite was down."""
The state dump file format is:
run mode : <mode>
time : <time> (<unix time>)
initial cycle : 2014050100
final cycle : (none)
(dp1 # (Broadcast pickle string)
. # (Broadcast pickle string)
Begin task states
class <classname>: item1=value1, item2=value2, ...
<task_id> : <state>
<task_id> : <state>
Expand Down Expand Up @@ -325,6 +330,14 @@ where they got to while the suite was down."""
print >> sys.stderr, "ERROR, Illegal line in suite state dump:"
print >> sys.stderr, " ", line
raise Exception( "ERROR: corrupted state dump" )
if (point_string == "1" and
DefaultCycler.TYPE == ISO8601_CYCLING_TYPE):
# A state file from a pre-cylc-6 with mixed-async graphing.
point_string = str(self.start_point)
new_id = cylc.TaskID.get(name, point_string)
print >> sys.stderr, (
"WARNING: converting %s to %s" % (id, new_id))
id = new_id
tasknames[name] = True
if 'status=submitting,' in state:
# backward compabitility for state dumps generated prior to #787
Expand Down
88 changes: 36 additions & 52 deletions lib/cylc/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import re, os, sys
import taskdef
from cylc.cfgspec.suite import get_suitecfg
from cylc.cycling.loader import (get_point,
from cylc.cycling.loader import (get_point, get_point_relative,
get_interval, get_interval_cls,
get_sequence, get_sequence_cls,
init_cyclers, INTEGER_CYCLING_TYPE,
Expand Down Expand Up @@ -333,8 +333,7 @@ def __init__( self, suite, fpath, template_vars=[],
self.cfg['visualization']['initial cycle point'] = vict

vict_rh = None
v_runahead_limit = (
self.custom_runahead_limit or self.minimum_runahead_limit)
v_runahead_limit = self.custom_runahead_limit
if vict and v_runahead_limit:
vict_rh = str( get_point( vict ) + v_runahead_limit )

Expand Down Expand Up @@ -576,7 +575,7 @@ def print_inheritance(self):
print ' ', ' ', item, val

def compute_runahead_limits( self ):
"""Extract the custom and the minimum runahead limits."""
"""Extract the runahead limits information."""

self.max_num_active_cycle_points = self.cfg['scheduling'][
'max active cycle points']
Expand All @@ -590,29 +589,6 @@ def compute_runahead_limits( self ):
# The custom runahead limit is None if not user-configured.
self.custom_runahead_limit = get_interval(limit)

# Find the minimum runahead limit necessary for any future triggers.
self.minimum_runahead_limit = None

offsets = set()
for name, taskdef in self.taskdefs.items():
if taskdef.min_intercycle_offset:
offsets.add(taskdef.min_intercycle_offset)

if offsets:
min_offset = min(offsets)
if min_offset < get_interval_cls().get_null():
# A negative offset comes from future triggering.
self.minimum_runahead_limit = abs(min_offset)
if (self.custom_runahead_limit is not None and
self.custom_runahead_limit <
self.minimum_runahead_limit):
print >> sys.stderr, (
' WARNING, custom runahead limit of %s is less than '
'future triggering offset %s: suite may stall.' %
(self.custom_runahead_limit,
self.minimum_runahead_limit)
)

def get_custom_runahead_limit( self ):
"""Return the custom runahead limit (may be None)."""
return self.custom_runahead_limit
Expand All @@ -621,10 +597,6 @@ def get_max_num_active_cycle_points( self ):
"""Return the maximum allowed number of pool cycle points."""
return self.max_num_active_cycle_points

def get_minimum_runahead_limit( self ):
"""Return the minimum runahead limit to apply."""
return self.minimum_runahead_limit

def get_config( self, args, sparse=False ):
return self.pcfg.get( args, sparse )

Expand Down Expand Up @@ -799,8 +771,9 @@ def process_directories(self):
os.environ['CYLC_SUITE_REG_PATH'] = RegPath( self.suite ).get_fpath()
os.environ['CYLC_SUITE_DEF_PATH'] = self.fdir

def set_trigger( self, task_name, right, output_name=None, offset=None,
cycle_point=None, suicide=False, base_interval=None ):
def set_trigger( self, task_name, right, output_name=None,
offset_string=None, cycle_point=None,
suicide=False, base_interval=None ):
trig = triggerx(task_name)
trig.set_suicide(suicide)
if output_name:
Expand Down Expand Up @@ -837,8 +810,9 @@ def set_trigger( self, task_name, right, output_name=None, offset=None,
# default: task succeeded
trig.set_type( 'succeeded' )

if offset:
trig.set_offset( str(offset) ) # TODO ISO - CONSISTENT SET_OFFSET INPUT
if offset_string:
# TODO ISO - CONSISTENT SET_OFFSET INPUT
trig.set_offset_string( offset_string )

if cycle_point:
trig.set_cycle_point( cycle_point )
Expand Down Expand Up @@ -1268,7 +1242,7 @@ def generate_taskdefs( self, line, left_nodes, right, ttype, section, seq,
raise SuiteConfigError, str(x)

name = my_taskdef_node.name
offset = my_taskdef_node.offset
offset_string = my_taskdef_node.offset_string

if name not in self.cfg['runtime']:
# naked dummy task, implicit inheritance from root
Expand Down Expand Up @@ -1308,19 +1282,20 @@ def generate_taskdefs( self, line, left_nodes, right, ttype, section, seq,
'status' : self.suite_polling_tasks[name][2] }

if not my_taskdef_node.is_absolute:
if offset:
if offset_string:
if flags.backwards_compat_cycling:
# Implicit cycling means foo[T+6] generates a +6 sequence.
if str(offset) in offset_seq_map:
seq_offset = offset_seq_map[str(offset)]
if offset_string in offset_seq_map:
seq_offset = offset_seq_map[offset_string]
else:
seq_offset = get_sequence(
section,
self.cfg['scheduling']['initial cycle point'],
self.cfg['scheduling']['final cycle point']
)
seq_offset.set_offset(offset)
offset_seq_map[str(offset)] = seq_offset
seq_offset.set_offset(
get_interval(offset_string))
offset_seq_map[offset_string] = seq_offset
self.taskdefs[name].add_sequence(
seq_offset, is_implicit=True)
if seq_offset not in self.sequences:
Expand Down Expand Up @@ -1360,21 +1335,30 @@ def generate_triggers( self, lexpression, left_nodes, right, seq, suicide ):

if lnode.intercycle:
ltaskdef.intercycle = True
if (ltaskdef.max_intercycle_offset is None or
lnode.offset > ltaskdef.max_intercycle_offset):
ltaskdef.max_intercycle_offset = lnode.offset
if (ltaskdef.min_intercycle_offset is None or
lnode.offset < ltaskdef.min_intercycle_offset):
ltaskdef.min_intercycle_offset = lnode.offset

if lnode.offset_is_from_ict:
first_point = get_point_relative(
lnode.offset_string, ltaskdef.ict)
last_point = seq.get_stop_point()
first_point = ltaskdef.ict - lnode.offset
if first_point and last_point is not None:
offset = (last_point - first_point)
ltaskdef.max_intercycle_offset = offset
if last_point is None:
# This dependency persists for the whole suite run.
ltaskdef.intercycle_offsets.append(
(None, seq))
else:
ltaskdef.intercycle_offsets.append(
(str(-(last_point - first_point)), seq))
cycle_point = first_point
trigger = self.set_trigger( lnode.name, right, lnode.output, lnode.offset, cycle_point, suicide, seq.get_interval() )
elif lnode.intercycle:
ltaskdef.intercycle = True
if lnode.offset_is_irregular:
offset_tuple = (lnode.offset_string, seq)
else:
offset_tuple = (lnode.offset_string, None)
ltaskdef.intercycle_offsets.append(offset_tuple)
trigger = self.set_trigger(
lnode.name, right, lnode.output, lnode.offset_string,
cycle_point, suicide, seq.get_interval()
)
if not trigger:
continue
if not conditional:
Expand Down
9 changes: 9 additions & 0 deletions lib/cylc/cycling/integer.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,10 @@ def get_first_point(self, point):
point = self.get_next_point(point)
return point

def get_start_point( self ):
"""Return the first point in this sequence, or None."""
return self.p_start

def get_stop_point(self):
"""Return the last point in this sequence, or None if unbounded."""
return self.p_stop
Expand All @@ -430,6 +434,11 @@ def init_from_cfg(cfg):
"""Placeholder function required by all cycling modules."""
pass

def get_point_relative(offset_string, base_point):
"""Create a point from offset_string applied to base_point."""
# TODO ISO: needs to be done somehow? Is it meaningful?
return base_point + IntegerInterval(offset_string)


def test():
"""Run some simple tests for integer cycling."""
Expand Down
45 changes: 40 additions & 5 deletions lib/cylc/cycling/iso8601.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class SuiteSpecifics(object):
ASSUMED_TIME_ZONE = None
DUMP_FORMAT = None
NUM_EXPANDED_YEAR_DIGITS = None
abbrev_util = None
interval_parser = None
point_parser = None

Expand Down Expand Up @@ -320,15 +321,14 @@ def __init__(self, dep_section, context_start_point=None,
self.custom_point_parse_function = None
if SuiteSpecifics.DUMP_FORMAT == PREV_DATE_TIME_FORMAT:
self.custom_point_parse_function = point_parse

self.time_parser = CylcTimeParser(
self.abbrev_util = CylcTimeParser(
self.context_start_point, self.context_end_point,
num_expanded_year_digits=SuiteSpecifics.NUM_EXPANDED_YEAR_DIGITS,
dump_format=SuiteSpecifics.DUMP_FORMAT,
custom_point_parse_function=self.custom_point_parse_function,
assumed_time_zone=SuiteSpecifics.ASSUMED_TIME_ZONE
)
self.recurrence = self.time_parser.parse_recurrence(recurrence_syntax)
self.recurrence = self.abbrev_util.parse_recurrence(recurrence_syntax)
self.step = ISO8601Interval(str(self.recurrence.interval))
self.value = str(self.recurrence)

Expand All @@ -343,9 +343,9 @@ def get_offset(self):
def set_offset(self, offset):
"""Deprecated: alter state to offset the entire sequence."""
if self.recurrence.start_point is not None:
self.recurrence.start_point -= interval_parse(str(offset))
self.recurrence.start_point += interval_parse(str(offset))
if self.recurrence.end_point is not None:
self.recurrence.end_point -= interval_parse(str(offset))
self.recurrence.end_point += interval_parse(str(offset))
self._cached_first_point_values = {}
self._cached_next_point_values = {}
self._cached_valid_point_booleans = {}
Expand Down Expand Up @@ -435,6 +435,12 @@ def get_first_point(self, point):
return ISO8601Point(first_point_value)
return None

def get_start_point( self ):
"""Return the first point in this sequence, or None."""
for recurrence_iso_point in self.recurrence:
return ISO8601Point(str(recurrence_iso_point))
return None

def get_stop_point(self):
"""Return the last point in this sequence, or None if unbounded."""
if (self.recurrence.repetitions is not None or (
Expand All @@ -455,6 +461,9 @@ def __eq__(self, other):
return True
return False

def __str__(self):
return self.value


def convert_old_cycler_syntax(dep_section, only_detect_old=False,
start_point=None):
Expand Down Expand Up @@ -578,6 +587,32 @@ def init(num_expanded_year_digits=0, custom_dump_format=None, time_zone=None,
dump_format=SuiteSpecifics.DUMP_FORMAT,
assumed_time_zone=time_zone_hours_minutes
)
custom_point_parse_function = None
if SuiteSpecifics.DUMP_FORMAT == PREV_DATE_TIME_FORMAT:
custom_point_parse_function = point_parse
SuiteSpecifics.abbrev_util = CylcTimeParser(
None, None,
num_expanded_year_digits=SuiteSpecifics.NUM_EXPANDED_YEAR_DIGITS,
dump_format=SuiteSpecifics.DUMP_FORMAT,
custom_point_parse_function=custom_point_parse_function,
assumed_time_zone=SuiteSpecifics.ASSUMED_TIME_ZONE
)


def get_point_relative(offset_string, base_point):
"""Create a point from offset_string applied to base_point."""
try:
interval = ISO8601Interval(
str(interval_parse(offset_string)))
except Exception:
pass
else:
return base_point + interval
return ISO8601Point(str(
SuiteSpecifics.abbrev_util.parse_timepoint(
offset_string, context_point=_point_parse(base_point.value)
)
))


def interval_parse(interval_string):
Expand Down
11 changes: 11 additions & 0 deletions lib/cylc/cycling/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@
POINTS = {INTEGER_CYCLING_TYPE: integer.IntegerPoint,
ISO8601_CYCLING_TYPE: iso8601.ISO8601Point}

POINT_RELATIVE_GETTERS = {
INTEGER_CYCLING_TYPE: integer.get_point_relative,
ISO8601_CYCLING_TYPE: iso8601.get_point_relative
}

INTERVALS = {INTEGER_CYCLING_TYPE: integer.IntegerInterval,
ISO8601_CYCLING_TYPE: iso8601.ISO8601Interval}

Expand Down Expand Up @@ -76,6 +81,12 @@ def get_point_cls(cycling_type=None):
return POINTS[cycling_type]


def get_point_relative(*args, **kwargs):
"""Return a point from an offset expression and a base point."""
cycling_type = kwargs.pop("cycling_type", DefaultCycler.TYPE)
return POINT_RELATIVE_GETTERS[cycling_type](*args, **kwargs)


def get_interval(*args, **kwargs):
"""Return a cylc.cycling.IntervalBase-derived object from a string."""
if args[0] is None:
Expand Down
9 changes: 5 additions & 4 deletions lib/cylc/graphing.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import re
import pygraphviz
import TaskID
from cycling.loader import get_point, get_interval
from cycling.loader import get_point, get_point_relative, get_interval
from graphnode import graphnode

# TODO: Do we still need autoURL below?
Expand Down Expand Up @@ -215,9 +215,10 @@ def get_left( self, inpoint, start_point, not_first_cycle, raw,

left_graphnode = graphnode(left, base_interval=base_interval)
if left_graphnode.offset_is_from_ict:
point = start_point - left_graphnode.offset
elif left_graphnode.offset:
point = inpoint - left_graphnode.offset
point = get_point_relative(left_graphnode.offset_string,
start_point)
elif left_graphnode.offset_string:
point = get_point_relative(left_graphnode.offset_string, inpoint)
else:
point = inpoint
name = left_graphnode.name
Expand Down
Loading