From 2518c76b534298a06a030bec7090b074a1d04498 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Sat, 4 Jan 2025 08:36:28 +0000 Subject: [PATCH 1/8] fix asyncio staggered leaking tasks, and logging unhandled exceptions.append exception --- Lib/asyncio/staggered.py | 75 ++++++++++++++++++++++++++-------------- 1 file changed, 49 insertions(+), 26 deletions(-) diff --git a/Lib/asyncio/staggered.py b/Lib/asyncio/staggered.py index 0f4df8855a80b9..0378fa44b73735 100644 --- a/Lib/asyncio/staggered.py +++ b/Lib/asyncio/staggered.py @@ -66,8 +66,25 @@ async def staggered_race(coro_fns, delay, *, loop=None): enum_coro_fns = enumerate(coro_fns) winner_result = None winner_index = None - exceptions = [] - running_tasks = [] + running_tasks = set() + on_completed_fut = None + + def task_done(task): + running_tasks.discard(task) + if ( + on_completed_fut is not None + and not on_completed_fut.done() + and not running_tasks + ): + on_completed_fut.set_result(True) + + if task.cancelled(): + return + + exc = task.exception() + if exc is None: + return + unhandled_exceptions.append(exc) async def run_one_coro(ok_to_start, previous_failed) -> None: # in eager tasks this waits for the calling task to append this task @@ -91,11 +108,11 @@ async def run_one_coro(ok_to_start, previous_failed) -> None: this_failed = locks.Event() next_ok_to_start = locks.Event() next_task = loop.create_task(run_one_coro(next_ok_to_start, this_failed)) - running_tasks.append(next_task) - # next_task has been appended to running_tasks so next_task is ok to + running_tasks.add(next_task) + next_task.add_done_callback(task_done) +# next_task has been appended to running_tasks so next_task is ok to # start. next_ok_to_start.set() - assert len(running_tasks) == this_index + 2 # Prepare place to put this coroutine's exceptions if not won exceptions.append(None) assert len(exceptions) == this_index + 1 @@ -120,31 +137,37 @@ async def run_one_coro(ok_to_start, previous_failed) -> None: # up as done() == True, cancelled() == False, exception() == # asyncio.CancelledError. This behavior is specified in # https://bugs.python.org/issue30048 - for i, t in enumerate(running_tasks): - if i != this_index: + current_task = tasks.current_task(loop) + for t in running_tasks: + if t is not current_task: t.cancel() - ok_to_start = locks.Event() - first_task = loop.create_task(run_one_coro(ok_to_start, None)) - running_tasks.append(first_task) - # first_task has been appended to running_tasks so first_task is ok to start. - ok_to_start.set() + unhandled_exceptions = [] + exceptions = [] try: - # Wait for a growing list of tasks to all finish: poor man's version of - # curio's TaskGroup or trio's nursery - done_count = 0 - while done_count != len(running_tasks): - done, _ = await tasks.wait(running_tasks) - done_count = len(done) + ok_to_start = locks.Event() + first_task = loop.create_task(run_one_coro(ok_to_start, None)) + running_tasks.add(first_task) + first_task.add_done_callback(task_done) + # first_task has been appended to running_tasks so first_task is ok to start. + ok_to_start.set() + propagate_cancellation_error = None + while running_tasks: + if on_completed_fut is None: + on_completed_fut = loop.create_future() + try: + await on_completed_fut + except exceptions_mod.CancelledError as ex: + propagate_cancellation_error = ex + for task in running_tasks: + task.cancel(*ex.args) + on_completed_fut = None + if __debug__ and unhandled_exceptions: # If run_one_coro raises an unhandled exception, it's probably a # programming error, and I want to see it. - if __debug__: - for d in done: - if d.done() and not d.cancelled() and d.exception(): - raise d.exception() + raise ExceptionGroup("multiple errors in staggered race", unhandled_exceptions) + if propagate_cancellation_error is not None: + raise propagate_cancellation_error return winner_result, winner_index, exceptions finally: - del exceptions - # Make sure no tasks are left running if we leave this function - for t in running_tasks: - t.cancel() + del exceptions, propagate_cancellation_error, unhandled_exceptions From 950187964fb6d29165f115a9b449a8f14d2e9494 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Sat, 4 Jan 2025 09:08:09 +0000 Subject: [PATCH 2/8] add test --- Lib/test/test_asyncio/test_staggered.py | 27 +++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/Lib/test/test_asyncio/test_staggered.py b/Lib/test/test_asyncio/test_staggered.py index 3c81b629693596..ad34aa6da01f54 100644 --- a/Lib/test/test_asyncio/test_staggered.py +++ b/Lib/test/test_asyncio/test_staggered.py @@ -122,3 +122,30 @@ async def do_set(): self.assertIsNone(excs[0], None) self.assertIsInstance(excs[1], asyncio.CancelledError) self.assertIsInstance(excs[2], asyncio.CancelledError) + + + async def test_cancelled(self): + log = [] + with self.assertRaises(TimeoutError): + async with asyncio.timeout(None) as cs_outer, asyncio.timeout(None) as cs_inner: + async def coro_fn(): + cs_inner.reschedule(-1) + await asyncio.sleep(0) + try: + await asyncio.sleep(0) + except asyncio.CancelledError: + log.append("cancelled 1") + + cs_outer.reschedule(-1) + await asyncio.sleep(0) + try: + await asyncio.sleep(0) + except asyncio.CancelledError: + log.append("cancelled 2") + try: + await staggered_race([coro_fn], delay=None) + except asyncio.CancelledError: + log.append("cancelled 3") + raise + + self.assertListEqual(log, ["cancelled 1", "cancelled 2", "cancelled 3"]) From dfe83841abbb316ae95eaf437d041c35e8e02085 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Sat, 4 Jan 2025 09:08:46 +0000 Subject: [PATCH 3/8] Update Lib/asyncio/staggered.py --- Lib/asyncio/staggered.py | 1 + 1 file changed, 1 insertion(+) diff --git a/Lib/asyncio/staggered.py b/Lib/asyncio/staggered.py index 0378fa44b73735..ef9aaff47f36fe 100644 --- a/Lib/asyncio/staggered.py +++ b/Lib/asyncio/staggered.py @@ -152,6 +152,7 @@ async def run_one_coro(ok_to_start, previous_failed) -> None: # first_task has been appended to running_tasks so first_task is ok to start. ok_to_start.set() propagate_cancellation_error = None + # Make sure no tasks are left running if we leave this function while running_tasks: if on_completed_fut is None: on_completed_fut = loop.create_future() From 1c34edc7608910b49e19b6d39181e899c9f537bf Mon Sep 17 00:00:00 2001 From: "blurb-it[bot]" <43283697+blurb-it[bot]@users.noreply.github.com> Date: Sat, 4 Jan 2025 11:10:09 +0000 Subject: [PATCH 4/8] =?UTF-8?q?=F0=9F=93=9C=F0=9F=A4=96=20Added=20by=20blu?= =?UTF-8?q?rb=5Fit.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../next/Library/2025-01-04-11-10-04.gh-issue-128479.jvOrF-.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 Misc/NEWS.d/next/Library/2025-01-04-11-10-04.gh-issue-128479.jvOrF-.rst diff --git a/Misc/NEWS.d/next/Library/2025-01-04-11-10-04.gh-issue-128479.jvOrF-.rst b/Misc/NEWS.d/next/Library/2025-01-04-11-10-04.gh-issue-128479.jvOrF-.rst new file mode 100644 index 00000000000000..c8cef44ad037b4 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2025-01-04-11-10-04.gh-issue-128479.jvOrF-.rst @@ -0,0 +1 @@ +fix `asyncio.staggered.staggered_race` leaking tasks and issuing an unhandled exception From 62c83a212e5ea24c8ce08d8a51a672d236d4a10d Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Sat, 4 Jan 2025 11:12:33 +0000 Subject: [PATCH 5/8] Update Misc/NEWS.d/next/Library/2025-01-04-11-10-04.gh-issue-128479.jvOrF-.rst --- .../next/Library/2025-01-04-11-10-04.gh-issue-128479.jvOrF-.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Misc/NEWS.d/next/Library/2025-01-04-11-10-04.gh-issue-128479.jvOrF-.rst b/Misc/NEWS.d/next/Library/2025-01-04-11-10-04.gh-issue-128479.jvOrF-.rst index c8cef44ad037b4..6adefc8d099d6c 100644 --- a/Misc/NEWS.d/next/Library/2025-01-04-11-10-04.gh-issue-128479.jvOrF-.rst +++ b/Misc/NEWS.d/next/Library/2025-01-04-11-10-04.gh-issue-128479.jvOrF-.rst @@ -1 +1 @@ -fix `asyncio.staggered.staggered_race` leaking tasks and issuing an unhandled exception +fix ``asyncio.staggered.staggered_race`` leaking tasks and issuing an unhandled exception From 4a4c027b672283b074653c6baf610cf5d94e415e Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Sat, 4 Jan 2025 14:20:59 +0000 Subject: [PATCH 6/8] Update Misc/NEWS.d/next/Library/2025-01-04-11-10-04.gh-issue-128479.jvOrF-.rst Co-authored-by: Peter Bierma --- .../next/Library/2025-01-04-11-10-04.gh-issue-128479.jvOrF-.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Misc/NEWS.d/next/Library/2025-01-04-11-10-04.gh-issue-128479.jvOrF-.rst b/Misc/NEWS.d/next/Library/2025-01-04-11-10-04.gh-issue-128479.jvOrF-.rst index 6adefc8d099d6c..fc3b4d5a5273a6 100644 --- a/Misc/NEWS.d/next/Library/2025-01-04-11-10-04.gh-issue-128479.jvOrF-.rst +++ b/Misc/NEWS.d/next/Library/2025-01-04-11-10-04.gh-issue-128479.jvOrF-.rst @@ -1 +1 @@ -fix ``asyncio.staggered.staggered_race`` leaking tasks and issuing an unhandled exception +Fix :func:`!asyncio.staggered.staggered_race` leaking tasks and issuing an unhandled exception. From 9fc105f0f4d25b9683eee1e14d95db26246726df Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Sat, 4 Jan 2025 16:59:51 +0000 Subject: [PATCH 7/8] Apply suggestions from code review --- Lib/asyncio/staggered.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Lib/asyncio/staggered.py b/Lib/asyncio/staggered.py index ef9aaff47f36fe..7abfe1d93b48b5 100644 --- a/Lib/asyncio/staggered.py +++ b/Lib/asyncio/staggered.py @@ -76,7 +76,7 @@ def task_done(task): and not on_completed_fut.done() and not running_tasks ): - on_completed_fut.set_result(True) + on_completed_fut.set_result(None) if task.cancelled(): return @@ -110,7 +110,7 @@ async def run_one_coro(ok_to_start, previous_failed) -> None: next_task = loop.create_task(run_one_coro(next_ok_to_start, this_failed)) running_tasks.add(next_task) next_task.add_done_callback(task_done) -# next_task has been appended to running_tasks so next_task is ok to + # next_task has been appended to running_tasks so next_task is ok to # start. next_ok_to_start.set() # Prepare place to put this coroutine's exceptions if not won @@ -166,7 +166,7 @@ async def run_one_coro(ok_to_start, previous_failed) -> None: if __debug__ and unhandled_exceptions: # If run_one_coro raises an unhandled exception, it's probably a # programming error, and I want to see it. - raise ExceptionGroup("multiple errors in staggered race", unhandled_exceptions) + raise ExceptionGroup("staggered race failed", unhandled_exceptions) if propagate_cancellation_error is not None: raise propagate_cancellation_error return winner_result, winner_index, exceptions From 58f39d91e4a982e02a6033de99c47e5592faab37 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Mon, 6 Jan 2025 08:03:12 +0000 Subject: [PATCH 8/8] Apply suggestions from code review --- Lib/asyncio/staggered.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Lib/asyncio/staggered.py b/Lib/asyncio/staggered.py index 7abfe1d93b48b5..0afed64fdf9c0f 100644 --- a/Lib/asyncio/staggered.py +++ b/Lib/asyncio/staggered.py @@ -66,6 +66,8 @@ async def staggered_race(coro_fns, delay, *, loop=None): enum_coro_fns = enumerate(coro_fns) winner_result = None winner_index = None + unhandled_exceptions = [] + exceptions = [] running_tasks = set() on_completed_fut = None @@ -142,8 +144,7 @@ async def run_one_coro(ok_to_start, previous_failed) -> None: if t is not current_task: t.cancel() - unhandled_exceptions = [] - exceptions = [] + propagate_cancellation_error = None try: ok_to_start = locks.Event() first_task = loop.create_task(run_one_coro(ok_to_start, None)) @@ -154,8 +155,7 @@ async def run_one_coro(ok_to_start, previous_failed) -> None: propagate_cancellation_error = None # Make sure no tasks are left running if we leave this function while running_tasks: - if on_completed_fut is None: - on_completed_fut = loop.create_future() + on_completed_fut = loop.create_future() try: await on_completed_fut except exceptions_mod.CancelledError as ex: