From c03e43fd05710df9971805c69cd1f0b3250971e2 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Tue, 1 Oct 2024 14:39:38 +0100 Subject: [PATCH 1/4] restore tests from reverted commits Co-Authored-By: Peter Bierma --- .../test_asyncio/test_eager_task_factory.py | 46 +++++++++++++++++++ Lib/test/test_asyncio/test_staggered.py | 27 +++++++++++ 2 files changed, 73 insertions(+) diff --git a/Lib/test/test_asyncio/test_eager_task_factory.py b/Lib/test/test_asyncio/test_eager_task_factory.py index 0777f39b572486..31d2a00dbb8c9c 100644 --- a/Lib/test/test_asyncio/test_eager_task_factory.py +++ b/Lib/test/test_asyncio/test_eager_task_factory.py @@ -213,6 +213,52 @@ async def run(): self.run_coro(run()) + def test_staggered_race_with_eager_tasks(self): + # See https://github.com/python/cpython/issues/124309 + + async def fail(): + await asyncio.sleep(0) + raise ValueError("no good") + + async def run(): + winner, index, excs = await asyncio.staggered.staggered_race( + [ + lambda: asyncio.sleep(2, result="sleep2"), + lambda: asyncio.sleep(1, result="sleep1"), + lambda: fail() + ], + delay=0.25 + ) + self.assertEqual(winner, 'sleep1') + self.assertEqual(index, 1) + self.assertIsNone(excs[index]) + self.assertIsInstance(excs[0], asyncio.CancelledError) + self.assertIsInstance(excs[2], ValueError) + + self.run_coro(run()) + + def test_staggered_race_with_eager_tasks_no_delay(self): + # See https://github.com/python/cpython/issues/124309 + async def fail(): + raise ValueError("no good") + + async def run(): + winner, index, excs = await asyncio.staggered.staggered_race( + [ + lambda: fail(), + lambda: asyncio.sleep(1, result="sleep1"), + lambda: asyncio.sleep(0, result="sleep0"), + ], + delay=None + ) + self.assertEqual(winner, 'sleep1') + self.assertEqual(index, 1) + self.assertIsNone(excs[index]) + self.assertIsInstance(excs[0], ValueError) + self.assertEqual(len(excs), 2) + + self.run_coro(run()) + class PyEagerTaskFactoryLoopTests(EagerTaskFactoryLoopTests, test_utils.TestCase): Task = tasks._PyTask diff --git a/Lib/test/test_asyncio/test_staggered.py b/Lib/test/test_asyncio/test_staggered.py index e6e32f7dbbbcba..74941f704c4890 100644 --- a/Lib/test/test_asyncio/test_staggered.py +++ b/Lib/test/test_asyncio/test_staggered.py @@ -95,3 +95,30 @@ async def coro(index): self.assertEqual(len(excs), 2) self.assertIsInstance(excs[0], ValueError) self.assertIsInstance(excs[1], ValueError) + + + async def test_multiple_winners(self): + event = asyncio.Event() + + async def coro(index): + await event.wait() + return index + + async def do_set(): + event.set() + await asyncio.Event().wait() + + winner, index, excs = await staggered_race( + [ + lambda: coro(0), + lambda: coro(1), + do_set, + ], + delay=0.1, + ) + self.assertIs(winner, 0) + self.assertIs(index, 0) + self.assertEqual(len(excs), 3) + self.assertIsNone(excs[0], None) + self.assertIsInstance(excs[1], asyncio.CancelledError) + self.assertIsInstance(excs[2], asyncio.CancelledError) From c936c0f2a5ee45cc38cbf684b4ac12cf76994a0f Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Tue, 1 Oct 2024 14:41:10 +0100 Subject: [PATCH 2/4] in asyncio.staggered wait for the current task to resume before starting work on child tasks --- Lib/asyncio/staggered.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/Lib/asyncio/staggered.py b/Lib/asyncio/staggered.py index c3a7441a7b091d..747cf4305cb224 100644 --- a/Lib/asyncio/staggered.py +++ b/Lib/asyncio/staggered.py @@ -69,7 +69,8 @@ async def staggered_race(coro_fns, delay, *, loop=None): exceptions = [] running_tasks = [] - async def run_one_coro(previous_failed) -> None: + async def run_one_coro(ok_to_start, previous_failed) -> None: + await ok_to_start.wait() # Wait for the previous task to finish, or for delay seconds if previous_failed is not None: with contextlib.suppress(exceptions_mod.TimeoutError): @@ -85,8 +86,10 @@ async def run_one_coro(previous_failed) -> None: return # Start task that will run the next coroutine this_failed = locks.Event() - next_task = loop.create_task(run_one_coro(this_failed)) + next_ok_to_start = locks.Event() + next_task = loop.create_task(run_one_coro(next_ok_to_start, this_failed)) running_tasks.append(next_task) + next_ok_to_start.set() assert len(running_tasks) == this_index + 2 # Prepare place to put this coroutine's exceptions if not won exceptions.append(None) @@ -116,8 +119,10 @@ async def run_one_coro(previous_failed) -> None: if i != this_index: t.cancel() - first_task = loop.create_task(run_one_coro(None)) + ok_to_start = locks.Event() + first_task = loop.create_task(run_one_coro(ok_to_start, None)) running_tasks.append(first_task) + ok_to_start.set() try: # Wait for a growing list of tasks to all finish: poor man's version of # curio's TaskGroup or trio's nursery From 2c928f63b3cbcf0d41de8822c53f9f2488112753 Mon Sep 17 00:00:00 2001 From: "blurb-it[bot]" <43283697+blurb-it[bot]@users.noreply.github.com> Date: Tue, 1 Oct 2024 13:47:05 +0000 Subject: [PATCH 3/4] =?UTF-8?q?=F0=9F=93=9C=F0=9F=A4=96=20Added=20by=20blu?= =?UTF-8?q?rb=5Fit.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../next/Library/2024-10-01-13-46-58.gh-issue-124390.dK1Zcm.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 Misc/NEWS.d/next/Library/2024-10-01-13-46-58.gh-issue-124390.dK1Zcm.rst diff --git a/Misc/NEWS.d/next/Library/2024-10-01-13-46-58.gh-issue-124390.dK1Zcm.rst b/Misc/NEWS.d/next/Library/2024-10-01-13-46-58.gh-issue-124390.dK1Zcm.rst new file mode 100644 index 00000000000000..89610fa44bf743 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2024-10-01-13-46-58.gh-issue-124390.dK1Zcm.rst @@ -0,0 +1 @@ +Fixed :exc:`AssertionError` when using :func:`!asyncio.staggered.staggered_race` with :attr:`asyncio.eager_task_factory`. From da797fc0c7cb3510ae627cd9973061c0620e048b Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Tue, 8 Oct 2024 09:50:22 +0100 Subject: [PATCH 4/4] add comments --- Lib/asyncio/staggered.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/Lib/asyncio/staggered.py b/Lib/asyncio/staggered.py index 27c3600a851289..0f4df8855a80b9 100644 --- a/Lib/asyncio/staggered.py +++ b/Lib/asyncio/staggered.py @@ -70,6 +70,9 @@ async def staggered_race(coro_fns, delay, *, loop=None): running_tasks = [] async def run_one_coro(ok_to_start, previous_failed) -> None: + # in eager tasks this waits for the calling task to append this task + # to running_tasks, in regular tasks this wait is a no-op that does + # not yield a future. See gh-124309. await ok_to_start.wait() # Wait for the previous task to finish, or for delay seconds if previous_failed is not None: @@ -89,6 +92,8 @@ async def run_one_coro(ok_to_start, previous_failed) -> None: next_ok_to_start = locks.Event() next_task = loop.create_task(run_one_coro(next_ok_to_start, this_failed)) running_tasks.append(next_task) + # next_task has been appended to running_tasks so next_task is ok to + # start. next_ok_to_start.set() assert len(running_tasks) == this_index + 2 # Prepare place to put this coroutine's exceptions if not won @@ -122,6 +127,7 @@ async def run_one_coro(ok_to_start, previous_failed) -> None: ok_to_start = locks.Event() first_task = loop.create_task(run_one_coro(ok_to_start, None)) running_tasks.append(first_task) + # first_task has been appended to running_tasks so first_task is ok to start. ok_to_start.set() try: # Wait for a growing list of tasks to all finish: poor man's version of