Skip to content

Commit

Permalink
opt outputs: submitted is implicitly required (#5755)
Browse files Browse the repository at this point in the history
* This fixes a bug where submit-failed tasks were incorrectly identified
  as complete.
* E.G. `foo:finish => bar`.
* A task is incomplete if:
  * it finished executing without completing all required outputs
  * or if job submission failed and the :submit output was not optional
* See:
  https://github.com/cylc/cylc-admin/blob/master/docs/proposal-new-output-syntax.md#output-syntax
  • Loading branch information
oliver-sanders authored Oct 5, 2023
1 parent 23d450a commit a023aee
Show file tree
Hide file tree
Showing 8 changed files with 196 additions and 7 deletions.
1 change: 1 addition & 0 deletions changes.d/5755.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixes an issue where submit-failed tasks could be incorrectly considered as completed rather than causing the workflow to stall.
25 changes: 24 additions & 1 deletion cylc/flow/task_outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,24 @@ def __init__(self, tdef):
self._by_message = {}
self._by_trigger = {}
self._required = set()

# Add outputs from task def.
for trigger, (message, required) in tdef.outputs.items():
self._add(message, trigger, required=required)

# Handle implicit submit requirement
if (
# "submitted" is not declared as optional/required
tdef.outputs[TASK_OUTPUT_SUBMITTED][1] is None
# and "submit-failed" is not declared as optional/required
and tdef.outputs[TASK_OUTPUT_SUBMIT_FAILED][1] is None
):
self._add(
TASK_OUTPUT_SUBMITTED,
TASK_OUTPUT_SUBMITTED,
required=True,
)

def _add(self, message, trigger, is_completed=False, required=False):
"""Add a new output message"""
self._by_message[message] = [trigger, message, is_completed]
Expand Down Expand Up @@ -197,7 +211,16 @@ def is_incomplete(self):
)

