-
-
Notifications
You must be signed in to change notification settings - Fork 32.4k
GH-111693: Propagate correct asyncio.CancelledError instance out of asyncio.Condition.wait() #111694
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-111693: Propagate correct asyncio.CancelledError instance out of asyncio.Condition.wait() #111694
Changes from 5 commits
c3aff47
111e74a
6d41771
4e16fa7
ad56f29
30e0ec4
6d59820
c060f40
62bd6cd
01046c5
8802dfa
f7c103c
1572b2b
dce420e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -95,6 +95,9 @@ async def acquire(self): | |
This method blocks until the lock is unlocked, then sets it to | ||
locked and returns True. | ||
""" | ||
# Implement fair scheduling, where thread always waits | ||
# its turn. | ||
# Jumping the queue if all are cancelled is an optimization. | ||
if (not self._locked and (self._waiters is None or | ||
all(w.cancelled() for w in self._waiters))): | ||
self._locked = True | ||
|
@@ -105,19 +108,16 @@ async def acquire(self): | |
fut = self._get_loop().create_future() | ||
self._waiters.append(fut) | ||
|
||
# Finally block should be called before the CancelledError | ||
# handling as we don't want CancelledError to call | ||
# _wake_up_first() and attempt to wake up itself. | ||
gvanrossum marked this conversation as resolved.
Show resolved
Hide resolved
|
||
try: | ||
try: | ||
await fut | ||
finally: | ||
self._waiters.remove(fut) | ||
except exceptions.CancelledError: | ||
await fut | ||
except BaseException: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm still not very comfortable with widening this exception net for the purpose of sneaking in support for your library. If we want that to be supported we should make it an explicit feature, everywhere, rather than just tweaking an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fair point. I quite understand that you are reluctant to touch this just to humour an eccentric experimental library which is trying to push the envelope of the original design. And I'm happy to have this rejected as long as we have had the chance to discuss it. I'll add a comment here in the mean time, just for safety. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IIUC you're planning to make your library to use a subclass of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure. Of course, IMO, it would be even greater if in some future version, we would have something like: class InterruptError(BaseException):
pass
class CancelledError(InterruptError):
pass but that is future music :) |
||
self._waiters.remove(fut) | ||
if not self._locked: | ||
# Error occurred after release() was called, must re-do release. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment makes things more mysterious to me, not less. If we're precise, we get to this exception handles whenever If I had to explain in plain English what was going on here, I'd have to say something like "The acquiring task was cancelled after the lock was released, but before it resumed execution. (This is one of these things that's easy to forget is possible when casually reading async code.) We're going to re-raise the cancellation here, but we should first wake up the next task waiting for the lock, if there are any." I guess you summarized the condition as "after release() was called", which I think is too concise. Thinking all this over, I am also thinking there's an incredibly subtle invariant going on here regarding Maybe there's a way of thinking about this that makes it easier to prove to oneself that the code here is correct, without such a "global" analysis? Maybe the invariant is something like "every cancelled future in the list corresponds to a task that will eventually be woken up, at which point it will remove itself from the list and call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you for your thoughts. Yes, the comment was too concise and your version is better, but thinking more about it, it is not really true either. This here is not a case of "cancelled after lock was released". this is because release() does not do any modification of the _waiters queue itself. All it does is
I go over the self._waiters in a comment below. Here is how I follow the logic:
Looking at the case you mention:
Not sure how to formulate that into an invariant, but something like this: if not self._locked and not claiming_lock, and self._waiters:
# ensure that the head of the queue will run
head = self.waiters[0]
head.set_result(True) In other words, if the lock is not claimed, then the head of the queue, if any, should no longer be blocked. |
||
self._wake_up_first() | ||
raise | ||
|
||
self._waiters.remove(fut) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now I look at this more carefully, I think I like the original version using nested (Note that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, I guess it makes it simpler to reason about, good to know that nested blocks have no overhead anymore. |
||
self._locked = True | ||
return True | ||
|
||
|
@@ -269,17 +269,22 @@ async def wait(self): | |
self._waiters.remove(fut) | ||
|
||
finally: | ||
# Must reacquire lock even if wait is cancelled | ||
cancelled = False | ||
# Must reacquire lock even if wait is cancelled. | ||
gvanrossum marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# We only catch CancelledError here, since we don't want any | ||
# other (fatal) errors with the future to cause us to spin. | ||
gvanrossum marked this conversation as resolved.
Show resolved
Hide resolved
|
||
err = None | ||
while True: | ||
try: | ||
await self.acquire() | ||
break | ||
except exceptions.CancelledError: | ||
cancelled = True | ||
except exceptions.CancelledError as e: | ||
err = e | ||
|
||
if cancelled: | ||
raise exceptions.CancelledError | ||
if err: | ||
try: | ||
raise err # re-raise same exception instance | ||
kristjanvalur marked this conversation as resolved.
Show resolved
Hide resolved
|
||
finally: | ||
err = None # brake reference cycles | ||
kristjanvalur marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
async def wait_for(self, predicate): | ||
"""Wait until a predicate becomes true. | ||
|
@@ -378,20 +383,18 @@ async def acquire(self): | |
fut = self._get_loop().create_future() | ||
self._waiters.append(fut) | ||
|
||
# Finally block should be called before the CancelledError | ||
# handling as we don't want CancelledError to call | ||
# _wake_up_first() and attempt to wake up itself. | ||
try: | ||
try: | ||
await fut | ||
finally: | ||
self._waiters.remove(fut) | ||
except exceptions.CancelledError: | ||
if not fut.cancelled(): | ||
await fut | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same thing as before -- I actually like the nested There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fair point, especially if it costs nothing. |
||
except BaseException: | ||
self._waiters.remove(fut) | ||
if fut.done() and not fut.cancelled(): | ||
gvanrossum marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# Error occurred after release() was called for us. Must undo | ||
# the bookkeeping done there and retry. | ||
self._value += 1 | ||
self._wake_up_next() | ||
raise | ||
|
||
self._waiters.remove(fut) | ||
if self._value > 0: | ||
self._wake_up_next() | ||
return True | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -758,6 +758,63 @@ async def test_timeout_in_block(self): | |
with self.assertRaises(asyncio.TimeoutError): | ||
await asyncio.wait_for(condition.wait(), timeout=0.5) | ||
|
||
async def test_cancelled_error_wakeup(self): | ||
"""Test that a cancelled error, received when awaiting wakeup | ||
will be re-raised un-modified. | ||
""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please convert the docstrings on the tests to comments -- test docstrings tend to be printed by the test runner, messing up the neat output. Note how no other tests here have docstrings. |
||
wake = False | ||
raised = None | ||
cond = asyncio.Condition() | ||
|
||
async def func(): | ||
nonlocal raised | ||
async with cond: | ||
with self.assertRaises(asyncio.CancelledError) as err: | ||
await cond.wait_for(lambda: wake) | ||
raised = err.exception | ||
raise raised | ||
|
||
task = asyncio.create_task(func()) | ||
await asyncio.sleep(0) | ||
# Task is waiting on the condition, cancel it there | ||
task.cancel(msg="foo") | ||
with self.assertRaises(asyncio.CancelledError) as err: | ||
await task | ||
self.assertEqual(err.exception.args, ("foo",)) | ||
# we should have got the _same_ exception instance as the one originally raised | ||
self.assertIs(err.exception, raised) | ||
|
||
async def test_cancelled_error_re_aquire(self): | ||
"""Test that a cancelled error, received when re-aquiring lock, | ||
will be re-raised un-modified. | ||
""" | ||
wake = False | ||
raised = None | ||
cond = asyncio.Condition() | ||
|
||
async def func(): | ||
nonlocal raised | ||
async with cond: | ||
with self.assertRaises(asyncio.CancelledError) as err: | ||
await cond.wait_for(lambda: wake) | ||
raised = err.exception | ||
raise raised | ||
|
||
task = asyncio.create_task(func()) | ||
await asyncio.sleep(0) | ||
# Task is waiting on the condition | ||
await cond.acquire() | ||
wake = True | ||
cond.notify() | ||
await asyncio.sleep(0) | ||
# task is now trying to re-acquire the lock, cancel it there | ||
task.cancel(msg="foo") | ||
cond.release() | ||
with self.assertRaises(asyncio.CancelledError) as err: | ||
await task | ||
self.assertEqual(err.exception.args, ("foo",)) | ||
# we should have got the _same_ exception instance as the one originally raised | ||
self.assertIs(err.exception, raised) | ||
|
||
class SemaphoreTests(unittest.IsolatedAsyncioTestCase): | ||
|
||
|
Uh oh!
There was an error while loading. Please reload this page.