8000 gh-128479: fix asyncio staggered race leaking tasks, and logging unhandled exception.append exception by graingert · Pull Request #128475 · python/cpython · GitHub
[go: up one dir, main page]

Skip to content

gh-128479: fix asyncio staggered race leaking tasks, and logging unhandled exception.append exception #128475

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from 1 commit
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
fix asyncio staggered leaking tasks, and logging unhandled exceptions…
….append exception
  • Loading branch information
graingert committed Jan 4, 2025
commit 2518c76b534298a06a030bec7090b074a1d04498
75 changes: 49 additions & 26 deletions Lib/asyncio/staggered.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,25 @@ async def staggered_race(coro_fns, delay, *, loop=None):
enum_coro_fns = enumerate(coro_fns)
winner_result = None
winner_index = None
exceptions = []
running_tasks = []
running_tasks = set()
on_completed_fut = None

def task_done(task):
running_tasks.discard(task)
if (
on_completed_fut is not None
and not on_completed_fut.done()
and not running_tasks
):
on_completed_fut.set_result(True)

if task.cancelled():
return

exc = task.exception()
if exc is None:
return
unhandled_exceptions.append(exc)

async def run_one_coro(ok_to_start, previous_failed) -> None:
# in eager tasks this waits for the calling task to append this task
Expand All @@ -91,11 +108,11 @@ async def run_one_coro(ok_to_start, previous_failed) -> None:
this_failed = locks.Event()
next_ok_to_start = locks.Event()
next_task = loop.create_task(run_one_coro(next_ok_to_start, this_failed))
running_tasks.append(next_task)
# next_task has been appended to running_tasks so next_task is ok to
running_tasks.add(next_task)
next_task.add_done_callback(task_done)
# next_task has been appended to running_tasks so next_task is ok to
# start.
next_ok_to_start.set()
assert len(running_tasks) == this_index + 2
# Prepare place to put this coroutine's exceptions if not won
exceptions.append(None)
assert len(exceptions) == this_index + 1
Expand All @@ -120,31 +137,37 @@ async def run_one_coro(ok_to_start, previous_failed) -> None:
# up as done() == True, cancelled() == False, exception() ==
# asyncio.CancelledError. This behavior is specified in
# https://bugs.python.org/issue30048
for i, t in enumerate(running_tasks):
if i != this_index:
current_task = tasks.current_task(loop)
for t in running_tasks:
if t is not current_task:
t.cancel()

ok_to_start = locks.Event()
first_task = loop.create_task(run_one_coro(ok_to_start, None))
running_tasks.append(first_task)
# first_task has been appended to running_tasks so first_task is ok to start.
ok_to_start.set()
unhandled_exceptions = []
exceptions = []
try:
# Wait for a growing list of tasks to all finish: poor man's version of
# curio's TaskGroup or trio's nursery
done_count = 0
while done_count != len(running_tasks):
done, _ = await tasks.wait(running_tasks)
done_count = len(done)
ok_to_start = locks.Event()
first_task = loop.create_task(run_one_coro(ok_to_start, None))
running_tasks.add(first_task)
first_task.add_done_callback(task_done)
# first_task has been appended to running_tasks so first_task is ok to start.
ok_to_start.set()
propagate_cancellation_error = None
while running_tasks:
if on_completed_fut is None:
on_completed_fut = loop.create_future()
try:
await on_completed_fut
except exceptions_mod.CancelledError as ex:
propagate_cancellation_error = ex
for task in running_tasks:
task.cancel(*ex.args)
on_completed_fut = None
if __debug__ and unhandled_exceptions:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably this should just always raise it, not sure why this was suppressed in the original code

# If run_one_coro raises an unhandled exception, it's probably a
# programming error, and I want to see it.
if __debug__:
for d in done:
if d.done() and not d.cancelled() and d.exception():
raise d.exception()
raise ExceptionGroup("multiple errors in staggered race", unhandled_exceptions)
if propagate_cancellation_error is not None:
raise propagate_cancellation_error
return winner_result, winner_index, exceptions
finally:
del exceptions
# Make sure no tasks are left running if we leave this function
for t in running_tasks:
t.cancel()
del exceptions, propagate_cancellation_error, unhandled_exceptions
Loading
0