From 4fd064079b0b4966b2f8b77bf419ac601935c15e Mon Sep 17 00:00:00 2001 From: Laurie O Date: Thu, 1 Sep 2022 18:33:18 +1000 Subject: [PATCH 01/13] Add threading implementation of queue shutdown --- Lib/queue.py | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/Lib/queue.py b/Lib/queue.py index 55f50088460f9e..4d2c1cbe084035 100644 --- a/Lib/queue.py +++ b/Lib/queue.py @@ -25,6 +25,15 @@ class Full(Exception): pass +class ShutDown(Exception): + '''Raised when put/get with shut-down queue.''' + + +_queue_alive = "alive" +_queue_shutdown = "shutdown" +_queue_shutdown_immediate = "shutdown-immediate" + + class Queue: '''Create a queue object with a given maximum size. @@ -54,6 +63,9 @@ def __init__(self, maxsize=0): self.all_tasks_done = threading.Condition(self.mutex) self.unfinished_tasks = 0 + # Queue shut-down state + self.shutdown_state = _queue_alive + def task_done(self): '''Indicate that a formerly enqueued task is complete. @@ -87,6 +99,8 @@ def join(self): ''' with self.all_tasks_done: while self.unfinished_tasks: + if self.shutdown_state == _queue_shutdown_immediate: + return self.all_tasks_done.wait() def qsize(self): @@ -130,6 +144,8 @@ def put(self, item, block=True, timeout=None): is immediately available, else raise the Full exception ('timeout' is ignored in that case). ''' + if self.shutdown_state != _queue_alive: + raise ShutDown with self.not_full: if self.maxsize > 0: if not block: @@ -138,6 +154,8 @@ def put(self, item, block=True, timeout=None): elif timeout is None: while self._qsize() >= self.maxsize: self.not_full.wait() + if self.shutdown_state != _queue_alive: + raise ShutDown elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: @@ -147,6 +165,8 @@ def put(self, item, block=True, timeout=None): if remaining <= 0.0: raise Full self.not_full.wait(remaining) + if self.shutdown_state != _queue_alive: + raise ShutDown self._put(item) self.unfinished_tasks += 1 self.not_empty.notify() @@ -162,6 +182,8 @@ def get(self, block=True, timeout=None): available, else raise the Empty exception ('timeout' is ignored in that case). ''' + if self.shutdown_state == _queue_shutdown_immediate: + raise ShutDown with self.not_empty: if not block: if not self._qsize(): @@ -169,6 +191,8 @@ def get(self, block=True, timeout=None): elif timeout is None: while not self._qsize(): self.not_empty.wait() + if self.shutdown_state == _queue_shutdown_immediate: + raise ShutDown elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: @@ -178,6 +202,8 @@ def get(self, block=True, timeout=None): if remaining <= 0.0: raise Empty self.not_empty.wait(remaining) + if self.shutdown_state == _queue_shutdown_immediate: + raise ShutDown item = self._get() self.not_full.notify() return item @@ -198,6 +224,26 @@ def get_nowait(self): ''' return self.get(block=False) + def shutdown(self, immediate=False): + '''Shut-down the queue, making queue gets and puts raise. + + By default, gets will only raise once the queue is empty. Set + 'immediate' to True to make gets raise immediately instead. + + All blocked callers of put(), get() and join() will be + unblocked. The ShutDown exception is raised. + ''' + if immediate: + self.shutdown_state = _queue_shutdown_immediate + with self.not_empty: + self.not_empty.notify_all() + with self.all_tasks_done: + self.all_tasks_done.notify_all() + else: + self.shutdown_state = _queue_shutdown + with self.not_full: + self.not_full.notify_all() + # Override these methods to implement other queue organizations # (e.g. stack or priority queue). # These will only be called with appropriate locks held From d942c9e00573442a7a26ce1d9506ea0e94f42ffe Mon Sep 17 00:00:00 2001 From: Laurie O Date: Sun, 18 Sep 2022 18:25:40 +1000 Subject: [PATCH 02/13] Fix up implementation, add unit-tests --- Lib/queue.py | 28 +++++++++++++++++----------- Lib/test/test_queue.py | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 11 deletions(-) diff --git a/Lib/queue.py b/Lib/queue.py index 4d2c1cbe084035..f6af7cb6df5cff 100644 --- a/Lib/queue.py +++ b/Lib/queue.py @@ -187,23 +187,31 @@ def get(self, block=True, timeout=None): with self.not_empty: if not block: if not self._qsize(): + if self.shutdown_state != _queue_alive: + raise ShutDown raise Empty elif timeout is None: while not self._qsize(): + if self.shutdown_state != _queue_alive: + raise ShutDown self.not_empty.wait() - if self.shutdown_state == _queue_shutdown_immediate: + if self.shutdown_state != _queue_alive: raise ShutDown elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: endtime = time() + timeout while not self._qsize(): + if self.shutdown_state != _queue_alive: + raise ShutDown remaining = endtime - time() if remaining <= 0.0: raise Empty self.not_empty.wait(remaining) - if self.shutdown_state == _queue_shutdown_immediate: + if self.shutdown_state != _queue_alive: raise ShutDown + if self.shutdown_state == _queue_shutdown_immediate: + raise ShutDown item = self._get() self.not_full.notify() return item @@ -230,18 +238,16 @@ def shutdown(self, immediate=False): By default, gets will only raise once the queue is empty. Set 'immediate' to True to make gets raise immediately instead. - All blocked callers of put(), get() and join() will be - unblocked. The ShutDown exception is raised. + All blocked callers of put() will be unblocked, and also get() + and join() if 'immediate'. The ShutDown exception is raised. ''' - if immediate: - self.shutdown_state = _queue_shutdown_immediate - with self.not_empty: + with self.mutex: + if immediate: + self.shutdown_state = _queue_shutdown_immediate self.not_empty.notify_all() - with self.all_tasks_done: self.all_tasks_done.notify_all() - else: - self.shutdown_state = _queue_shutdown - with self.not_full: + else: + self.shutdown_state = _queue_shutdown self.not_full.notify_all() # Override these methods to implement other queue organizations diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index 33113a72e6b6a9..354299b9a5b16a 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -241,6 +241,41 @@ def test_shrinking_queue(self): with self.assertRaises(self.queue.Full): q.put_nowait(4) + def test_shutdown_empty(self): + q = self.type2test() + q.shutdown() + try: + q.put("data") + self.fail("Didn't appear to shut-down queue") + except self.queue.ShutDown: + pass + try: + q.get() + self.fail("Didn't appear to shut-down queue") + except self.queue.ShutDown: + pass + + def test_shutdown_nonempty(self): + q = self.type2test() + q.put("data") + q.shutdown() + q.get() + try: + q.get() + self.fail("Didn't appear to shut-down queue") + except self.queue.ShutDown: + pass + + def test_shutdown_immediate(self): + q = self.type2test() + q.put("data") + q.shutdown(immediate=True) + try: + q.get() + self.fail("Didn't appear to shut-down queue") + except self.queue.ShutDown: + pass + class QueueTest(BaseQueueTestMixin): def setUp(self): From f552ac10fa43e5b312092aaeef86076841adf3d1 Mon Sep 17 00:00:00 2001 From: Laurie O Date: Sun, 18 Sep 2022 18:26:02 +1000 Subject: [PATCH 03/13] Implement for asyncio queues --- Lib/asyncio/queues.py | 57 +++++++++++++++++++++++++++- Lib/test/test_asyncio/test_queues.py | 57 ++++++++++++++++++++++++++++ 2 files changed, 113 insertions(+), 1 deletion(-) diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py index a9656a6df561ba..a869993a1de3fe 100644 --- a/Lib/asyncio/queues.py +++ b/Lib/asyncio/queues.py @@ -1,4 +1,11 @@ -__all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty') +__all__ = ( + 'Queue', + 'PriorityQueue', + 'LifoQueue', + 'QueueFull', + 'QueueEmpty', + 'QueueShutDown', +) import collections import heapq @@ -18,6 +25,16 @@ class QueueFull(Exception): pass +class QueueShutDown(Exception): + """Raised when putting on to or getting from a shut-down Queue.""" + pass + + +_queue_alive = "alive" +_queue_shutdown = "shutdown" +_queue_shutdown_immediate = "shutdown-immediate" + + class Queue(mixins._LoopBoundMixin): """A queue, useful for coordinating producer and consumer coroutines. @@ -41,6 +58,7 @@ def __init__(self, maxsize=0): self._finished = locks.Event() self._finished.set() self._init(maxsize) + self.shutdown_state = _queue_alive # These three are overridable in subclasses. @@ -113,6 +131,8 @@ async def put(self, item): Put an item into the queue. If the queue is full, wait until a free slot is available before adding item. """ + if self.shutdown_state != _queue_alive: + raise QueueShutDown while self.full(): putter = self._get_loop().create_future() self._putters.append(putter) @@ -132,6 +152,8 @@ async def put(self, item): # the call. Wake up the next in line. self._wakeup_next(self._putters) raise + if self.shutdown_state != _queue_alive: + raise QueueShutDown return self.put_nowait(item) def put_nowait(self, item): @@ -139,6 +161,8 @@ def put_nowait(self, item): If no free slot is immediately available, raise QueueFull. """ + if self.shutdown_state != _queue_alive: + raise QueueShutDown if self.full(): raise QueueFull self._put(item) @@ -151,7 +175,11 @@ async def get(self): If queue is empty, wait until an item is available. """ + if self.shutdown_state == _queue_shutdown_immediate: + raise QueueShutDown while self.empty(): + if self.shutdown_state != _queue_alive: + raise QueueShutDown getter = self._get_loop().create_future() self._getters.append(getter) try: @@ -170,6 +198,8 @@ async def get(self): # the call. Wake up the next in line. self._wakeup_next(self._getters) raise + if self.shutdown_state == _queue_shutdown_immediate: + raise QueueShutDown return self.get_nowait() def get_nowait(self): @@ -178,7 +208,11 @@ def get_nowait(self): Return an item if one is immediately available, else raise QueueEmpty. """ if self.empty(): + if self.shutdown_state != _queue_alive: + raise QueueShutDown raise QueueEmpty + elif self.shutdown_state == _queue_shutdown_immediate: + raise QueueShutDown item = self._get() self._wakeup_next(self._putters) return item @@ -214,6 +248,27 @@ async def join(self): if self._unfinished_tasks > 0: await self._finished.wait() + def shutdown(self, immediate=False): + """Shut-down the queue, making queue gets and puts raise. + + By default, gets will only raise once the queue is empty. Set + 'immediate' to True to make gets raise immediately instead. + + All blocked callers of put() will be unblocked, and also get() + and join() if 'immediate'. The QueueShutDown exception is raised. + """ + if immediate: + self.shutdown_state = _queue_shutdown_immediate + while self._getters: + getter = self._getters.popleft() + if not getter.done(): + getter.set_result(None) + else: + self.shutdown_state = _queue_shutdown + while self._putters: + putter = self._putters.popleft() + if not putter.done(): + putter.set_result(None) class PriorityQueue(Queue): """A subclass of Queue; retrieves entries in priority order (lowest first). diff --git a/Lib/test/test_asyncio/test_queues.py b/Lib/test/test_asyncio/test_queues.py index 2d058ccf6a8c72..418c3fe618d89b 100644 --- a/Lib/test/test_asyncio/test_queues.py +++ b/Lib/test/test_asyncio/test_queues.py @@ -522,5 +522,62 @@ class PriorityQueueJoinTests(_QueueJoinTestMixin, unittest.IsolatedAsyncioTestCa q_class = asyncio.PriorityQueue +class _QueueShutdownTestMixin: + q_class = None + + async def test_empty(self): + q = self.q_class() + q.shutdown() + try: + await q.put("data") + self.fail("Didn't appear to shut-down queue") + except asyncio.QueueShutDown: + pass + try: + await q.get() + self.fail("Didn't appear to shut-down queue") + except asyncio.QueueShutDown: + pass + + async def test_nonempty(self): + q = self.q_class() + q.put_nowait("data") + q.shutdown() + await q.get() + try: + await q.get() + self.fail("Didn't appear to shut-down queue") + except asyncio.QueueShutDown: + pass + + async def test_immediate(self): + q = self.q_class() + q.put_nowait("data") + q.shutdown(immediate=True) + try: + await q.get() + self.fail("Didn't appear to shut-down queue") + except asyncio.QueueShutDown: + pass + + +class QueueShutdownTests( + _QueueShutdownTestMixin, unittest.IsolatedAsyncioTestCase +): + q_class = asyncio.Queue + + +class LifoQueueShutdownTests( + _QueueShutdownTestMixin, unittest.IsolatedAsyncioTestCase +): + q_class = asyncio.LifoQueue + + +class PriorityQueueShutdownTests( + _QueueShutdownTestMixin, unittest.IsolatedAsyncioTestCase +): + q_class = asyncio.PriorityQueue + + if __name__ == '__main__': unittest.main() From 78671f9a81c7757563301bb7246863f0d24d09a9 Mon Sep 17 00:00:00 2001 From: Laurie O Date: Sat, 22 Oct 2022 15:39:07 +1000 Subject: [PATCH 04/13] WIP: multiprocessing queue shutdown --- Lib/multiprocessing/queues.py | 26 +++++++++++++++++++++- Lib/test/_test_multiprocessing.py | 36 +++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 1 deletion(-) diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index f37f114a968871..26b212d6a50610 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -17,8 +17,9 @@ import types import weakref import errno +import ctypes -from queue import Empty, Full +from queue import Empty, Full, ShutDown import _multiprocessing @@ -28,6 +29,10 @@ from .util import debug, info, Finalize, register_after_fork, is_exiting +_queue_alive = 0 +_queue_shutdown = 1 +_queue_shutdown_immediate = 2 + # # Queue type using a pipe, buffer and thread # @@ -50,6 +55,9 @@ def __init__(self, maxsize=0, *, ctx): # For use by concurrent.futures self._ignore_epipe = False self._reset() + self._shutdown_state = context._default_context.Value( + ctypes.c_uint8, lock=self._rlock + ) if sys.platform != 'win32': register_after_fork(self, Queue._after_fork) @@ -86,20 +94,28 @@ def _reset(self, after_fork=False): def put(self, obj, block=True, timeout=None): if self._closed: raise ValueError(f"Queue {self!r} is closed") + if self._shutdown_state.value != _queue_alive: + raise ShutDown if not self._sem.acquire(block, timeout): raise Full with self._notempty: + if self._shutdown_state.value != _queue_alive: + raise ShutDown if self._thread is None: self._start_thread() self._buffer.append(obj) self._notempty.notify() def get(self, block=True, timeout=None): + if self._shutdown_state.value == _queue_shutdown_immediate: + raise ShutDown if self._closed: raise ValueError(f"Queue {self!r} is closed") if block and timeout is None: with self._rlock: + if self._shutdown_state.value != _queue_alive: + raise ShutDown res = self._recv_bytes() self._sem.release() else: @@ -111,13 +127,19 @@ def get(self, block=True, timeout=None): if block: timeout = deadline - time.monotonic() if not self._poll(timeout): + if self._shutdown_state.value != _queue_alive: + raise ShutDown raise Empty + if self._shutdown_state.value != _queue_alive : + raise ShutDown elif not self._poll(): raise Empty res = self._recv_bytes() self._sem.release() finally: self._rlock.release() + if self._shutdown_state.value == _queue_shutdown: + raise ShutDown # unserialize the data after having released the lock return _ForkingPickler.loads(res) @@ -327,6 +349,8 @@ def task_done(self): def join(self): with self._cond: + if self._shutdown_state.value == _queue_shutdown_immediate: + return if not self._unfinished_tasks._semlock._is_zero(): self._cond.wait() diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index b78586c560a68a..b25b0dee5eca8d 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -958,6 +958,42 @@ def queue_full(q, maxsize): class _TestQueue(BaseTestCase): + # TODO: add queue shutdown unit-tests + + def test_shutdown_empty(self): + q = self.type2test() + q.shutdown() + try: + q.put("data") + self.fail("Didn't appear to shut-down queue") + except self.queue.ShutDown: + pass + try: + q.get() + self.fail("Didn't appear to shut-down queue") + except self.queue.ShutDown: + pass + + def test_shutdown_nonempty(self): + q = self.type2test() + q.put("data") + q.shutdown() + q.get() + try: + q.get() + self.fail("Didn't appear to shut-down queue") + except self.queue.ShutDown: + pass + + def test_shutdown_immediate(self): + q = self.type2test() + q.put("data") + q.shutdown(immediate=True) + try: + q.get() + self.fail("Didn't appear to shut-down queue") + except self.queue.ShutDown: + pass @classmethod def _test_put(cls, queue, child_can_start, parent_can_continue): From 5f31f8e6be334a0258b656943ea798517cd5159a Mon Sep 17 00:00:00 2001 From: Laurie O Date: Thu, 19 Jan 2023 20:34:24 +1000 Subject: [PATCH 05/13] WIP: multiprocessing queue shutdown --- Lib/test/_test_multiprocessing.py | 71 +++++++++++++++---------------- 1 file changed, 35 insertions(+), 36 deletions(-) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index b25b0dee5eca8d..1f9c9aec36b312 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -958,42 +958,6 @@ def queue_full(q, maxsize): class _TestQueue(BaseTestCase): - # TODO: add queue shutdown unit-tests - - def test_shutdown_empty(self): - q = self.type2test() - q.shutdown() - try: - q.put("data") - self.fail("Didn't appear to shut-down queue") - except self.queue.ShutDown: - pass - try: - q.get() - self.fail("Didn't appear to shut-down queue") - except self.queue.ShutDown: - pass - - def test_shutdown_nonempty(self): - q = self.type2test() - q.put("data") - q.shutdown() - q.get() - try: - q.get() - self.fail("Didn't appear to shut-down queue") - except self.queue.ShutDown: - pass - - def test_shutdown_immediate(self): - q = self.type2test() - q.put("data") - q.shutdown(immediate=True) - try: - q.get() - self.fail("Didn't appear to shut-down queue") - except self.queue.ShutDown: - pass @classmethod def _test_put(cls, queue, child_can_start, parent_can_continue): @@ -1313,6 +1277,41 @@ def test_closed_queue_put_get_exceptions(self): q.put('foo') with self.assertRaisesRegex(ValueError, 'is closed'): q.get() + + def test_shutdown_empty(self): + q = multiprocessing.Queue() + q.shutdown() + try: + q.put("data") + self.fail("Didn't appear to shut-down queue") + except pyqueue.ShutDown: + pass + try: + q.get() + self.fail("Didn't appear to shut-down queue") + except pyqueue.ShutDown: + pass + + def test_shutdown_nonempty(self): + q = multiprocessing.Queue() + q.put("data") + q.shutdown() + q.get() + try: + q.get() + self.fail("Didn't appear to shut-down queue") + except pyqueue.ShutDown: + pass + + def test_shutdown_immediate(self): + q = multiprocessing.Queue() + q.put("data") + q.shutdown(immediate=True) + try: + q.get() + self.fail("Didn't appear to shut-down queue") + except pyqueue.ShutDown: + pass # # # From 978b8d1295d74026f3a50a7db8a4852927d14ce1 Mon Sep 17 00:00:00 2001 From: Laurie O Date: Tue, 21 Feb 2023 19:44:19 +1000 Subject: [PATCH 06/13] Replace try-catch with assert-raises ctx-mgrs --- Lib/test/_test_multiprocessing.py | 28 ++++++++++++---------------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 5d525257a9ad30..d9264ed62f5526 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -1281,37 +1281,33 @@ def test_closed_queue_put_get_exceptions(self): def test_shutdown_empty(self): q = multiprocessing.Queue() q.shutdown() - try: + with self.assertRaises( + pyqueue.ShutDown, msg="Didn't appear to shut-down queue" + ): q.put("data") - self.fail("Didn't appear to shut-down queue") - except pyqueue.ShutDown: - pass - try: + with self.assertRaises( + pyqueue.ShutDown, msg="Didn't appear to shut-down queue" + ): q.get() - self.fail("Didn't appear to shut-down queue") - except pyqueue.ShutDown: - pass def test_shutdown_nonempty(self): q = multiprocessing.Queue() q.put("data") q.shutdown() q.get() - try: + with self.assertRaises( + pyqueue.ShutDown, msg="Didn't appear to shut-down queue" + ): q.get() - self.fail("Didn't appear to shut-down queue") - except pyqueue.ShutDown: - pass def test_shutdown_immediate(self): q = multiprocessing.Queue() q.put("data") q.shutdown(immediate=True) - try: + with self.assertRaises( + pyqueue.ShutDown, msg="Didn't appear to shut-down queue" + ): q.get() - self.fail("Didn't appear to shut-down queue") - except pyqueue.ShutDown: - pass # # # From b19f529b14d8faa88af11d6c5c2813bddb30ffaf Mon Sep 17 00:00:00 2001 From: Laurie O Date: Tue, 21 Feb 2023 20:03:17 +1000 Subject: [PATCH 07/13] WIP: multiprocessing queue shut-down method --- Lib/multiprocessing/queues.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index f9854cc1b7e227..5220504369937d 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -180,6 +180,14 @@ def cancel_join_thread(self): except AttributeError: pass + def shutdown(self, immediate=False): + with self._rlock: + if immediate: + self._shutdown_state = _queue_shutdown_immediate + self._notempty.notify_all() + else: + self._shutdown_state = _queue_shutdown + def _start_thread(self): debug('Queue._start_thread()') From 6d3b3a508ada5639ccc3534a18f419cf1863779b Mon Sep 17 00:00:00 2001 From: Laurie O Date: Tue, 21 Feb 2023 20:05:45 +1000 Subject: [PATCH 08/13] Use enum for asyncio queue shutdown state --- Lib/asyncio/queues.py | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py index a869993a1de3fe..6284504369d611 100644 --- a/Lib/asyncio/queues.py +++ b/Lib/asyncio/queues.py @@ -8,6 +8,7 @@ ) import collections +import enum import heapq from types import GenericAlias @@ -30,9 +31,10 @@ class QueueShutDown(Exception): pass -_queue_alive = "alive" -_queue_shutdown = "shutdown" -_queue_shutdown_immediate = "shutdown-immediate" +class _QueueState(enum.Enum): + alive = "alive" + shutdown = "shutdown" + shutdown_immediate = "shutdown-immediate" class Queue(mixins._LoopBoundMixin): @@ -58,7 +60,7 @@ def __init__(self, maxsize=0): self._finished = locks.Event() self._finished.set() self._init(maxsize) - self.shutdown_state = _queue_alive + self.shutdown_state = _QueueState.alive # These three are overridable in subclasses. @@ -131,7 +133,7 @@ async def put(self, item): Put an item into the queue. If the queue is full, wait until a free slot is available before adding item. """ - if self.shutdown_state != _queue_alive: + if self.shutdown_state != _QueueState.alive: raise QueueShutDown while self.full(): putter = self._get_loop().create_future() @@ -152,7 +154,7 @@ async def put(self, item): # the call. Wake up the next in line. self._wakeup_next(self._putters) raise - if self.shutdown_state != _queue_alive: + if self.shutdown_state != _QueueState.alive: raise QueueShutDown return self.put_nowait(item) @@ -161,7 +163,7 @@ def put_nowait(self, item): If no free slot is immediately available, raise QueueFull. """ - if self.shutdown_state != _queue_alive: + if self.shutdown_state != _QueueState.alive: raise QueueShutDown if self.full(): raise QueueFull @@ -175,10 +177,10 @@ async def get(self): If queue is empty, wait until an item is available. """ - if self.shutdown_state == _queue_shutdown_immediate: + if self.shutdown_state == _QueueState.shutdown_immediate: raise QueueShutDown while self.empty(): - if self.shutdown_state != _queue_alive: + if self.shutdown_state != _QueueState.alive: raise QueueShutDown getter = self._get_loop().create_future() self._getters.append(getter) @@ -198,7 +200,7 @@ async def get(self): # the call. Wake up the next in line. self._wakeup_next(self._getters) raise - if self.shutdown_state == _queue_shutdown_immediate: + if self.shutdown_state == _QueueState.shutdown_immediate: raise QueueShutDown return self.get_nowait() @@ -208,10 +210,10 @@ def get_nowait(self): Return an item if one is immediately available, else raise QueueEmpty. """ if self.empty(): - if self.shutdown_state != _queue_alive: + if self.shutdown_state != _QueueState.alive: raise QueueShutDown raise QueueEmpty - elif self.shutdown_state == _queue_shutdown_immediate: + elif self.shutdown_state == _QueueState.shutdown_immediate: raise QueueShutDown item = self._get() self._wakeup_next(self._putters) @@ -258,13 +260,13 @@ def shutdown(self, immediate=False): and join() if 'immediate'. The QueueShutDown exception is raised. """ if immediate: - self.shutdown_state = _queue_shutdown_immediate + self.shutdown_state = _QueueState.shutdown_immediate while self._getters: getter = self._getters.popleft() if not getter.done(): getter.set_result(None) else: - self.shutdown_state = _queue_shutdown + self.shutdown_state = _QueueState.shutdown while self._putters: putter = self._putters.popleft() if not putter.done(): From 85827f17579180b63e6138013c6e384736f3529f Mon Sep 17 00:00:00 2001 From: Laurie O Date: Tue, 21 Feb 2023 20:12:15 +1000 Subject: [PATCH 09/13] Make asyncio queue state private --- Lib/asyncio/queues.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py index 6284504369d611..147e33ad82727c 100644 --- a/Lib/asyncio/queues.py +++ b/Lib/asyncio/queues.py @@ -60,7 +60,7 @@ def __init__(self, maxsize=0): self._finished = locks.Event() self._finished.set() self._init(maxsize) - self.shutdown_state = _QueueState.alive + self._shutdown_state = _QueueState.alive # These three are overridable in subclasses. @@ -133,7 +133,7 @@ async def put(self, item): Put an item into the queue. If the queue is full, wait until a free slot is available before adding item. """ - if self.shutdown_state != _QueueState.alive: + if self._shutdown_state != _QueueState.alive: raise QueueShutDown while self.full(): putter = self._get_loop().create_future() @@ -154,7 +154,7 @@ async def put(self, item): # the call. Wake up the next in line. self._wakeup_next(self._putters) raise - if self.shutdown_state != _QueueState.alive: + if self._shutdown_state != _QueueState.alive: raise QueueShutDown return self.put_nowait(item) @@ -163,7 +163,7 @@ def put_nowait(self, item): If no free slot is immediately available, raise QueueFull. """ - if self.shutdown_state != _QueueState.alive: + if self._shutdown_state != _QueueState.alive: raise QueueShutDown if self.full(): raise QueueFull @@ -177,10 +177,10 @@ async def get(self): If queue is empty, wait until an item is available. """ - if self.shutdown_state == _QueueState.shutdown_immediate: + if self._shutdown_state == _QueueState.shutdown_immediate: raise QueueShutDown while self.empty(): - if self.shutdown_state != _QueueState.alive: + if self._shutdown_state != _QueueState.alive: raise QueueShutDown getter = self._get_loop().create_future() self._getters.append(getter) @@ -200,7 +200,7 @@ async def get(self): # the call. Wake up the next in line. self._wakeup_next(self._getters) raise - if self.shutdown_state == _QueueState.shutdown_immediate: + if self._shutdown_state == _QueueState.shutdown_immediate: raise QueueShutDown return self.get_nowait() @@ -210,10 +210,10 @@ def get_nowait(self): Return an item if one is immediately available, else raise QueueEmpty. """ if self.empty(): - if self.shutdown_state != _QueueState.alive: + if self._shutdown_state != _QueueState.alive: raise QueueShutDown raise QueueEmpty - elif self.shutdown_state == _QueueState.shutdown_immediate: + elif self._shutdown_state == _QueueState.shutdown_immediate: raise QueueShutDown item = self._get() self._wakeup_next(self._putters) @@ -260,13 +260,13 @@ def shutdown(self, immediate=False): and join() if 'immediate'. The QueueShutDown exception is raised. """ if immediate: - self.shutdown_state = _QueueState.shutdown_immediate + self._shutdown_state = _QueueState.shutdown_immediate while self._getters: getter = self._getters.popleft() if not getter.done(): getter.set_result(None) else: - self.shutdown_state = _QueueState.shutdown + self._shutdown_state = _QueueState.shutdown while self._putters: putter = self._putters.popleft() if not putter.done(): From 121de127e540b2c8c8cb579ab790ff269db33c96 Mon Sep 17 00:00:00 2001 From: Laurie O Date: Tue, 21 Feb 2023 20:12:34 +1000 Subject: [PATCH 10/13] Replace try-catch with assert-raises ctx-mgrs --- Lib/test/test_asyncio/test_queues.py | 28 ++++++++++++---------------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/Lib/test/test_asyncio/test_queues.py b/Lib/test/test_asyncio/test_queues.py index 418c3fe618d89b..0cb48f33086779 100644 --- a/Lib/test/test_asyncio/test_queues.py +++ b/Lib/test/test_asyncio/test_queues.py @@ -528,37 +528,33 @@ class _QueueShutdownTestMixin: async def test_empty(self): q = self.q_class() q.shutdown() - try: + with self.assertRaises( + asyncio.QueueShutDown, msg="Didn't appear to shut-down queue" + ): await q.put("data") - self.fail("Didn't appear to shut-down queue") - except asyncio.QueueShutDown: - pass - try: + with self.assertRaises( + asyncio.QueueShutDown, msg="Didn't appear to shut-down queue" + ): await q.get() - self.fail("Didn't appear to shut-down queue") - except asyncio.QueueShutDown: - pass async def test_nonempty(self): q = self.q_class() q.put_nowait("data") q.shutdown() await q.get() - try: + with self.assertRaises( + asyncio.QueueShutDown, msg="Didn't appear to shut-down queue" + ): await q.get() - self.fail("Didn't appear to shut-down queue") - except asyncio.QueueShutDown: - pass async def test_immediate(self): q = self.q_class() q.put_nowait("data") q.shutdown(immediate=True) - try: + with self.assertRaises( + asyncio.QueueShutDown, msg="Didn't appear to shut-down queue" + ): await q.get() - self.fail("Didn't appear to shut-down queue") - except asyncio.QueueShutDown: - pass class QueueShutdownTests( From 5e2df02722f85912243517202446d869cfbb0d95 Mon Sep 17 00:00:00 2001 From: Laurie O Date: Tue, 21 Feb 2023 20:14:29 +1000 Subject: [PATCH 11/13] Add asyncio queue tests Co-authored-by: Duprat --- Lib/test/test_asyncio/test_queues.py | 91 ++++++++++++++++++++++++++++ 1 file changed, 91 insertions(+) diff --git a/Lib/test/test_asyncio/test_queues.py b/Lib/test/test_asyncio/test_queues.py index 0cb48f33086779..75b016f399a13b 100644 --- a/Lib/test/test_asyncio/test_queues.py +++ b/Lib/test/test_asyncio/test_queues.py @@ -555,7 +555,98 @@ async def test_immediate(self): asyncio.QueueShutDown, msg="Didn't appear to shut-down queue" ): await q.get() + async def test_repr_shutdown(self): + q = self.q_class() + q.shutdown() + self.assertIn("shutdown", repr(q)) + + q = self.q_class() + q.shutdown(immediate=True) + self.assertIn("shutdown-immediate", repr(q)) + + async def test_get_shutdown_immediate(self): + results = [] + maxsize = 2 + delay = 1e-3 + + async def get_q(q): + try: + msg = await q.get() + results.append(False) + except asyncio.QueueShutDown: + results.append(True) + return True + + async def shutdown(q, delay, immediate): + await asyncio.sleep(delay) + q.shutdown(immediate) + return True + + q = self.q_class(maxsize) + t = [asyncio.create_task(get_q(q)) for _ in range(maxsize)] + t += [asyncio.create_task(shutdown(q, delay, True))] + res = await asyncio.gather(*t) + + self.assertEqual(results, [True]*maxsize) + + async def test_put_shutdown(self): + maxsize = 2 + results = [] + delay = 1e-3 + + async def put_twice(q, delay, msg): + await q.put(msg) + await asyncio.sleep(delay) + try: + await q.put(msg+maxsize) + results.append(False) + except asyncio.QueueShutDown: + results.append(True) + return msg + + async def shutdown(q, delay, immediate): + await asyncio.sleep(delay) + q.shutdown(immediate) + + q = self.q_class(maxsize) + t = [asyncio.create_task(put_twice(q, delay, i+1)) for i in range(maxsize)] + t += [asyncio.create_task(shutdown(q, delay*2, False))] + res = await asyncio.gather(*t) + + self.assertEqual(results, [True]*maxsize) + + async def test_put_and_join_shutdown(self): + maxsize = 2 + results = [] + delay = 1e-3 + + async def put_twice(q, delay, msg): + await q.put(msg) + await asyncio.sleep(delay) + try: + await q.put(msg+maxsize) + results.append(False) + except asyncio.QueueShutDown: + results.append(True) + return msg + + async def shutdown(q, delay, immediate): + await asyncio.sleep(delay) + q.shutdown(immediate) + + async def join(q, delay): + await asyncio.sleep(delay) + await q.join() + results.append(True) + return True + + q = self.q_class(maxsize) + t = [asyncio.create_task(put_twice(q, delay, i+1)) for i in range(maxsize)] + t += [asyncio.create_task(shutdown(q, delay*2, True)), + asyncio.create_task(join(q, delay))] + res = await asyncio.gather(*t) + self.assertEqual(results, [True]*(maxsize+1)) class QueueShutdownTests( _QueueShutdownTestMixin, unittest.IsolatedAsyncioTestCase From 14c53d7b65e49ce8cb4334b99286be7d372ce717 Mon Sep 17 00:00:00 2001 From: Laurie O Date: Tue, 21 Feb 2023 20:15:32 +1000 Subject: [PATCH 12/13] Release joined awaitables Co-authored-by: Duprat --- Lib/asyncio/queues.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py index 147e33ad82727c..d591d0ebab481b 100644 --- a/Lib/asyncio/queues.py +++ b/Lib/asyncio/queues.py @@ -271,6 +271,8 @@ def shutdown(self, immediate=False): putter = self._putters.popleft() if not putter.done(): putter.set_result(None) + # Release 'joined' tasks/coros + self._finished.set() class PriorityQueue(Queue): """A subclass of Queue; retrieves entries in priority order (lowest first). From da10d9fc5752975b24ca743660b1be70ac9c3756 Mon Sep 17 00:00:00 2001 From: Laurie O Date: Tue, 21 Feb 2023 20:16:14 +1000 Subject: [PATCH 13/13] Zero unfinished tasks Co-authored-by: Duprat --- Lib/queue.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/Lib/queue.py b/Lib/queue.py index f6af7cb6df5cff..f08dbd47f188ee 100644 --- a/Lib/queue.py +++ b/Lib/queue.py @@ -245,6 +245,10 @@ def shutdown(self, immediate=False): if immediate: self.shutdown_state = _queue_shutdown_immediate self.not_empty.notify_all() + # set self.unfinished_tasks to 0 + # to break the loop in 'self.join()' + # when quits from `wait()` + self.unfinished_tasks = 0 self.all_tasks_done.notify_all() else: self.shutdown_state = _queue_shutdown