diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 66a04536dc..f7ad5916e1 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2225,8 +2225,8 @@ async def add_worker( recommendations = {} for ts in list(self.unrunnable): - valid = self.valid_workers(ts) - if valid is True or ws in valid: + valid: set = self.valid_workers(ts) + if valid is None or ws in valid: recommendations[ts._key] = "waiting" if recommendations: @@ -4655,14 +4655,19 @@ def decide_worker(self, ts: TaskState): """ Decide on a worker for task *ts*. Return a WorkerState. """ - valid_workers = self.valid_workers(ts) - - if not valid_workers and not ts._loose_restrictions and self.workers: + valid_workers: set = self.valid_workers(ts) + + if ( + valid_workers is not None + and not valid_workers + and not ts._loose_restrictions + and self.workers + ): self.unrunnable.add(ts) ts.state = "no-worker" return None - if ts._dependencies or valid_workers is not True: + if ts._dependencies or valid_workers is not None: worker = decide_worker( ts, self.workers.values(), @@ -5516,17 +5521,17 @@ def check_idle_saturated(self, ws: WorkerState, occ: double = -1.0): else: saturated.discard(ws) - def valid_workers(self, ts: TaskState): + def valid_workers(self, ts: TaskState) -> set: """Return set of currently valid workers for key - If all workers are valid then this returns ``True``. + If all workers are valid then this returns ``None``. This checks tracks the following state: * worker_restrictions * host_restrictions * resource_restrictions """ - s = True + s: set = None if ts._worker_restrictions: s = {w for w in ts._worker_restrictions if w in self.workers} @@ -5534,17 +5539,19 @@ def valid_workers(self, ts: TaskState): if ts._host_restrictions: # Resolve the alias here rather than early, for the worker # may not be connected when host_restrictions is populated - hr = [self.coerce_hostname(h) for h in ts._host_restrictions] + hr: list = [self.coerce_hostname(h) for h in ts._host_restrictions] # XXX need HostState? - ss = [self.host_info[h]["addresses"] for h in hr if h in self.host_info] - ss = set.union(*ss) if ss else set() - if s is True: + sl: list = [ + self.host_info[h]["addresses"] for h in hr if h in self.host_info + ] + ss: set = set.union(*sl) if sl else set() + if s is None: s = ss else: s |= ss if ts._resource_restrictions: - w = { + dw: dict = { resource: { w for w, supplied in self.resources[resource].items() @@ -5553,14 +5560,13 @@ def valid_workers(self, ts: TaskState): for resource, required in ts._resource_restrictions.items() } - ww = set.intersection(*w.values()) - - if s is True: + ww: set = set.intersection(*dw.values()) + if s is None: s = ww else: s &= ww - if s is True: + if s is None: return s else: return {self.workers[w] for w in s} @@ -6078,7 +6084,7 @@ def adaptive_target(self, comm=None, target_duration=None): return len(self.workers) - len(to_close) -def decide_worker(ts: TaskState, all_workers, valid_workers, objective): +def decide_worker(ts: TaskState, all_workers, valid_workers: set, objective): """ Decide which worker should take task *ts*. @@ -6087,7 +6093,7 @@ def decide_worker(ts: TaskState, all_workers, valid_workers, objective): If several workers have dependencies then we choose the less-busy worker. Optionally provide *valid_workers* of where jobs are allowed to occur - (if all workers are allowed to take the task, pass True instead). + (if all workers are allowed to take the task, pass None instead). If the task requires data communication because no eligible worker has all the dependencies already, then we choose to minimize the number @@ -6102,7 +6108,7 @@ def decide_worker(ts: TaskState, all_workers, valid_workers, objective): else: ws: WorkerState candidates = {ws for dts in deps for ws in dts._who_has} - if valid_workers is True: + if valid_workers is None: if not candidates: candidates = set(all_workers) else: @@ -6111,7 +6117,7 @@ def decide_worker(ts: TaskState, all_workers, valid_workers, objective): candidates = valid_workers if not candidates: if ts._loose_restrictions: - return decide_worker(ts, all_workers, True, objective) + return decide_worker(ts, all_workers, None, objective) else: return None if not candidates: