diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 0caa128c02b..9c0be0ab0f2 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1890,6 +1890,15 @@ def get_thread_name(): assert "Dask-Foo-Threads" in gpu_result +@gen_cluster(client=True) +async def test_bad_executor_annotation(c, s, a, b): + with dask.annotate(executor="bad"): + future = c.submit(inc, 1) + with pytest.raises(ValueError, match="Invalid executor 'bad'; expected one of: "): + await future + assert future.status == "error" + + @gen_cluster(client=True) async def test_process_executor(c, s, a, b): with ProcessPoolExecutor() as e: diff --git a/distributed/worker.py b/distributed/worker.py index 076eb1f0e8d..92ad1881b96 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -3429,17 +3429,22 @@ async def execute(self, key: str, *, stimulus_id: str) -> tuple[Recs, Instructio args2, kwargs2 = self._prepare_args_for_execution(ts, args, kwargs) - if ts.annotations is not None and "executor" in ts.annotations: - executor = ts.annotations["executor"] - else: + try: + executor = ts.annotations["executor"] # type: ignore + except (TypeError, KeyError): executor = "default" - assert executor in self.executors + try: + e = self.executors[executor] + except KeyError: + raise ValueError( + f"Invalid executor {executor!r}; " + f"expected one of: {sorted(self.executors)}" + ) self.active_keys.add(ts.key) result: dict try: - e = self.executors[executor] ts.start_time = time() if iscoroutinefunction(function): result = await apply_function_async(