Skip to content

Commit

Permalink
Optimize valid_workers (#4329)
Browse files Browse the repository at this point in the history
* Drop extra blank line

* Use different variables for `list` and `set`

* Distinguish iterated variable from result

* Annotate intermediate variables

* Assign `s` `None` instead of `True`

This works better if we want to type `s` as `set`. Also this is a more
typical default value when initializing a variable unlike `True`.

* Always return `set` from `valid_workers`
  • Loading branch information
jakirkham authored Dec 9, 2020
1 parent 47876b6 commit 881f532
Showing 1 changed file with 28 additions and 22 deletions.
50 changes: 28 additions & 22 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2225,8 +2225,8 @@ async def add_worker(

recommendations = {}
for ts in list(self.unrunnable):
valid = self.valid_workers(ts)
if valid is True or ws in valid:
valid: set = self.valid_workers(ts)
if valid is None or ws in valid:
recommendations[ts._key] = "waiting"

if recommendations:
Expand Down Expand Up @@ -4655,14 +4655,19 @@ def decide_worker(self, ts: TaskState):
"""
Decide on a worker for task *ts*. Return a WorkerState.
"""
valid_workers = self.valid_workers(ts)

if not valid_workers and not ts._loose_restrictions and self.workers:
valid_workers: set = self.valid_workers(ts)

if (
valid_workers is not None
and not valid_workers
and not ts._loose_restrictions
and self.workers
):
self.unrunnable.add(ts)
ts.state = "no-worker"
return None

if ts._dependencies or valid_workers is not True:
if ts._dependencies or valid_workers is not None:
worker = decide_worker(
ts,
self.workers.values(),
Expand Down Expand Up @@ -5516,35 +5521,37 @@ def check_idle_saturated(self, ws: WorkerState, occ: double = -1.0):
else:
saturated.discard(ws)

def valid_workers(self, ts: TaskState):
def valid_workers(self, ts: TaskState) -> set:
"""Return set of currently valid workers for key
If all workers are valid then this returns ``True``.
If all workers are valid then this returns ``None``.
This checks tracks the following state:
* worker_restrictions
* host_restrictions
* resource_restrictions
"""
s = True
s: set = None

if ts._worker_restrictions:
s = {w for w in ts._worker_restrictions if w in self.workers}

if ts._host_restrictions:
# Resolve the alias here rather than early, for the worker
# may not be connected when host_restrictions is populated
hr = [self.coerce_hostname(h) for h in ts._host_restrictions]
hr: list = [self.coerce_hostname(h) for h in ts._host_restrictions]
# XXX need HostState?
ss = [self.host_info[h]["addresses"] for h in hr if h in self.host_info]
ss = set.union(*ss) if ss else set()
if s is True:
sl: list = [
self.host_info[h]["addresses"] for h in hr if h in self.host_info
]
ss: set = set.union(*sl) if sl else set()
if s is None:
s = ss
else:
s |= ss

if ts._resource_restrictions:
w = {
dw: dict = {
resource: {
w
for w, supplied in self.resources[resource].items()
Expand All @@ -5553,14 +5560,13 @@ def valid_workers(self, ts: TaskState):
for resource, required in ts._resource_restrictions.items()
}

ww = set.intersection(*w.values())

if s is True:
ww: set = set.intersection(*dw.values())
if s is None:
s = ww
else:
s &= ww

if s is True:
if s is None:
return s
else:
return {self.workers[w] for w in s}
Expand Down Expand Up @@ -6078,7 +6084,7 @@ def adaptive_target(self, comm=None, target_duration=None):
return len(self.workers) - len(to_close)


def decide_worker(ts: TaskState, all_workers, valid_workers, objective):
def decide_worker(ts: TaskState, all_workers, valid_workers: set, objective):
"""
Decide which worker should take task *ts*.
Expand All @@ -6087,7 +6093,7 @@ def decide_worker(ts: TaskState, all_workers, valid_workers, objective):
If several workers have dependencies then we choose the less-busy worker.
Optionally provide *valid_workers* of where jobs are allowed to occur
(if all workers are allowed to take the task, pass True instead).
(if all workers are allowed to take the task, pass None instead).
If the task requires data communication because no eligible worker has
all the dependencies already, then we choose to minimize the number
Expand All @@ -6102,7 +6108,7 @@ def decide_worker(ts: TaskState, all_workers, valid_workers, objective):
else:
ws: WorkerState
candidates = {ws for dts in deps for ws in dts._who_has}
if valid_workers is True:
if valid_workers is None:
if not candidates:
candidates = set(all_workers)
else:
Expand All @@ -6111,7 +6117,7 @@ def decide_worker(ts: TaskState, all_workers, valid_workers, objective):
candidates = valid_workers
if not candidates:
if ts._loose_restrictions:
return decide_worker(ts, all_workers, True, objective)
return decide_worker(ts, all_workers, None, objective)
else:
return None
if not candidates:
Expand Down

0 comments on commit 881f532

Please sign in to comment.