8000 Merge branch 'master' into lock-backend · IBMZ-Linux-OSS-Python/anyio@5f98ae9 · GitHub
[go: up one dir, main page]

Skip to content

Commit 5f98ae9

Browse files
authored
Merge branch 'master' into lock-backend
2 parents 3dcc8df + 053e8f0 commit 5f98ae9

File tree

10 files changed

+190
-69
lines changed

10 files changed

+190
-69
lines changed

.github/pull_request_template.md

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,7 @@ If there are no entries after the last release, use `**UNRELEASED**` as the vers
2424
If, say, your patch fixes issue #123, the entry should look like this:
2525

2626
`* Fix big bad boo-boo in task groups (#123
27-
<https://github.com/agronholm/anyio/issues/123>_; PR by Yourname)`
27+
<https://github.com/agronholm/anyio/issues/123>_; PR by @yourgithubaccount)`
2828

2929
If there's no issue linked, just link to your pull request instead by updating the
3030
changelog after you've created the PR.
31-
32-
If possible, use your real name in the changelog entry. If not, use your GitHub
33-
username.

docs/versionhistory.rst

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ Version history
33

44
This library adheres to `Semantic Versioning 2.0 <http://semver.org/>`_.
55

6-
**UNRELEASED**
6+
**4.4.0**
77

88
- Added an opt-in performance optimization that decreases AnyIO's overhead (compared to
99
native calls on the selected async backend) by locking in the first automatically
@@ -14,14 +14,24 @@ This library adheres to `Semantic Versioning 2.0 <http://semver.org/>`_.
1414
portals
1515
- Added ``__slots__`` to ``AsyncResource`` so that child classes can use ``__slots__``
1616
(`#733 <https://github.com/agronholm/anyio/pull/733>`_; PR by Justin Su)
17-
- Fixed a race condition that caused crashes when multiple event loops of the same
18-
backend were running in separate threads and simultaneously attempted to use AnyIO for
19-
their first time (`#425 <https://github.com/agronholm/anyio/issues/425>`_; PR by David
20-
Jiricek and Ganden Schaffner)
17+
- Added the ``TaskInfo.has_pending_cancellation()`` method
2118
- Fixed erroneous ``RuntimeError: called 'started' twice on the same task status``
2219
when cancelling a task in a TaskGroup created with the ``start()`` method before
2320
the first checkpoint is reached after calling ``task_status.started()``
2421
(`#706 <https://github.com/agronholm/anyio/issues/706>`_; PR by Dominik Schwabe)
22+
- Fixed two bugs with ``TaskGroup.start()`` on asyncio:
23+
24+
* Fixed erroneous ``RuntimeError: called 'started' twice on the same task status``
25+
when cancelling a task in a TaskGroup created with the ``start()`` method before
26+
the first checkpoint is reached after calling ``task_status.started()``
27+
(`#706 <https://github.com/agronholm/anyio/issues/706>`_; PR by Dominik Schwabe)
28+
* Fixed the entire task group being cancelled if a ``TaskGroup.start()`` call gets
29+
cancelled (`#685 <https://github.com/agronholm/anyio/issues/685>`_,
30+
`#710 <https://github.com/agronholm/anyio/issues/710>`_)
31+
- Fixed a race condition that caused crashes when multiple event loops of the same
32+
backend were running in separate threads and simultaneously attempted to use AnyIO for
33+
their first time (`#425 <https://github.com/agronholm/anyio/issues/425>`_; PR by David
34+
Jiricek and Ganden Schaffner)
2535
- Fixed cancellation delivery on asyncio incrementing the wrong cancel scope's
2636
cancellation counter when cascading a cancel operation to a child scope, thus failing
2737
to uncancel the host task (`#716 <https://github.com/agronholm/anyio/issues/716>`_)
@@ -31,6 +41,9 @@ This library adheres to `Semantic Versioning 2.0 <http://semver.org/>`_.
3141
variable when setting the ``debug`` flag in ``anyio.run()``
3242
- Fixed ``SocketStream.receive()`` not detecting EOF on asyncio if there is also data in
3343
the read buffer (`#701 <https://github.com/agronholm/anyio/issues/701>`_)
44+
- Fixed ``MemoryObjectStream`` dropping an item if the item is delivered to a recipient
45+
that is waiting to receive an item but has a cancellation pending
46+
(`#728 <https://github.com/agronholm/anyio/issues/728>`_)
3447
- Emit a ``ResourceWarning`` for ``MemoryObjectReceiveStream`` and
3548
``MemoryObjectSendStream`` that were garbage collected without being closed (PR by
3649
Andrey Kazantcev)

