Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cylc.network.suite_state: speed up updating #1811

Merged
merged 2 commits into from
Apr 27, 2016
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 52 additions & 40 deletions lib/cylc/network/suite_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,45 +86,45 @@ def __init__(self, run_mode):
def update(self, tasks, tasks_rh, min_point, max_point, max_point_rh,
paused, will_pause_at, stopping, will_stop_at, ns_defn_order,
reloading):
task_summary = {}
global_summary = {}
family_summary = {}
task_states = {}

fs = None
for tlist in [tasks, tasks_rh]:
for task in tlist:
ts = task.get_state_summary()
if fs:
ts['state'] = fs
task_summary[task.identity] = ts
name, point_string = TaskID.split(task.identity)
point_string = str(point_string)
task_states.setdefault(point_string, {})
task_states[point_string][name] = (
task_summary[task.identity]['state'])
fs = 'runahead'
task_summary, task_states = self._get_tasks_info(tasks, tasks_rh)

fam_states = {}
all_states = []
for point_string, c_task_states in task_states.items():
config = SuiteConfig.get_inst()
ancestors_dict = config.get_first_parent_ancestors()

# Compute state_counts (total, and per cycle).
state_count_totals = {}
state_count_cycles = {}

for point_string, c_task_states in task_states:
# For each cycle point, construct a family state tree
# based on the first-parent single-inheritance tree

c_fam_task_states = {}
config = SuiteConfig.get_inst()

for key, parent_list in (
config.get_first_parent_ancestors().items()):
state = c_task_states.get(key)
count = {}

for key in c_task_states:
state = c_task_states[key]
if state is None:
continue
try:
count[state] += 1
except KeyError:
count[state] = 1

all_states.append(state)
for parent in parent_list:
for parent in ancestors_dict[key]:
if parent == key:
continue
c_fam_task_states.setdefault(parent, [])
c_fam_task_states[parent].append(state)
c_fam_task_states.setdefault(parent, set([]))
c_fam_task_states[parent].add(state)

state_count_cycles[point_string] = count

for fam, child_states in c_fam_task_states.items():
f_id = TaskID.get(fam, point_string)
Expand All @@ -143,23 +143,13 @@ def update(self, tasks, tasks_rh, min_point, max_point, max_point_rh,
'label': point_string,
'state': state}

all_states.sort()

# Compute state_counts (total, and per cycle).
state_count_totals = {}
state_count_cycles = {}
for point_string, name_states in task_states.items():
count = {}
for name, state in name_states.items():
try:
count[state] += 1
except KeyError:
count[state] = 1
try:
state_count_totals[state] += 1
except KeyError:
state_count_totals[state] = 1
state_count_cycles[point_string] = count
for point_string, count in state_count_cycles.items():
for state, state_count in count.items():
state_count_totals.setdefault(state, 0)
state_count_totals[state] += state_count

all_states.sort()

global_summary['oldest cycle point string'] = (
self.str_or_None(min_point))
Expand Down Expand Up @@ -196,11 +186,33 @@ def update(self, tasks, tasks_rh, min_point, max_point, max_point_rh,
self.task_summary = task_summary
self.global_summary = global_summary
self.family_summary = family_summary
task_states = {}
self.first_update_completed = True
self.state_count_totals = state_count_totals
self.state_count_cycles = state_count_cycles

def _get_tasks_info(self, tasks, tasks_rh):
"""Retrieve task summary info and states."""

task_summary = {}
task_states = {}

for task in tasks:
ts = task.get_state_summary()
task_summary[task.identity] = ts
name, point_string = TaskID.split(task.identity)
task_states.setdefault(point_string, {})
task_states[point_string][name] = ts['state']

for task in tasks_rh:
ts = task.get_state_summary()
ts['state'] = 'runahead'
task_summary[task.identity] = ts
name, point_string = TaskID.split(task.identity)
task_states.setdefault(point_string, {})
task_states[point_string][name] = 'runahead'

return task_summary, task_states.items()

def str_or_None(self, s):
if s:
return str(s)
Expand Down