From bbc9205a44e2ced0fcb46101e1a3d02267e95ed3 Mon Sep 17 00:00:00 2001 From: Min RK Date: Thu, 19 Oct 2023 09:52:08 +0200 Subject: [PATCH] make sure futures can be cancelled implement TaskGroup-style cancellation of pending tasks, since gather doesn't cancel unfinished tasks unless gather _itself_ is cancelled. --- kubespawner/spawner.py | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/kubespawner/spawner.py b/kubespawner/spawner.py index 8a6d97a6..f2e9caa3 100644 --- a/kubespawner/spawner.py +++ b/kubespawner/spawner.py @@ -2467,11 +2467,18 @@ async def _stop_all_reflectors(cls): reflector = cls.reflectors.pop(key) tasks.append(reflector.stop()) + # make sure all tasks are Futures so we can cancel them later + # in case of error + futures = [asyncio.ensure_future(task) for task in tasks] try: - await asyncio.gather(*tasks) + await asyncio.gather(*futures) except Exception: - for task in tasks: - task.cancel() + # cancel any unfinished tasks before re-raising + # because gather doesn't cancel unfinished tasks. + # TaskGroup would do this cancel for us, but requires Python 3.11 + for future in futures: + if not future.done(): + future.cancel() raise def start(self): @@ -2659,14 +2666,21 @@ async def _start(self): # namespace can be changed via kubespawner_override, start watching pods only after # load_user_options() is called - start_futures = [self._start_watching_pods()] + start_tasks = [self._start_watching_pods()] if self.events_enabled: - start_futures.append(self._start_watching_events()) + start_tasks.append(self._start_watching_events()) + # create Futures for coroutines so we can cancel them + # in case of an error + start_futures = [asyncio.ensure_future(task) for task in start_tasks] try: await asyncio.gather(*start_futures) except Exception: + # cancel any unfinished tasks before re-raising + # because gather doesn't cancel unfinished tasks. + # TaskGroup would do this cancel for us, but requires Python 3.11 for future in start_futures: - future.cancel() + if not future.done(): + future.cancel() raise # record latest event so we don't include old