Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle edge cases between queued and no-worker #7259

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 30 additions & 6 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1511,10 +1511,15 @@ class SchedulerState:
#: All tasks currently known to the scheduler
tasks: dict[str, TaskState]

#: Tasks in the "queued" state, ordered by priority
#: Tasks in the "queued" state, ordered by priority.
#: They should generally be root-ish, but in certain cases may not be.
#: They must not have restrictions.
#: Always empty if `worker-saturation` is set to `inf`.
queued: HeapSet[TaskState]

#: Tasks in the "no-worker" state
#: Tasks in the "no-worker" state.
#: They may or may not have restrictions.
#: Could contain root-ish tasks even when `worker-saturation` is a finite value.
unrunnable: set[TaskState]

#: Subset of tasks that exist in memory on more than one worker
Expand Down Expand Up @@ -2014,11 +2019,29 @@ def transition_no_worker_processing(self, key, stimulus_id):
assert not ts.actor, f"Actors can't be in `no-worker`: {ts}"
assert ts in self.unrunnable

if ws := self.decide_worker_non_rootish(ts):
decide_worker = (
self.decide_worker_rootish_queuing_disabled
if self.is_rootish(ts)
else self.decide_worker_non_rootish
)
Comment on lines +2022 to +2026
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the biggest problems I have right now with the queuing/rootish scheduling is that we have three different decide_* functions. From an (internal) API perspective, I don't want to have the burden of making the correct decision about which one of these APIs to call in which circumstance. I just want to call a single decide_worker, provide it with sufficient context and it should return the proper worker. Wouldn't this already avoid the problem?

Naively I would expect that this new decide_worker would look approximately like the block in transition_waiting_processing

if self.is_rootish(ts):
# NOTE: having two root-ish methods is temporary. When the feature flag is removed,
# there should only be one, which combines co-assignment and queuing.
# Eventually, special-casing root tasks might be removed entirely, with better heuristics.
if math.isinf(self.WORKER_SATURATION):
if not (ws := self.decide_worker_rootish_queuing_disabled(ts)):
return {ts.key: "no-worker"}, {}, {}
else:
if not (ws := self.decide_worker_rootish_queuing_enabled()):
return {ts.key: "queued"}, {}, {}
else:
if not (ws := self.decide_worker_non_rootish(ts)):
return {ts.key: "no-worker"}, {}, {}

Isn't this always the correct logic when deciding on a worker?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and this is how I'd originally implemented it in #6614. But based on your feedback (which I agree with) we split it into multiple decide_worker_* functions for different cases.

The reason we didn't wrap the three decide_worker_* cases into one overall decide_worker function, which always "does the right thing", is that the recommendation you make—no-worker vs queued—changes depending on which function you use.

So then this decide_worker function would have to take and mutate a recommendations dict, or at least return somehow what recommendation to make. I thought we'd decided this was a pattern we wanted to avoid.

Moreover, we'd then have to implement a no-worker->queued and queued->no-worker transition. That's not hard, just more complexity. If we don't do #7262, it's maybe the right thing to do instead of this PR.

# NOTE: it's possible that queuing is enabled and `is_rootish(ts)`,
# meaning this task should have been queued and `decide_worker_rootish_queuing_enabled`
# would be the most appropriate function to use. But if, at submission time,
# it didn't look root-ish (`TaskGroup` too small, or cluster too big) and there were
# no running workers, it would have gone to `no-worker` instead.
Comment on lines +2029 to +2031
Copy link
Collaborator

@crusaderky crusaderky Nov 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To my understanding this should be impossible.
A non-rootish task will go to no-worker if there are no workers available that satisfy its worker/host/resource restrictions, or if there are no workers at all.
Tasks with restrictions cannot be rootish, so this leaves us only with the second option - 0 total threads in the cluster, in which case a TaskGroup with a single task can qualify as rootish and neither adding tasks to the group, nor removing them will change that.

If you add workers you can flip a task from rootish to non-rootish, but a rootish task would not be in no-worker to begin with.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see the tests I've added covering these cases.

or if there are no workers at all

If there are no running workers. Paused or retiring workers will contribute to total_nthreads, allowing a task to look non-rootish (TaskGroup smaller than total_nthreads * 2), go into no-worker, then look root-ish when it comes out, either by adding tasks or removing workers.

client.submit in a for loop probably the most common way to make a task look non-rootish when it enters and root-ish when it leaves, because the TaskGroup grows larger each iteration. The first nthreads * 2 tasks are non-rootish; the rest are root-ish.

# Rather than implementing some `no-worker->queued` transition, we
# just live with our original assessment and treat it as though queuing were disabled.
# If we used `decide_worker_rootish_queuing_enabled` here, it's possible that no workers
# are idle, which would leave it in `unrunnable` and cause a deadlock.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this deadlock case is covered by test_queued_rootish_changes_while_paused.

If you apply this diff (using decide_worker_rootish_queuing_enabled when 'appropriate'), that test will deadlock:

diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml
index 105a45e9..f1b966b5 100644
--- a/distributed/distributed.yaml
+++ b/distributed/distributed.yaml
@@ -22,7 +22,7 @@ distributed:
     events-log-length: 100000
     work-stealing: True     # workers should steal tasks from each other
     work-stealing-interval: 100ms  # Callback time for work stealing
-    worker-saturation: .inf # Send this fraction of nthreads root tasks to workers
+    worker-saturation: 1.0  # Send this fraction of nthreads root tasks to workers
     worker-ttl: "5 minutes" # like '60s'. Time to live for workers.  They must heartbeat faster than this
     pickle: True            # Is the scheduler allowed to deserialize arbitrary bytestrings
     preload: []             # Run custom modules with Scheduler
