8000 gh-77714: Provide an async iterator version of as_completed (GH-22491) · python/cpython@c741ad3 · GitHub
[go: up one dir, main page]

Skip to content

Commit c741ad3

Browse files
JustinTArthurserhiy-storchakagvanrossum
authored
gh-77714: Provide an async iterator version of as_completed (GH-22491)
* as_completed returns object that is both iterator and async iterator * Existing tests adjusted to test both the old and new style * New test to ensure iterator can be resumed * New test to ensure async iterator yields any passed-in Futures as-is Co-authored-by: Serhiy Storchaka <storchaka@gmail.com> Co-authored-by: Guido van Rossum <gvanrossum@gmail.com>
1 parent ddf814d commit c741ad3

File tree

5 files changed

+387
-120
lines changed

5 files changed

+387
-120
lines changed

Doc/library/asyncio-task.rst

Lines changed: 48 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -867,19 +867,50 @@ Waiting Primitives
867867

868868
.. function:: as_completed(aws, *, timeout=None)
869869

870-
Run :ref:`awaitable objects <asyncio-awaitables>` in the *aws*
871-
iterable concurrently. Return an iterator of coroutines.
872-
Each coroutine returned can be awaited to get the earliest next
873-
result from the iterable of the remaining awaitables.
874-
875-
Raises :exc:`TimeoutError` if the timeout occurs before
876-
all Futures are done.
877-
878-
Example::
879-
880-
for coro in as_completed(aws):
881-
earliest_result = await coro
882-
# ...
870+
Run :ref:`awaitable objects <asyncio-awaitables>` in the *aws* iterable
871+
concurrently. The returned object can be iterated to obtain the results
872+
of the awaitables as they finish.
873+
874+
The object returned by ``as_completed()`` can be iterated as an
875+
:term:`asynchronous iterator` or a plain :term:`iterator`. When asynchronous
876+
iteration is used, the originally-supplied awaitables are yielded if they
877+
are tasks or futures. This makes it easy to correlate previously-scheduled
878+
tasks with their results. Example::
879+
880+
ipv4_connect = create_task(open_connection("127.0.0.1", 80))
881+
ipv6_connect = create_task(open_connection("::1", 80))
882+
tasks = [ipv4_connect, ipv6_connect]
883+
884+
async for earliest_connect in as_completed(tasks):
885+
# earliest_connect is done. The result can be obtained by
886+
# awaiting it or calling earliest_connect.result()
887+
reader, writer = await earliest_connect
888+
889+
if earliest_connect is ipv6_connect:
890+
print("IPv6 connection established.")
891+
else:
892+
print("IPv4 connection established.")
893+
894+
During asynchronous iteration, implicitly-created tasks will be yielded for
895+
supplied awaitables that aren't tasks or futures.
896+
897+
When used as a plain iterator, each iteration yields a new coroutine that
898+
returns the result or raises the exception of the next completed awaitable.
899+
This pattern is compatible with Python versions older than 3.13::
900+
901+
ipv4_connect = create_task(open_connection("127.0.0.1", 80))
902+
ipv6_connect = create_task(open_connection("::1", 80))
903+
tasks = [ipv4_connect, ipv6_connect]
904+
905+
for next_connect in as_completed(tasks):
906+
# next_connect is not one of the original task objects. It must be
907+
# awaited to obtain the result value or raise the exception of the
908+
# awaitable that finishes next.
909+
reader, writer = await next_connect
910+
911+
A :exc:`TimeoutError` is raised if the timeout occurs before all awaitables
912+
are done. This is raised by the ``async for`` loop during asynchronous
913+
iteration or by the coroutines yielded during plain iteration.
883914

884915
.. versionchanged:: 3.10
885916
Removed the *loop* parameter.
@@ -891,6 +922,10 @@ Waiting Primitives
891922
.. versionchanged:: 3.12
892923
Added support for generators yielding tasks.
893924

