Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use OrderedDict for queues to make them FIFO #2584

Merged
merged 6 commits into from
Mar 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion doc/src/cylc-user-guide/cug.tex
Original file line number Diff line number Diff line change
Expand Up @@ -6857,7 +6857,10 @@ \subsection{Limiting Activity With Internal Queues}
Large suites can potentially overwhelm task hosts by submitting too many
tasks at once. You can prevent this with {\em internal queues}, which
limit the number of tasks that can be active (submitted or running)
at the some time.
at the same time.

Internal queues behave in the first-in-first-out (FIFO) manner, i.e.\ tasks are
released from a queue in the same order that they were queued.

A queue is defined by a {\em name}; a {\em limit}, which is the maximum
number of active tasks allowed for the queue; and a list of {\em members},
Expand Down
37 changes: 22 additions & 15 deletions lib/cylc/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
TASK_STATUS_RETRYING)
from cylc.wallclock import (
get_current_time_string, get_time_string_from_unix_time)
from parsec.OrderedDict import OrderedDict


class TaskPool(object):
Expand Down Expand Up @@ -208,7 +209,7 @@ def add_to_runahead_pool(self, itask, is_restart=False):
itask.state.set_held()

# add to the runahead pool
self.runahead_pool.setdefault(itask.point, {})
self.runahead_pool.setdefault(itask.point, OrderedDict())
self.runahead_pool[itask.point][itask.identity] = itask
self.rhpool_changed = True

Expand Down Expand Up @@ -460,7 +461,7 @@ def release_runahead_task(self, itask):
queue = self.myq[itask.tdef.name]
except KeyError:
queue = self.config.Q_DEFAULT
self.queues.setdefault(queue, {})
self.queues.setdefault(queue, OrderedDict())
self.queues[queue][itask.identity] = itask
self.pool.setdefault(itask.point, {})
self.pool[itask.point][itask.identity] = itask
Expand Down Expand Up @@ -568,25 +569,32 @@ def get_ready_tasks(self):
Return the tasks that are dequeued.
"""

# 1) queue unqueued tasks that are ready to run or manually forced
now = time()
for itask in self.get_tasks():
if itask.state.status != TASK_STATUS_QUEUED:
# only need to check that unqueued tasks are ready
if itask.manual_trigger or itask.ready_to_run(now):
# queue the task
itask.state.reset_state(TASK_STATUS_QUEUED)
itask.reset_manual_trigger()

# 2) submit queued tasks if manually forced or not queue-limited
ready_tasks = []
qconfig = self.config.cfg['scheduling']['queues']

for queue in self.queues:
# 2.1) count active tasks and compare to queue limit
tasks = self.queues[queue].values()

# 1) queue unqueued tasks that are ready to run or manually forced
for itask in tasks:
if itask.state.status != TASK_STATUS_QUEUED:
# only need to check that unqueued tasks are ready
if itask.manual_trigger or itask.ready_to_run(now):
# queue the task
itask.state.reset_state(TASK_STATUS_QUEUED)
itask.reset_manual_trigger()
# move the task to the back of the queue
self.queues[queue][itask.identity] = \
self.queues[queue].pop(itask.identity)

# 2) submit queued tasks if manually forced or not queue-limited
n_active = 0
n_release = 0
n_limit = qconfig[queue]['limit']
tasks = self.queues[queue].values()

# 2.1) count active tasks and compare to queue limit
if n_limit:
for itask in tasks:
if itask.state.status in [TASK_STATUS_READY,
Expand Down Expand Up @@ -674,8 +682,7 @@ def set_do_reload(self, config, stop_point):
if itask.tdef.name not in self.myq:
continue
key = self.myq[itask.tdef.name]
if key not in new_queues:
new_queues[key] = {}
new_queues.setdefault(key, OrderedDict())
new_queues[key][id_] = itask
self.queues = new_queues

Expand Down
32 changes: 32 additions & 0 deletions tests/queues/02-queueorder.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#!/bin/bash
# THIS FILE IS PART OF THE CYLC SUITE ENGINE.
# Copyright (C) 2008-2018 NIWA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#-------------------------------------------------------------------------------
# Test job script sets dependencies in evironment.
. "$(dirname "${0}")/test_header"
set_test_number 3

install_suite "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"
run_ok "${TEST_NAME_BASE}-validate" cylc validate "${SUITE_NAME}"
run_ok "${TEST_NAME_BASE}-run" \
cylc run "${SUITE_NAME}" --reference-test --debug --no-detach
run_ok "${TEST_NAME_BASE}-test" bash -o pipefail -c "
cylc cat-log '${SUITE_NAME}' |
grep 'proc_n.*submitted at' |
sort --key=4,4 --check"

purge_suite "${SUITE_NAME}"
exit
17 changes: 17 additions & 0 deletions tests/queues/02-queueorder/reference.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
2018-03-13T14:22:38Z INFO - Initial point: 1
2018-03-13T14:22:38Z INFO - Final point: 1
2018-03-13T14:22:38Z INFO - [delay_n1.1] -triggered off []
2018-03-13T14:22:38Z INFO - [hold.1] -triggered off []
2018-03-13T14:22:40Z INFO - [delay_n2.1] -triggered off ['delay_n1.1']
2018-03-13T14:22:42Z INFO - [delay_n3.1] -triggered off ['delay_n2.1']
2018-03-13T14:22:44Z INFO - [delay_n4.1] -triggered off ['delay_n3.1']
2018-03-13T14:22:47Z INFO - [delay_n5.1] -triggered off ['delay_n4.1']
2018-03-13T14:22:48Z INFO - [proc_n1.1] -triggered off ['delay_n1.1']
2018-03-13T14:22:49Z INFO - [delay_n6.1] -triggered off ['delay_n5.1']
2018-03-13T14:22:50Z INFO - [proc_n2.1] -triggered off ['delay_n2.1']
2018-03-13T14:22:52Z INFO - [delay_n7.1] -triggered off ['delay_n6.1']
2018-03-13T14:22:52Z INFO - [proc_n3.1] -triggered off ['delay_n3.1']
2018-03-13T14:22:54Z INFO - [proc_n4.1] -triggered off ['delay_n4.1']
2018-03-13T14:22:56Z INFO - [proc_n5.1] -triggered off ['delay_n5.1']
2018-03-13T14:22:58Z INFO - [proc_n6.1] -triggered off ['delay_n6.1']
2018-03-13T14:23:00Z INFO - [proc_n7.1] -triggered off ['delay_n7.1']
19 changes: 19 additions & 0 deletions tests/queues/02-queueorder/suite.rc
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[cylc]
[[parameters]]
n = 1..7
[scheduling]
[[queues]]
[[[q1]]]
limit = 1
members = proc<n>, hold
[[dependencies]]
graph = """
delay<n-1> => delay<n>
delay<n> => proc<n>
hold
"""
[runtime]
[[delay<n>]]
[[proc<n>]]
[[hold]]
script = sleep 7