-
-
Notifications
You must be signed in to change notification settings - Fork 32.2k
gh-96471: Add asyncio queue shutdown #104228
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
440a702
fb458db
e5951ac
a72aedd
d5e925d
f3517fb
bd2a7c3
e9ac8de
1e7813a
1275bb6
eec29bb
2c6156f
17f1f32
a233830
420a247
25ad2ac
f3321b4
6d9edd6
1135d85
ddc6ad6
2fa1bd9
aef4063
d49c6dd
5a435a6
c8db40e
ca01ee1
b02c4dd
8deca77
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -296,6 +296,10 @@ asyncio | |
with the tasks being completed. | ||
(Contributed by Justin Arthur in :gh:`77714`.) | ||
|
||
* Add :meth:`asyncio.Queue.shutdown` (along with | ||
:exc:`asyncio.QueueShutDown`) for queue termination. | ||
(Contributed by Laurie Opperman in :gh:`104228`.) | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We've worked well together, I'd be nice if you'd mention me. |
||
base64 | ||
------ | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,11 @@ | ||
__all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty') | ||
__all__ = ( | ||
'Queue', | ||
'PriorityQueue', | ||
'LifoQueue', | ||
'QueueFull', | ||
'QueueEmpty', | ||
'QueueShutDown', | ||
) | ||
|
||
import collections | ||
import heapq | ||
|
@@ -18,6 +25,11 @@ class QueueFull(Exception): | |
pass | ||
|
||
|
||
class QueueShutDown(Exception): | ||
"""Raised when putting on to or getting from a shut-down Queue.""" | ||
pass | ||
|
||
|
||
class Queue(mixins._LoopBoundMixin): | ||
"""A queue, useful for coordinating producer and consumer coroutines. | ||
|
||
|
@@ -41,6 +53,7 @@ def __init__(self, maxsize=0): | |
self._finished = locks.Event() | ||
self._finished.set() | ||
self._init(maxsize) | ||
self._is_shutdown = False | ||
|
||
# These three are overridable in subclasses. | ||
|
||
|
@@ -81,6 +94,8 @@ def _format(self): | |
result += f' _putters[{len(self._putters)}]' | ||
if self._unfinished_tasks: | ||
result += f' tasks={self._unfinished_tasks}' | ||
if self._is_shutdown: | ||
result += ' shutdown' | ||
return result | ||
|
||
def qsize(self): | ||
|
@@ -112,8 +127,12 @@ async def put(self, item): | |
|
||
Put an item into the queue. If the queue is full, wait until a free | ||
slot is available before adding item. | ||
|
||
Raises QueueShutDown if the queue has been shut down. | ||
""" | ||
while self.full(): | ||
if self._is_shutdown: | ||
raise QueueShutDown | ||
putter = self._get_loop().create_future() | ||
self._putters.append(putter) | ||
try: | ||
|
@@ -125,7 +144,7 @@ async def put(self, item): | |
self._putters.remove(putter) | ||
except ValueError: | ||
# The putter could be removed from self._putters by a | ||
# previous get_nowait call. | ||
# previous get_nowait call or a shutdown call. | ||
pass | ||
if not self.full() and not putter.cancelled(): | ||
# We were woken up by get_nowait(), but can't take | ||
|
@@ -138,7 +157,11 @@ def put_nowait(self, item): | |
"""Put an item into the queue without blocking. | ||
|
||
If no free slot is immediately available, raise QueueFull. | ||
|
||
Raises QueueShutDown if the queue has been shut down. | ||
""" | ||
if self._is_shutdown: | ||
raise QueueShutDown | ||
if self.full(): | ||
raise QueueFull | ||
self._put(item) | ||
|
@@ -150,8 +173,13 @@ async def get(self): | |
"""Remove and return an item from the queue. | ||
|
||
If queue is empty, wait until an item is available. | ||
|
||
Raises QueueShutDown if the queue has been shut down and is empty, or | ||
if the queue has been shut down immediately. | ||
""" | ||
while self.empty(): | ||
if self._is_shutdown and self.empty(): | ||
raise QueueShutDown | ||
getter = self._get_loop().create_future() | ||
self._getters.append(getter) | ||
try: | ||
|
@@ -163,7 +191,7 @@ async def get(self): | |
self._getters.remove(getter) | ||
except ValueError: | ||
# The getter could be removed from self._getters by a | ||
# previous put_nowait call. | ||
# previous put_nowait call, or a shutdown call. | ||
pass | ||
if not self.empty() and not getter.cancelled(): | ||
# We were woken up by put_nowait(), but can't take | ||
|
@@ -176,8 +204,13 @@ def get_nowait(self): | |
"""Remove and return an item from the queue. | ||
|
||
Return an item if one is immediately available, else raise QueueEmpty. | ||
|
||
Raises QueueShutDown if the queue has been shut down and is empty, or | ||
if the queue has been shut down immediately. | ||
""" | ||
if self.empty(): | ||
EpicWink marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if self._is_shutdown: | ||
raise QueueShutDown | ||
raise QueueEmpty | ||
item = self._get() | ||
self._wakeup_next(self._putters) | ||
|
@@ -194,6 +227,9 @@ def task_done(self): | |
been processed (meaning that a task_done() call was received for every | ||
item that had been put() into the queue). | ||
|
||
shutdown(immediate=True) calls task_done() for each remaining item in | ||
the queue. | ||
|
||
Raises ValueError if called more times than there were items placed in | ||
the queue. | ||
""" | ||
|
@@ -214,6 +250,32 @@ async def join(self): | |
if self._unfinished_tasks > 0: | ||
await self._finished.wait() | ||
|
||
def shutdown(self, immediate=False): | ||
"""Shut-down the queue, making queue gets and puts raise QueueShutDown. | ||
|
||
By default, gets will only raise once the queue is empty. Set | ||
'immediate' to True to make gets raise immediately instead. | ||
|
||
All blocked callers of put() will be unblocked, and also get() | ||
and join() if 'immediate'. | ||
""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Docstring to modify depending of agree/disagree on my first remark about `blocked callers'.
|
||
self._is_shutdown = True | ||
if immediate: | ||
while not self.empty(): | ||
self._get() | ||
if self._unfinished_tasks > 0: | ||
self._unfinished_tasks -= 1 | ||
if self._unfinished_tasks == 0: | ||
self._finished.set() | ||
while self._getters: | ||
getter = self._getters.popleft() | ||
if not getter.done(): | ||
getter.set_result(None) | ||
while self._putters: | ||
putter = self._putters.popleft() | ||
if not putter.done(): | ||
putter.set_result(None) | ||
|
||
|
||
class PriorityQueue(Queue): | ||
"""A subclass of Queue; retrieves entries in priority order (lowest first). | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry but I have a doubt, shouldn't this documentation block be rather:
In event of change, the docstring of the
shutdown
method must be updated.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
join
callers aren't necessarily even unblocked anyway, if consumers are processing any items. I should probably say that a task is marked as done for each item in the queue if immediate shutdown.Also, I think the threading queue docs are the same.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's very precise, better.
Yes, I commented here so as not to forget (see #117532 (comment)).
English is your native language, I think it'is best if you update documentations and docstrings.
Update: but I can create the follow-up PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've made a PR: #117621