925+
.. versionchanged:: 3.13
926+
The result can now be used as either an :term:`asynchronous iterator`
927+
or as a plain :term:`iterator` (previously it was only a plain iterator).
928+
894929

895930
Running in Threads
896931
==================

Doc/whatsnew/3.13.rst

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,13 @@ asyncio
289289
forcefully close an asyncio server.
290290
(Contributed by Pierre Ossman in :gh:`113538`.)
291291

292+
* :func:`asyncio.as_completed` now returns an object that is both an
293+
:term:`asynchronous iterator` and a plain :term:`iterator` of awaitables.
294+
The awaitables yielded by asynchronous iteration include original task or
295+
future objects that were passed in, making it easier to associate results
296+
with the tasks being completed.
297+
(Contributed by Justin Arthur in :gh:`77714`.)
298+
292299
base64
293300
------
294301

Lib/asyncio/tasks.py

Lines changed: 108 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from . import events
2626
from . import exceptions
2727
from . import futures
28+
from . import queues
2829
from . import timeouts
2930

3031
# Helper to generate new task names
@@ -564,62 +565,125 @@ async def _cancel_and_wait(fut):
564565
fut.remove_done_callback(cb)
565566

566567

567-
# This is *not* a @coroutine! It is just an iterator (yielding Futures).
568+
class _AsCompletedIterator:
569+
"""Iterator of awaitables representing tasks of asyncio.as_completed.
570+
571+
As an asynchronous iterator, iteration yields futures as they finish. As a
572+
plain iterator, new coroutines are yielded that will return or raise the
573+
result of the next underlying future to complete.
574+
"""
575+
def __init__(self, aws, timeout):
576+
self._done = queues.Queue()
577+
self._timeout_handle = None
578+
579+
loop = events.get_event_loop()
580+
todo = {ensure_future(aw, loop=loop) for aw in set(aws)}
581+
for f in todo:
582+
f.add_done_callback(self._handle_completion)
583+
if todo and timeout is not None:
584+
self._timeout_handle = (
585+
loop.call_later(timeout, self._handle_timeout)
586+
)
587+
self._todo = todo
588+
self._todo_left = len(todo)
589+
590+
def __aiter__(self):
591+
return self
592+
593+
def __iter__(self):
594+
return self
595+
596+
async def __anext__(self):
597+
if not self._todo_left:
598+
raise StopAsyncIteration
599+
assert self._todo_left > 0
600+
self._todo_left -= 1
601+
return await self._wait_for_one()
602+
603+
def __next__(self):
604+
if not self._todo_left:
605+
raise StopIteration
606+
assert self._todo_left > 0
607+
self._todo_left -= 1
608+
return self._wait_for_one(resolve=True)
609+
610+
def _handle_timeout(self):
611+
for f in self._todo:
612+
f.remove_done_callback(self._handle_completion)
613+
self._done.put_nowait(None) # Sentinel for _wait_for_one().
614+
self._todo.clear() # Can't do todo.remove(f) in the loop.
615+
616+
def _handle_completion(self, f):
617+
if not self._todo:
618+
return # _handle_timeout() was here first.
619+
self._todo.remove(f)
620+
self._done.put_nowait(f)
621+
if not self._todo and self._timeout_handle is not None:
622+
self._timeout_handle.cancel()
623+
624+
async def _wait_for_one(self, resolve=False):
625+
# Wait for the next future to be done and return it unless resolve is
626+
# set, in which case return either the result of the future or raise
627+
# an exception.
628+
f = await self._done.get()
629+
if f is None:
630+
# Dummy value from _handle_timeout().
631+
raise exceptions.TimeoutError
632+
return f.result() if resolve else f
633+
634+
568635
def as_completed(fs, *, timeout=None):
569-
"""Return an iterator whose values are coroutines.
636+
"""Create an iterator of awaitables or their results in completion order.
570637
571-
When waiting for the yielded coroutines you'll get the results (or
572-
exceptions!) of the original Futures (or coroutines), in the order
573-
in which and as soon as they complete.
638+
Run the supplied awaitables concurrently. The returned object can be
639+
iterated to obtain the results of the awaitables as they finish.
574640
575-
This differs from PEP 3148; the proper way to use this is:
641+
The object returned can be iterated as an asynchronous iterator or a plain
642+
iterator. When asynchronous iteration is used, the originally-supplied
643+
awaitables are yielded if they are tasks or futures. This makes it easy to
644+
correlate previously-scheduled tasks with their results:
576645
577-
for f in as_completed(fs):
578-
result = await f # The 'await' may raise.
579-
# Use result.
646+
ipv4_connect = create_task(open_connection("127.0.0.1", 80))
647+
ipv6_connect = create_task(open_connection("::1", 80))
648+
tasks = [ipv4_connect, ipv6_connect]
580649
581-
If a timeout is specified, the 'await' will raise
582-
TimeoutError when the timeout occurs before all Futures are done.
650+
async for earliest_connect in as_completed(tasks):
651+
# earliest_connect is done. The result can be obtained by
652+
# awaiting it or calling earliest_connect.result()
653+
reader, writer = await earliest_connect
583654
584-
Note: The futures 'f' are not necessarily members of fs.
585-
"""
586-
if futures.isfuture(fs) or coroutines.iscoroutine(fs):
587-
raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}")
655+
if earliest_connect is ipv6_connect:
656+
print("IPv6 connection established.")
657+
else:
658+
print("IPv4 connection established.")
588659
589-
from .queues import Queue # Import here to avoid circular import problem.
590-
done = Queue()
660+
During asynchronous iteration, implicitly-created tasks will be yielded for
661+
supplied awaitables that aren't tasks or futures.
591662
592-
loop = events.get_event_loop()
593-
todo = {ensure_future(f, loop=loop) for f in set(fs)}
594-
timeout_handle = None
663+
When used as a plain iterator, each iteration yields a new coroutine that
664+
returns the result or raises the exception of the next completed awaitable.
665+
This pattern is compatible with Python versions older than 3.13:
595666
596-
def _on_timeout():
597-
for f in todo:
598-
f.remove_done_callback(_on_completion)
599-
done.put_nowait(None) # Queue a dummy value for _wait_for_one().
600-
todo.clear() # Can't do todo.remove(f) in the loop.
667+
ipv4_connect = create_task(open_connection("127.0.0.1", 80))
668+
ipv6_connect = create_task(open_connection("::1", 80))
669+
tasks = [ipv4_connect, ipv6_connect]
601670
602-
def _on_completion(f):
603-
if not todo:
604-
return # _on_timeout() was here first.
605-
todo.remove(f)
606-
done.put_nowait(f)
607-
if not todo and timeout_handle is not None:
608-
timeout_handle.cancel()
671+
for next_connect in as_completed(tasks):
672+
# next_connect is not one of the original task objects. It must be
673+
# awaited to obtain the result value or raise the exception of the
674+
# awaitable that finishes next.
675+
reader, writer = await next_connect
609676
610-
async def _wait_for_one():
611-
f = await done.get()
612-
if f is None:
613-
# Dummy value from _on_timeout().
614-
raise exceptions.TimeoutError
615-
return f.result() # May raise f.exception().
677+
A TimeoutError is raised if the timeout occurs before all awaitables are
678+
done. This is raised by the async for loop during asynchronous iteration or
679+
by the coroutines yielded during plain iteration.
680+
"""
681+
if inspect.isawaitable(fs):
682+
raise TypeError(
683+
f"expects an iterable of awaitables, not {type(fs).__name__}"
684+
)
616685

617-
for f in todo:
618-
f.add_done_callback(_on_completion)
619-
if todo and timeout is not None:
620-
timeout_handle = loop.call_later(timeout, _on_timeout)
621-
for _ in range(len(todo)):
622-
yield _wait_for_one()
686+
return _AsCompletedIterator(fs, timeout)
623687

624688

625689
@types.coroutine

0 commit comments

Comments
 (0)
0