From e7f932d2ae0befc5994b6ede1264a32ab74766b3 Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Wed, 4 Oct 2023 08:49:48 +0100 Subject: [PATCH] Fix execution time limit polling intervals --- cylc/flow/task_events_mgr.py | 99 +++++++++++++++++++++++++++++------- 1 file changed, 82 insertions(+), 17 deletions(-) diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py index 852b28f848e..3152131d044 100644 --- a/cylc/flow/task_events_mgr.py +++ b/cylc/flow/task_events_mgr.py @@ -33,7 +33,7 @@ from shlex import quote import shlex from time import time -from typing import TYPE_CHECKING, Optional, Union, cast +from typing import TYPE_CHECKING, List, Optional, Union, cast from cylc.flow import LOG, LOG_LEVELS from cylc.flow.cfgspec.glbl_cfg import glbl_cfg @@ -1557,24 +1557,23 @@ def _reset_job_timers(self, itask): if itask.state(TASK_STATUS_RUNNING): timeref = itask.summary['started_time'] timeout_key = 'execution timeout' + # Actual timeout after all polling. timeout = self._get_events_conf(itask, timeout_key) - delays = list(self._get_workflow_platforms_conf( - itask, 'execution polling intervals')) - if itask.summary[self.KEY_EXECUTE_TIME_LIMIT]: - time_limit = itask.summary[self.KEY_EXECUTE_TIME_LIMIT] - time_limit_delays = itask.platform.get( + execution_polling_intervals = list( + self._get_workflow_platforms_conf( + itask, 'execution polling intervals')) + if itask.summary[TaskEventsManager.KEY_EXECUTE_TIME_LIMIT]: + time_limit = itask.summary[ + TaskEventsManager.KEY_EXECUTE_TIME_LIMIT] + time_limit_polling_intervals = itask.platform.get( 'execution time limit polling intervals') - if sum(delays) > time_limit: - # Remove excessive polling before time limit - while sum(delays) > time_limit: - del delays[-1] - else: - # Fill up the gap before time limit - if delays: - size = int((time_limit - sum(delays)) / delays[-1]) - delays.extend([delays[-1]] * size) - time_limit_delays[0] += time_limit - sum(delays) - delays += time_limit_delays + delays = self.process_execution_polling_intervals( + execution_polling_intervals, + time_limit, + time_limit_polling_intervals + ) + else: + delays = execution_polling_intervals else: # if itask.state.status == TASK_STATUS_SUBMITTED: timeref = itask.summary['submitted_time'] timeout_key = 'submission timeout' @@ -1608,6 +1607,72 @@ def _reset_job_timers(self, itask): # Set next poll time self.check_poll_time(itask) + @staticmethod + def process_execution_polling_intervals( + polling_intervals: List[float], + time_limit: float, + time_limit_polling_intervals: List[float] + ) -> List[float]: + """Create a list of polling times. + + Args: + (execution) polling_intervals + (execution) time_limit + (execution) time_limit_polling_intervals + + Examples: + + >>> this = TaskEventsManager.process_execution_polling_intervals + + # Basic example: + >>> this([40, 35], 100, [10]) + [40, 35, 35, 10] + + # Second 40 second delay gets lopped off the list because it's after + # the execution time limit: + >>> this([40, 40], 60, [10]) + [40, 30, 10] + + # Expand last item in exection polling intervals to fill the + # execution time limit: + >>> this([5, 20], 100, [10]) + [5, 20, 20, 20, 20, 25, 10] + + # There are no execution polling intervals set - polling starts + # at execution time limit: + >>> this([], 10, [5]) + [15, 5] + + # We have a list of execution time limit polling intervals, + >>> this([10], 25, [5, 6, 7, 8]) + [10, 10, 10, 6, 7, 8] + """ + delays = polling_intervals + if sum(delays) > time_limit: + # Remove execution polling which would overshoot the + # execution time limit: + while sum(delays) > time_limit: + del delays[-1] + else: + # Repeat the last execution polling interval up to the execution + # time limit: + if delays: + size = int((time_limit - sum(delays)) / delays[-1]) + delays.extend([delays[-1]] * size) + + # After the last delay before the execution time limit add the + # delay to get to the execution_time_limit + if len(time_limit_polling_intervals) > 1: + time_limit_polling_intervals[0] += time_limit - sum(delays) + else: + delays.append( + time_limit_polling_intervals[0] + time_limit - sum(delays)) + + # After the execution time limit poll at execution time limit polling + # intervals. + delays += time_limit_polling_intervals + return delays + def add_event_timer(self, id_key, event_timer): """Add a new event timer.