src/anyio/_backends/_asyncio.py

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import socket
88
import sys
99
import threading
10+
import weakref
1011
from asyncio import (
1112
AbstractEventLoop,
1213
CancelledError,
@@ -596,14 +597,14 @@ class TaskState:
596597
itself because there are no guarantees about its implementation.
597598
"""
598599

599-
__slots__ = "parent_id", "cancel_scope"
600+
__slots__ = "parent_id", "cancel_scope", "__weakref__"
600601

601602
def __init__(self, parent_id: int | None, cancel_scope: CancelScope | None):
602603
self.parent_id = parent_id
603604
self.cancel_scope = cancel_scope
604605

605606

606-
_task_states = WeakKeyDictionary() # type: WeakKeyDictionary[asyncio.Task, TaskState]
607+
_task_states: WeakKeyDictionary[asyncio.Task, TaskState] = WeakKeyDictionary()
607608

608609

609610
#
@@ -714,6 +715,12 @@ def task_done(_task: asyncio.Task) -> None:
714715
exc = e
715716

716717
if exc is not None:
718+
# The future can only be in the cancelled state if the host task was
719+
# cancelled, so return immediately instead of adding one more
720+
# CancelledError to the exceptions list
721+
if task_status_future is not None and task_status_future.cancelled():
722+
return
723+
717724
if task_status_future is None or task_status_future.done():
718725
if not isinstance(exc, CancelledError):
719726
self._exceptions.append(exc)
@@ -1827,14 +1834,36 @@ async def __anext__(self) -> Signals:
18271834
#
18281835

18291836

1830-
def _create_task_info(task: asyncio.Task) -> TaskInfo:
1831-
task_state = _task_states.get(task)
1832-
if task_state is None:
1833-
parent_id = None
1834-
else:
1835-
parent_id = task_state.parent_id
1837+
class AsyncIOTaskInfo(TaskInfo):
1838+
def __init__(self, task: asyncio.Task):
1839+
task_state = _task_states.get(task)
1840+
if task_state is None:
1841+
parent_id = None
1842+
else:
1843+
parent_id = task_state.parent_id
18361844

1837-
return TaskInfo(id(task), parent_id, task.get_name(), task.get_coro())
1845+
super().__init__(id(task), parent_id, task.get_name(), task.get_coro())
1846+
self._task = weakref.ref(task)
1847+
1848+
def has_pending_cancellation(self) -> bool:
1849+
if not (task := self._task()):
1850+
# If the task isn't around anymore, it won't have a pending cancellation
1851+
return False
1852+
1853+
if sys.version_info >= (3, 11):
1854+
if task.cancelling():
1855+
return True
1856+
elif (
1857+
isinstance(task._fut_waiter, asyncio.Future)
1858+
and task._fut_waiter.cancelled()
1859+
):
1860+
return True
1861+
1862+
if task_state := _task_states.get(task):
1863+
if cancel_scope := task_state.cancel_scope:
1864+
return cancel_scope.cancel_called or cancel_scope._parent_cancelled()
1865+
1866+
return False
18381867

18391868

18401869
class TestRunner(abc.TestRunner):
@@ -2452,11 +2481,11 @@ def open_signal_receiver(
24522481

24532482
@classmethod
24542483
def get_current_task(cls) -> TaskInfo:
2455-
return _create_task_info(current_task()) # type: ignore[arg-type]
2484+
return AsyncIOTaskInfo(current_task()) # type: ignore[arg-type]
24562485

24572486
@classmethod
2458-
def get_running_tasks(cls) -> list[TaskInfo]:
2459-
return [_create_task_info(task) for task in all_tasks() if not task.done()]
2487+
def get_running_tasks(cls) -> Sequence[TaskInfo]:
2488+
return [AsyncIOTaskInfo(task) for task in all_tasks() if not task.done()]
24602489

24612490
@classmethod
24622491
async def wait_all_tasks_blocked(cls) -> None:

src/anyio/_backends/_trio.py

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import socket
66
import sys
77
import types
8+
import weakref
89
from collections.abc import AsyncIterator, Iterable
910
from concurrent.futures import Future
1011
from dataclasses import dataclass
@@ -839,6 +840,24 @@ def run_test(
839840
self._call_in_runner_task(test_func, **kwargs)
840841

841842

843+
class TrioTaskInfo(TaskInfo):
844+
def __init__(self, task: trio.lowlevel.Task):
845+
parent_id = None
846+
if task.parent_nursery and task.parent_nursery.parent_task:
847+
parent_id = id(task.parent_nursery.parent_task)
848+
849+
super().__init__(id(task), parent_id, task.name, task.coro)
850+
self._task = weakref.proxy(task)
851+
852+
def has_pending_cancellation(self) -> bool:
853+
try:
854+
return self._task._cancel_status.effectively_cancelled
855+
except ReferenceError:
856+
# If the task is no longer around, it surely doesn't have a cancellation
857+
# pending
858+
return False
859+
860+
842861
class TrioBackend(AsyncBackend):
843862
@classmethod
844863
def run(
@@ -1125,28 +1144,19 @@ def open_signal_receiver(
11251144
@classmethod
11261145
def get_current_task(cls) -> TaskInfo:
11271146
task = current_task()
1128-
1129-
parent_id = None
1130-
if task.parent_nursery F438 and task.parent_nursery.parent_task:
1131-
parent_id = id(task.parent_nursery.parent_task)
1132-
1133-
return TaskInfo(id(task), parent_id, task.name, task.coro)
1147+
return TrioTaskInfo(task)
11341148

11351149
@classmethod
1136-
def get_running_tasks(cls) -> list[TaskInfo]:
1150+
def get_running_tasks(cls) -> Sequence[TaskInfo]:
11371151
root_task = current_root_task()
11381152
assert root_task
1139-
task_infos = [TaskInfo(id(root_task), None, root_task.name, root_task.coro)]
1153+
task_infos = [TrioTaskInfo(root_task)]
11401154
nurseries = root_task.child_nurseries
11411155
while nurseries:
11421156
new_nurseries: list[trio.Nursery] = []
11431157
for nursery in nurseries:
11441158
for task in nursery.child_tasks:
1145-
task_infos.append(
1146-
TaskInfo(
1147-
id(task), id(nursery.parent_task), task.name, task.coro
1148-
)
1149-
)
1159+
task_infos.append(TrioTaskInfo(task))
11501160
new_nurseries.extend(task.child_nurseries)
11511161

11521162
nurseries = new_nurseries

src/anyio/_core/_testing.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from __future__ import annotations
22

33
from collections.abc import Awaitable, Generator
4-
from typing import Any
4+
from typing import Any, cast
55

66
from ._eventloop import get_async_backend
77

@@ -45,8 +45,12 @@ def __hash__(self) -> int:
4545
def __repr__(self) -> str:
4646
return f"{self.__class__.__name__}(id={self.id!r}, name={self.name!r})"
4747

48-
def _unwrap(self) -> TaskInfo:
49-
return self
48+
def has_pending_cancellation(self) -> bool:
49+
"""
50+
Return ``True`` if the task has a cancellation pending, ``False`` otherwise.
51+
52+
"""
53+
return False
5054

5155

5256
def get_current_task() -> TaskInfo:
@@ -66,7 +70,7 @@ def get_running_tasks() -> list[TaskInfo]:
6670
:return: a list of task info objects
6771
6872
"""
69-
return get_async_backend().get_running_tasks()
73+
return cast("list[TaskInfo]", get_async_backend().get_running_tasks())
7074

7175

7276
async def wait_all_tasks_blocked() -> None:

src/anyio/abc/_eventloop.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,7 @@ def get_current_task(cls) -> TaskInfo:
376376

377377
@classmethod
378378
@abstractmethod
379-
def get_running_tasks(cls) -> list[TaskInfo]:
379+
def get_running_tasks(cls) -> Sequence[TaskInfo]:
380380
pass
381381

382382
@classmethod

src/anyio/streams/memory.py

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
EndOfStream,
1313
WouldBlock,
1414
)
15+
from .._core._testing import TaskInfo, get_current_task
1516
from ..abc import Event, ObjectReceiveStream, ObjectSendStream
1617
from ..lowlevel import checkpoint
1718

