8000 bpo-46771: Implement asyncio context managers for handling timeouts by asvetlov · Pull Request #31394 · python/cpython · GitHub
[go: up one dir, main page]

Skip to content

bpo-46771: Implement asyncio context managers for handling timeouts #31394

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 31 commits into from
Mar 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
dac5874
bpo-46771: Implement asyncio context managers for handling timeouts
asvetlov Mar 2, 2022
374ff2a
Add reschedule tests
asvetlov Mar 2, 2022
70fa59b
Add reschedule tests
asvetlov Mar 2, 2022
3ae2af6
Dro breakpoint
asvetlov Mar 2, 2022
1654ec4
Tune repr
asvetlov Mar 2, 2022
2c9dbf8
Add tests
asvetlov Mar 2, 2022
baa7400
More tests
asvetlov Mar 2, 2022
1ca0fb8
Rename
asvetlov Mar 2, 2022
8a81dd1
Update Lib/asyncio/timeouts.py
asvetlov Mar 8, 2022
fae235d
Update Lib/asyncio/timeouts.py
asvetlov Mar 8, 2022
663b82f
Update Lib/asyncio/timeouts.py
asvetlov Mar 8, 2022
24d62d1
Update Lib/asyncio/timeouts.py
asvetlov Mar 8, 2022
6a26d1b
Add a test
asvetlov Mar 8, 2022
930b92b
Polish docstrings
asvetlov Mar 8, 2022
ac5c53d
Format
asvetlov Mar 8, 2022
f96ad1c
Tune tests
asvetlov Mar 8, 2022
94b4b4c
Fix comment
asvetlov Mar 8, 2022
388c6da
Tune comment
asvetlov Mar 8, 2022
cdc7f88
Tune docstrings
asvetlov Mar 8, 2022
9949fe4
Tune
asvetlov Mar 8, 2022
c716856
Tune tests
asvetlov Mar 8, 2022
b4889a0
Update Lib/test/test_asyncio/test_timeouts.py
asvetlov Mar 8, 2022
b6504e6
Tune tests
asvetlov Mar 8, 2022
fd2688d
Don't clobber foreign exceptions even if timeout is expiring
gvanrossum Mar 9, 2022
2ddda69
Add test from discussion
gvanrossum Mar 9, 2022
493545a
Fix indent of added test
gvanrossum Mar 9, 2022
ff36f2a
Disable slow callback warning
asvetlov Mar 9, 2022
8790e49
Reformat
asvetlov Mar 9, 2022
ac6f8c8
Increase delay
asvetlov Mar 9, 2022
e8c67ce
Don't raise TimeoutError if the CancelledError was swallowed by inner…
asvetlov Mar 9, 2022
e65d766
Don't duplicate py/c tests, timeout has no C accelerators
asvetlov Mar 10, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Lib/asyncio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from .subprocess import *
from .tasks import *
from .taskgroups import *
from .timeouts import *
from .threads import *
from .transports import *

Expand All @@ -34,6 +35,7 @@
subprocess.__all__ +
tasks.__all__ +
threads.__all__ +
timeouts.__all__ +
transports.__all__)

if sys.platform == 'win32': # pragma: no cover
Expand Down
151 changes: 151 additions & 0 deletions Lib/asyncio/timeouts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import enum

from types import TracebackType
from typing import final, Optional, Type

from . import events
from . import exceptions
from . import tasks


__all__ = (
"Timeout",
"timeout",
"timeout_at",
)


class _State(enum.Enum):
CREATED = "created"
ENTERED = "active"
EXPIRING = "expiring"
EXPIRED = "expired"
EXITED = "finished"


@final
class Timeout:

def __init__(self, when: Optional[float]) -> None:
self._state = _State.CREATED

self._timeout_handler: Optional[events.TimerHandle] = None
self._task: Optional[tasks.Task] = None
self._when = when

def when(self) -> Optional[float]:
return self._when

def reschedule(self, when: Optional[float]) -> None:
assert self._state is not _State.CREATED
if self._state is not _State.ENTERED:
raise RuntimeError(
f"Cannot change state of {self._state.value} Timeout",
)

self._when = when

if self._timeout_handler is not None:
self._timeout_handler.cancel()

if when is None:
self._timeout_ha 10000 ndler = None
else:
loop = events.get_running_loop()
self._timeout_handler = loop.call_at(
when,
self._on_timeout,
)

def expired(self) -> bool:
"""Is timeout expired during execution?"""
return self._state in (_State.EXPIRING, _State.EXPIRED)

