Skip to content

Commit

Permalink
Refactor executor pick
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Mar 25, 2022
1 parent 655f934 commit b96cb5a
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 5 deletions.
9 changes: 9 additions & 0 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1890,6 +1890,15 @@ def get_thread_name():
assert "Dask-Foo-Threads" in gpu_result


@gen_cluster(client=True)
async def test_bad_executor_annotation(c, s, a, b):
with dask.annotate(executor="bad"):
future = c.submit(inc, 1)
with pytest.raises(ValueError, match="Invalid executor 'bad'; expected one of: "):
await future
assert future.status == "error"


@gen_cluster(client=True)
async def test_process_executor(c, s, a, b):
with ProcessPoolExecutor() as e:
Expand Down
15 changes: 10 additions & 5 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3429,17 +3429,22 @@ async def execute(self, key: str, *, stimulus_id: str) -> tuple[Recs, Instructio

args2, kwargs2 = self._prepare_args_for_execution(ts, args, kwargs)

if ts.annotations is not None and "executor" in ts.annotations:
executor = ts.annotations["executor"]
else:
try:
executor = ts.annotations["executor"] # type: ignore
except (TypeError, KeyError):
executor = "default"
assert executor in self.executors
try:
e = self.executors[executor]
except KeyError:
raise ValueError(
f"Invalid executor {executor!r}; "
f"expected one of: {sorted(self.executors)}"
)

self.active_keys.add(ts.key)

result: dict
try:
e = self.executors[executor]
ts.start_time = time()
if iscoroutinefunction(function):
result = await apply_function_async(
Expand Down

0 comments on commit b96cb5a

Please sign in to comment.