@@ -32,13 +33,19 @@ class MemoryObjectStreamStatistics(NamedTuple):
3233
tasks_waiting_receive: int
3334

3435

36+
@dataclass(eq=False)
37+
class MemoryObjectItemReceiver(Generic[T_Item]):
38+
task_info: TaskInfo = field(init=False, default_factory=get_current_task)
39+
item: T_Item = field(init=False)
40+
41+
3542
@dataclass(eq=False)
3643
class MemoryObjectStreamState(Generic[T_Item]):
3744
max_buffer_size: float = field()
3845
buffer: deque[T_Item] = field(init=False, default_factory=deque)
3946
open_send_channels: int = field(init=False, default=0)
4047
open_receive_channels: int = field(init=False, default=0)
41-
waiting_receivers: OrderedDict[Event, list[T_Item]] = field(
48+
waiting_receivers: OrderedDict[Event, MemoryObjectItemReceiver[T_Item]] = field(
4249
init=False, default_factory=OrderedDict
4350
)
4451
waiting_senders: OrderedDict[Event, T_Item] = field(
@@ -99,17 +106,17 @@ async def receive(self) -> T_co:
99106
except WouldBlock:
100107
# Add ourselves in the queue
101108
receive_event = Event()
102-
container: list[T_co] = []
103-
self._state.waiting_receivers[receive_event] = container
109+
receiver = MemoryObjectItemReceiver[T_co]()
110+
self._state.waiting_receivers[receive_event] = receiver
104111

105112
try:
106113
await receive_event.wait()
107114
finally:
108115
self._state.waiting_receivers.pop(receive_event, None)
109116

110-
if container:
111-
return container[0]
112-
else:
117+
try:
118+
return receiver.item
119+
except AttributeError:
113120
raise EndOfStream
114121

115122
def clone(self) -> MemoryObjectReceiveStream[T_co]:
@@ -199,11 +206,14 @@ def send_nowait(self, item: T_contra) -> None:
199206
if not self._state.open_receive_channels:
200207
raise BrokenResourceError
201208

202-
if self._state.waiting_receivers:
203-
receive_event, container = self._state.waiting_receivers.popitem(last=False)
204-
container.append(item)
205-
receive_event.set()
206-
elif len(self._state.buffer) < self._state.max_buffer_size:
209+
while self._state.waiting_receivers:
210+
receive_event, receiver = self._state.waiting_receivers.popitem(last=False)
211+
if not receiver.task_info.has_pending_cancellation():
212+
receiver.item = item
213+
receive_event.set()
214+
return
215+
216+
if len(self._state.buffer) < self._state.max_buffer_size:
207217
self._state.buffer.append(item)
208218
else:
209219
raise WouldBlock

0 commit comments

Comments
 (0)
0