8000 extmod/uasyncio: Fix race with cancelled task waiting on finished task. · codemee/micropython@514bf1a · GitHub
[go: up one dir, main page]

Skip to content

Commit 514bf1a

Browse files
committed
extmod/uasyncio: Fix race with cancelled task waiting on finished task.
This commit fixes a problem with a race between cancellation of task A and completion of task B, when A waits on B. If task B completes just before task A is cancelled then the cancellation of A does not work. Instead, the CancelledError meant to cancel A gets passed through to B (that's expected behaviour) but B handles it as a "Task exception wasn't retrieved" scenario, printing out such a message (this is because finished tasks point their "coro" attribute to themselves to indicate they are done, and implement the throw() method, but that method inadvertently catches the CancelledError). The correct behaviour is for B to bounce that CancelledError back out. This bug is mainly seen when wait_for() is used, and in that context the symptoms are: - occurs when using wait_for(T, S), if the task T being waited on finishes at exactly the same time as the wait-for timeout S expires - task T will have run to completion - the "Task exception wasn't retrieved message" is printed with "<class 'CancelledError'>" as the error (ie no traceback) - the wait_for(T, S) call never returns (it's never put back on the uasyncio run queue) and all tasks waiting on this are blocked forever from running - uasyncio otherwise continues to function and other tasks continue to be scheduled as normal The fix here reworks the "waiting" attribute of Task to be called "state" and uses it to indicate whether a task is: running and not awaited on, running and awaited on, finished and not awaited on, or finished and awaited on. This means the task does not need to point "coro" to itself to indicate finished, and also allows removal of the throw() method. A benefit of this is that "Task exception wasn't retrieved" messages can go back to being able to print the name of the coroutine function. Fixes issue micropython#7386. Signed-off-by: Damien George <damien@micropython.org>
1 parent 8edc3aa commit 514bf1a

File tree

5 files changed

+111
-83
lines changed

5 files changed

+111
-83
lines changed

extmod/moduasyncio.c

Lines changed: 22 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,19 @@
3131

3232
#if MICROPY_PY_UASYNCIO
3333

34+
#define TASK_STATE_RUNNING_NOT_WAITED_ON (mp_const_true)
35+
#define TASK_STATE_DONE_NOT_WAITED_ON (mp_const_none)
36+
#define TASK_STATE_DONE_WAS_WAITED_ON (mp_const_false)
37+
38+
#define TASK_IS_DONE(task) ( \
39+
(task)->state == TASK_STATE_DONE_NOT_WAITED_ON \
40+
|| (task)->state == TASK_STATE_DONE_WAS_WAITED_ON)
41+
3442
typedef struct _mp_obj_task_t {
3543
mp_pairheap_t pairheap;
3644
mp_obj_t coro;
3745
mp_obj_t data;
38-
mp_obj_t waiting;
39-
46+
mp_obj_t state;
4047
mp_obj_t ph_key;
4148
} mp_obj_task_t;
4249