def __repr__(self) -> str:
info = ['']
if self._state is _State.ENTERED:
when = round(self._when, 3) if self._when is not None else None
info.append(f"when={when}")
info_str = ' '.join(info)
return f"<Timeout [{self._state.value}]{info_str}>"

async def __aenter__(self) -> "Timeout":
self._state = _State.ENTERED
self._task = tasks.current_task()
if self._task is None:
raise RuntimeError("Timeout should be used inside a task")
self.reschedule(self._when)
return self

async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> Optional[bool]:
assert self._state in (_State.ENTERED, _State.EXPIRING)

if self._timeout_handler is not None:
self._timeout_handler.cancel()
self._timeout_handler = None

if self._state is _State.EXPIRING:
self._state = _State.EXPIRED

if self._task.uncancel() == 0 and exc_type is exceptions.CancelledError:
# Since there are no outstanding cancel requests, we're
# handling this.
raise TimeoutError
elif self._state is _State.ENTERED:
self._state = _State.EXITED

return None

def _on_timeout(self) -> None:
assert self._state is _State.ENTERED
self._task.cancel()
self._state = _State.EXPIRING
# drop the reference early
self._timeout_handler = None


def timeout(delay: Optional[float]) -> Timeout:
"""Timeout async context manager.

Useful in cases when you want to apply timeout logic around block
of code or in cases when asyncio.wait_for is not suitable. For example:

>>> async with asyncio.timeout(10): # 10 seconds timeout
... await long_running_task()


delay - value in seconds or None to disable timeout logic

long_running_task() is interrupted by raising asyncio.CancelledError,
the top-most affected timeout() context manager converts CancelledError
into TimeoutError.
"""
loop = events.get_running_loop()
return Timeout(loop.time() + delay if delay is not None else None)


def timeout_at(when: Optional[float]) -> Timeout:
"""Schedule the timeout at absolute time.

Like timeout() but argument gives absolute time in the same clock system
as loop.time().

Please note: it is not POSIX time but a time with
undefined starting base, e.g. the time of the system power on.

>>> async with asyncio.timeout_at(loop.time() + 10):
... await long_running_task()


when - a deadline when timeout occurs or None to disable timeout logic

long_running_task() is interrupted by raising asyncio.CancelledError,
the top-most affected timeout() context manager converts CancelledError
into TimeoutError.
"""
return Timeout(when)
229 changes: 229 additions & 0 deletions Lib/test/test_asyncio/test_timeouts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
"""Tests for asyncio/timeouts.py"""

import unittest
import time

import asyncio
from asyncio import tasks


def tearDownModule():
asyncio.set_event_loop_policy(None)


class TimeoutTests(unittest.IsolatedAsyncioTestCase):

async def test_timeout_basic(self):
with self.assertRaises(TimeoutError):
async with asyncio.timeout(0.01) as cm:
await asyncio.sleep(10)
self.assertTrue(cm.expired())

async def test_timeout_at_basic(self):
loop = asyncio.get_running_loop()

with self.assertRaises(TimeoutError):
deadline = loop.time() + 0.01
async with asyncio.timeout_at(deadline) as cm:
await asyncio.sleep(10)
self.assertTrue(cm.expired())
self.assertEqual(deadline, cm.when())

async def test_nested_timeouts(self):
loop = asyncio.get_running_loop()
cancelled = False
with self.assertRaises(TimeoutError):
deadline = loop.time() + 0.01
async with asyncio.timeout_at(deadline) as cm1:
# Only the topmost context manager should raise TimeoutError
try:
async with asyncio.timeout_at(deadline) as cm2:
await asyncio.sleep(10)
except asyncio.CancelledError:
cancelled = True
raise
self.assertTrue(cancelled)
self.assertTrue(cm1.expired())
self.assertTrue(cm2.expired())

async def test_waiter_cancelled(self):
loop = asyncio.get_running_loop()
cancelled = False
with self.assertRaises(TimeoutError):
async with asyncio.timeout(0.01):
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
cancelled = True
raise
self.assertTrue(cancelled)

async def test_timeout_not_called(self):
loop = asyncio.get_running_loop()
t0 = loop.time()
async with asyncio.timeout(10) as cm:
await asyncio.sleep(0.01)
t1 = loop.time()

self.assertFalse(cm.expired())
# 2 sec for slow CI boxes
self.assertLess(t1-t0, 2)
self.assertGreater(cm.when(), t1)

async def test_timeout_disabled(self):
loop = asyncio.get_running_loop()
t0 = loop.time()
async with asyncio.timeout(None) as cm:
await asyncio.sleep(0.01)
t1 = loop.time()

self.assertFalse(cm.expired())
self.assertIsNone(cm.when())
# 2 sec for slow CI boxes
self.assertLess(t1-t0, 2)

