10000 [3.12] gh-128479: fix asyncio staggered race leaking tasks, and loggi… · python/cpython@e94939c · GitHub
[go: up one dir, main page]

Skip to content

Commit e94939c

Browse files
miss-islingtongraingertZeroIntensity
authored
[3.12] gh-128479: fix asyncio staggered race leaking tasks, and logging unhandled exception.append exception (GH-128475) (#129228)
gh-128479: fix asyncio staggered race leaking tasks, and logging unhandled exception.append exception (GH-128475) (cherry picked from commit ec91e1c) Co-authored-by: Thomas Grainger <tagrain@gmail.com> Co-authored-by: Peter Bierma <zintensitydev@gmail.com>
1 parent bb7c54d commit e94939c

File tree

3 files changed

+76
-24
lines changed

3 files changed

+76
-24
lines changed

Lib/asyncio/staggered.py

Lines changed: 48 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,27 @@ async def staggered_race(coro_fns, delay, *, loop=None):
6666
enum_coro_fns = enumerate(coro_fns)
6767
winner_result = None
6868
winner_index = None
69+
unhandled_exceptions = []
6970
exceptions = []
70-
running_tasks = []
71+
running_tasks = set()
72+
on_completed_fut = None
73+
74+
def task_done(task):
75+
running_tasks.discard(task)
76+
if (
77+
on_completed_fut is not None
78+
and not on_completed_fut.done()
79+
and not running_tasks
80+
):
81+
on_completed_fut.set_result(None)
82+
83+
if task.cancelled():
84+
return
85+
86+
exc = task.exception()
87+
if exc is None:
88+
return
89+
unhandled_exceptions.append(exc)
7190

7291
async def run_one_coro(ok_to_start, previous_failed) -> None:
7392
# in eager tasks this waits for the calling task to append this task
@@ -91,11 +110,11 @@ async def run_one_coro(ok_to_start, previous_failed) -> None:
91110
this_failed = locks.Event()
92111
next_ok_to_start = locks.Event()
93112
next_task = loop.create_task(run_one_coro(next_ok_to_start, this_failed))
94-
running_tasks.append(next_task)
113+
running_tasks.add(next_task)
114+
next_task.add_done_callback(task_done)
95115
# next_task has been appended to running_tasks so next_task is ok to
96116
# start.
97117
next_ok_to_start.set()
98-
assert len(running_tasks) == this_index + 2
99118
# Prepare place to put this coroutine's exceptions if not won
100119
exceptions.append(None)
101120
assert len(exceptions) == this_index + 1
@@ -120,31 +139,36 @@ async def run_one_coro(ok_to_start, previous_failed) -> None:
120139
# up as done() == True, cancelled() == False, exception() ==
121140
# asyncio.CancelledError. This behavior is specified in
122141
# https://bugs.python.org/issue30048
123-
for i, t in enumerate(running_tasks):
124-
if i != this_index:
142+
current_task = tasks.current_task(loop)
143+
for t in running_tasks:
144+
if t is not current_task:
125145
t.cancel()
126146

127-
ok_to_start = locks.Event()
128-
first_task = loop.create_task(run_one_coro(ok_to_start, None))
129-
running_tasks.append(first_task)
130-
# first_task has been appended to running_tasks so first_task is ok to start.
131-
ok_to_start.set()
147+
propagate_cancellation_error = None
132148
try:
133-
# Wait for a growing list of tasks to all finish: poor man's version of
134-
# curio's TaskGroup or trio's nursery
135-
done_count = 0
136-
while done_count != len(running_tasks):
137-
done, _ = await tasks.wait(running_tasks)
138-
done_count = len(done)
149+
ok_to_start = locks.Event()
150+
first_task = loop.create_task(run_one_coro(ok_to_start, None))
151+
running_tasks.add(first_task)
152+
first_task.add_done_callback(task_done)
153+
# first_task has been appended to running_tasks so first_task is ok to start.
154+
ok_to_start.set()
155+
propagate_cancellation_error = None
156+
# Make sure no tasks are left running if we leave this function
157+
while running_tasks:
158+
on_completed_fut = loop.create_future()
159+
try:
160+
await on_completed_fut
161+
except exceptions_mod.CancelledError as ex:
162+
propagate_cancellation_error = ex
163+
for task in running_tasks:
164+
task.cancel(*ex.args)
165+
on_completed_fut = None
166+
if __debug__ and unhandled_exceptions:
139167
# If run_one_coro raises an unhandled exception, it's probably a
140168
# programming error, and I want to see it.
141-
if __debug__:
142-
for d in done:
143-
if d.done() and not d.cancelled() and d.exception():
144-
raise d.exception()
169+
raise ExceptionGroup("staggered race failed", unhandled_exceptions)
170+
if propagate_cancellation_error is not None:
171+
raise propagate_cancellation_error
145172
return winner_result, winner_index, exceptions
146173
finally:
147-
del exceptions
148-
# Make sure no tasks are left running if we leave this function
149-
for t in running_tasks:
150-
t.cancel()
174+
del exceptions, propagate_cancellation_error, unhandled_exceptions

Lib/test/test_asyncio/test_staggered.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,3 +122,30 @@ async def do_set():
122122
self.assertIsNone(excs[0], None)
123123
self.assertIsInstance(excs[1], asyncio.CancelledError)
124124
self.assertIsInstance(excs[2], asyncio.CancelledError)
125+
126+
127+
async def test_cancelled(self):
128+
log = []
129+
with self.assertRaises(TimeoutError):
130+
async with asyncio.timeout(None) as cs_outer, asyncio.timeout(None) as cs_inner:
131+
async def coro_fn():
132+
cs_inner.reschedule(-1)
133+
await asyncio.sleep(0)
134+
try:
135+
await asyncio.sleep(0)
136+
except asyncio.CancelledError:
137+
log.append("cancelled 1")
138+
139+
cs_outer.reschedule(-1)
140+
await asyncio.sleep(0)
141+
try:
142+
await asyncio.sleep(0)
143+
except asyncio.CancelledError:
144+
log.append("cancelled 2")
145+
try:
146+
await staggered_race([coro_fn], delay=None)
147+
except asyncio.CancelledError:
148+
log.append("cancelled 3")
149+
raise
150+
151+
self.assertListEqual(log, ["cancelled 1", "cancelled 2", "cancelled 3"])
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix :func:`!asyncio.staggered.staggered_race` leaking tasks and issuing an unhandled exception.

0 commit comments

Comments
 (0)
0