From 57b8868eacf6477413ce58398b0004b900112086 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Wed, 19 Apr 2023 15:17:57 +0100 Subject: [PATCH] Don't let no-worker and long-running tasks break Computations --- distributed/scheduler.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 857c077cd4b..83c7a6541fc 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1775,6 +1775,19 @@ def _clear_task_state(self) -> None: ): collection.clear() # type: ignore + @property + def fully_idle(self) -> bool: + """Return True iff there are no tasks that haven't finished computing. + + Unlike testing `self.total_occupancy`, this property returns False if there are + long-running tasks, no-worker, or queued tasks (due to not having any workers). + """ + return all( + count == 0 or state in {"memory", "error", "released", "forgotten"} + for tg in self.task_groups.values() + for state, count in tg.states.items() + ) + @property def total_occupancy(self) -> float: return self._calc_occupancy( @@ -4319,7 +4332,7 @@ def update_graph( keys=lost_keys, client=client, stimulus_id=stimulus_id ) - if self.total_occupancy > 1e-9 and self.computations: + if not self.fully_idle and self.computations: # Still working on something. Assign new tasks to same computation computation = self.computations[-1] else: