@@ -66,8 +66,27 @@ async def staggered_race(coro_fns, delay, *, loop=None):
66
66
enum_coro_fns = enumerate (coro_fns )
67
67
winner_result = None
68
68
winner_index = None
69
+ unhandled_exceptions = []
69
70
exceptions = []
70
- running_tasks = []
71
+ running_tasks = set ()
72
+ on_completed_fut = None
73
+
74
+ def task_done (task ):
75
+ running_tasks .discard (task )
76
+ if (
77
+ on_completed_fut is not None
78
+ and not on_completed_fut .done ()
79
+ and not running_tasks
80
+ ):
81
+ on_completed_fut .set_result (None )
82
+
83
+ if task .cancelled ():
84
+ return
85
+
86
+ exc = task .exception ()
87
+ if exc is None :
88
+ return
89
+ unhandled_exceptions .append (exc )
71
90
72
91
async def run_one_coro (ok_to_start , previous_failed ) -> None :
73
92
# in eager tasks this waits for the calling task to append this task
@@ -91,11 +110,11 @@ async def run_one_coro(ok_to_start, previous_failed) -> None:
91
110
this_failed = locks .Event ()
92
111
next_ok_to_start = locks .Event ()
93
112
next_task = loop .create_task (run_one_coro (next_ok_to_start , this_failed ))
94
- running_tasks .append (next_task )
113
+ running_tasks .add (next_task )
114
+ next_task .add_done_callback (task_done )
95
115
# next_task has been appended to running_tasks so next_task is ok to
96
116
# start.
97
117
next_ok_to_start .set ()
98
- assert len (running_tasks ) == this_index + 2
99
118
# Prepare place to put this coroutine's exceptions if not won
100
119
exceptions .append (None )
101
120
assert len (exceptions ) == this_index + 1
@@ -120,31 +139,36 @@ async def run_one_coro(ok_to_start, previous_failed) -> None:
120
139
# up as done() == True, cancelled() == False, exception() ==
121
140
# asyncio.CancelledError. This behavior is specified in
122
141
# https://bugs.python.org/issue30048
123
- for i , t in enumerate (running_tasks ):
124
- if i != this_index :
142
+ current_task = tasks .current_task (loop )
143
+ for t in running_tasks :
144
+ if t is not current_task :
125
145
t .cancel ()
126
146
127
- ok_to_start = locks .Event ()
128
- first_task = loop .create_task (run_one_coro (ok_to_start , None ))
129
- running_tasks .append (first_task )
130
- # first_task has been appended to running_tasks so first_task is ok to start.
131
- ok_to_start .set ()
147
+ propagate_cancellation_error = None
132
148
try :
133
- # Wait for a growing list of tasks to all finish: poor man's version of
134
- # curio's TaskGroup or trio's nursery
135
- done_count = 0
136
- while done_count != len (running_tasks ):
137
- done , _ = await tasks .wait (running_tasks )
138
- done_count = len (done )
149
+ ok_to_start = locks .Event ()
150
+ first_task = loop .create_task (run_one_coro (ok_to_start , None ))
151
+ running_tasks .add (first_task )
152
+ first_task .add_done_callback (task_done )
153
+ # first_task has been appended to running_tasks so first_task is ok to start.
154
+ ok_to_start .set ()
155
+ propagate_cancellation_error = None
156
+ # Make sure no tasks are left running if we leave this function
157
+ while running_tasks :
158
+ on_completed_fut = loop .create_future ()
159
+ try :
160
+ await on_completed_fut
161
+ except exceptions_mod .CancelledError as ex :
162
+ propagate_cancellation_error = ex
163
+ for task in running_tasks :
164
+ task .cancel (* ex .args )
165
+ on_completed_fut = None
166
+ if __debug__ and unhandled_exceptions :
139
167
# If run_one_coro raises an unhandled exception, it's probably a
140
168
# programming error, and I want to see it.
141
- if __debug__ :
142
- for d in done :
143
- if d .done () and not d .cancelled () and d .exception ():
144
- raise d .exception ()
169
+ raise ExceptionGroup ("staggered race failed" , unhandled_exceptions )
170
+ if propagate_cancellation_error is not None :
171
+ raise propagate_cancellation_error
145
172
return winner_result , winner_index , exceptions
146
173
finally :
147
- del exceptions
2364
148
- # Make sure no tasks are left running if we leave this function
149
- for t in running_tasks :
150
- t .cancel ()
174
+ del exceptions , propagate_cancellation_error , unhandled_exceptions
0 commit comments