Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize decide_worker #4332

Merged
merged 11 commits into from
Dec 9, 2020
49 changes: 25 additions & 24 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4653,10 +4653,11 @@ def transition_no_worker_waiting(self, key):
pdb.set_trace()
raise

def decide_worker(self, ts: TaskState):
def decide_worker(self, ts: TaskState) -> WorkerState:
"""
Decide on a worker for task *ts*. Return a WorkerState.
"""
ws: WorkerState = None
valid_workers: set = self.valid_workers(ts)

if (
Expand All @@ -4667,36 +4668,31 @@ def decide_worker(self, ts: TaskState):
):
self.unrunnable.add(ts)
ts.state = "no-worker"
return None
return ws

if ts._dependencies or valid_workers is not None:
worker = decide_worker(
ws = decide_worker(
ts,
self.workers.values(),
valid_workers,
partial(self.worker_objective, ts),
)
elif self.idle:
if len(self.idle) < 20: # smart but linear in small case
worker = min(self.idle, key=operator.attrgetter("occupancy"))
else: # dumb but fast in large case
worker = self.idle[self.n_tasks % len(self.idle)]
else:
if len(self.workers) < 20: # smart but linear in small case
worker = min(
self.workers.values(), key=operator.attrgetter("occupancy")
)
worker_pool = self.idle or self.workers.values()
n_workers = len(worker_pool)
if n_workers < 20: # smart but linear in small case
ws = min(worker_pool, key=operator.attrgetter("occupancy"))
else: # dumb but fast in large case
worker = self.workers.values()[self.n_tasks % len(self.workers)]
ws = worker_pool[self.n_tasks % n_workers]

if self.validate:
assert worker is None or isinstance(worker, WorkerState), (
type(worker),
worker,
assert ws is None or isinstance(ws, WorkerState), (
type(ws),
ws,
)
assert worker.address in self.workers
assert ws.address in self.workers

return worker
return ws
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for standardizing pronouns along the way

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's certainly made it a lot easier to understand where things can be annotated 🙂


def transition_waiting_processing(self, key):
try:
Expand Down Expand Up @@ -6086,7 +6082,9 @@ def adaptive_target(self, comm=None, target_duration=None):
return len(self.workers) - len(to_close)


def decide_worker(ts: TaskState, all_workers, valid_workers: set, objective):
def decide_worker(
ts: TaskState, all_workers, valid_workers: set, objective
) -> WorkerState:
"""
Decide which worker should take task *ts*.

Expand All @@ -6102,13 +6100,14 @@ def decide_worker(ts: TaskState, all_workers, valid_workers: set, objective):
of bytes sent between workers. This is determined by calling the
*objective* function.
"""
ws: WorkerState
dts: TaskState
deps = ts._dependencies
deps: set = ts._dependencies
candidates: set
assert all([dts._who_has for dts in deps])
if ts._actor:
candidates = set(all_workers)
else:
ws: WorkerState
candidates = {ws for dts in deps for ws in dts._who_has}
if valid_workers is None:
if not candidates:
Expand All @@ -6126,9 +6125,11 @@ def decide_worker(ts: TaskState, all_workers, valid_workers: set, objective):
return None

if len(candidates) == 1:
return first(candidates)

return min(candidates, key=objective)
for ws in candidates:
break
else:
ws = min(candidates, key=objective)
return ws


def validate_task_state(ts: TaskState):
Expand Down