diff --git a/distributed/scheduler.py b/distributed/scheduler.py
index 87ffce4e..5400c50a 100644
--- a/distributed/scheduler.py
+++ b/distributed/scheduler.py
@@ -2020,9 +2020,13 @@ class SchedulerState:
                 assert ts in self.unrunnable
 
             decide_worker = (
-                self.decide_worker_rootish_queuing_disabled
+                (
+                    partial(self.decide_worker_rootish_queuing_disabled, ts)
+                    if math.isinf(self.WORKER_SATURATION)
+                    else self.decide_worker_rootish_queuing_enabled
+                )
                 if self.is_rootish(ts)
-                else self.decide_worker_non_rootish
+                else partial(self.decide_worker_non_rootish, ts)
             )
             # NOTE: it's possible that queuing is enabled and `is_rootish(ts)`,
             # meaning this task should have been queued and `decide_worker_rootish_queuing_enabled`
@@ -2034,13 +2038,13 @@ class SchedulerState:
             # If we used `decide_worker_rootish_queuing_enabled` here, it's possible that no workers
             # are idle, which would leave it in `unrunnable` and cause a deadlock.
 
-            if ws := decide_worker(ts):
+            if ws := decide_worker():
                 self.unrunnable.discard(ts)
                 worker_msgs = _add_to_processing(self, ts, ws)
             # If no worker, task just stays in `no-worker`
 
-            if self.validate and self.is_rootish(ts):
-                assert ws is not None
+            # if self.validate and self.is_rootish(ts):
+            #     assert ws is not None
 
             return recommendations, client_msgs, worker_msgs
         except Exception as e:


if ws := decide_worker(ts):
self.unrunnable.discard(ts)
worker_msgs = _add_to_processing(self, ts, ws)
# If no worker, task just stays in `no-worker`

if self.validate and self.is_rootish(ts):
assert ws is not None

return recommendations, client_msgs, worker_msgs
except Exception as e:
logger.exception(e)
Expand Down Expand Up @@ -2051,9 +2074,10 @@ def decide_worker_rootish_queuing_disabled(
returns None, in which case the task should be transitioned to
``no-worker``.
"""
if self.validate:
# See root-ish-ness note below in `decide_worker_rootish_queuing_enabled`
assert math.isinf(self.WORKER_SATURATION)
# NOTE: in rare cases, it's possible queuing is actually enabled here (see
# `transition_no_worker_processing`).
# It's also possible that `is_rootish(ts)` is False (see note below in
# `decide_worker_rootish_queuing_enabled`)

pool = self.idle.values() if self.idle else self.running
if not pool:
Expand Down
79 changes: 79 additions & 0 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pickle
import re
import sys
from contextlib import AsyncExitStack
from itertools import product
from textwrap import dedent
from time import sleep
Expand Down Expand Up @@ -481,6 +482,84 @@ async def test_queued_remove_add_worker(c, s, a, b):
await wait(fs)


@gen_cluster(
client=True,
nthreads=[("", 2)] * 2,
config={
"distributed.worker.memory.pause": False,
"distributed.worker.memory.target": False,
"distributed.worker.memory.spill": False,
"distributed.scheduler.work-stealing": False,
},
)
async def test_queued_rootish_changes_while_paused(c, s, a, b):
"Some tasks are root-ish, some aren't. So both `unrunnable` and `queued` contain non-restricted tasks."

root = c.submit(inc, 1, key="root")
await root

# manually pause the workers
a.status = Status.paused
b.status = Status.paused

await async_wait_for(lambda: not s.running, 5)

fs = [c.submit(inc, root, key=f"inc-{i}") for i in range(s.total_nthreads * 2 + 1)]
# ^ `c.submit` in a for-loop so the first tasks don't look root-ish (`TaskGroup` too
# small), then the last one does. So N-1 tasks will go to `no-worker`, and the last
# to `queued`. `is_rootish` is just messed up like that.

await async_wait_for(lambda: len(s.tasks) > len(fs), 5)

# un-pause
a.status = Status.running
b.status = Status.running
await async_wait_for(lambda: len(s.running) == len(s.workers), 5)

await c.gather(fs)


@gen_cluster(
client=True,
nthreads=[("", 1)],
config={"distributed.scheduler.work-stealing": False},
)
async def test_queued_rootish_changes_scale_up(c, s, a):
"Tasks are initially root-ish. After cluster scales, they aren't."

root = c.submit(inc, 1, key="root")

event = Event()
clog = c.submit(event.wait, key="clog")
await wait_for_state(clog.key, "processing", s)

fs = c.map(inc, [root] * 5, key=[f"inc-{i}" for i in range(5)])

await async_wait_for(lambda: len(s.tasks) > len(fs), 5)

if not s.is_rootish(s.tasks[fs[0].key]):
pytest.fail(
"Test assumptions have changed; task is not root-ish. Test may no longer be relevant."
)
if math.isfinite(s.WORKER_SATURATION):
assert s.queued

async with AsyncExitStack() as stack:
for _ in range(3):
await stack.enter_async_context(Worker(s.address, nthreads=2))

if s.is_rootish(s.tasks[fs[0].key]):
pytest.fail(
"Test assumptions have changed; task is still root-ish. Test may no longer be relevant."
)

await event.set()
await clog

# Just verify it doesn't deadlock
await c.gather(fs)


@gen_cluster(client=True, nthreads=[("", 1)])
async def test_secede_opens_slot(c, s, a):
first = Event()
Expand Down