Skip to content

Commit

Permalink
Handle edge cases between queued and no-worker
Browse files Browse the repository at this point in the history
  • Loading branch information
gjoseph92 committed Nov 3, 2022
1 parent 624add3 commit 63c649d
Showing 1 changed file with 30 additions and 6 deletions.
36 changes: 30 additions & 6 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1511,10 +1511,15 @@ class SchedulerState:
#: All tasks currently known to the scheduler
tasks: dict[str, TaskState]

#: Tasks in the "queued" state, ordered by priority
#: Tasks in the "queued" state, ordered by priority.
#: They should generally be root-ish, but in certain cases may not be.
#: They must not have restrictions.
#: Always empty if `worker-saturation` is set to `inf`.
queued: HeapSet[TaskState]

#: Tasks in the "no-worker" state
#: Tasks in the "no-worker" state.
#: They may or may not have restrictions.
#: Could contain root-ish tasks even when `worker-saturation` is a finite value.
unrunnable: set[TaskState]

#: Subset of tasks that exist in memory on more than one worker
Expand Down Expand Up @@ -2014,11 +2019,29 @@ def transition_no_worker_processing(self, key, stimulus_id):
assert not ts.actor, f"Actors can't be in `no-worker`: {ts}"
assert ts in self.unrunnable

if ws := self.decide_worker_non_rootish(ts):
decide_worker = (
self.decide_worker_rootish_queuing_disabled
if self.is_rootish(ts)
else self.decide_worker_non_rootish
)
# NOTE: it's possible that queuing is enabled and `is_rootish(ts)`,
# meaning this task should have been queued and `decide_worker_rootish_queuing_enabled`
# would be the most appropriate function to use. But if, at submission time,
# it didn't look root-ish (`TaskGroup` too small, or cluster too big) and there were
# no running workers, it would have gone to `no-worker` instead.
# Rather than implementing some `no-worker->queued` transition, we
# just live with our original assessment and treat it as though queuing were disabled.
# If we used `decide_worker_rootish_queuing_enabled` here, it's possible that no workers
# are idle, which would leave it in `unrunnable` and cause a deadlock.

if ws := decide_worker(ts):
self.unrunnable.discard(ts)
worker_msgs = _add_to_processing(self, ts, ws)
# If no worker, task just stays in `no-worker`

if self.validate and self.is_rootish(ts):
assert ws is not None

return recommendations, client_msgs, worker_msgs
except Exception as e:
logger.exception(e)
Expand Down Expand Up @@ -2051,9 +2074,10 @@ def decide_worker_rootish_queuing_disabled(
returns None, in which case the task should be transitioned to
``no-worker``.
"""
if self.validate:
# See root-ish-ness note below in `decide_worker_rootish_queuing_enabled`
assert math.isinf(self.WORKER_SATURATION)
# NOTE: in rare cases, it's possible queuing is actually enabled here (see
# `transition_no_worker_processing`).
# It's also possible that `is_rootish(ts)` is False (see note below in
# `decide_worker_rootish_queuing_enabled`)

pool = self.idle.values() if self.idle else self.running
if not pool:
Expand Down

0 comments on commit 63c649d

Please sign in to comment.