def get_incomplete(self):
"""Return a list of required outputs that are not complete."""
"""Return a list of required outputs that are not complete.
A task is incomplete if:
* it finished executing without completing all required outputs
* or if job submission failed and the :submit output was not optional
https://github.com/cylc/cylc-admin/blob/master/docs/proposal-new-output-syntax.md#output-syntax
"""
return [
trigger
for trigger, (_, _, is_completed) in self._by_trigger.items()
Expand Down
5 changes: 4 additions & 1 deletion cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
TASK_OUTPUT_EXPIRED,
TASK_OUTPUT_FAILED,
TASK_OUTPUT_SUCCEEDED,
TASK_OUTPUT_SUBMIT_FAILED,
)
from cylc.flow.util import (
serialise,
Expand Down Expand Up @@ -1376,9 +1377,11 @@ def spawn_on_output(self, itask, output, forced=False):
self.remove(c_task, msg)

if not forced and output in [
# final task statuses
TASK_OUTPUT_SUCCEEDED,
TASK_OUTPUT_EXPIRED,
TASK_OUTPUT_FAILED
TASK_OUTPUT_FAILED,
TASK_OUTPUT_SUBMIT_FAILED,
]:
self.remove_if_complete(itask)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
[meta]
title = "Try out scenarios for intelligent host selection."
description = """
Tasks
- goodhost: a control to check that everything works
- badhost is always going to fail
- mixedhost contains some hosts that will and won't fail
Tasks:
- goodhost: a control to check that everything works
- badhost is always going to fail
- mixedhost contains some hosts that will and won't fail
"""

[scheduler]
Expand All @@ -18,7 +18,10 @@ Tasks
initial cycle point = 1
[[graph]]
# Run good and mixed as controls
R1 = badhosttask:submit-fail? => goodhosttask & mixedhosttask
R1 = """
badhosttask:submit-fail? => goodhosttask & mixedhosttask
mixedhosttask:submit-fail? # permit mixedhosttask to submit-fail
"""

[runtime]
[[root]]
Expand Down
47 changes: 47 additions & 0 deletions tests/functional/spawn-on-demand/18-submitted.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#!/usr/bin/env bash
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#-------------------------------------------------------------------------------

# Test the submitted and submit-failed triggers work correctly.
#
# The :submitted output should be considered required unless explicitly stated
# otherwise.
# See:
# * https://github.com/cylc/cylc-flow/pull/5755
# * https://github.com/cylc/cylc-admin/blob/master/docs/proposal-new-output-syntax.md#output-syntax

. "$(dirname "$0")/test_header"
set_test_number 5

# define a broken platform which will always result in submission failures
create_test_global_config '' '
[platforms]
[[broken]]
hosts = no-such-host
'

install_and_validate
reftest_run

for number in 1 2 3; do
grep_workflow_log_ok \
"${TEST_NAME_BASE}-a${number}" \
"${number}/a${number} .* did not complete required outputs: \['submitted'\]"
done

purge
exit
56 changes: 56 additions & 0 deletions tests/functional/spawn-on-demand/18-submitted/flow.cylc
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
[scheduler]
allow implicit tasks = True
[[events]]
# shut down once the workflow has stalled
# abort on stall timeout = True
# stall timeout = PT0S
stall handlers = cylc stop %(workflow)s
expected task failures = 1/a1, 2/a2, 3/a3

[scheduling]
initial cycle point = 1
cycling mode = integer
runahead limit = P10
[[graph]]
R/1 = """
# a1 should be incomplete (submission is implicitly required)
a1? => b
"""
R/2 = """
# a2 should be incomplete (submission is implicitly required)
a2:finished => b
"""
R/3 = """
# a3 should be incomplete (submission is explicitly required)
a3? => b
a3:submitted => s
"""
R/4 = """
# a4 should be complete (submission is explicitly optional)
a4? => b
a4:submitted? => s
"""
R/5 = """
# a5 should be complete (submission is explicitly optional)
a5? => b
a5:submitted? => s
a5:submit-failed? => f # branch should run
"""
R/6 = """
# a6 should be complete (submission is explicitly optional)
a6? => b
a6:submit-failed? => f # branch should run
"""
R/7 = """
# a7 should be complete (submission is explicitly optional)
a:submit-failed? => f # branch should run
"""
R/8 = """
# a8 should be complete (submission is explicitly optional)
a:submitted? => s # branch should run
"""

[runtime]
[[a1, a2, a3, a4, a5]]
# a task which will always submit-fail
platform = broken
11 changes: 11 additions & 0 deletions tests/functional/spawn-on-demand/18-submitted/reference.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
7/a -triggered off [] in flow 1
6/a6 -triggered off [] in flow 1
8/a -triggered off [] in flow 1
3/a3 -triggered off [] in flow 1
2/a2 -triggered off [] in flow 1
4/a4 -triggered off [] in flow 1
1/a1 -triggered off [] in flow 1
5/a5 -triggered off [] in flow 1
5/f -triggered off ['5/a5'] in flow 1
8/s -triggered off ['8/a'] in flow 1
6/b -triggered off ['6/a6'] in flow 1
45 changes: 45 additions & 0 deletions tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING,
TASK_STATUS_SUCCEEDED,
TASK_STATUS_FAILED,
TASK_STATUS_EXPIRED,
TASK_STATUS_SUBMIT_FAILED,
)

# NOTE: foo and bar have no parents so at start-up (even with the workflow
Expand Down Expand Up @@ -1201,3 +1204,45 @@ async def test_runahead_offset_start(
"""
task_pool = mod_example_flow_2.pool
assert task_pool.runahead_limit_point == ISO8601Point('2004')


async def test_detect_incomplete_tasks(
flow,
scheduler,
start,
log_filter,
):
"""Finished tasks should be marked as incomplete.
If a task finishes without completing all required outputs, then it should
be marked as incomplete.
"""
incomplete_final_task_states = [
TASK_STATUS_FAILED,
TASK_STATUS_EXPIRED,
TASK_STATUS_SUBMIT_FAILED,
]
id_ = flow({
'scheduler': {
'allow implicit tasks': 'True',
},
'scheduling': {
'graph': {
# a workflow with one task for each of the incomplete final
# task states
'R1': '\n'.join(incomplete_final_task_states)
}
}
})
schd = scheduler(id_)
async with start(schd) as log:
itasks = schd.pool.get_tasks()
for itask in itasks:
# spawn the output corresponding to the task
schd.pool.spawn_on_output(itask, itask.tdef.name)
# ensure that it is correctly identified as incomplete
assert itask.state.outputs.get_incomplete()
assert itask.state.outputs.is_incomplete()
assert log_filter(log, contains=f"[{itask}] did not complete required outputs:")
# the task should not have been removed
assert itask in schd.pool.get_tasks()

0 comments on commit a023aee

Please sign in to comment.