@@ -146,9 +153,6 @@ STATIC const mp_obj_type_t task_queue_type = {
146153
/******************************************************************************/
147154
// Task class
148155

149-
// For efficiency, the task object is stored to the coro entry when the task is done.
150-
#define TASK_IS_DONE(task) ((task)->coro == MP_OBJ_FROM_PTR(task))
151-
152156
// This is the core uasyncio context with cur_task, _task_queue and CancelledError.
153157
STATIC mp_obj_t uasyncio_context = MP_OBJ_NULL;
154158

@@ -159,7 +163,7 @@ STATIC mp_obj_t task_make_new(const mp_obj_type_t *type, size_t n_args, size_t n
159163
mp_pairheap_init_node(task_lt, &self->pairheap);
160164
self->coro = args[0];
161165
self->data = mp_const_none;
162-
self->waiting = mp_const_none;
166+
self->state = TASK_STATE_RUNNING_NOT_WAITED_ON;
163167
self->ph_key = MP_OBJ_NEW_SMALL_INT(0);
164168
if (n_args == 2) {
165169
uasyncio_context = args[1];
@@ -218,24 +222,6 @@ STATIC mp_obj_t task_cancel(mp_obj_t self_in) {
218222
}
219223
STATIC MP_DEFINE_CONST_FUN_OBJ_1(task_cancel_obj, task_cancel);
220224

221-
STATIC mp_obj_t task_throw(mp_obj_t self_in, mp_obj_t value_in) {
222-
// This task raised an exception which was uncaught; handle that now.
223-
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
224-
// Set the data because it was cleared by the main scheduling loop.
225-
self->data = value_in;
< 6D40 /td>
226-
if (self->waiting == mp_const_none) {
227-
// Nothing await'ed on the task so call the exception handler.
228-
mp_obj_t _exc_context = mp_obj_dict_get(uasyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR__exc_context));
229-
mp_obj_dict_store(_exc_context, MP_OBJ_NEW_QSTR(MP_QSTR_exception), value_in);
230-
mp_obj_dict_store(_exc_context, MP_OBJ_NEW_QSTR(MP_QSTR_future), self_in);
231-
mp_obj_t Loop = mp_obj_dict_get(uasyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR_Loop));
232-
mp_obj_t call_exception_handler = mp_load_attr(Loop, MP_QSTR_call_exception_handler);
233-
mp_call_function_1(call_exception_handler, _exc_context);
234-
}
235-
return mp_const_none;
236-
}
237-
STATIC MP_DEFINE_CONST_FUN_OBJ_2(task_throw_obj, task_throw);
238-
239225
STATIC void task_attr(mp_obj_t self_in, qstr attr, mp_obj_t *dest) {
240226
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
241227
if (dest[0] == MP_OBJ_NULL) {
@@ -244,32 +230,24 @@ STATIC void task_attr(mp_obj_t self_in, qstr attr, mp_obj_t *dest) {
244230
dest[0] = self->coro;
9E88 245231
} else if (attr == MP_QSTR_data) {
246232
dest[0] = self->data;
247-
} else if (attr == MP_QSTR_waiting) {
248-
if (self->waiting != mp_const_none && self->waiting != mp_const_false) {
249-
dest[0] = self->waiting;
250-
}
233+
} else if (attr == MP_QSTR_state) {
234+
dest[0] = self->state;
251235
} else if (attr == MP_QSTR_done) {
252236
dest[0] = MP_OBJ_FROM_PTR(&task_done_obj);
253237
dest[1] = self_in;
254238
} else if (attr == MP_QSTR_cancel) {
255239
dest[0] = MP_OBJ_FROM_PTR(&task_cancel_obj);
256240
dest[1] = self_in;
257-
} else if (attr == MP_QSTR_throw) {
258-
dest[0] = MP_OBJ_FROM_PTR(&task_throw_obj);
259-
dest[1] = self_in;
260241
} else if (attr == MP_QSTR_ph_key) {
261242
dest[0] = self->ph_key;
262243
}
263244
} else if (dest[1] != MP_OBJ_NULL) {
264245
// Store
265-
if (attr == MP_QSTR_coro) {
266-
self->coro = dest[1];
267-
dest[0] = MP_OBJ_NULL;
268-
} else if (attr == MP_QSTR_data) {
246+
if (attr == MP_QSTR_data) {
269247
self->data = dest[1];
270248
dest[0] = MP_OBJ_NULL;
271-
} else if (attr == MP_QSTR_waiting) {
272-
self->waiting = dest[1];
249+
} else if (attr == MP_QSTR_state) {
250+
self->state = dest[1];
273251
dest[0] = MP_OBJ_NULL;
274252
}
275253
}
@@ -278,15 +256,12 @@ STATIC void task_attr(mp_obj_t self_in, qstr attr, mp_obj_t *dest) {
278256
STATIC mp_obj_t task_getiter(mp_obj_t self_in, mp_obj_iter_buf_t *iter_buf) {
279257
(void)iter_buf;
280258
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
281-
if (self->waiting == mp_const_none) {
282-
// The is the first access of the "waiting" entry.
283-
if (TASK_IS_DONE(self)) {
284-
// Signal that the completed-task has been await'ed on.
285-
self->waiting = mp_const_false;
286-
} else {
287-
// Lazily allocate the waiting queue.
288-
self->waiting = task_queue_make_new(&task_queue_type, 0, 0, NULL);
289-
}
259+
if (TASK_IS_DONE(self)) {
260+
// Signal that the completed-task has been await'ed on.
261+
self->state = TASK_STATE_DONE_WAS_WAITED_ON;
262+
} else if (self->state == TASK_STATE_RUNNING_NOT_WAITED_ON) {
263+
// Allocate the waiting queue.
264+
self->state = task_queue_make_new(&task_queue_type, 0, 0, NULL);
290265
}
291266
return self_in;
292267
}
@@ -299,7 +274,7 @@ STATIC mp_obj_t task_iternext(mp_obj_t self_in) {
299274
} else {
300275
// Put calling task on waiting queue.
301276
mp_obj_t cur_task = mp_obj_dict_get(uasyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR_cur_task));
302-
mp_obj_t args[2] = { self->waiting, cur_task };
277+
mp_obj_t args[2] = { self->state, cur_task };
303278
task_queue_push_sorted(2, args);
304279
// Set calling task's data to this task that it waits on, to double-link it.
305280
((mp_obj_task_t *)MP_OBJ_TO_PTR(cur_task))->data = self_in;

extmod/uasyncio/core.py

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,10 @@ def run_until_complete(main_task=None):
175175
if not exc:
176176
t.coro.send(None)
177177
else:
178+
# If the task is finished and on the run queue and gets here, then it
179+
# had an exception and was not await'ed on. Throwing into it now will
180+
# raise StopIteration and the code below will catch this and run the
181+
# call_exception_handler function.
178182
t.data = None
179183
t.coro.throw(exc)
180184
except excs_all as er:
@@ -185,22 +189,32 @@ def run_until_complete(main_task=None):
185189
if isinstance(er, StopIteration):
186190
return er.value
187191
raise er
188-
# Schedule any other tasks waiting on the completion of this task
189-
waiting = False
190-
if hasattr(t, "waiting"):
191-
while t.waiting.peek():
192-
_task_queue.push_head(t.waiting.pop_head())
193-
waiting = True
194-
t.waiting = None # Free waiting queue head
195-
if not waiting and not isinstance(er, excs_stop):
196-
# An exception ended this detached task, so queue it for later
197-
# execution to handle the uncaught exception if no other task retrieves
198-
# the exception in the meantime (this is handled by Task.throw).
199-
_task_queue.push_head(t)
200-
# Indicate task is done by setting coro to the task object itself
201-
t.coro = t
202-
# Save return value of coro to pass up to caller
203-
t.data = er
192+
if t.state:
193+
# Task was running but is now finished.
194+
waiting = False
195+
if t.state is True:
196+
# "None" indicates that the task is complete and not await'ed on (yet).
197+
t.state = None
198+
else:
199+
# Schedule any other tasks waiting on the completion of this task.
200+
while t.state.peek():
201+
_task_queue.push_head(t.state.pop_head())
202+
waiting = True
203+
# "False" indicates that the task is complete and has been await'ed on.
204+
t.state = False
205+
if not waiting and not isinstance(er, excs_stop):
206+
# An exception ended this detached task, so queue it for later
207+
# execution to handle the uncaught exception if no other task retrieves
208+
# the exception in the meantime (this is handled by Task.throw).
209+
_task_queue.push_head(t)
210+
# Save return value of coro to pass up to caller.
211+
t.data = er
212+
elif t.state is None:
213+
# Task is already finished and nothing await'ed on the task,
214+
# so call the exception handler.
215+
_exc_context["exception"] = exc
216+
_exc_context["future"] = t
217+
Loop.call_exception_handler(_exc_context)
204218

205219

206220
# Create a new task from a coroutine and run it until it finishes

extmod/uasyncio/task.py

Lines changed: 11 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -123,37 +123,38 @@ class Task:
123123
def __init__(self, coro, globals=None):
124124
self.coro = coro # Coroutine of this Task
125125
self.data = None # General data for queue it is waiting on
126+
self.state = True # None, False, True or a TaskQueue instance
126127
self.ph_key = 0 # Pairing heap
127128
self.ph_child = None # Paring heap
128129
self.ph_child_last = None # Paring heap
129130
self.ph_next = None # Paring heap
130131
self.ph_rightmost_parent = None # Paring heap
131132

132133
def __iter__(self):
133-
if self.coro is self:
134-
# Signal that the completed-task has been await'ed on.
135-
self.waiting = None
136-
elif not hasattr(self, "waiting"):
137-
# Lazily allocated head of linked list of Tasks waiting on completion of this task.
138-
self.waiting = TaskQueue()
134+
if not self.state:
135+
# Task finished, signal that is has been await'ed on.
136+
self.state = False
137+
elif self.state is True:
138+
# Allocated head of linked list of Tasks waiting on completion of this task.
139+
self.state = TaskQueue()
139140
return self
140141

141142
def __next__(self):
142-
if self.coro is self:
143+
if not self.state:
143144
# Task finished, raise return value to caller so it can continue.
144145
raise self.data
145146
else:
146147
# Put calling task on waiting queue.
147-
self.waiting.push_head(core.cur_task)
148+
self.state.push_head(core.cur_task)
148149
# Set calling task's data to this task that it waits on, to double-link it.
149150
core.cur_task.data = self
150151

151152
def done(self):
152-
return self.coro is self
153+
return not self.state
153154

154155
def cancel(self):
155156
# Check if task is already finished.
156-
if self.coro is self:
157+
if not self.state:
157158
return False
158159
# Can't cancel self (not supported yet).
159160
if self is core.cur_task:
@@ -172,13 +173,3 @@ def cancel(self):
172173
core._task_queue.push_head(self)
173174
self.data = core.CancelledError
174175
return True
175-
176-
def throw(self, value):
177-
# This task raised an exception which was uncaught; handle that now.
178-
# Set the data because it was cleared by the main scheduling loop.
179-
self.data = value
180-
if not hasattr(self, "waiting"):
181-
# Nothing await'ed on the task so call the exception handler.
182-
core._exc_context["exception"] = value
183-
core._exc_context["future"] = self
184-
core.Loop.call_exception_handler(core._exc_context)
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# Test cancelling a task that is waiting on a task that just finishes.
2+
3+
try:
4+
import uasyncio as asyncio
5+
except ImportError:
6+
try:
7+
import asyncio
8+
except ImportError:
9+
print("SKIP")
10+
raise SystemExit
11+
12+
13+
async def sleep_task():
14+
print("sleep_task sleep")
15+
await asyncio.sleep(0)
16+
print("sleep_task wake")
17+
18+
19+
async def wait_task(t):
20+
print("wait_task wait")
21+
await t
22+
print("wait_task wake")
23+
24+
25+
async def main():
26+
waiting_task = asyncio.create_task(wait_task(asyncio.create_task(sleep_task())))
27+
28+
print("main sleep")
29+
await asyncio.sleep(0)
30+
print("main sleep")
31+
await asyncio.sleep(0)
32+
33+
waiting_task.cancel()
34+
print("main wait")
35+
try:
36+
await waiting_task
37+
except asyncio.CancelledError as er:
38+
print(repr(er))
39+
40+
41+
asyncio.run(main())
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
main sleep
2+
sleep_task sleep
3+
wait_task wait
4+
main sleep
5+
sleep_task wake
6+
main wait
7+
CancelledError()

0 commit comments

Comments
 (0)
0