Skip to content

Commit

Permalink
Merge pull request #974 from benfitzpatrick/iso.speedup
Browse files Browse the repository at this point in the history
#119: speed up
  • Loading branch information
arjclark committed Jun 17, 2014
2 parents 6c08fde + 6fbf02a commit 4f8763f
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 66 deletions.
65 changes: 48 additions & 17 deletions lib/cylc/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -957,7 +957,7 @@ def replace_family_triggers( self, line_in, fam, members, orig='' ):
line = re.sub( exclam + r"\b" + fam + r"\b" + re.escape(foffset) + orig, mems, line )
return line

def process_graph_line( self, line, section, ttype, seq,
def process_graph_line( self, line, section, ttype, seq, offset_seq_map,
tasks_to_prune=None,
return_all_dependencies=False ):
"""Extract dependent pairs from the suite.rc dependency text.
Expand All @@ -976,6 +976,8 @@ def process_graph_line( self, line, section, ttype, seq,
ttype is either 'cycling' or an async indicator.
seq is the sequence generated from 'section' given the initial
and final cycle time.
offset_seq_map is a cache of seq with various offsets for
speeding up backwards-compatible cycling.
tasks_to_prune, if not None, is a list of tasks to remove
from dependency expressions (backwards compatibility for
start-up tasks and async graph tasks).
Expand Down Expand Up @@ -1208,7 +1210,8 @@ def process_graph_line( self, line, section, ttype, seq,
seq, suicide )
self.generate_taskdefs( orig_line, pruned_left_nodes,
right_name, ttype,
section, asyncid_pattern,
section, seq, offset_seq_map,
asyncid_pattern,
seq.get_interval() )
self.generate_triggers( lexpression, pruned_left_nodes,
right_name, seq,
Expand All @@ -1232,8 +1235,9 @@ def generate_edges( self, lexpression, left_nodes, right, ttype, seq, suicide=Fa
else:
self.edges.append(e)

def generate_taskdefs( self, line, left_nodes, right, ttype, section, asyncid_pattern,
base_interval ):
def generate_taskdefs( self, line, left_nodes, right, ttype, section, seq,
offset_seq_map, asyncid_pattern, base_interval ):
"""Generate task definitions for nodes on a given line."""
for node in left_nodes + [right]:
if not node:
# if right is None, lefts are lone nodes
Expand Down Expand Up @@ -1291,17 +1295,22 @@ def generate_taskdefs( self, line, left_nodes, right, ttype, section, asyncid_pa
'task' : self.suite_polling_tasks[name][1],
'status' : self.suite_polling_tasks[name][2] }

seq = get_sequence( section,
self.cfg['scheduling']['initial cycle time'],
self.cfg['scheduling']['final cycle time'] )

if not my_taskdef_node.is_absolute:
if offset:
if flags.back_comp_cycling:
# Implicit cycling means foo[T+6] generates a +6 sequence.
seq.set_offset(offset)
if str(offset) in offset_seq_map:
seq_offset = offset_seq_map[str(offset)]
else:
seq_offset = get_sequence(
section,
self.cfg['scheduling']['initial cycle time'],
self.cfg['scheduling']['final cycle time']
)
seq_offset.set_offset(offset)
offset_seq_map[str(offset)] = seq_offset
self.taskdefs[name].add_sequence(
seq, is_implicit=True)
seq_offset, is_implicit=True)
# We don't handle implicit cycling in new-style cycling.
else:
self.taskdefs[ name ].add_sequence(seq)
Expand Down Expand Up @@ -1578,12 +1587,14 @@ def load_graph( self ):
self.graph_found = False
has_non_async_graphs = False

section_seq_map = {}

# Set up our backwards-compatibility handling of async graphs.
async_graph = self.cfg['scheduling']['dependencies']['graph']
if async_graph:
section = get_sequence_cls().get_async_expr()
async_dependencies = self.parse_graph(
section, async_graph,
section, async_graph, section_seq_map=section_seq_map,
return_all_dependencies=True
)
for left, left_output, right in async_dependencies:
Expand Down Expand Up @@ -1625,7 +1636,7 @@ def load_graph( self ):
print "[[[" + section + "]]]"
print " " + 'graph = """' + graph + '"""'
special_dependencies = self.parse_graph(
section, graph,
section, graph, section_seq_map=section_seq_map,
tasks_to_prune=tasks_to_prune
)
if special_dependencies and tasks_to_prune:
Expand Down Expand Up @@ -1664,8 +1675,21 @@ def load_graph( self ):
"new-style cycling"
)

def parse_graph( self, section, graph, tasks_to_prune=None,
return_all_dependencies=False ):
def parse_graph( self, section, graph, section_seq_map=None,
tasks_to_prune=None, return_all_dependencies=False ):
"""Parse a multi-line graph string for section.
section should be a string like "R1" or "T00".
graph should be a single or multi-line string like "foo => bar"
section_seq_map should be a dictionary that indexes cycling
sequences by their section string
tasks_to_prune is a list of task names that should be
automatically removed when processing graph
return_all_dependencies is a boolean that, if True, returns a
list of task dependencies - e.g. [('foo', 'start', 'bar')] for
a graph of 'foo:start => bar'.
"""
self.graph_found = True

if re.match( '^ASYNCID:', section ):
Expand All @@ -1675,9 +1699,16 @@ def parse_graph( self, section, graph, tasks_to_prune=None,
ttype = 'cycling'
sec = section

seq = get_sequence( section,
if section in section_seq_map:
seq = section_seq_map[section]
else:
seq = get_sequence(
section,
self.cfg['scheduling']['initial cycle time'],
self.cfg['scheduling']['final cycle time'] )
self.cfg['scheduling']['final cycle time']
)
section_seq_map[section] = seq
offset_seq_map = {}

if seq not in self.sequences:
self.sequences.append(seq)
Expand All @@ -1696,7 +1727,7 @@ def parse_graph( self, section, graph, tasks_to_prune=None,
line = re.sub( '\s*$', '', line )
# generate pygraphviz graph nodes and edges, and task definitions
special_dependencies.extend(self.process_graph_line(
line, section, ttype, seq,
line, section, ttype, seq, offset_seq_map,
tasks_to_prune=tasks_to_prune,
return_all_dependencies=return_all_dependencies
))
Expand Down
4 changes: 4 additions & 0 deletions lib/cylc/cycling/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ class IntervalBase(object):
They should also provide self.cmp_, self.sub, self.add,
self.__mul__, self.__abs__, self.__nonzero__ methods which should
behave as __cmp__, __sub__, etc standard comparison methods.
They can also just override the provided comparison methods (such
as __cmp__) instead.
Note: "cmp_" not "cmp", etc. They should also provide:
* self.get_null, which is a method to extract the null interval of
this type.
Expand Down
48 changes: 44 additions & 4 deletions lib/cylc/cycling/iso8601.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,11 @@ def from_nonstandard_string(cls, point_string):
def add(self, other):
return ISO8601Point(self._iso_point_add(self.value, other.value))

def cmp_(self, other):
def __cmp__(self, other):
if self.TYPE != other.TYPE:
return cmp(self.TYPE_SORT_KEY, other.TYPE_SORT_KEY)
if self.value == other.value:
return 0
return self._iso_point_cmp(self.value, other.value)

def standardise(self):
Expand Down Expand Up @@ -241,6 +245,7 @@ class ISO8601Sequence(object):

TYPE = CYCLER_TYPE_ISO8601
TYPE_SORT_KEY = CYCLER_TYPE_SORT_KEY_ISO8601
_MAX_CACHED_POINTS = 100

@classmethod
def get_async_expr(cls, start_point=None):
Expand Down Expand Up @@ -271,6 +276,10 @@ def __init__(self, dep_section, context_start_point=None,
if not i:
raise "ERROR: iso8601 cycling init!"

self._cached_first_point_values = {}
self._cached_next_point_values = {}
self._cached_valid_point_booleans = {}

self.spec = i
self.custom_point_parse_function = None
if DUMP_FORMAT == PREV_DATE_TIME_FORMAT:
Expand Down Expand Up @@ -299,6 +308,9 @@ def set_offset(self, offset):
self.recurrence.start_point -= interval_parse(str(offset))
if self.recurrence.end_point is not None:
self.recurrence.end_point -= interval_parse(str(offset))
self._cached_first_point_values = {}
self._cached_next_point_values = {}
self._cached_valid_point_booleans = {}
self.value = str(self.recurrence)

def is_on_sequence(self, point):
Expand All @@ -307,7 +319,15 @@ def is_on_sequence(self, point):

def is_valid(self, point):
"""Return True if point is on-sequence and in-bounds."""
return self.is_on_sequence(point)
try:
return self._cached_valid_point_booleans[point.value]
except KeyError:
is_valid = self.is_on_sequence(point)
if (len(self._cached_valid_point_booleans) >
self._MAX_CACHED_POINTS):
self._cached_valid_point_booleans.popitem()
self._cached_valid_point_booleans[point.value] = is_valid
return is_valid

def get_prev_point(self, point):
"""Return the previous point < point, or None if out of bounds."""
Expand Down Expand Up @@ -335,10 +355,19 @@ def get_nearest_prev_point(self, point):

def get_next_point(self, point):
"""Return the next point > p, or None if out of bounds."""
try:
return ISO8601Point(self._cached_next_point_values[point.value])
except KeyError:
pass
p_iso_point = point_parse(point.value)
for recurrence_iso_point in self.recurrence:
if recurrence_iso_point > p_iso_point:
return ISO8601Point(str(recurrence_iso_point))
next_point_value = str(recurrence_iso_point)
if (len(self._cached_next_point_values) >
self._MAX_CACHED_POINTS):
self._cached_next_point_values.popitem()
self._cached_next_point_values[point.value] = next_point_value
return ISO8601Point(next_point_value)
return None

def get_next_point_on_sequence(self, point):
Expand All @@ -352,10 +381,20 @@ def get_next_point_on_sequence(self, point):

def get_first_point( self, point):
"""Return the first point >= to poing, or None if out of bounds."""
try:
return ISO8601Point(self._cached_first_point_values[point.value])
except KeyError:
pass
p_iso_point = point_parse(point.value)
for recurrence_iso_point in self.recurrence:
if recurrence_iso_point >= p_iso_point:
return ISO8601Point(str(recurrence_iso_point))
first_point_value = str(recurrence_iso_point)
if (len(self._cached_first_point_values) >
self._MAX_CACHED_POINTS):
self._cached_first_point_values.popitem()
self._cached_first_point_values[point.value] = (
first_point_value)
return ISO8601Point(first_point_value)
return None

def get_stop_point( self ):
Expand Down Expand Up @@ -524,6 +563,7 @@ def _interval_parse(interval_string):
def point_parse(point_string):
return _point_parse(point_string).copy()


@memoize
def _point_parse(point_string):
if "%" in DUMP_FORMAT:
Expand Down
1 change: 0 additions & 1 deletion lib/cylc/state_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ def update( self, tasks, oldest, newest, paused, will_pause_at,
# based on the first-parent single-inheritance tree

c_fam_task_states = {}
c_task_states = task_states.get(ctime, {})

for key, parent_list in self.config.get_first_parent_ancestors().items():
state = c_task_states.get(key)
Expand Down
46 changes: 28 additions & 18 deletions lib/cylc/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ def release_runahead_tasks( self ):
# compute runahead base: the oldest task not succeeded or failed
# (excludes finished and includes runahead-limited tasks so a
# low limit cannot stall the suite).

if not self.runahead_pool:
return

runahead_base = None
for itask in self.get_tasks(all=True):
if itask.state.is_currently('failed', 'succeeded'):
Expand Down Expand Up @@ -206,7 +210,6 @@ def remove( self, itask, reason=None ):
del itask



def get_tasks( self, all=False ):
""" Return the current list of task proxies."""

Expand Down Expand Up @@ -669,6 +672,23 @@ def remove_suiciding_tasks( self ):
self.remove( itask, 'suicide' )


def _get_earliest_unsatisfied_cycle_point( self ):
cutoff = None
for itask in self.get_tasks(all=True):
# this has to consider tasks in the runahead pool too, e.g.
# ones that have just spawned and not been released yet.
if not itask.is_cycling:
continue
if itask.state.is_currently('waiting', 'held' ):
if cutoff is None or itask.c_time < cutoff:
cutoff = itask.c_time
elif not itask.has_spawned():
# (e.g. 'ready')
nxt = itask.next_tag()
if nxt is not None and ( cutoff is None or nxt < cutoff ):
cutoff = nxt
return cutoff

def remove_spent_cycling_tasks( self ):
"""
Remove cycling tasks no longer needed to satisfy others' prerequisites.
Expand All @@ -682,30 +702,20 @@ def remove_spent_cycling_tasks( self ):
"""

# first find the cycle time of the earliest unsatisfied task
cutoff = None
for itask in self.get_tasks(all=True):
# this has to consider tasks in the runahead pool too, e.g.
# ones that have just spawned and not been released yet.
if not itask.is_cycling:
continue
if itask.state.is_currently('waiting', 'held' ):
if not cutoff or itask.c_time < cutoff:
cutoff = itask.c_time
elif not itask.has_spawned():
# (e.g. 'ready')
nxt = itask.next_tag()
if nxt and ( not cutoff or nxt < cutoff ):
cutoff = nxt
cutoff = self._get_earliest_unsatisfied_cycle_point()

if not cutoff:
return

# now check each succeeded task against the cutoff
spent = []
for itask in self.get_tasks():
if not itask.state.is_currently('succeeded') or \
not itask.is_cycling or \
not itask.state.has_spawned():
not itask.state.has_spawned() or \
itask.cleanup_cutoff is None:
continue
if (cutoff and itask.cleanup_cutoff is not None and
cutoff > itask.cleanup_cutoff):
if cutoff > itask.cleanup_cutoff:
spent.append(itask)
for itask in spent:
self.remove( itask )
Expand Down
8 changes: 0 additions & 8 deletions lib/cylc/task_types/cycling.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,3 @@ def next_tag( self ):
if adjusted:
p_next = min( adjusted )
return p_next

def get_state_summary( self ):
summary = task.get_state_summary( self )
# derived classes can call this method and then
# add more information to the summary if necessary.
summary[ 'cycle_time' ] = self.c_time # (equiv to self.tag)
return summary

Loading

0 comments on commit 4f8763f

Please sign in to comment.