Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve work stealing for scaling situations #4920

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions distributed/dashboard/components/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1693,6 +1693,7 @@ def __init__(self, scheduler, **kwargs):
self.root = figure(
title="Idle and Saturated Workers Over Time",
x_axis_type="datetime",
height=150,
tools="",
x_range=x_range,
**kwargs,
Expand Down Expand Up @@ -1743,6 +1744,7 @@ def __init__(self, scheduler, **kwargs):
self.root = figure(
title="Stealing Events",
x_axis_type="datetime",
height=250,
tools="",
x_range=x_range,
**kwargs,
Expand Down
50 changes: 41 additions & 9 deletions distributed/stealing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

import asyncio
import logging
import random
import uuid
from collections import defaultdict, deque
from collections.abc import Container
from functools import partial
from math import log2
from time import time

Expand Down Expand Up @@ -190,27 +192,24 @@ def steal_time_ratio(self, ts):
For example a result of zero implies a task without dependencies.
level: The location within a stealable list to place this value
"""
if not ts.dependencies: # no dependencies fast path
return 0, 0

split = ts.prefix.name
if split in fast_tasks:
return None, None

if not ts.dependencies: # no dependencies fast path
return 0, 0

ws = ts.processing_on
compute_time = ws.processing[ts]
if compute_time < 0.005: # 5ms, just give up
return None, None

nbytes = ts.get_nbytes_deps()
transfer_time = nbytes / self.scheduler.bandwidth + LATENCY
cost_multiplier = transfer_time / compute_time
if cost_multiplier > 100:
return None, None

level = int(round(log2(cost_multiplier) + 6))
if level < 1:
level = 1
level = min(len(self.cost_multipliers) - 1, level)

return cost_multiplier, level

Expand Down Expand Up @@ -361,6 +360,7 @@ def maybe_move_task(level, ts, sat, idl, duration, cost_multiplier):
if occ_idl + cost_multiplier * duration <= occ_sat - duration / 2:
self.move_task_request(ts, sat, idl)
log.append(
# The format of this message is tightly coupled to the dashboard
(
start,
level,
Expand Down Expand Up @@ -417,7 +417,10 @@ def maybe_move_task(level, ts, sat, idl, duration, cost_multiplier):
thieves = _potential_thieves_for(ts, idle)
if not thieves:
break
thief = thieves[i % len(thieves)]

thief = self._maybe_pick_thief(ts, thieves)
if not thief:
thief = thieves[i % len(thieves)]

duration = sat.processing.get(ts)
if duration is None:
Expand Down Expand Up @@ -450,7 +453,10 @@ def maybe_move_task(level, ts, sat, idl, duration, cost_multiplier):
thieves = _potential_thieves_for(ts, idle)
if not thieves:
continue
thief = thieves[i % len(thieves)]
thief = self._maybe_pick_thief(ts, thieves)
if not thief:
thief = thieves[i % len(thieves)]
Comment on lines +457 to +458
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: might be easier to read if this was moved into _maybe_pick_thief (and it was no longer maybe).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, I figured this is nicer because it felt weird passing i to _maybe_pick_thief.


duration = sat.processing[ts]

maybe_move_task(
Expand All @@ -464,6 +470,32 @@ def maybe_move_task(level, ts, sat, idl, duration, cost_multiplier):
if s.digests:
s.digests["steal-duration"].add(stop - start)

def _maybe_pick_thief(self, ts, thieves):
"""Try to be smart about picking a thief given some options. We're
trying to pick a thief which has dependencies for a given task, if
possible and will pick the one which works best for us given the
Scheduler.worker_objective

If no idle worker with dependencies is found, this returns None.
"""
if ts._dependencies:
who_has = set()
for dep in ts._dependencies:
who_has.update(dep.who_has)

thieves_with_data = list(who_has & set(thieves))

# If there are potential thieves with dependencies we
# should prefer them and pick the one which works best.
# Otherwise just random/round robin
if thieves_with_data:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If, say, just 1 idle worker holds dependencies, is it possible for that worker to get slammed with new tasks because of this? What has to happen between picking a worker as a thief and it getting removed from the idle set?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If, say, just 1 idle worker holds dependencies, is it possible for that worker to get slammed with new tasks because of this

Yes, this may hurt, particularly for sub-topologies like

flowchart BT
A --> B1
A --> B2
A --> B3
Loading

where A is very small and B* is very compute heavy, i.e. steal_ratio will always be small and tasks are even allowed to be stolen from "public", i.e. non-saturated workers. I believe for other cases, i.e. if tasks are just stolen from saturated workers, this should be OK since decide_worker is much smarter with initial task placement.

What has to happen between picking a worker as a thief and it getting removed from the idle set?

Steal request must be confirmed which updates occupancies and recalculates the idle set. Somewhere on this PR a comment of mine is suggesting that this _mabye_pick_thief should incorporate in_flight_occupancy to reflect for the time until this happens.

if len(thieves_with_data) > 10:
thieves_with_data = random.choices(thieves_with_data, k=10)
return min(
thieves_with_data,
key=partial(self.scheduler.worker_objective, ts),
)

def restart(self, scheduler):
for stealable in self.stealable.values():
for s in stealable:
Expand Down