diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 5342d693e9..87ffce4e13 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1511,10 +1511,15 @@ class SchedulerState: #: All tasks currently known to the scheduler tasks: dict[str, TaskState] - #: Tasks in the "queued" state, ordered by priority + #: Tasks in the "queued" state, ordered by priority. + #: They should generally be root-ish, but in certain cases may not be. + #: They must not have restrictions. + #: Always empty if `worker-saturation` is set to `inf`. queued: HeapSet[TaskState] - #: Tasks in the "no-worker" state + #: Tasks in the "no-worker" state. + #: They may or may not have restrictions. + #: Could contain root-ish tasks even when `worker-saturation` is a finite value. unrunnable: set[TaskState] #: Subset of tasks that exist in memory on more than one worker @@ -2014,11 +2019,29 @@ def transition_no_worker_processing(self, key, stimulus_id): assert not ts.actor, f"Actors can't be in `no-worker`: {ts}" assert ts in self.unrunnable - if ws := self.decide_worker_non_rootish(ts): + decide_worker = ( + self.decide_worker_rootish_queuing_disabled + if self.is_rootish(ts) + else self.decide_worker_non_rootish + ) + # NOTE: it's possible that queuing is enabled and `is_rootish(ts)`, + # meaning this task should have been queued and `decide_worker_rootish_queuing_enabled` + # would be the most appropriate function to use. But if, at submission time, + # it didn't look root-ish (`TaskGroup` too small, or cluster too big) and there were + # no running workers, it would have gone to `no-worker` instead. + # Rather than implementing some `no-worker->queued` transition, we + # just live with our original assessment and treat it as though queuing were disabled. + # If we used `decide_worker_rootish_queuing_enabled` here, it's possible that no workers + # are idle, which would leave it in `unrunnable` and cause a deadlock. + + if ws := decide_worker(ts): self.unrunnable.discard(ts) worker_msgs = _add_to_processing(self, ts, ws) # If no worker, task just stays in `no-worker` + if self.validate and self.is_rootish(ts): + assert ws is not None + return recommendations, client_msgs, worker_msgs except Exception as e: logger.exception(e) @@ -2051,9 +2074,10 @@ def decide_worker_rootish_queuing_disabled( returns None, in which case the task should be transitioned to ``no-worker``. """ - if self.validate: - # See root-ish-ness note below in `decide_worker_rootish_queuing_enabled` - assert math.isinf(self.WORKER_SATURATION) + # NOTE: in rare cases, it's possible queuing is actually enabled here (see + # `transition_no_worker_processing`). + # It's also possible that `is_rootish(ts)` is False (see note below in + # `decide_worker_rootish_queuing_enabled`) pool = self.idle.values() if self.idle else self.running if not pool: diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index f3e21e9d0a..c899809ba0 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -8,6 +8,7 @@ import pickle import re import sys +from contextlib import AsyncExitStack from itertools import product from textwrap import dedent from time import sleep @@ -481,6 +482,84 @@ async def test_queued_remove_add_worker(c, s, a, b): await wait(fs) +@gen_cluster( + client=True, + nthreads=[("", 2)] * 2, + config={ + "distributed.worker.memory.pause": False, + "distributed.worker.memory.target": False, + "distributed.worker.memory.spill": False, + "distributed.scheduler.work-stealing": False, + }, +) +async def test_queued_rootish_changes_while_paused(c, s, a, b): + "Some tasks are root-ish, some aren't. So both `unrunnable` and `queued` contain non-restricted tasks." + + root = c.submit(inc, 1, key="root") + await root + + # manually pause the workers + a.status = Status.paused + b.status = Status.paused + + await async_wait_for(lambda: not s.running, 5) + + fs = [c.submit(inc, root, key=f"inc-{i}") for i in range(s.total_nthreads * 2 + 1)] + # ^ `c.submit` in a for-loop so the first tasks don't look root-ish (`TaskGroup` too + # small), then the last one does. So N-1 tasks will go to `no-worker`, and the last + # to `queued`. `is_rootish` is just messed up like that. + + await async_wait_for(lambda: len(s.tasks) > len(fs), 5) + + # un-pause + a.status = Status.running + b.status = Status.running + await async_wait_for(lambda: len(s.running) == len(s.workers), 5) + + await c.gather(fs) + + +@gen_cluster( + client=True, + nthreads=[("", 1)], + config={"distributed.scheduler.work-stealing": False}, +) +async def test_queued_rootish_changes_scale_up(c, s, a): + "Tasks are initially root-ish. After cluster scales, they aren't." + + root = c.submit(inc, 1, key="root") + + event = Event() + clog = c.submit(event.wait, key="clog") + await wait_for_state(clog.key, "processing", s) + + fs = c.map(inc, [root] * 5, key=[f"inc-{i}" for i in range(5)]) + + await async_wait_for(lambda: len(s.tasks) > len(fs), 5) + + if not s.is_rootish(s.tasks[fs[0].key]): + pytest.fail( + "Test assumptions have changed; task is not root-ish. Test may no longer be relevant." + ) + if math.isfinite(s.WORKER_SATURATION): + assert s.queued + + async with AsyncExitStack() as stack: + for _ in range(3): + await stack.enter_async_context(Worker(s.address, nthreads=2)) + + if s.is_rootish(s.tasks[fs[0].key]): + pytest.fail( + "Test assumptions have changed; task is still root-ish. Test may no longer be relevant." + ) + + await event.set() + await clog + + # Just verify it doesn't deadlock + await c.gather(fs) + + @gen_cluster(client=True, nthreads=[("", 1)]) async def test_secede_opens_slot(c, s, a): first = Event()