diff --git a/distributed/scheduler.py b/distributed/scheduler.py index db1d1c89cd..3bce832c7b 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -7443,7 +7443,7 @@ async def retire_workers( close_workers: bool = False, remove: bool = True, stimulus_id: str | None = None, - ) -> dict[str, Any]: ... + ) -> list[str]: ... @overload async def retire_workers( @@ -7453,7 +7453,7 @@ async def retire_workers( close_workers: bool = False, remove: bool = True, stimulus_id: str | None = None, - ) -> dict[str, Any]: ... + ) -> list[str]: ... @overload async def retire_workers( @@ -7469,7 +7469,7 @@ async def retire_workers( minimum: int | None = None, target: int | None = None, attribute: str = "address", - ) -> dict[str, Any]: ... + ) -> list[str]: ... @log_errors async def retire_workers( @@ -7481,7 +7481,7 @@ async def retire_workers( remove: bool = True, stimulus_id: str | None = None, **kwargs: Any, - ) -> dict[str, Any]: + ) -> list[str]: """Gracefully retire workers from cluster. Any key that is in memory exclusively on the retired workers is replicated somewhere else. @@ -7559,7 +7559,7 @@ async def retire_workers( self.workers[address] for address in self.workers_to_close(**kwargs) } if not wss: - return {} + return [] stop_amm = False amm: ActiveMemoryManagerExtension | None = self.extensions.get("amm") @@ -7609,13 +7609,13 @@ async def retire_workers( # time (depending on interval settings) amm.run_once() - workers_info_ok = {} - workers_info_abort = {} - for addr, result, info in await asyncio.gather(*coros): + workers_info_ok = [] + workers_info_abort = [] + for addr, result in await asyncio.gather(*coros): if result == "OK": - workers_info_ok[addr] = info + workers_info_ok.append(addr) else: - workers_info_abort[addr] = info + workers_info_abort.append(addr) finally: if stop_amm: @@ -7625,8 +7625,8 @@ async def retire_workers( "all", { "action": "retire-workers", - "retired": workers_info_ok, - "could-not-retire": workers_info_abort, + "retired": list(workers_info_ok), + "could-not-retire": list(workers_info_abort), "stimulus_id": stimulus_id, }, ) @@ -7649,7 +7649,7 @@ async def _track_retire_worker( close: bool, remove: bool, stimulus_id: str, - ) -> tuple[str, Literal["OK", "no-recipients"], dict]: + ) -> tuple[str, Literal["OK", "no-recipients"]]: while not policy.done(): # Sleep 0.01s when there are 4 tasks or less # Sleep 0.5s when there are 200 or more @@ -7671,7 +7671,7 @@ async def _track_retire_worker( f"Could not retire worker {ws.address!r}: unique data could not be " f"moved to any other worker ({stimulus_id=!r})" ) - return ws.address, "no-recipients", ws.identity() + return ws.address, "no-recipients" logger.debug( f"All unique keys on worker {ws.address!r} have been replicated elsewhere" @@ -7685,7 +7685,7 @@ async def _track_retire_worker( self.close_worker(ws.address) logger.info(f"Retired worker {ws.address!r} ({stimulus_id=!r})") - return ws.address, "OK", ws.identity() + return ws.address, "OK" def add_keys( self, worker: str, keys: Collection[Key] = (), stimulus_id: str | None = None diff --git a/distributed/tests/test_active_memory_manager.py b/distributed/tests/test_active_memory_manager.py index 4f55158622..bcc0d48d71 100644 --- a/distributed/tests/test_active_memory_manager.py +++ b/distributed/tests/test_active_memory_manager.py @@ -977,7 +977,7 @@ async def test_RetireWorker_all_recipients_are_paused(c, s, a, b): x = await c.scatter("x", workers=[a.address]) out = await c.retire_workers([a.address]) - assert out == {} + assert not out assert not s.extensions["amm"].policies assert set(s.workers) == {a.address, b.address} @@ -1230,7 +1230,7 @@ async def test_RetireWorker_with_actor(c, s, a, b, has_proxy): with captured_logger("distributed.active_memory_manager", logging.WARNING) as log: out = await c.retire_workers([a.address]) - assert out == {} + assert not out assert "it holds actor(s)" in log.getvalue() assert "x" in a.state.actors @@ -1250,7 +1250,7 @@ async def test_RetireWorker_with_actor_proxy(c, s, a, b): assert "y" in b.data out = await c.retire_workers([b.address]) - assert out.keys() == {b.address} + assert out == (b.address,) assert "x" in a.state.actors assert "y" in a.data @@ -1301,6 +1301,7 @@ async def tensordot_stress(c, s): assert sum(t.start == "memory" for t in s.transition_log) == expected_tasks +@pytest.mark.slow @gen_cluster( client=True, nthreads=[("", 1)] * 4, @@ -1356,6 +1357,7 @@ async def test_ReduceReplicas_stress(c, s, *workers): await tensordot_stress(c, s) +@pytest.mark.slow @pytest.mark.parametrize("use_ReduceReplicas", [False, True]) @gen_cluster( client=True, diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 443ce70520..92a83c3563 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -1499,7 +1499,6 @@ async def test_retire_workers(c, s, a, b): workers = await s.retire_workers() assert list(workers) == [a.address] - assert workers[a.address]["nthreads"] == a.state.nthreads assert list(s.workers) == [b.address] assert s.workers_to_close() == [] diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 041fa1364a..9e2f235e88 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -3553,7 +3553,7 @@ async def test_execute_preamble_abort_retirement(c, s): # b has shut down. There's nowhere to replicate x to anymore, so retire_workers # will give up and reinstate a to running status. - assert await retire_fut == {} + assert not await retire_fut while a.status != Status.running: await asyncio.sleep(0.01)