-
-
Notifications
You must be signed in to change notification settings - Fork 32.4k
gh-96471: Add asyncio queue shutdown #104228
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
440a702
fb458db
e5951ac
a72aedd
d5e925d
f3517fb
bd2a7c3
e9ac8de
1e7813a
1275bb6
eec29bb
2c6156f
17f1f32
a233830
420a247
25ad2ac
f3321b4
6d9edd6
1135d85
ddc6ad6
2fa1bd9
aef4063
d49c6dd
5a435a6
c8db40e
ca01ee1
b02c4dd
8deca77
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -526,15 +526,17 @@ class _QueueShutdownTestMixin: | |
q_class = None | ||
|
||
@staticmethod | ||
async def _ping_awaitable(a): | ||
async def swallow(a_): | ||
async def _ensure_started(task): | ||
# Explicitly start (if not already) task | ||
|
||
async def swallow(x): | ||
try: | ||
return await a_ | ||
return await x | ||
except Exception: | ||
pass | ||
|
||
try: | ||
await asyncio.wait_for(asyncio.shield(swallow(a)), 0.01) | ||
await asyncio.wait_for(asyncio.shield(swallow(task)), 0.01) | ||
except TimeoutError: | ||
pass | ||
|
||
|
@@ -547,17 +549,24 @@ async def test_format(self): | |
self.assertEqual(q._format(), 'maxsize=0 shutdown') | ||
|
||
async def test_shutdown_empty(self): | ||
# Test shutting down an empty queue | ||
|
||
# Setup empty queue and join() task | ||
q = self.q_class() | ||
loop = asyncio.get_running_loop() | ||
join_task = loop.create_task(q.join()) | ||
|
||
# Perform shut-down | ||
q.shutdown(immediate=False) # unfinished tasks: 0 -> 0 | ||
|
||
self.assertEqual(q.qsize(), 0) | ||
|
||
await self._ping_awaitable(join_task) | ||
# Ensure join() task has successfully finished | ||
await self._ensure_started(join_task) | ||
self.assertTrue(join_task.done()) | ||
await join_task | ||
|
||
# Ensure put() and get() raise ShutDown | ||
with self.assertRaisesShutdown(): | ||
await q.put("data") | ||
with self.assertRaisesShutdown(): | ||
|
@@ -569,30 +578,39 @@ async def test_shutdown_empty(self): | |
q.get_nowait() | ||
|
||
async def test_shutdown_nonempty(self): | ||
# Test shutting down a non-empty queue | ||
|
||
# Setup full queue with 1 item, and join() and put() tasks | ||
q = self.q_class(maxsize=1) | ||
loop = asyncio.get_running_loop() | ||
|
||
q.put_nowait("data") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not just There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this scenario the two are equivalent (the queue is not empty, there are no other tasks scheduled), so I thought |
||
join_task = loop.create_task(q.join()) | ||
put_task = loop.create_task(q.put("data2")) | ||
|
||
await self._ping_awaitable(put_task) | ||
# Ensure put() task is not finished | ||
await self._ensure_started(put_task) | ||
self.assertFalse(put_task.done()) | ||
|
||
# Perform shut-down | ||
q.shutdown(immediate=False) # unfinished tasks: 1 -> 1 | ||
|
||
self.assertEqual(q.qsize(), 1) | ||
|
||
await self._ping_awaitable(put_task) | ||
# Ensure put() task is finished, and raised ShutDown | ||
await self._ensure_started(put_task) | ||
self.assertTrue(put_task.done()) | ||
with self.assertRaisesShutdown(): | ||
await put_task | ||
|
||
# Ensure get() succeeds on enqueued item | ||
self.assertEqual(await q.get(), "data") | ||
|
||
await self._ping_awaitable(join_task) | ||
# Ensure join() task is not finished | ||
await self._ensure_started(join_task) | ||
self.assertFalse(join_task.done()) | ||
|
||
# Ensure put() and get() raise ShutDown | ||
with self.assertRaisesShutdown(): | ||
await q.put("data") | ||
with self.assertRaisesShutdown(): | ||
|
@@ -603,25 +621,38 @@ async def test_shutdown_nonempty(self): | |
with self.assertRaisesShutdown(): | ||
q.get_nowait() | ||
|
||
# Ensure there is 1 unfinished task | ||
q.task_done() | ||
with self.assertRaises( | ||
ValueError, msg="Didn't appear to mark all tasks done" | ||
): | ||
q.task_done() | ||
|
||
await self._ping_awaitable(join_task) | ||
# Ensure join() task has successfully finished | ||
await self._ensure_started(join_task) | ||
self.assertTrue(join_task.done()) | ||
await join_task | ||
|
||
async def test_shutdown_immediate(self): | ||
# Test immediately shutting down a queue | ||
|
||
# Setup queue with 1 item, and a join() task | ||
q = self.q_class() | ||
loop = asyncio.get_running_loop() | ||
q.put_nowait("data") | ||
join_task = loop.create_task(q.join()) | ||
|
||
# Perform shut-down | ||
q.shutdown(immediate=True) # unfinished tasks: 1 -> 0 | ||
|
||
self.assertEqual(q.qsize(), 0) | ||
|
||
await self._ping_awaitable(join_task) | ||
# Ensure join() task has successfully finished | ||
await self._ensure_started(join_task) | ||
self.assertTrue(join_task.done()) | ||
await join_task | ||
|
||
# Ensure put() and get() raise ShutDown | ||
with self.assertRaisesShutdown(): | ||
await q.put("data") | ||
with self.assertRaisesShutdown(): | ||
|
@@ -632,25 +663,33 @@ async def test_shutdown_immediate(self): | |
with self.assertRaisesShutdown(): | ||
q.get_nowait() | ||
|
||
# Ensure there are no unfinished tasks | ||
with self.assertRaises( | ||
ValueError, msg="Didn't appear to mark all tasks done" | ||
): | ||
q.task_done() | ||
|
||
async def test_shutdown_immediate_with_unfinished(self): | ||
# Test immediately shutting down a queue with unfinished tasks | ||
|
||
# Setup queue with 2 items (1 retrieved), and a join() task | ||
q = self.q_class() | ||
loop = asyncio.get_running_loop() | ||
q.put_nowait("data") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suggest: q.put_nowait("data1")
q.put_nowait("data2") |
||
q.put_nowait("data") | ||
join_task = loop.create_task(q.join()) | ||
self.assertEqual(await q.get(), "data") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. and testing on There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Except when There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I forgot the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have implemented your suggestion, but I have not committed. I am considering it. I don't think it's necessary. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The test case (thus including |
||
|
||
EpicWink marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# Perform shut-down | ||
q.shutdown(immediate=True) # unfinished tasks: 2 -> 1 | ||
|
||
self.assertEqual(q.qsize(), 0) | ||
|
||
await self._ping_awaitable(join_task) | ||
# Ensure join() task is not finished | ||
await self._ensure_started(join_task) | ||
self.assertFalse(join_task.done()) | ||
|
||
# Ensure put() and get() raise ShutDown | ||
with self.assertRaisesShutdown(): | ||
await q.put("data") | ||
with self.assertRaisesShutdown(): | ||
|
@@ -661,13 +700,15 @@ async def test_shutdown_immediate_with_unfinished(self): | |
with self.assertRaisesShutdown(): | ||
q.get_nowait() | ||
|
||
# Ensure there is 1 unfinished task | ||
q.task_done() | ||
with self.assertRaises( | ||
ValueError, msg="Didn't appear to mark all tasks done" | ||
): | ||
q.task_done() | ||
|
||
await self._ping_awaitable(join_task) | ||
# Ensure join() task has successfully finished | ||
await self._ensure_started(join_task) | ||
self.assertTrue(join_task.done()) | ||
await join_task | ||
|
||
|
Uh oh!
There was an error while loading. Please reload this page.