From 4002695554ad6c6ef73ef1380b569c611d0ba268 Mon Sep 17 00:00:00 2001 From: Peter Bierma Date: Mon, 23 Sep 2024 16:20:32 -0400 Subject: [PATCH 01/16] Partial implementation. --- Lib/asyncio/staggered.py | 106 +++++++----------- .../test_asyncio/test_eager_task_factory.py | 20 ++++ Lib/test/test_asyncio/test_staggered.py | 4 + 3 files changed, 62 insertions(+), 68 deletions(-) diff --git a/Lib/asyncio/staggered.py b/Lib/asyncio/staggered.py index c3a7441a7b091d..61c3dc50308fce 100644 --- a/Lib/asyncio/staggered.py +++ b/Lib/asyncio/staggered.py @@ -6,8 +6,8 @@ from . import events from . import exceptions as exceptions_mod -from . import locks from . import tasks +from . import taskgroups async def staggered_race(coro_fns, delay, *, loop=None): @@ -63,76 +63,46 @@ async def staggered_race(coro_fns, delay, *, loop=None): """ # TODO: when we have aiter() and anext(), allow async iterables in coro_fns. loop = loop or events.get_running_loop() - enum_coro_fns = enumerate(coro_fns) winner_result = None winner_index = None exceptions = [] - running_tasks = [] - - async def run_one_coro(previous_failed) -> None: - # Wait for the previous task to finish, or for delay seconds - if previous_failed is not None: - with contextlib.suppress(exceptions_mod.TimeoutError): - # Use asyncio.wait_for() instead of asyncio.wait() here, so - # that if we get cancelled at this point, Event.wait() is also - # cancelled, otherwise there will be a "Task destroyed but it is - # pending" later. - await tasks.wait_for(previous_failed.wait(), delay) - # Get the next coroutine to run - try: - this_index, coro_fn = next(enum_coro_fns) - except StopIteration: - return - # Start task that will run the next coroutine - this_failed = locks.Event() - next_task = loop.create_task(run_one_coro(this_failed)) - running_tasks.append(next_task) - assert len(running_tasks) == this_index + 2 - # Prepare place to put this coroutine's exceptions if not won - exceptions.append(None) - assert len(exceptions) == this_index + 1 + + def future_callback(index, future, task_group): + assert future.done() try: - result = await coro_fn() - except (SystemExit, KeyboardInterrupt): - raise - except BaseException as e: - exceptions[this_index] = e - this_failed.set() # Kickstart the next coroutine + error = future.exception() + except exceptions_mod.CancelledError as cancelled_error: + exceptions[index] = cancelled_error else: - # Store winner's results - nonlocal winner_index, winner_result - assert winner_index is None - winner_index = this_index - winner_result = result - # Cancel all other tasks. We take care to not cancel the current - # task as well. If we do so, then since there is no `await` after - # here and CancelledError are usually thrown at one, we will - # encounter a curious corner case where the current task will end - # up as done() == True, cancelled() == False, exception() == - # asyncio.CancelledError. This behavior is specified in - # https://bugs.python.org/issue30048 - for i, t in enumerate(running_tasks): - if i != this_index: - t.cancel() - - first_task = loop.create_task(run_one_coro(None)) - running_tasks.append(first_task) - try: - # Wait for a growing list of tasks to all finish: poor man's version of - # curio's TaskGroup or trio's nursery - done_count = 0 - while done_count != len(running_tasks): - done, _ = await tasks.wait(running_tasks) - done_count = len(done) - # If run_one_coro raises an unhandled exception, it's probably a - # programming error, and I want to see it. - if __debug__: - for d in done: - if d.done() and not d.cancelled() and d.exception(): - raise d.exception() - return winner_result, winner_index, exceptions - finally: - # Make sure no tasks are left running if we leave this function - for t in running_tasks: - t.cancel() + exceptions[index] = error + task_group._errors.remove(error) + + nonlocal winner_result, winner_index + if (winner_result is None) and (not task_group._aborting): + # If this is in an eager task factory, it's possible + # for multiple tasks to get here. In that case, we want + # only the first one to win and the rest to no-op before + # cancellation. + winner_result = future.result() + winner_index = index + task_group._abort() + + async with taskgroups.TaskGroup() as task_group: + for index, coro in enumerate(coro_fns): + if task_group._aborting: + break + + def wrapper(idx): + return lambda future: future_callback(idx, future, task_group) + + exceptions.append(None) + task = task_group.create_task(coro()) + task.add_done_callback(wrapper(index)) + + if delay is not None: + await tasks.sleep(delay) + else: + await task + + return winner_result, winner_index, exceptions diff --git a/Lib/test/test_asyncio/test_eager_task_factory.py b/Lib/test/test_asyncio/test_eager_task_factory.py index 0777f39b572486..0a0ceb9d16e6d0 100644 --- a/Lib/test/test_asyncio/test_eager_task_factory.py +++ b/Lib/test/test_asyncio/test_eager_task_factory.py @@ -213,6 +213,26 @@ async def run(): self.run_coro(run()) + def test_staggered_race_with_eager_tasks(self): + # See GH-124309 + async def coro(amount): + await asyncio.sleep(amount) + return amount + + async def run(): + winner, index, excs = await asyncio.staggered.staggered_race( + [ + lambda: coro(1), + lambda: coro(0), + lambda: coro(2) + ], + delay=None + ) + self.assertEqual(winner, 0) + self.assertEqual(index, 1) + + self.run_coro(run()) + class PyEagerTaskFactoryLoopTests(EagerTaskFactoryLoopTests, test_utils.TestCase): Task = tasks._PyTask diff --git a/Lib/test/test_asyncio/test_staggered.py b/Lib/test/test_asyncio/test_staggered.py index e6e32f7dbbbcba..0f8acb63053fee 100644 --- a/Lib/test/test_asyncio/test_staggered.py +++ b/Lib/test/test_asyncio/test_staggered.py @@ -95,3 +95,7 @@ async def coro(index): self.assertEqual(len(excs), 2) self.assertIsInstance(excs[0], ValueError) self.assertIsInstance(excs[1], ValueError) + + +if __name__ == "__main__": + unittest.main() From 302d7e33af0990e58d4675139e0bb824f6a53a5f Mon Sep 17 00:00:00 2001 From: Peter Bierma Date: Mon, 23 Sep 2024 16:31:25 -0400 Subject: [PATCH 02/16] Add unit tests. --- .../test_asyncio/test_eager_task_factory.py | 40 +++++++++++++++---- 1 file changed, 32 insertions(+), 8 deletions(-) diff --git a/Lib/test/test_asyncio/test_eager_task_factory.py b/Lib/test/test_asyncio/test_eager_task_factory.py index 0a0ceb9d16e6d0..df9941b7c20e20 100644 --- a/Lib/test/test_asyncio/test_eager_task_factory.py +++ b/Lib/test/test_asyncio/test_eager_task_factory.py @@ -213,27 +213,51 @@ async def run(): self.run_coro(run()) + # See GH-124309 for both of these def test_staggered_race_with_eager_tasks(self): - # See GH-124309 - async def coro(amount): - await asyncio.sleep(amount) - return amount + async def fail(): + await asyncio.sleep(0) # Dummy coroutine + raise ValueError("no good") async def run(): winner, index, excs = await asyncio.staggered.staggered_race( [ - lambda: coro(1), - lambda: coro(0), - lambda: coro(2) + lambda: asyncio.sleep(2), + lambda: asyncio.sleep(1), + lambda: fail() + ], + delay=0.25 + ) + self.assertIsNone(winner) + self.assertEqual(index, 1) + self.assertIsNone(excs[index]) + self.assertIsInstance(excs[0], asyncio.CancelledError) + self.assertIsInstance(excs[2], ValueError) + + self.run_coro(run()) + + def test_staggered_race_with_eager_tasks_no_delay(self): + async def fail(): + raise ValueError("no good") + + async def run(): + winner, index, excs = await asyncio.staggered.staggered_race( + [ + lambda: asyncio.sleep(1), + lambda: asyncio.sleep(0), + lambda: fail() ], delay=None ) - self.assertEqual(winner, 0) + self.assertIsNone(winner) self.assertEqual(index, 1) + self.assertIsNone(excs[index]) + self.assertIsInstance(excs[2], ValueError) self.run_coro(run()) + class PyEagerTaskFactoryLoopTests(EagerTaskFactoryLoopTests, test_utils.TestCase): Task = tasks._PyTask From e2cf78a194d8ca8046ce52e223ddd6584c062689 Mon Sep 17 00:00:00 2001 From: Peter Bierma Date: Mon, 23 Sep 2024 18:14:48 -0400 Subject: [PATCH 03/16] Fix tests with eager task factory. --- Lib/asyncio/staggered.py | 41 ++++++++++++++----- Lib/asyncio/taskgroups.py | 8 +++- .../test_asyncio/test_eager_task_factory.py | 5 ++- 3 files changed, 39 insertions(+), 15 deletions(-) diff --git a/Lib/asyncio/staggered.py b/Lib/asyncio/staggered.py index 61c3dc50308fce..4fcb55ea2101ba 100644 --- a/Lib/asyncio/staggered.py +++ b/Lib/asyncio/staggered.py @@ -72,20 +72,26 @@ def future_callback(index, future, task_group): try: error = future.exception() + exceptions[index] = error except exceptions_mod.CancelledError as cancelled_error: + # If another task finishes first and cancels this task, it + # is propagated here. exceptions[index] = cancelled_error + return else: - exceptions[index] = error - task_group._errors.remove(error) + if error is not None: + return nonlocal winner_result, winner_index - if (winner_result is None) and (not task_group._aborting): - # If this is in an eager task factory, it's possible - # for multiple tasks to get here. In that case, we want - # only the first one to win and the rest to no-op before - # cancellation. + # If this is in an eager task factory, it's possible + # for multiple tasks to get here. In that case, we want + # only the first one to win and the rest to no-op before + # cancellation. + if winner_result is None and not task_group._aborting: winner_result = future.result() winner_index = index + + # Cancel all other tasks, we win! task_group._abort() async with taskgroups.TaskGroup() as task_group: @@ -93,16 +99,29 @@ def future_callback(index, future, task_group): if task_group._aborting: break + exceptions.append(None) + task = loop.create_task(coro()) + + # We don't want the task group to propagate the error. Instead, + # we want to put it in our special exceptions list, so we manually + # create the task. + task.add_done_callback(task_group._on_task_done_without_propagation) + task_group._tasks.add(task) + + # We need this extra wrapper here to stop the closure from having + # an incorrect index. def wrapper(idx): return lambda future: future_callback(idx, future, task_group) - exceptions.append(None) - task = task_group.create_task(coro()) task.add_done_callback(wrapper(index)) if delay is not None: - await tasks.sleep(delay) + await tasks.sleep(delay or 0) else: - await task + # We don't care about exceptions here, the callback will + # deal with it. + with contextlib.suppress(BaseException): + # If there's no delay, we just wait until completion. + await task return winner_result, winner_index, exceptions diff --git a/Lib/asyncio/taskgroups.py b/Lib/asyncio/taskgroups.py index f2ee9648c43876..ad033bc51a145c 100644 --- a/Lib/asyncio/taskgroups.py +++ b/Lib/asyncio/taskgroups.py @@ -199,7 +199,8 @@ def _abort(self): if not t.done(): t.cancel() - def _on_task_done(self, task): + def _on_task_done_without_propagation(self, task): + # For staggered_race() self._tasks.discard(task) if self._on_completed_fut is not None and not self._tasks: @@ -209,7 +210,10 @@ def _on_task_done(self, task): if task.cancelled(): return - exc = task.exception() + return task.exception() + + def _on_task_done(self, task): + exc = self._on_task_done_without_propagation(task) if exc is None: return diff --git a/Lib/test/test_asyncio/test_eager_task_factory.py b/Lib/test/test_asyncio/test_eager_task_factory.py index df9941b7c20e20..d4839eeb850c7f 100644 --- a/Lib/test/test_asyncio/test_eager_task_factory.py +++ b/Lib/test/test_asyncio/test_eager_task_factory.py @@ -243,16 +243,17 @@ async def fail(): async def run(): winner, index, excs = await asyncio.staggered.staggered_race( [ + lambda: fail(), lambda: asyncio.sleep(1), lambda: asyncio.sleep(0), - lambda: fail() ], delay=None ) self.assertIsNone(winner) self.assertEqual(index, 1) self.assertIsNone(excs[index]) - self.assertIsInstance(excs[2], ValueError) + self.assertIsInstance(excs[0], ValueError) + self.assertEqual(len(excs), 2) self.run_coro(run()) From 48a1798d339d834342e35d9278ff8be7227b32f2 Mon Sep 17 00:00:00 2001 From: Peter Bierma Date: Mon, 23 Sep 2024 18:19:24 -0400 Subject: [PATCH 04/16] Add NEWS entry. --- Lib/asyncio/staggered.py | 2 +- .../next/Library/2024-09-23-18-18-23.gh-issue-124309.iFcarA.rst | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) create mode 100644 Misc/NEWS.d/next/Library/2024-09-23-18-18-23.gh-issue-124309.iFcarA.rst diff --git a/Lib/asyncio/staggered.py b/Lib/asyncio/staggered.py index 4fcb55ea2101ba..53af83e043055a 100644 --- a/Lib/asyncio/staggered.py +++ b/Lib/asyncio/staggered.py @@ -61,7 +61,7 @@ async def staggered_race(coro_fns, delay, *, loop=None): coroutine's entry is ``None``. """ - # TODO: when we have aiter() and anext(), allow async iterables in coro_fns. + # TODO: allow async iterables in coro_fns loop = loop or events.get_running_loop() winner_result = None winner_index = None diff --git a/Misc/NEWS.d/next/Library/2024-09-23-18-18-23.gh-issue-124309.iFcarA.rst b/Misc/NEWS.d/next/Library/2024-09-23-18-18-23.gh-issue-124309.iFcarA.rst new file mode 100644 index 00000000000000..a70630456a4e8e --- /dev/null +++ b/Misc/NEWS.d/next/Library/2024-09-23-18-18-23.gh-issue-124309.iFcarA.rst @@ -0,0 +1,2 @@ +Modernize the :func:`!asyncio.staggered.staggered_task` function to now +support :attr:`asyncio.eager_task_factory` and use :class:`asyncio.TaskGroup` internally. From e213f518b20ff398f32f44097a34a3f63f1f99b1 Mon Sep 17 00:00:00 2001 From: Peter Bierma Date: Tue, 24 Sep 2024 15:56:08 -0400 Subject: [PATCH 05/16] Refactor to use only public APIs Co-authored-by: Thomas Grainger --- Lib/asyncio/staggered.py | 87 ++++++++++++++-------------------------- 1 file changed, 29 insertions(+), 58 deletions(-) diff --git a/Lib/asyncio/staggered.py b/Lib/asyncio/staggered.py index 53af83e043055a..29bed1bd395869 100644 --- a/Lib/asyncio/staggered.py +++ b/Lib/asyncio/staggered.py @@ -4,12 +4,14 @@ import contextlib -from . import events -from . import exceptions as exceptions_mod +from . import locks from . import tasks from . import taskgroups +class _Done(Exception): + pass + async def staggered_race(coro_fns, delay, *, loop=None): """Run coroutines with staggered start times and take the first to finish. @@ -61,67 +63,36 @@ async def staggered_race(coro_fns, delay, *, loop=None): coroutine's entry is ``None``. """ - # TODO: allow async iterables in coro_fns - loop = loop or events.get_running_loop() + # TODO: when we have aiter() and anext(), allow async iterables in coro_fns. winner_result = None winner_index = None exceptions = [] - def future_callback(index, future, task_group): - assert future.done() - + async def run_one_coro(this_index, coro_fn, this_failed): try: - error = future.exception() - exceptions[index] = error - except exceptions_mod.CancelledError as cancelled_error: - # If another task finishes first and cancels this task, it - # is propagated here. - exceptions[index] = cancelled_error - return + result = await coro_fn() + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as e: + exceptions[this_index] = e + this_failed.set() # Kickstart the next coroutine else: - if error is not None: - return - - nonlocal winner_result, winner_index - # If this is in an eager task factory, it's possible - # for multiple tasks to get here. In that case, we want - # only the first one to win and the rest to no-op before - # cancellation. - if winner_result is None and not task_group._aborting: - winner_result = future.result() - winner_index = index - - # Cancel all other tasks, we win! - task_group._abort() - - async with taskgroups.TaskGroup() as task_group: - for index, coro in enumerate(coro_fns): - if task_group._aborting: - break - - exceptions.append(None) - task = loop.create_task(coro()) - - # We don't want the task group to propagate the error. Instead, - # we want to put it in our special exceptions list, so we manually - # create the task. - task.add_done_callback(task_group._on_task_done_without_propagation) - task_group._tasks.add(task) - - # We need this extra wrapper here to stop the closure from having - # an incorrect index. - def wrapper(idx): - return lambda future: future_callback(idx, future, task_group) - - task.add_done_callback(wrapper(index)) - - if delay is not None: - await tasks.sleep(delay or 0) - else: - # We don't care about exceptions here, the callback will - # deal with it. - with contextlib.suppress(BaseException): - # If there's no delay, we just wait until completion. - await task + # Store winner's results + nonlocal winner_index, winner_result + # There could be more than one winner + winner_index = this_index + winner_result = result + raise _Done + + try: + async with taskgroups.TaskGroup() as tg: + for this_index, coro_fn in enumerate(coro_fns): + this_failed = locks.Event() + exceptions.append(None) + tg.create_task(run_one_coro(this_index, coro_fn, this_failed)) + with contextlib.suppress(TimeoutError): + await tasks.wait_for(this_failed.wait(), delay) + except* _Done: + pass return winner_result, winner_index, exceptions From 0d27caf1d6c6037780b604ab9646b88eddd30391 Mon Sep 17 00:00:00 2001 From: Peter Bierma Date: Tue, 24 Sep 2024 15:58:31 -0400 Subject: [PATCH 06/16] Revert changes to TaskGroup. --- Lib/asyncio/taskgroups.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/Lib/asyncio/taskgroups.py b/Lib/asyncio/taskgroups.py index ad033bc51a145c..f2ee9648c43876 100644 --- a/Lib/asyncio/taskgroups.py +++ b/Lib/asyncio/taskgroups.py @@ -199,8 +199,7 @@ def _abort(self): if not t.done(): t.cancel() - def _on_task_done_without_propagation(self, task): - # For staggered_race() + def _on_task_done(self, task): self._tasks.discard(task) if self._on_completed_fut is not None and not self._tasks: @@ -210,10 +209,7 @@ def _on_task_done_without_propagation(self, task): if task.cancelled(): return - return task.exception() - - def _on_task_done(self, task): - exc = self._on_task_done_without_propagation(task) + exc = task.exception() if exc is None: return From 229eaf8bc41f4ff4f5d33db438a8bf5444828dab Mon Sep 17 00:00:00 2001 From: Peter Bierma Date: Tue, 24 Sep 2024 16:10:56 -0400 Subject: [PATCH 07/16] Add extra test. --- Lib/test/test_asyncio/test_staggered.py | 33 ++++++++++++++++++++++--- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/Lib/test/test_asyncio/test_staggered.py b/Lib/test/test_asyncio/test_staggered.py index 0f8acb63053fee..fe3538a95cfd07 100644 --- a/Lib/test/test_asyncio/test_staggered.py +++ b/Lib/test/test_asyncio/test_staggered.py @@ -82,19 +82,44 @@ async def test_none_successful(self): async def coro(index): raise ValueError(index) + for delay in [None, 0, 0.1, 1]: + with self.subTest(delay=delay): + winner, index, excs = await staggered_race( + [ + lambda: coro(0), + lambda: coro(1), + ], + delay=delay, + ) + + self.assertIs(winner, None) + self.assertIs(index, None) + self.assertEqual(len(excs), 2) + self.assertIsInstance(excs[0], ValueError) + self.assertIsInstance(excs[1], ValueError) + + async def test_long_delay_early_failure(self): + async def coro(index): + await asyncio.sleep(0) # Dummy coroutine for the 1 case + if index == 0: + await asyncio.sleep(0.1) # Dummy coroutine + raise ValueError(index) + + return f'Res: {index}' + winner, index, excs = await staggered_race( [ lambda: coro(0), lambda: coro(1), ], - delay=None, + delay=10, ) - self.assertIs(winner, None) - self.assertIs(index, None) + self.assertEqual(winner, 'Res: 1') + self.assertEqual(index, 1) self.assertEqual(len(excs), 2) self.assertIsInstance(excs[0], ValueError) - self.assertIsInstance(excs[1], ValueError) + self.assertIsNone(excs[1], None) if __name__ == "__main__": From aa00ebc60134f5e1b0ff09265f713b000982e902 Mon Sep 17 00:00:00 2001 From: Peter Bierma Date: Tue, 24 Sep 2024 16:13:37 -0400 Subject: [PATCH 08/16] Add sanity check. --- Lib/asyncio/staggered.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/asyncio/staggered.py b/Lib/asyncio/staggered.py index 29bed1bd395869..179beaa3796b80 100644 --- a/Lib/asyncio/staggered.py +++ b/Lib/asyncio/staggered.py @@ -79,7 +79,7 @@ async def run_one_coro(this_index, coro_fn, this_failed): else: # Store winner's results nonlocal winner_index, winner_result - # There could be more than one winner + assert winner_index is None winner_index = this_index winner_result = result raise _Done From 57bb25a6324252998146ea4e0dd4075dcfbf6929 Mon Sep 17 00:00:00 2001 From: Peter Bierma Date: Tue, 24 Sep 2024 16:18:43 -0400 Subject: [PATCH 09/16] Remove obsolete loop parameter. --- Lib/asyncio/base_events.py | 2 +- Lib/asyncio/staggered.py | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index 000647f57dd9e3..ffcc0174e1e245 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -1144,7 +1144,7 @@ async def create_connection( (functools.partial(self._connect_sock, exceptions, addrinfo, laddr_infos) for addrinfo in infos), - happy_eyeballs_delay, loop=self) + happy_eyeballs_delay) if sock is None: exceptions = [exc for sub in exceptions for exc in sub] diff --git a/Lib/asyncio/staggered.py b/Lib/asyncio/staggered.py index 179beaa3796b80..dc5a17835300c9 100644 --- a/Lib/asyncio/staggered.py +++ b/Lib/asyncio/staggered.py @@ -12,7 +12,8 @@ class _Done(Exception): pass -async def staggered_race(coro_fns, delay, *, loop=None): + +async def staggered_race(coro_fns, delay): """Run coroutines with staggered start times and take the first to finish. This method takes an iterable of coroutine functions. The first one is @@ -44,8 +45,6 @@ async def staggered_race(coro_fns, delay, *, loop=None): delay: amount of time, in seconds, between starting coroutines. If ``None``, the coroutines will run sequentially. - loop: the event loop to use. - Returns: tuple *(winner_result, winner_index, exceptions)* where From 69408894e5f86ecc29372dfcbead3c36f75c1a72 Mon Sep 17 00:00:00 2001 From: Peter Bierma Date: Tue, 24 Sep 2024 16:29:14 -0400 Subject: [PATCH 10/16] Deprecate the loop parameter. --- Lib/asyncio/staggered.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/Lib/asyncio/staggered.py b/Lib/asyncio/staggered.py index dc5a17835300c9..18c1971ed716c6 100644 --- a/Lib/asyncio/staggered.py +++ b/Lib/asyncio/staggered.py @@ -8,12 +8,10 @@ from . import tasks from . import taskgroups - class _Done(Exception): pass - -async def staggered_race(coro_fns, delay): +async def staggered_race(coro_fns, delay, *, loop=None): """Run coroutines with staggered start times and take the first to finish. This method takes an iterable of coroutine functions. The first one is @@ -67,6 +65,15 @@ async def staggered_race(coro_fns, delay): winner_index = None exceptions = [] + if loop is not None: + import warnings + warnings._deprecated( + 'loop', + 'the {name!r} parameter is deprecated and slated for removal in ' + 'Python {remove}; it does nothing since 3.14', + remove=(3, 16), + ) + async def run_one_coro(this_index, coro_fn, this_failed): try: result = await coro_fn() From d01ed7fcd9ca80fbff603995269d56ceb16cbb8a Mon Sep 17 00:00:00 2001 From: Peter Bierma Date: Tue, 24 Sep 2024 16:45:19 -0400 Subject: [PATCH 11/16] News: only mention the bugfix. --- .../Library/2024-09-23-18-18-23.gh-issue-124309.iFcarA.rst | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Misc/NEWS.d/next/Library/2024-09-23-18-18-23.gh-issue-124309.iFcarA.rst b/Misc/NEWS.d/next/Library/2024-09-23-18-18-23.gh-issue-124309.iFcarA.rst index a70630456a4e8e..1999b3b4de258b 100644 --- a/Misc/NEWS.d/next/Library/2024-09-23-18-18-23.gh-issue-124309.iFcarA.rst +++ b/Misc/NEWS.d/next/Library/2024-09-23-18-18-23.gh-issue-124309.iFcarA.rst @@ -1,2 +1 @@ -Modernize the :func:`!asyncio.staggered.staggered_task` function to now -support :attr:`asyncio.eager_task_factory` and use :class:`asyncio.TaskGroup` internally. +Fixed :exc:`AssertionError` when using :func:`!asyncio.staggered.staggered_task` with :attr:`asyncio.eager_task_factory`. From 9c04271d2d14e4f177362933a70f6a612ce3df02 Mon Sep 17 00:00:00 2001 From: Peter Bierma Date: Wed, 25 Sep 2024 20:44:39 -0400 Subject: [PATCH 12/16] Update Lib/test/test_asyncio/test_staggered.py Co-authored-by: Jelle Zijlstra --- Lib/test/test_asyncio/test_staggered.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/test/test_asyncio/test_staggered.py b/Lib/test/test_asyncio/test_staggered.py index fe3538a95cfd07..21a39b3f911747 100644 --- a/Lib/test/test_asyncio/test_staggered.py +++ b/Lib/test/test_asyncio/test_staggered.py @@ -119,7 +119,7 @@ async def coro(index): self.assertEqual(index, 1) self.assertEqual(len(excs), 2) self.assertIsInstance(excs[0], ValueError) - self.assertIsNone(excs[1], None) + self.assertIsNone(excs[1]) if __name__ == "__main__": From e11a52a6387f483b3cdfff0663e9e3f948e11987 Mon Sep 17 00:00:00 2001 From: Peter Bierma Date: Wed, 25 Sep 2024 20:53:10 -0400 Subject: [PATCH 13/16] Update Lib/asyncio/staggered.py Co-authored-by: Carol Willing --- Lib/asyncio/staggered.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/asyncio/staggered.py b/Lib/asyncio/staggered.py index 18c1971ed716c6..20045b35d16ecb 100644 --- a/Lib/asyncio/staggered.py +++ b/Lib/asyncio/staggered.py @@ -70,7 +70,7 @@ async def staggered_race(coro_fns, delay, *, loop=None): warnings._deprecated( 'loop', 'the {name!r} parameter is deprecated and slated for removal in ' - 'Python {remove}; it does nothing since 3.14', + 'Python {remove}; it is ignored since 3.14', remove=(3, 16), ) From 3fbbedb0ad64eeba7284d1244158b892497d55c8 Mon Sep 17 00:00:00 2001 From: Kumar Aditya Date: Thu, 26 Sep 2024 10:04:35 +0530 Subject: [PATCH 14/16] code review --- Lib/asyncio/staggered.py | 11 +---------- Lib/test/test_asyncio/test_eager_task_factory.py | 6 ++++-- 2 files changed, 5 insertions(+), 12 deletions(-) diff --git a/Lib/asyncio/staggered.py b/Lib/asyncio/staggered.py index 20045b35d16ecb..4458d01dece0e6 100644 --- a/Lib/asyncio/staggered.py +++ b/Lib/asyncio/staggered.py @@ -11,7 +11,7 @@ class _Done(Exception): pass -async def staggered_race(coro_fns, delay, *, loop=None): +async def staggered_race(coro_fns, delay): """Run coroutines with staggered start times and take the first to finish. This method takes an iterable of coroutine functions. The first one is @@ -65,15 +65,6 @@ async def staggered_race(coro_fns, delay, *, loop=None): winner_index = None exceptions = [] - if loop is not None: - import warnings - warnings._deprecated( - 'loop', - 'the {name!r} parameter is deprecated and slated for removal in ' - 'Python {remove}; it is ignored since 3.14', - remove=(3, 16), - ) - async def run_one_coro(this_index, coro_fn, this_failed): try: result = await coro_fn() diff --git a/Lib/test/test_asyncio/test_eager_task_factory.py b/Lib/test/test_asyncio/test_eager_task_factory.py index d4839eeb850c7f..25ec39c9deafe3 100644 --- a/Lib/test/test_asyncio/test_eager_task_factory.py +++ b/Lib/test/test_asyncio/test_eager_task_factory.py @@ -213,10 +213,11 @@ async def run(): self.run_coro(run()) - # See GH-124309 for both of these def test_staggered_race_with_eager_tasks(self): + # See https://github.com/python/cpython/issues/124309 + async def fail(): - await asyncio.sleep(0) # Dummy coroutine + await asyncio.sleep(0) raise ValueError("no good") async def run(): @@ -237,6 +238,7 @@ async def run(): self.run_coro(run()) def test_staggered_race_with_eager_tasks_no_delay(self): + # See https://github.com/python/cpython/issues/124309 async def fail(): raise ValueError("no good") From 1b2e217dde74740e74f9c6969670a6a9aa882921 Mon Sep 17 00:00:00 2001 From: Kumar Aditya Date: Thu, 26 Sep 2024 10:05:30 +0530 Subject: [PATCH 15/16] fix news --- .../next/Library/2024-09-23-18-18-23.gh-issue-124309.iFcarA.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Misc/NEWS.d/next/Library/2024-09-23-18-18-23.gh-issue-124309.iFcarA.rst b/Misc/NEWS.d/next/Library/2024-09-23-18-18-23.gh-issue-124309.iFcarA.rst index 1999b3b4de258b..89610fa44bf743 100644 --- a/Misc/NEWS.d/next/Library/2024-09-23-18-18-23.gh-issue-124309.iFcarA.rst +++ b/Misc/NEWS.d/next/Library/2024-09-23-18-18-23.gh-issue-124309.iFcarA.rst @@ -1 +1 @@ -Fixed :exc:`AssertionError` when using :func:`!asyncio.staggered.staggered_task` with :attr:`asyncio.eager_task_factory`. +Fixed :exc:`AssertionError` when using :func:`!asyncio.staggered.staggered_race` with :attr:`asyncio.eager_task_factory`. From e1047b96c014e7756d96b642b10318ebb2b4ed74 Mon Sep 17 00:00:00 2001 From: Kumar Aditya Date: Thu, 26 Sep 2024 10:15:51 +0530 Subject: [PATCH 16/16] better tests --- Lib/test/test_asyncio/test_eager_task_factory.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/Lib/test/test_asyncio/test_eager_task_factory.py b/Lib/test/test_asyncio/test_eager_task_factory.py index 25ec39c9deafe3..1579ad1188d725 100644 --- a/Lib/test/test_asyncio/test_eager_task_factory.py +++ b/Lib/test/test_asyncio/test_eager_task_factory.py @@ -223,13 +223,13 @@ async def fail(): async def run(): winner, index, excs = await asyncio.staggered.staggered_race( [ - lambda: asyncio.sleep(2), - lambda: asyncio.sleep(1), + lambda: asyncio.sleep(2, result="sleep2"), + lambda: asyncio.sleep(1, result="sleep1"), lambda: fail() ], delay=0.25 ) - self.assertIsNone(winner) + self.assertEqual(winner, 'sleep1') self.assertEqual(index, 1) self.assertIsNone(excs[index]) self.assertIsInstance(excs[0], asyncio.CancelledError) @@ -246,12 +246,12 @@ async def run(): winner, index, excs = await asyncio.staggered.staggered_race( [ lambda: fail(), - lambda: asyncio.sleep(1), - lambda: asyncio.sleep(0), + lambda: asyncio.sleep(1, result="sleep1"), + lambda: asyncio.sleep(0, result="sleep0"), ], delay=None ) - self.assertIsNone(winner) + self.assertEqual(winner, 'sleep1') self.assertEqual(index, 1) self.assertIsNone(excs[index]) self.assertIsInstance(excs[0], ValueError)