diff --git a/distributed/scheduler.py b/distributed/scheduler.py index bbeb7175b5..0b0586698a 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1316,11 +1316,7 @@ class TaskState: #: responses. #: Only the most recently assigned worker is trusted. All other results will #: be rejected. - run_id: int - - #: Whether the task has already been attempted. This value is used to refresh - #: refresh the run ID on following attempts. - _attempted: bool + run_id: int | None #: Cached hash of :attr:`~TaskState.client_key` _hash: int @@ -1374,8 +1370,7 @@ def __init__( self.metadata = {} self.annotations = {} self.erred_on = set() - self.run_id = next(TaskState._run_id_iterator) - self._attempted = False + self.run_id = None TaskState._instances.add(self) def __hash__(self) -> int: @@ -1450,12 +1445,6 @@ def validate(self) -> None: def get_nbytes_deps(self) -> int: return sum(ts.get_nbytes() for ts in self.dependencies) - def next_run(self) -> None: - """Update the task state for a new run""" - if self._attempted: - self.run_id = next(TaskState._run_id_iterator) - self._attempted = True - def _to_dict_no_nest(self, *, exclude: Container[str] = ()) -> dict[str, Any]: """Dictionary representation for debugging purposes. Not type stable and not intended for roundtrips. @@ -3285,7 +3274,7 @@ def _task_to_msg(self, ts: TaskState, duration: float = -1) -> dict[str, Any]: # time to compute and submit this if duration < 0: duration = self.get_task_duration(ts) - ts.next_run() + ts.run_id = next(TaskState._run_id_iterator) msg: dict[str, Any] = { "op": "compute-task",