8000 gh-96471: Add asyncio queue shutdown by EpicWink · Pull Request #104228 · python/cpython · GitHub
[go: up one dir, main page]

Skip to content

gh-96471: Add asyncio queue shutdown #104228

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 28 commits into from
Apr 6, 2024
Merged
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
440a702
Add asyncio queue shutdown
EpicWink Sep 1, 2022
fb458db
Fix queue shutdown
YvesDup Feb 10, 2023
e5951ac
📜🤖 Added by blurb_it.
blurb-it[bot] May 6, 2023
a72aedd
Merge remote-tracking branch 'upstream/main' into asyncio-queue-shutdown
EpicWink Feb 20, 2024
d5e925d
Add references in docs and news entry
EpicWink Feb 20, 2024
f3517fb
Merge remote-tracking branch 'upstream/main' into asyncio-queue-shutdown
EpicWink Mar 20, 2024
bd2a7c3
Improve docs
EpicWink Mar 20, 2024
e9ac8de
Consume queue on immediate shutdown
EpicWink Mar 20, 2024
1e7813a
Fix links in what's-new
EpicWink Mar 22, 2024
1275bb6
Merge remote-tracking branch 'upstream/main' into asyncio-queue-shutdown
EpicWink Mar 22, 2024
eec29bb
Fix formatting in news entry
EpicWink Mar 22, 2024
2c6156f
Merge remote-tracking branch 'upstream/main' into asyncio-queue-shutdown
EpicWink Mar 22, 2024
17f1f32
Merge remote-tracking branch 'upstream/main' into asyncio-queue-shutdown
EpicWink Mar 26, 2024
a233830
Improve tests
EpicWink Mar 26, 2024
420a247
Improve tests even more
EpicWink Mar 26, 2024
25ad2ac
Merge remote-tracking branch 'upstream/main' into asyncio-queue-shutdown
EpicWink Mar 26, 2024
f3321b4
Merge remote-tracking branch 'upstream/main' into asyncio-queue-shutdown
EpicWink Mar 27, 2024
6d9edd6
Document tests
EpicWink Mar 27, 2024
1135d85
Merge remote-tracking branch 'upstream/main' into asyncio-queue-shutdown
EpicWink Mar 28, 2024
ddc6ad6
Always allow getters to re-check queue empty
EpicWink Mar 28, 2024
2fa1bd9
Merge branch 'main' into asyncio-queue-shutdown
gvanrossum Apr 3, 2024
aef4063
Simplify shutdown-check in put and get
EpicWink Apr 4, 2024
d49c6dd
Format shutdown docstring
EpicWink Apr 4, 2024
5a435a6
Check for 0 unfinised tasks in shutdown
EpicWink Apr 4, 2024
c8db40e
Use asyncio.sleep to run other tasks
EpicWink Apr 4, 2024
ca01ee1
Use public method to shut down queue in format test
EpicWink Apr 4, 2024
b02c4dd
Only start queue join after shutdown in test
EpicWink Apr 4, 2024
8deca77
Test join before failing task-done
EpicWink Apr 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Document tests
  • Loading branch information
EpicWink committed Mar 27, 2024
commit 6d9edd6cd768367ce72f8791c5307d9b46400822
65 changes: 53 additions & 12 deletions Lib/test/test_asyncio/test_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,15 +526,17 @@ class _QueueShutdownTestMixin:
q_class = None

@staticmethod
async def _ping_awaitable(a):
async def swallow(a_):
async def _ensure_started(task):
# Explicitly start (if not already) task

async def swallow(x):
try:
return await a_
return await x
except Exception:
pass

try:
await asyncio.wait_for(asyncio.shield(swallow(a)), 0.01)
await asyncio.wait_for(asyncio.shield(swallow(task)), 0.01)
except TimeoutError:
pass

Expand All @@ -547,17 +549,24 @@ async def test_format(self):
self.assertEqual(q._format(), 'maxsize=0 shutdown')

async def test_shutdown_empty(self):
# Test shutting down an empty queue

# Setup empty queue and join() task
q = self.q_class()
loop = asyncio.get_running_loop()
join_task = loop.create_task(q.join())

# Perform shut-down
q.shutdown(immediate=False) # unfinished tasks: 0 -> 0

self.assertEqual(q.qsize(), 0)

await self._ping_awaitable(join_task)
# Ensure join() task has successfully finished
await self._ensure_started(join_task)
self.assertTrue(join_task.done())
await join_task

# Ensure put() and get() raise ShutDown
with self.assertRaisesShutdown():
await q.put("data")
with self.assertRaisesShutdown():
Expand All @@ -569,30 +578,39 @@ async def test_shutdown_empty(self):
q.get_nowait()

