Skip to content

Commit

Permalink
Merge pull request #1044 from benfitzpatrick/976.advanced-dependencies
Browse files Browse the repository at this point in the history
#976: advanced dependencies
  • Loading branch information
hjoliver committed Aug 4, 2014
2 parents 2c69133 + 73c1965 commit 363f76c
Show file tree
Hide file tree
Showing 17 changed files with 493 additions and 155 deletions.
15 changes: 14 additions & 1 deletion bin/cylc-restart
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ from cylc.task_state import task_state
from cylc.run import main
from cylc.command_prep import prep_file
import cylc.TaskID
from cylc.cycling.loader import get_point
from cylc.cycling.loader import get_point, DefaultCycler, ISO8601_CYCLING_TYPE
from cylc.wallclock import get_current_time_string


Expand Down Expand Up @@ -124,6 +124,11 @@ where they got to while the suite was down."""
The state dump file format is:
run mode : <mode>
time : <time> (<unix time>)
initial cycle : 2014050100
final cycle : (none)
(dp1 # (Broadcast pickle string)
. # (Broadcast pickle string)
Begin task states
class <classname>: item1=value1, item2=value2, ...
<task_id> : <state>
<task_id> : <state>
Expand Down Expand Up @@ -325,6 +330,14 @@ where they got to while the suite was down."""
print >> sys.stderr, "ERROR, Illegal line in suite state dump:"
print >> sys.stderr, " ", line
raise Exception( "ERROR: corrupted state dump" )
if (point_string == "1" and
DefaultCycler.TYPE == ISO8601_CYCLING_TYPE):
# A state file from a pre-cylc-6 with mixed-async graphing.
point_string = str(self.start_point)
new_id = cylc.TaskID.get(name, point_string)
print >> sys.stderr, (
"WARNING: converting %s to %s" % (id, new_id))
id = new_id
tasknames[name] = True
if 'status=submitting,' in state:
# backward compabitility for state dumps generated prior to #787
Expand Down
88 changes: 36 additions & 52 deletions lib/cylc/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import re, os, sys
import taskdef
from cylc.cfgspec.suite import get_suitecfg
from cylc.cycling.loader import (get_point,
from cylc.cycling.loader import (get_point, get_point_relative,
get_interval, get_interval_cls,
get_sequence, get_sequence_cls,
init_cyclers, INTEGER_CYCLING_TYPE,
Expand Down Expand Up @@ -333,8 +333,7 @@ def __init__( self, suite, fpath, template_vars=[],
self.cfg['visualization']['initial cycle point'] = vict

vict_rh = None
v_runahead_limit = (
self.custom_runahead_limit or self.minimum_runahead_limit)
v_runahead_limit = self.custom_runahead_limit
if vict and v_runahead_limit:
vict_rh = str( get_point( vict ) + v_runahead_limit )

Expand Down Expand Up @@ -576,7 +575,7 @@ def print_inheritance(self):
print ' ', ' ', item, val

def compute_runahead_limits( self ):
"""Extract the custom and the minimum runahead limits."""
"""Extract the runahead limits information."""

self.max_num_active_cycle_points = self.cfg['scheduling'][
'max active cycle points']
Expand All @@ -590,29 +589,6 @@ def compute_runahead_limits( self ):
# The custom runahead limit is None if not user-configured.
self.custom_runahead_limit = get_interval(limit)

# Find the minimum runahead limit necessary for any future triggers.
self.minimum_runahead_limit = None

offsets = set()
for name, taskdef in self.taskdefs.items():
if taskdef.min_intercycle_offset:
offsets.add(taskdef.min_intercycle_offset)

if offsets:
min_offset = min(offsets)
if min_offset < get_interval_cls().get_null():
# A negative offset comes from future triggering.
self.minimum_runahead_limit = abs(min_offset)
if (self.custom_runahead_limit is not None and
self.custom_runahead_limit <
self.minimum_runahead_limit):
print >> sys.stderr, (
' WARNING, custom runahead limit of %s is less than '
'future triggering offset %s: suite may stall.' %
(self.custom_runahead_limit,
self.minimum_runahead_limit)
)

def get_custom_runahead_limit( self ):
"""Return the custom runahead limit (may be None)."""
return self.custom_runahead_limit
Expand All @@ -621,10 +597,6 @@ def get_max_num_active_cycle_points( self ):
"""Return the maximum allowed number of pool cycle points."""
return self.max_num_active_cycle_points

def get_minimum_runahead_limit( self ):
"""Return the minimum runahead limit to apply."""
return self.minimum_runahead_limit

def get_config( self, args, sparse=False ):
return self.pcfg.get( args, sparse )

Expand Down Expand Up @@ -799,8 +771,9 @@ def process_directories(self):
os.environ['CYLC_SUITE_REG_PATH'] = RegPath( self.suite ).get_fpath()
os.environ['CYLC_SUITE_DEF_PATH'] = self.fdir

def set_trigger( self, task_name, right, output_name=None, offset=None,
cycle_point=None, suicide=False, base_interval=None ):
def set_trigger( self, task_name, right, output_name=None,
offset_string=None, cycle_point=None,
suicide=False, base_interval=None ):
trig = triggerx(task_name)
trig.set_suicide(suicide)
if output_name:
Expand Down Expand Up @@ -837,8 +810,9 @@ def set_trigger( self, task_name, right, output_name=None, offset=None,
# default: task succeeded
trig.set_type( 'succeeded' )

if offset:
trig.set_offset( str(offset) ) # TODO ISO - CONSISTENT SET_OFFSET INPUT
if offset_string:
# TODO ISO - CONSISTENT SET_OFFSET INPUT
trig.set_offset_string( offset_string )

if cycle_point:
trig.set_cycle_point( cycle_point )
Expand Down Expand Up @@ -1268,7 +1242,7 @@ def generate_taskdefs( self, line, left_nodes, right, ttype, section, seq,
raise SuiteConfigError, str(x)

name = my_taskdef_node.name
offset = my_taskdef_node.offset
offset_string = my_taskdef_node.offset_string

if name not in self.cfg['runtime']:
# naked dummy task, implicit inheritance from root
Expand Down Expand Up @@ -1308,19 +1282,20 @@ def generate_taskdefs( self, line, left_nodes, right, ttype, section, seq,
'status' : self.suite_polling_tasks[name][2] }

if not my_taskdef_node.is_absolute:
if offset:
if offset_string:
if flags.backwards_compat_cycling:
# Implicit cycling means foo[T+6] generates a +6 sequence.
if str(offset) in offset_seq_map:
seq_offset = offset_seq_map[str(offset)]
if offset_string in offset_seq_map:
seq_offset = offset_seq_map[offset_string]
else:
seq_offset = get_sequence(
section,
self.cfg['scheduling']['initial cycle point'],
self.cfg['scheduling']['final cycle point']
)
seq_offset.set_offset(offset)
offset_seq_map[str(offset)] = seq_offset
seq_offset.set_offset(
get_interval(offset_string))
offset_seq_map[offset_string] = seq_offset
self.taskdefs[name].add_sequence(
seq_offset, is_implicit=True)
if seq_offset not in self.sequences:
Expand Down Expand Up @@ -1360,21 +1335,30 @@ def generate_triggers( self, lexpression, left_nodes, right, seq, suicide ):

if lnode.intercycle:
ltaskdef.intercycle = True
if (ltaskdef.max_intercycle_offset is None or
lnode.offset > ltaskdef.max_intercycle_offset):
ltaskdef.max_intercycle_offset = lnode.offset
if (ltaskdef.min_intercycle_offset is None or
lnode.offset < ltaskdef.min_intercycle_offset):
ltaskdef.min_intercycle_offset = lnode.offset

if lnode.offset_is_from_ict:
first_point = get_point_relative(
lnode.offset_string, ltaskdef.ict)
last_point = seq.get_stop_point()
first_point = ltaskdef.ict - lnode.offset
if first_point and last_point is not None:
offset = (last_point - first_point)
ltaskdef.max_intercycle_offset = offset
if last_point is None:
# This dependency persists for the whole suite run.
ltaskdef.intercycle_offsets.append(
(None, seq))
else:
ltaskdef.intercycle_offsets.append(
(str(-(last_point - first_point)), seq))
cycle_point = first_point
trigger = self.set_trigger( lnode.name, right, lnode.output, lnode.offset, cycle_point, suicide, seq.get_interval() )
elif lnode.intercycle:
ltaskdef.intercycle = True
if lnode.offset_is_irregular:
offset_tuple = (lnode.offset_string, seq)
else:
offset_tuple = (lnode.offset_string, None)
ltaskdef.intercycle_offsets.append(offset_tuple)
trigger = self.set_trigger(
lnode.name, right, lnode.output, lnode.offset_string,
cycle_point, suicide, seq.get_interval()
)
if not trigger:
continue
if not conditional:
Expand Down
9 changes: 9 additions & 0 deletions lib/cylc/cycling/integer.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,10 @@ def get_first_point(self, point):
point = self.get_next_point(point)
return point

def get_start_point( self ):
"""Return the first point in this sequence, or None."""
return self.p_start

def get_stop_point(self):
"""Return the last point in this sequence, or None if unbounded."""
return self.p_stop
Expand All @@ -430,6 +434,11 @@ def init_from_cfg(cfg):
"""Placeholder function required by all cycling modules."""
pass

def get_point_relative(offset_string, base_point):
"""Create a point from offset_string applied to base_point."""
# TODO ISO: needs to be done somehow? Is it meaningful?
return base_point + IntegerInterval(offset_string)


def test():
"""Run some simple tests for integer cycling."""
Expand Down
45 changes: 40 additions & 5 deletions lib/cylc/cycling/iso8601.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class SuiteSpecifics(object):
ASSUMED_TIME_ZONE = None
DUMP_FORMAT = None
NUM_EXPANDED_YEAR_DIGITS = None
abbrev_util = None
interval_parser = None
point_parser = None

Expand Down Expand Up @@ -320,15 +321,14 @@ def __init__(self, dep_section, context_start_point=None,
self.custom_point_parse_function = None
if SuiteSpecifics.DUMP_FORMAT == PREV_DATE_TIME_FORMAT:
self.custom_point_parse_function = point_parse

self.time_parser = CylcTimeParser(
self.abbrev_util = CylcTimeParser(
self.context_start_point, self.context_end_point,
num_expanded_year_digits=SuiteSpecifics.NUM_EXPANDED_YEAR_DIGITS,
dump_format=SuiteSpecifics.DUMP_FORMAT,
custom_point_parse_function=self.custom_point_parse_function,
assumed_time_zone=SuiteSpecifics.ASSUMED_TIME_ZONE
)
self.recurrence = self.time_parser.parse_recurrence(recurrence_syntax)
self.recurrence = self.abbrev_util.parse_recurrence(recurrence_syntax)
self.step = ISO8601Interval(str(self.recurrence.interval))
self.value = str(self.recurrence)

Expand All @@ -343,9 +343,9 @@ def get_offset(self):
def set_offset(self, offset):
"""Deprecated: alter state to offset the entire sequence."""
if self.recurrence.start_point is not None:
self.recurrence.start_point -= interval_parse(str(offset))
self.recurrence.start_point += interval_parse(str(offset))
if self.recurrence.end_point is not None:
self.recurrence.end_point -= interval_parse(str(offset))
self.recurrence.end_point += interval_parse(str(offset))
self._cached_first_point_values = {}
self._cached_next_point_values = {}
self._cached_valid_point_booleans = {}
Expand Down Expand Up @@ -435,6 +435,12 @@ def get_first_point(self, point):
return ISO8601Point(first_point_value)
return None

def get_start_point( self ):
"""Return the first point in this sequence, or None."""
for recurrence_iso_point in self.recurrence:
return ISO8601Point(str(recurrence_iso_point))
return None

def get_stop_point(self):
"""Return the last point in this sequence, or None if unbounded."""
if (self.recurrence.repetitions is not None or (
Expand All @@ -455,6 +461,9 @@ def __eq__(self, other):
return True
return False

def __str__(self):
return self.value


def convert_old_cycler_syntax(dep_section, only_detect_old=False,
start_point=None):
Expand Down Expand Up @@ -578,6 +587,32 @@ def init(num_expanded_year_digits=0, custom_dump_format=None, time_zone=None,
dump_format=SuiteSpecifics.DUMP_FORMAT,
assumed_time_zone=time_zone_hours_minutes
)
custom_point_parse_function = None
if SuiteSpecifics.DUMP_FORMAT == PREV_DATE_TIME_FORMAT:
custom_point_parse_function = point_parse
SuiteSpecifics.abbrev_util = CylcTimeParser(
None, None,
num_expanded_year_digits=SuiteSpecifics.NUM_EXPANDED_YEAR_DIGITS,
dump_format=SuiteSpecifics.DUMP_FORMAT,
custom_point_parse_function=custom_point_parse_function,
assumed_time_zone=SuiteSpecifics.ASSUMED_TIME_ZONE
)


def get_point_relative(offset_string, base_point):
"""Create a point from offset_string applied to base_point."""
try:
interval = ISO8601Interval(
str(interval_parse(offset_string)))
except Exception:
pass
else:
return base_point + interval
return ISO8601Point(str(
SuiteSpecifics.abbrev_util.parse_timepoint(
offset_string, context_point=_point_parse(base_point.value)
)
))


def interval_parse(interval_string):
Expand Down
11 changes: 11 additions & 0 deletions lib/cylc/cycling/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@
POINTS = {INTEGER_CYCLING_TYPE: integer.IntegerPoint,
ISO8601_CYCLING_TYPE: iso8601.ISO8601Point}

POINT_RELATIVE_GETTERS = {
INTEGER_CYCLING_TYPE: integer.get_point_relative,
ISO8601_CYCLING_TYPE: iso8601.get_point_relative
}

INTERVALS = {INTEGER_CYCLING_TYPE: integer.IntegerInterval,
ISO8601_CYCLING_TYPE: iso8601.ISO8601Interval}

Expand Down Expand Up @@ -76,6 +81,12 @@ def get_point_cls(cycling_type=None):
return POINTS[cycling_type]


def get_point_relative(*args, **kwargs):
"""Return a point from an offset expression and a base point."""
cycling_type = kwargs.pop("cycling_type", DefaultCycler.TYPE)
return POINT_RELATIVE_GETTERS[cycling_type](*args, **kwargs)


def get_interval(*args, **kwargs):
"""Return a cylc.cycling.IntervalBase-derived object from a string."""
if args[0] is None:
Expand Down
9 changes: 5 additions & 4 deletions lib/cylc/graphing.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import re
import pygraphviz
import TaskID
from cycling.loader import get_point, get_interval
from cycling.loader import get_point, get_point_relative, get_interval
from graphnode import graphnode

# TODO: Do we still need autoURL below?
Expand Down Expand Up @@ -263,9 +263,10 @@ def get_left( self, inpoint, start_point, not_first_cycle, raw,

left_graphnode = graphnode(left, base_interval=base_interval)
if left_graphnode.offset_is_from_ict:
point = start_point - left_graphnode.offset
elif left_graphnode.offset:
point = inpoint - left_graphnode.offset
point = get_point_relative(left_graphnode.offset_string,
start_point)
elif left_graphnode.offset_string:
point = get_point_relative(left_graphnode.offset_string, inpoint)
else:
point = inpoint
name = left_graphnode.name
Expand Down
Loading

0 comments on commit 363f76c

Please sign in to comment.