8000 gh-96471: Add threading queue shutdown by EpicWink · Pull Request #104750 · python/cpython · GitHub
[go: up one dir, main page]

Skip to content

gh-96471: Add threading queue shutdown #104750

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Prev Previous commit
Next Next commit
Fix queue shutdown
* Include raised exception in docstrings
* Handle queue shutdown in task_done and join
* Factor out queue-state checks and updates to methods
* Logic fixes in qsize, get and shutdown
* Don't set unfinished_tasks to 0 on immediate shutdown
* Updated tests
* Document feature added in 3.13
  • Loading branch information
YvesDup authored and EpicWink committed May 6, 2023
commit 9c2971be6fe5894856c2fd8f636d53185bf18693
4 changes: 2 additions & 2 deletions Doc/library/queue.rst
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ The :mod:`queue` module defines the following classes and exceptions:
Exception raised when :meth:`~Queue.put` or :meth:`~Queue.get` is called on
a :class:`Queue` object which has been shut down.

.. versionadded:: 3.12
.. versionadded:: 3.13


.. _queueobjects:
Expand Down Expand Up @@ -247,7 +247,7 @@ them down.
All blocked callers of put() will be unblocked, and also get()
and join() if *immediate* is true.

.. versionadded:: 3.12
.. versionadded:: 3.13


SimpleQueue Objects
Expand Down
68 changes: 46 additions & 22 deletions Lib/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ class ShutDown(Exception):
_queue_shutdown = "shutdown"
_queue_shutdown_immediate = "shutdown-immediate"


class Queue:
'''Create a queue object with a given maximum size.

Expand Down Expand Up @@ -63,7 +62,7 @@ def __init__(self, maxsize=0):
self.all_tasks_done = threading.Condition(self.mutex)
self.unfinished_tasks = 0

# Queue shut-down state
# Queue shutdown state
self.shutdown_state = _queue_alive

def task_done(self):
Expand All @@ -79,8 +78,12 @@ def task_done(self):

Raises a ValueError if called more times than there were items
placed in the queue.

Raises ShutDown if the queue has been shut down immediately.
'''
with self.all_tasks_done:
if self._is_shutdown_immediate():
raise ShutDown
unfinished = self.unfinished_tasks - 1
if unfinished <= 0:
if unfinished < 0:
Expand All @@ -96,12 +99,16 @@ def join(self):
to indicate the item was retrieved and all work on it is complete.

When the count of unfinished tasks drops to zero, join() unblocks.

Raises ShutDown if the queue has been shut down immediately.
'''
with self.all_tasks_done:
if self._is_shutdown_immediate():
raise ShutDown
while self.unfinished_tasks:
if self.shutdown_state == _queue_shutdown_immediate:
return
self.all_tasks_done.wait()
if self._is_shutdown_immediate():
raise ShutDown

def qsize(self):
'''Return the approximate size of the queue (not reliable!).'''
Expand Down Expand Up @@ -143,18 +150,20 @@ def put(self, item, block=True, timeout=None):
Otherwise ('block' is false), put an item on the queue if a free slot
is immediately available, else raise the Full exception ('timeout'
is ignored in that case).

Raises ShutDown if the queue has been shut down.
'''
if self.shutdown_state != _queue_alive:
raise ShutDown
with self.not_full:
if not self._is_alive():
raise ShutDown
if self.maxsize > 0:
if not block:
if self._qsize() >= self.maxsize:
raise Full
elif timeout is None:
while self._qsize() >= self.maxsize:
self.not_full.wait()
if self.shutdown_state != _queue_alive:
if not self._is_alive():
raise ShutDown
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
Expand All @@ -165,7 +174,7 @@ def put(self, item, block=True, timeout=None):
if remaining <= 0.0:
raise Full
self.not_full.wait(remaining)
if self.shutdown_state != _queue_alive:
if not self._is_alive():
raise ShutDown
self._put(item)
self.unfinished_tasks += 1
Expand All @@ -181,37 +190,33 @@ def get(self, block=True, timeout=None):
Otherwise ('block' is false), return an item if one is immediately
available, else raise the Empty exception ('timeout' is ignored
in that case).

Raises ShutDown if the queue has been shut down and is empty,
or if the queue has been shut down immediately.
'''
if self.shutdown_state == _queue_shutdown_immediate:
raise ShutDown
with self.not_empty:
if self._is_shutdown_immediate() or\
(self._is_shutdown() and not self._qsize()):
raise ShutDown
if not block:
if not self._qsize():
if self.shutdown_state != _queue_alive:
raise ShutDown
raise Empty
elif timeout is None:
while not self._qsize():
if self.shutdown_state != _queue_alive:
raise ShutDown
self.not_empty.wait()
if self.shutdown_state != _queue_alive:
if self._is_shutdown_immediate():
raise ShutDown
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = time() + timeout
while not self._qsize():
if self.shutdown_state != _queue_alive:
raise ShutDown
remaining = endtime - time()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
if self.shutdown_state != _queue_alive:
if self._is_shutdown_immediate():
raise ShutDown
if self.shutdown_state == _queue_shutdown_immediate:
raise ShutDown
item = self._get()
self.not_full.notify()
return item
Expand Down Expand Up @@ -242,14 +247,33 @@ def shutdown(self, immediate=False):
and join() if 'immediate'. The ShutDown exception is raised.
'''
with self.mutex:
if self._is_shutdown_immediate():
return
if immediate:
self.shutdown_state = _queue_shutdown_immediate
self._set_shutdown_immediate()
self.not_empty.notify_all()
# release all blocked threads in `join()`
self.all_tasks_done.notify_all()
else:
self.shutdown_state = _queue_shutdown
self._set_shutdown()
self.not_full.notify_all()

def _is_alive(self):
return self.shutdown_state == _queue_alive

def _is_shutdown(self):
return self.shutdown_state == _queue_shutdown

def _is_shutdown_immediate(self):
return self.shutdown_state == _queue_shutdown_immediate

def _set_shutdown(self):
self.shutdown_state = _queue_shutdown

def _set_shutdown_immediate(self):
self.shutdown_state = _queue_shutdown_immediate


# Override these methods to implement other queue organizations
# (e.g. stack or priority queue).
# These will only be called with appropriate locks held
Expand Down
Loading
0