async def test_timeout_at_disabled(self):
loop = asyncio.get_running_loop()
t0 = loop.time()
async with asyncio.timeout_at(None) as cm:
await asyncio.sleep(0.01)
t1 = loop.time()

self.assertFalse(cm.expired())
self.assertIsNone(cm.when())
# 2 sec for slow CI boxes
self.assertLess(t1-t0, 2)

async def test_timeout_zero(self):
loop = asyncio.get_running_loop()
t0 = loop.time()
with self.assertRaises(TimeoutError):
async with asyncio.timeout(0) as cm:
await asyncio.sleep(10)
t1 = loop.time()
self.assertTrue(cm.expired())
# 2 sec for slow CI boxes
self.assertLess(t1-t0, 2)
self.assertTrue(t0 <= cm.when() <= t1)

async def test_foreign_exception_passed(self):
with self.assertRaises(KeyError):
async with asyncio.timeout(0.01) as cm:
raise KeyError
self.assertFalse(cm.expired())

async def test_foreign_exception_on_timeout(self):
async def crash():
try:
await asyncio.sleep(1)
finally:
1/0
with self.assertRaises(ZeroDivisionError):
async with asyncio.timeout(0.01):
await crash()

async def test_foreign_cancel_doesnt_timeout_if_not_expired(self):
with self.assertRaises(asyncio.CancelledError):
async with asyncio.timeout(10) as cm:
asyncio.current_task().cancel()
await asyncio.sleep(10)
self.assertFalse(cm.expired())

async def test_outer_task_is_not_cancelled(self):
async def outer() -> None:
with self.assertRaises(TimeoutError):
async with asyncio.timeout(0.001):
await asyncio.sleep(10)

task = asyncio.create_task(outer())
await task
self.assertFalse(task.cancelled())
self.assertTrue(task.done())

async def test_nested_timeouts_concurrent(self):
with self.assertRaises(TimeoutError):
async with asyncio.timeout(0.002):
with self.assertRaises(TimeoutError):
async with asyncio.timeout(0.1):
# Pretend we crunch some numbers.
time.sleep(0.01)
await asyncio.sleep(1)

async def test_nested_timeouts_loop_busy(self):
# After the inner timeout is an expensive operation which should
# be stopped by the outer timeout.
loop = asyncio.get_running_loop()
# Disable a message about long running task
loop.slow_callback_duration = 10
t0 = loop.time()
with self.assertRaises(TimeoutError):
async with asyncio.timeout(0.1): # (1)
with self.assertRaises(TimeoutError):
async with asyncio.timeout(0.01): # (2)
# Pretend the loop is busy for a while.
time.sleep(0.1)
await asyncio.sleep(1)
# TimeoutError was cought by (2)
await asyncio.sleep(10) # This sleep should be interrupted by (1)
t1 = loop.time()
self.assertTrue(t0 <= t1 <= t0 + 1)

async def test_reschedule(self):
loop = asyncio.get_running_loop()
fut = loop.create_future()
deadline1 = loop.time() + 10
deadline2 = deadline1 + 20

async def f():
async with asyncio.timeout_at(deadline1) as cm:
fut.set_result(cm)
await asyncio.sleep(50)

task = asyncio.create_task(f())
cm = await fut

self.assertEqual(cm.when(), deadline1)
cm.reschedule(deadline2)
self.assertEqual(cm.when(), deadline2)
cm.reschedule(None)
self.assertIsNone(cm.when())

task.cancel()

with self.assertRaises(asyncio.CancelledError):
await task
self.assertFalse(cm.expired())

async def test_repr_active(self):
async with asyncio.timeout(10) as cm:
self.assertRegex(repr(cm), r"<Timeout \[active\] when=\d+\.\d*>")

async def test_repr_expired(self):
with self.assertRaises(TimeoutError):
async with asyncio.timeout(0.01) as cm:
await asyncio.sleep(10)
self.assertEqual(repr(cm), "<Timeout [expired]>")

async def test_repr_finished(self):
async with asyncio.timeout(10) as cm:
await asyncio.sleep(0)

self.assertEqual(repr(cm), "<Timeout [finished]>")

async def test_repr_disabled(self):
async with asyncio.timeout(None) as cm:
self.assertEqual(repr(cm), r"<Timeout [active] when=None>")

async def test_nested_timeout_in_finally(self):
with self.assertRaises(TimeoutError):
async with asyncio.timeout(0.01):
try:
await asyncio.sleep(1)
finally:
with self.assertRaises(TimeoutError):
async with asyncio.timeout(0.01):
await asyncio.sleep(10)


if __name__ == '__main__':
unittest.main()
Loading
0