Skip to content

Commit

Permalink
Fix execution time limit polling intervals
Browse files Browse the repository at this point in the history
  • Loading branch information
wxtim committed Oct 4, 2023
1 parent 23d450a commit e7f932d
Showing 1 changed file with 82 additions and 17 deletions.
99 changes: 82 additions & 17 deletions cylc/flow/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from shlex import quote
import shlex
from time import time
from typing import TYPE_CHECKING, Optional, Union, cast
from typing import TYPE_CHECKING, List, Optional, Union, cast

from cylc.flow import LOG, LOG_LEVELS
from cylc.flow.cfgspec.glbl_cfg import glbl_cfg
Expand Down Expand Up @@ -1557,24 +1557,23 @@ def _reset_job_timers(self, itask):
if itask.state(TASK_STATUS_RUNNING):
timeref = itask.summary['started_time']
timeout_key = 'execution timeout'
# Actual timeout after all polling.
timeout = self._get_events_conf(itask, timeout_key)
delays = list(self._get_workflow_platforms_conf(
itask, 'execution polling intervals'))
if itask.summary[self.KEY_EXECUTE_TIME_LIMIT]:
time_limit = itask.summary[self.KEY_EXECUTE_TIME_LIMIT]
time_limit_delays = itask.platform.get(
execution_polling_intervals = list(
self._get_workflow_platforms_conf(
itask, 'execution polling intervals'))
if itask.summary[TaskEventsManager.KEY_EXECUTE_TIME_LIMIT]:
time_limit = itask.summary[
TaskEventsManager.KEY_EXECUTE_TIME_LIMIT]
time_limit_polling_intervals = itask.platform.get(
'execution time limit polling intervals')
if sum(delays) > time_limit:
# Remove excessive polling before time limit
while sum(delays) > time_limit:
del delays[-1]
else:
# Fill up the gap before time limit
if delays:
size = int((time_limit - sum(delays)) / delays[-1])
delays.extend([delays[-1]] * size)
time_limit_delays[0] += time_limit - sum(delays)
delays += time_limit_delays
delays = self.process_execution_polling_intervals(
execution_polling_intervals,
time_limit,
time_limit_polling_intervals
)
else:
delays = execution_polling_intervals
else: # if itask.state.status == TASK_STATUS_SUBMITTED:
timeref = itask.summary['submitted_time']
timeout_key = 'submission timeout'
Expand Down Expand Up @@ -1608,6 +1607,72 @@ def _reset_job_timers(self, itask):
# Set next poll time
self.check_poll_time(itask)

@staticmethod
def process_execution_polling_intervals(
polling_intervals: List[float],
time_limit: float,
time_limit_polling_intervals: List[float]
) -> List[float]:
"""Create a list of polling times.
Args:
(execution) polling_intervals
(execution) time_limit
(execution) time_limit_polling_intervals
Examples:
>>> this = TaskEventsManager.process_execution_polling_intervals
# Basic example:
>>> this([40, 35], 100, [10])
[40, 35, 35, 10]
# Second 40 second delay gets lopped off the list because it's after
# the execution time limit:
>>> this([40, 40], 60, [10])
[40, 30, 10]
# Expand last item in exection polling intervals to fill the
# execution time limit:
>>> this([5, 20], 100, [10])
[5, 20, 20, 20, 20, 25, 10]
# There are no execution polling intervals set - polling starts
# at execution time limit:
>>> this([], 10, [5])
[15, 5]
# We have a list of execution time limit polling intervals,
>>> this([10], 25, [5, 6, 7, 8])
[10, 10, 10, 6, 7, 8]
"""
delays = polling_intervals
if sum(delays) > time_limit:
# Remove execution polling which would overshoot the
# execution time limit:
while sum(delays) > time_limit:
del delays[-1]
else:
# Repeat the last execution polling interval up to the execution
# time limit:
if delays:
size = int((time_limit - sum(delays)) / delays[-1])
delays.extend([delays[-1]] * size)

# After the last delay before the execution time limit add the
# delay to get to the execution_time_limit
if len(time_limit_polling_intervals) > 1:
time_limit_polling_intervals[0] += time_limit - sum(delays)
else:
delays.append(
time_limit_polling_intervals[0] + time_limit - sum(delays))

# After the execution time limit poll at execution time limit polling
# intervals.
delays += time_limit_polling_intervals
return delays

def add_event_timer(self, id_key, event_timer):
"""Add a new event timer.
Expand Down

0 comments on commit e7f932d

Please sign in to comment.