Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle edge cases between queued and no-worker #7259

Closed
wants to merge 2 commits into from

Conversation

gjoseph92
Copy link
Collaborator

While working on #7221, I discovered some edge cases with queuing related to the queued vs no-worker states, and the fact that is_rootish can change. (These edge cases were not created by #7221; that just made it easier to find them since we could remove the almost-dead round-robin code c901823.)

When queuing is enabled, and there are no running workers (0 workers, or all paused/retiring):

  • If a task is root-ish, it goes into Scheduler.queued. Cool.
  • Otherwise, if goes into Scheduler.unrunnable. Possible issue.

Once worker(s) are running, we schedule tasks in unrunnable. By this time, is_rootish(ts) may now be True. This happens if the TaskGroup grew, or the cluster shrank, passing the len(tg) > total_nthreads * 2 cutoff.

transition_no_worker_processing used to always assume that tasks in unrunnable were non-rootish. This is usually the case (most of the time, they're restricted tasks), but not always.

If we remove the round-robin code path, this case then fails an assertion. So we should use decide_worker_rootish_queuing_disabled versus decide_worker_non_rootish depending on whether the task is root-ish or not.


More broadly, it's awkward that is_rootish isn't static. #6922 will be very nice once we have it.

I thought about storing root-ish-ness per task (on the first call, is_rootish would cache it) so at least it can't change. But I don't think that's necessary, because it doesn't really matter that much which decide_worker function we use in this very rare case, so long as the task gets scheduled. I could certainly see going the other way, but I felt better about loosening the assertions in the decide_worker functions for now.

cc @fjetter @crusaderky

  • Tests added / passed
  • Passes pre-commit run --all-files

it's possible for tasks to not be rootish when they go into no-worker, but to be rootish when they come out.
@github-actions
Copy link
Contributor

github-actions bot commented Nov 4, 2022

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       15 files         15 suites   6h 30m 41s ⏱️
  3 171 tests   3 086 ✔️   84 💤 1
23 464 runs  22 560 ✔️ 901 💤 3

For more details on these failures, see this check.

Results for commit 63c649d.

♻️ This comment has been updated with latest results.

Copy link
Collaborator Author

@gjoseph92 gjoseph92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A simpler approach would be to just remove the assertion from c901823. That is, to say it's okay for decide_worker_non_rootish to occasionally be used on tasks with no dependencies and no restrictions.

I'd originally thought "but if you submit tasks to an empty cluster and then it scales up, you won't get co-assignment". But as soon as the first worker joins, all unrunnable root-ish tasks will be assigned to it. So you won't have co-assignment anyway.

I'm leaning towards that approach instead of this, since it feels a little more consistent.

After seeing what caching is_rootish would look like https://github.com/dask/distributed/pull/7262/files, I think this style is actually better, since it's closer to how this look when root-ish-ness is static.

Again, another option would be to cache is_rootish on the TaskState, just so it can't change. I like the consistency, though it feels a bit heavy-handed. I also haven't thought about other consequences it could have.

We should maybe just cache it so we don't have to spend more time thinking about these consistency issues.

# Rather than implementing some `no-worker->queued` transition, we
# just live with our original assessment and treat it as though queuing were disabled.
# If we used `decide_worker_rootish_queuing_enabled` here, it's possible that no workers
# are idle, which would leave it in `unrunnable` and cause a deadlock.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this deadlock case is covered by test_queued_rootish_changes_while_paused.

If you apply this diff (using decide_worker_rootish_queuing_enabled when 'appropriate'), that test will deadlock:

diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml
index 105a45e9..f1b966b5 100644
--- a/distributed/distributed.yaml
+++ b/distributed/distributed.yaml
@@ -22,7 +22,7 @@ distributed:
     events-log-length: 100000
     work-stealing: True     # workers should steal tasks from each other
     work-stealing-interval: 100ms  # Callback time for work stealing
-    worker-saturation: .inf # Send this fraction of nthreads root tasks to workers
+    worker-saturation: 1.0  # Send this fraction of nthreads root tasks to workers
     worker-ttl: "5 minutes" # like '60s'. Time to live for workers.  They must heartbeat faster than this
     pickle: True            # Is the scheduler allowed to deserialize arbitrary bytestrings
     preload: []             # Run custom modules with Scheduler
diff --git a/distributed/scheduler.py b/distributed/scheduler.py
index 87ffce4e..5400c50a 100644
--- a/distributed/scheduler.py
+++ b/distributed/scheduler.py
@@ -2020,9 +2020,13 @@ class SchedulerState:
                 assert ts in self.unrunnable
 
             decide_worker = (
-                self.decide_worker_rootish_queuing_disabled
+                (
+                    partial(self.decide_worker_rootish_queuing_disabled, ts)
+                    if math.isinf(self.WORKER_SATURATION)
+                    else self.decide_worker_rootish_queuing_enabled
+                )
                 if self.is_rootish(ts)
-                else self.decide_worker_non_rootish
+                else partial(self.decide_worker_non_rootish, ts)
             )
             # NOTE: it's possible that queuing is enabled and `is_rootish(ts)`,
             # meaning this task should have been queued and `decide_worker_rootish_queuing_enabled`
@@ -2034,13 +2038,13 @@ class SchedulerState:
             # If we used `decide_worker_rootish_queuing_enabled` here, it's possible that no workers
             # are idle, which would leave it in `unrunnable` and cause a deadlock.
 
-            if ws := decide_worker(ts):
+            if ws := decide_worker():
                 self.unrunnable.discard(ts)
                 worker_msgs = _add_to_processing(self, ts, ws)
             # If no worker, task just stays in `no-worker`
 
-            if self.validate and self.is_rootish(ts):
-                assert ws is not None
+            # if self.validate and self.is_rootish(ts):
+            #     assert ws is not None
 
             return recommendations, client_msgs, worker_msgs
         except Exception as e:

gjoseph92 added a commit to gjoseph92/distributed that referenced this pull request Nov 5, 2022
Alternative to dask#7259. I'm quite torn about which is cleaner. I'm leaning towards this because I think it's even weirder to call `decide_worker_rootish_queuing_disabled` on a root-ish task when queuing is enabled than to call `decide_worker_non_rootish` on a root-ish task.

This also feels more consistent with the philosophy of "stick with the original decision". And if root-ish were a static property, this is what would happen.
gjoseph92 added a commit to gjoseph92/distributed that referenced this pull request Nov 5, 2022
Comment on lines +2022 to +2026
decide_worker = (
self.decide_worker_rootish_queuing_disabled
if self.is_rootish(ts)
else self.decide_worker_non_rootish
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the biggest problems I have right now with the queuing/rootish scheduling is that we have three different decide_* functions. From an (internal) API perspective, I don't want to have the burden of making the correct decision about which one of these APIs to call in which circumstance. I just want to call a single decide_worker, provide it with sufficient context and it should return the proper worker. Wouldn't this already avoid the problem?

Naively I would expect that this new decide_worker would look approximately like the block in transition_waiting_processing

if self.is_rootish(ts):
# NOTE: having two root-ish methods is temporary. When the feature flag is removed,
# there should only be one, which combines co-assignment and queuing.
# Eventually, special-casing root tasks might be removed entirely, with better heuristics.
if math.isinf(self.WORKER_SATURATION):
if not (ws := self.decide_worker_rootish_queuing_disabled(ts)):
return {ts.key: "no-worker"}, {}, {}
else:
if not (ws := self.decide_worker_rootish_queuing_enabled()):
return {ts.key: "queued"}, {}, {}
else:
if not (ws := self.decide_worker_non_rootish(ts)):
return {ts.key: "no-worker"}, {}, {}

Isn't this always the correct logic when deciding on a worker?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and this is how I'd originally implemented it in #6614. But based on your feedback (which I agree with) we split it into multiple decide_worker_* functions for different cases.

The reason we didn't wrap the three decide_worker_* cases into one overall decide_worker function, which always "does the right thing", is that the recommendation you make—no-worker vs queued—changes depending on which function you use.

So then this decide_worker function would have to take and mutate a recommendations dict, or at least return somehow what recommendation to make. I thought we'd decided this was a pattern we wanted to avoid.

Moreover, we'd then have to implement a no-worker->queued and queued->no-worker transition. That's not hard, just more complexity. If we don't do #7262, it's maybe the right thing to do instead of this PR.

Comment on lines +2029 to +2031
# would be the most appropriate function to use. But if, at submission time,
# it didn't look root-ish (`TaskGroup` too small, or cluster too big) and there were
# no running workers, it would have gone to `no-worker` instead.
Copy link
Collaborator

@crusaderky crusaderky Nov 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To my understanding this should be impossible.
A non-rootish task will go to no-worker if there are no workers available that satisfy its worker/host/resource restrictions, or if there are no workers at all.
Tasks with restrictions cannot be rootish, so this leaves us only with the second option - 0 total threads in the cluster, in which case a TaskGroup with a single task can qualify as rootish and neither adding tasks to the group, nor removing them will change that.

If you add workers you can flip a task from rootish to non-rootish, but a rootish task would not be in no-worker to begin with.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see the tests I've added covering these cases.

or if there are no workers at all

If there are no running workers. Paused or retiring workers will contribute to total_nthreads, allowing a task to look non-rootish (TaskGroup smaller than total_nthreads * 2), go into no-worker, then look root-ish when it comes out, either by adding tasks or removing workers.

client.submit in a for loop probably the most common way to make a task look non-rootish when it enters and root-ish when it leaves, because the TaskGroup grows larger each iteration. The first nthreads * 2 tasks are non-rootish; the rest are root-ish.

@gjoseph92
Copy link
Collaborator Author

Florian, Guido and I talked offline, and decided that we'll take this approach: #7259 (comment). That is, we'll explicitly implement queued<->no-worker transitions and fully respect the fact that is_rootish can change.

That is the opposite of #7262. It seems like it'll be the most maintainable, since it won't do anything stateful (no weirdness around "did is_rootish already run on this task?"), and explicitly handles the edge cases via transitions, rather than trying to implicitly disregard them like this PR does.

@gjoseph92 gjoseph92 closed this Nov 7, 2022
gjoseph92 added a commit to gjoseph92/distributed that referenced this pull request Nov 8, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants