8000 gh-96471: Add queue shutdown by EpicWink · Pull Request #96474 · python/cpython · GitHub
[go: up one dir, main page]

Skip to content

gh-96471: Add queue shutdown #96474

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 60 additions & 1 deletion Lib/asyncio/queues.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
__all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty')
__all__ = (
'Queue',
'PriorityQueue',
'LifoQueue',
'QueueFull',
'QueueEmpty',
'QueueShutDown',
)

import collections
import enum
import heapq
from types import GenericAlias

Expand All @@ -18,6 +26,17 @@ class QueueFull(Exception):
pass


class QueueShutDown(Exception):
"""Raised when putting on to or getting from a shut-down Queue."""
pass


class _QueueState(enum.Enum):
alive = "alive"
shutdown = "shutdown"
shutdown_immediate = "shutdown-immediate"


class Queue(mixins._LoopBoundMixin):
"""A queue, useful for coordinating producer and consumer coroutines.

Expand All @@ -41,6 +60,7 @@ def __init__(self, maxsize=0):
self._finished = locks.Event()
self._finished.set()
self._init(maxsize)
self._shutdown_state = _QueueState.alive

# These three are overridable in subclasses.

Expand Down Expand Up @@ -113,6 +133,8 @@ async def put(self, item):
Put an item into the queue. If the queue is full, wait until a free
slot is available before adding item.
"""
if self._shutdown_state != _QueueState.alive:
raise QueueShutDown
while self.full():
putter = self._get_loop().create_future()
self._putters.append(putter)
Expand All @@ -132,13 +154,17 @@ async def put(self, item):
# the call. Wake up the next in line.
self._wakeup_next(self._putters)
raise
if self._shutdown_state != _QueueState.alive:
raise QueueShutDown
return self.put_nowait(item)

def put_nowait(self, item):
"""Put an item into the queue without blocking.

If no free slot is immediately available, raise QueueFull.
"""
if self._shutdown_state != _QueueState.alive:
raise QueueShutDown
if self.full():
raise QueueFull
self._put(item)
Expand All @@ -151,7 +177,11 @@ async def get(self):

If queue is empty, wait until an item is available.
"""
if self._shutdown_state == _QueueState.shutdown_immediate:
raise QueueShutDown
while self.empty():
if self._shutdown_state != _QueueState.alive:
raise QueueShutDown
getter = self._get_loop().create_future()
self._getters.append(getter)
try:
Expand All @@ -170,6 +200,8 @@ async def get(self):
# the call. Wake up the next in line.
self._wakeup_next(self._getters)
raise
if self._shutdown_state == _QueueState.shutdown_immediate:
raise QueueShutDown
return self.get_nowait()

def get_nowait(self):
Expand All @@ -178,7 +210,11 @@ def get_nowait(self):
Return an item if one is immediately available, else raise QueueEmpty.
"""
if self.empty():
if self._shutdown_state != _QueueState.alive:
raise QueueShutDown
raise QueueEmpty
elif self._shutdown_state == _QueueState.shutdown_immediate:
raise QueueShutDown
item = self._get()
self._wakeup_next(self._putters)
return item
Expand Down Expand Up @@ -214,6 +250,29 @@ async def join(self):
if self._unfinished_tasks > 0:
await self._finished.wait()

def shutdown(self, immediate=False):
"""Shut-down the queue, making queue gets and puts raise.

By default, gets will only raise once the queue is empty. Set
'immediate' to True to make gets raise immediately instead.

All blocked callers of put() will be unblocked, and also get()
and join() if 'immediate'. The QueueShutDown exception is raised.
"""
if immediate:
self._shutdown_state = _QueueState.shutdown_immediate
while self._getters:
getter = self._getters.popleft()
if not getter.done():
getter.set_result(None)
else:
self._shutdown_state = _QueueState.shutdown
while self._putters:
putter = self._putters.popleft()
if not putter.done():
putter.set_result(None)
# Release 'joined' tasks/coros
self._finished.set()

class PriorityQueue(Queue):
"""A subclass of Queue; retrieves entries in priority order (lowest first).
Expand Down
34 changes: 33 additions & 1 deletion Lib/multiprocessing/queues.py
1E0A
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
import types
import weakref
import errno
import ctypes

from queue import Empty, Full
from queue import Empty, Full, ShutDown

import _multiprocessing

Expand All @@ -28,6 +29,10 @@

from .util import debug, info, Finalize, register_after_fork, is_exiting

_queue_alive = 0
_queue_shutdown = 1
_queue_shutdown_immediate = 2

#
# Queue type using a pipe, buffer and thread
#
Expand All @@ -50,6 +55,9 @@ def __init__(self, maxsize=0, *, ctx):
# For use by concurrent.futures
self._ignore_epipe = False
self._reset()
self._shutdown_state = context._default_context.Value(
ctypes.c_uint8, lock=self._rlock
)

if sys.platform != 'win32':
register_after_fork(self, Queue._after_fork)
Expand Down Expand Up @@ -86,20 +94,28 @@ def _reset(self, after_fork=False):
def put(self, obj, block=True, timeout=None):
if self._closed:
raise ValueError(f"Queue {self!r} is closed")
if self._shutdown_state.value != _queue_alive:
raise ShutDown
if not self._sem.acquire(block, timeout):
raise Full

with self._notempty:
if self._shutdown_state.value != _queue_alive:
raise ShutDown
if self._thread is None:
self._start_thread()
self._buffer.append(obj)
self._notempty.notify()

def get(self, block=True, timeout=None):
if self._shutdown_state.value == _queue_shutdown_immediate:
raise ShutDown
if self._closed:
raise ValueError(f"Queue {self!r} is closed")
if block and timeout is None:
with self._rlock:
if self._shutdown_state.value != _queue_alive:
raise ShutDown
res = self._recv_bytes()
self._sem.release()
else:
Expand All @@ -111,13 +127,19 @@ def get(self, block=True, timeout=None):
if block:
timeout = deadline - time.monotonic()
if not self._poll(timeout):
if self._shutdown_state.value != _queue_alive:
raise ShutDown
raise Empty
if self._shutdown_state.value != _queue_alive :
raise ShutDown
elif not self._poll():
raise Empty
res = self._recv_bytes()
self._sem.release()
finally:
self._rlock.release()
if self._shutdown_state.value == _queue_shutdown:
raise ShutDown
# unserialize the data after having released the lock
return _ForkingPickler.loads(res)

Expand Down Expand Up @@ -158,6 +180,14 @@ def cancel_join_thread(self):
except AttributeError:
pass

def shutdown(self, immediate=False):
with self._rlock:
if immediate:
self._shutdown_state = _queue_shutdown_immediate
self._notempty.notify_all()
else:
self._shutdown_state = _queue_shutdown

def _start_thread(self):
debug('Queue._start_thread()')

Expand Down Expand Up @@ -329,6 +359,8 @@ def task_done(self):

def join(self):
with self._cond:
if self._shutdown_state.value == _queue_shutdown_immediate:
return
if not self._unfinished_tasks._semlock._is_zero():
self._cond.wait()

Expand Down
56 changes: 56 additions & 0 deletions Lib/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ class Full(Exception):
pass


class ShutDown(Exception):
'''Raised when put/get with shut-down queue.'''


_queue_alive = "alive"
_queue_shutdown = "shutdown"
_queue_shutdown_immediate = "shutdown-immediate"


class Queue:
'''Create a queue object with a given maximum size.

Expand Down Expand Up @@ -54,6 +63,9 @@ def __init__(self, maxsize=0):
self.all_tasks_done = threading.Condition(self.mutex)
self.unfinished_tasks = 0

# Queue shut-down state
self.shutdown_state = _queue_alive

def task_done(self):
'''Indicate that a formerly enqueued task is complete.

Expand Down Expand Up @@ -87,6 +99,8 @@ def join(self):
'''
with self.all_tasks_done:
while self.unfinished_tasks:
if self.shutdown_state == _queue_shutdown_immediate:
return
self.all_tasks_done.wait()

def qsize(self):
Expand Down Expand Up @@ -130,6 +144,8 @@ def put(self, item, block=True, timeout=None):
is immediately available, else raise the Full exception ('timeout'
is ignored in that case).
'''
if self.shutdown_state != _queue_alive:
raise ShutDown
with self.not_full:
if self.maxsize > 0:
if not block:
Expand All @@ -138,6 +154,8 @@ def put(self, item, block=True, timeout=None):
elif timeout is None:
while self._qsize() >= self.maxsize:
self.not_full.wait()
if self.shutdown_state != _queue_alive:
raise ShutDown
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
Expand All @@ -147,6 +165,8 @@ def put(self, item, block=True, timeout=None):
if remaining <= 0.0:
raise Full
self.not_full.wait(remaining)
if self.shutdown_state != _queue_alive:
raise ShutDown
self._put(item)
self.unfinished_tasks += 1
self.not_empty.notify()
Expand All @@ -162,22 +182,36 @@ def get(self, block=True, timeout=None):
available, else raise the Empty exception ('timeout' is ignored
in that case).
'''
if self.shutdown_state == _queue_shutdown_immediate:
raise ShutDown
with self.not_empty:
if not block:
if not self._qsize():
if self.shutdown_state != _queue_alive:
raise ShutDown
raise Empty
elif timeout is None:
while not self._qsize():
if self.shutdown_state != _queue_alive:
raise ShutDown
self.not_empty.wait()
if self.shutdown_state != _queue_alive:
raise ShutDown
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = time() + timeout
while not self._qsize():
if self.shutdown_state != _queue_alive:
raise ShutDown
remaining = endtime - time()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
if self.shutdown_state != _queue_alive:
raise ShutDown
if self.shutdown_state == _queue_shutdown_immediate:
raise ShutDown
item = self._get()
self.not_full.notify()
return item
Expand All @@ -198,6 +232,28 @@ def get_nowait(self):
'''
return self.get(block=False)

def shutdown(self, immediate=False):
'''Shut-down the queue, making queue gets and puts raise.

By default, gets will only raise once the queue is empty. Set
'immediate' to True to make gets raise immediately instead.

All blocked callers of put() will be unblocked, and also get()
and join() if 'immediate'. The ShutDown exception is raised.
'''
with self.mutex:
if immediate:
self.shutdown_state = _queue_shutdown_immediate
self.not_empty.notify_all()
# set self.unfinished_tasks to 0
# to break the loop in 'self.join()'
# when quits from `wait()`
self.unfinished_tasks = 0
self.all_tasks_done.notify_all()
else:
self.shutdown_state = _queue_shutdown
self.not_full.notify_all()

# Override these methods to implement other queue organizations
# (e.g. stack or priority queue).
# These will only be called with appropriate locks held
Expand Down
Loading
0