Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix execution time limit polling intervals [cylc 8] #5753

Merged
merged 5 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/5753.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed bug where execution time limit polling intervals could end up incorrectly applied
99 changes: 82 additions & 17 deletions cylc/flow/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from shlex import quote
import shlex
from time import time
from typing import TYPE_CHECKING, Optional, Union, cast
from typing import TYPE_CHECKING, List, Optional, Union, cast

from cylc.flow import LOG, LOG_LEVELS
from cylc.flow.cfgspec.glbl_cfg import glbl_cfg
Expand Down Expand Up @@ -1557,24 +1557,23 @@ def _reset_job_timers(self, itask):
if itask.state(TASK_STATUS_RUNNING):
timeref = itask.summary['started_time']
timeout_key = 'execution timeout'
# Actual timeout after all polling.
timeout = self._get_events_conf(itask, timeout_key)
delays = list(self._get_workflow_platforms_conf(
itask, 'execution polling intervals'))
if itask.summary[self.KEY_EXECUTE_TIME_LIMIT]:
time_limit = itask.summary[self.KEY_EXECUTE_TIME_LIMIT]
time_limit_delays = itask.platform.get(
execution_polling_intervals = list(
self._get_workflow_platforms_conf(
itask, 'execution polling intervals'))
if itask.summary[TaskEventsManager.KEY_EXECUTE_TIME_LIMIT]:
time_limit = itask.summary[
TaskEventsManager.KEY_EXECUTE_TIME_LIMIT]
time_limit_polling_intervals = itask.platform.get(
'execution time limit polling intervals')
if sum(delays) > time_limit:
# Remove excessive polling before time limit
while sum(delays) > time_limit:
del delays[-1]
else:
# Fill up the gap before time limit
if delays:
size = int((time_limit - sum(delays)) / delays[-1])
delays.extend([delays[-1]] * size)
time_limit_delays[0] += time_limit - sum(delays)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The last member of the list is repeated.

if len(time_limit_delays) == 1 then all subsequent delays are not the set execution time limit polling intervals, but that value + the remainder of the execution polling intervals under the time limit.

delays += time_limit_delays
delays = self.process_execution_polling_intervals(
execution_polling_intervals,
time_limit,
time_limit_polling_intervals
)
else:
delays = execution_polling_intervals
else: # if itask.state.status == TASK_STATUS_SUBMITTED:
timeref = itask.summary['submitted_time']
timeout_key = 'submission timeout'
Expand Down Expand Up @@ -1608,6 +1607,72 @@ def _reset_job_timers(self, itask):
# Set next poll time
self.check_poll_time(itask)

@staticmethod
def process_execution_polling_intervals(
polling_intervals: List[float],
time_limit: float,
time_limit_polling_intervals: List[float]
) -> List[float]:
"""Create a list of polling times.

Args:
(execution) polling_intervals
(execution) time_limit
(execution) time_limit_polling_intervals

Examples:

>>> this = TaskEventsManager.process_execution_polling_intervals

# Basic example:
>>> this([40, 35], 100, [10])
[40, 35, 35, 10]
wxtim marked this conversation as resolved.
Show resolved Hide resolved

# Second 40 second delay gets lopped off the list because it's after
# the execution time limit:
>>> this([40, 40], 60, [10])
[40, 30, 10]

# Expand last item in exection polling intervals to fill the
# execution time limit:
>>> this([5, 20], 100, [10])
[5, 20, 20, 20, 20, 25, 10]

# There are no execution polling intervals set - polling starts
# at execution time limit:
>>> this([], 10, [5])
[15, 5]

# We have a list of execution time limit polling intervals,
>>> this([10], 25, [5, 6, 7, 8])
[10, 10, 10, 6, 7, 8]
"""
delays = polling_intervals
if sum(delays) > time_limit:
# Remove execution polling which would overshoot the
# execution time limit:
while sum(delays) > time_limit:
del delays[-1]
else:
# Repeat the last execution polling interval up to the execution
# time limit:
if delays:
size = int((time_limit - sum(delays)) / delays[-1])
delays.extend([delays[-1]] * size)
wxtim marked this conversation as resolved.
Show resolved Hide resolved

# After the last delay before the execution time limit add the
# delay to get to the execution_time_limit
if len(time_limit_polling_intervals) == 1:
time_limit_polling_intervals.append(
time_limit_polling_intervals[0]
)
time_limit_polling_intervals[0] += time_limit - sum(delays)

# After the execution time limit poll at execution time limit polling
# intervals.
delays += time_limit_polling_intervals
return delays

def add_event_timer(self, id_key, event_timer):
"""Add a new event timer.

Expand Down
23 changes: 23 additions & 0 deletions tests/integration/test_task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,26 @@ async def test_process_job_logs_retrieval_warns_no_platform(
warning = caplog.records[-1]
assert warning.levelname == 'WARNING'
assert 'Unable to retrieve' in warning.msg


async def test__reset_job_timers(
one_conf: Fixture, flow: Fixture, scheduler: Fixture,
start: Fixture, caplog: Fixture, mock_glbl_cfg: Fixture,
):
"""Integration test of pathway leading to
process_execution_polling_intervals.
"""
schd = scheduler(flow(one_conf))
async with start(schd):
itask = schd.pool.get_tasks()[0]
itask.state.status = 'running'
itask.platform['execution polling intervals'] = [25]
itask.platform['execution time limit polling intervals'] = [10]
itask.summary['execution_time_limit'] = 30
caplog.records.clear()
schd.task_events_mgr._reset_job_timers(itask)

assert (
'polling intervals=PT25S,PT15S,PT10S,...'
in caplog.records[0].msg
)
Loading