async def test_shutdown_nonempty(self):
# Test shutting down a non-empty queue

# Setup full queue with 1 item, and join() and put() tasks
q = self.q_class(maxsize=1)
loop = asyncio.get_running_loop()

q.put_nowait("data")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just await q.put("data")?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this scenario the two are equivalent (the queue is not empty, there are no other tasks scheduled), so I thought put_nowait was more explicit

join_task = loop.create_task(q.join())
put_task = loop.create_task(q.put("data2"))

await self._ping_awaitable(put_task)
# Ensure put() task is not finished
await self._ensure_started(put_task)
self.assertFalse(put_task.done())

# Perform shut-down
q.shutdown(immediate=False) # unfinished tasks: 1 -> 1

self.assertEqual(q.qsize(), 1)

await self._ping_awaitable(put_task)
# Ensure put() task is finished, and raised ShutDown
await self._ensure_started(put_task)
self.assertTrue(put_task.done())
with self.assertRaisesShutdown():
await put_task

# Ensure get() succeeds on enqueued item
self.assertEqual(await q.get(), "data")

await self._ping_awaitable(join_task)
# Ensure join() task is not finished
await self._ensure_started(join_task)
self.assertFalse(join_task.done())

# Ensure put() and get() raise ShutDown
with self.assertRaisesShutdown():
await q.put("data")
with self.assertRaisesShutdown():
Expand All @@ -603,25 +621,38 @@ async def test_shutdown_nonempty(self):
with self.assertRaisesShutdown():
q.get_nowait()

# Ensure there is 1 unfinished task
q.task_done()
with self.assertRaises(
ValueError, msg="Didn't appear to mark all tasks done"
):
q.task_done()

await self._ping_awaitable(join_task)
# Ensure join() task has successfully finished
await self._ensure_started(join_task)
self.assertTrue(join_task.done())
await join_task

async def test_shutdown_immediate(self):
# Test immediately shutting down a queue

# Setup queue with 1 item, and a join() task
q = self.q_class()
loop = asyncio.get_running_loop()
q.put_nowait("data")
join_task = loop.create_task(q.join())

# Perform shut-down
q.shutdown(immediate=True) # unfinished tasks: 1 -> 0

self.assertEqual(q.qsize(), 0)

await self._ping_awaitable(join_task)
# Ensure join() task has successfully finished
await self._ensure_started(join_task)
self.assertTrue(join_task.done())
await join_task

# Ensure put() and get() raise ShutDown
with self.assertRaisesShutdown():
await q.put("data")
with self.assertRaisesShutdown():
Expand All @@ -632,25 +663,33 @@ async def test_shutdown_immediate(self):
with self.assertRaisesShutdown():
q.get_nowait()

# Ensure there are no unfinished tasks
with self.assertRaises(
ValueError, msg="Didn't appear to mark all tasks done"
):
q.task_done()

async def test_shutdown_immediate_with_unfinished(self):
# Test immediately shutting down a queue with unfinished tasks

# Setup queue with 2 items (1 retrieved), and a join() task
q = self.q_class()
loop = asyncio.get_running_loop()
q.put_nowait("data")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest:

q.put_nowait("data1")
q.put_nowait("data2")

q.put_nowait("data")
join_task = loop.create_task(q.join())
self.assertEqual(await q.get(), "data")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and testing on "data1".

Copy link
Contributor Author
@EpicWink EpicWink Apr 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Except when q is an instance of LifoQueue, when the value should be "data2". Is it necessary to test queue ordering in the shutdown tests?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot the LifoQueue class. Is this worth commenting on?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have implemented your suggestion, but I have not committed. I am considering it. I don't think it's necessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test case (thus including LifoQueue) is implicit. A comment would be enough.


# Perform shut-down
q.shutdown(immediate=True) # unfinished tasks: 2 -> 1

self.assertEqual(q.qsize(), 0)

await self._ping_awaitable(join_task)
# Ensure join() task is not finished
await self._ensure_started(join_task)
self.assertFalse(join_task.done())

# Ensure put() and get() raise ShutDown
with self.assertRaisesShutdown():
await q.put("data")
with self.assertRaisesShutdown():
Expand All @@ -661,13 +700,15 @@ async def test_shutdown_immediate_with_unfinished(self):
with self.assertRaisesShutdown():
q.get_nowait()

# Ensure there is 1 unfinished task
q.task_done()
with self.assertRaises(
ValueError, msg="Didn't appear to mark all tasks done"
):
q.task_done()

await self._ping_awaitable(join_task)
# Ensure join() task has successfully finished
await self._ensure_started(join_task)
self.assertTrue(join_task.done())
await join_task

Expand Down
0