8000 gh-74028: add `buffersize` parameter to `concurrent.futures.Executor.… · python/cpython@a005835 · GitHub
[go: up one dir, main page]

Skip to content

Commit a005835

Browse files
ebonnalpicnixz
andauthored
gh-74028: add buffersize parameter to concurrent.futures.Executor.map for lazier behavior (#125663)
`concurrent.futures.Executor.map` now supports limiting the number of submitted tasks whose results have not yet been yielded via the new `buffersize` parameter. --------- Co-authored-by: Bénédikt Tran <10796600+picnixz@users.noreply.github.com>
1 parent e98d321 commit a005835

File tree

6 files changed

+128
-7
lines changed

6 files changed

+128
-7
lines changed

Doc/library/concurrent.futures.rst

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,14 @@ Executor Objects
4040
future = executor.submit(pow, 323, 1235)
4141
print(future.result())
4242

43-
.. method:: map(fn, *iterables, timeout=None, chunksize=1)
43+
.. method:: map(fn, *iterables, timeout=None, chunksize=1, buffersize=None)
4444

4545
Similar to :func:`map(fn, *iterables) <map>` except:
4646

47-
* the *iterables* are collected immediately rather than lazily;
47+
* The *iterables* are collected immediately rather than lazily, unless a
48+
*buffersize* is specified to limit the number of submitted tasks whose
49+
results have not yet been yielded. If the buffer is full, iteration over
50+
the *iterables* pauses until a result is yielded from the buffer.
4851

4952
* *fn* is executed asynchronously and several calls to
5053
*fn* may be made concurrently.
@@ -68,7 +71,10 @@ Executor Objects
6871
*chunksize* has no effect.
6972

7073
.. versionchanged:: 3.5
71-
Added the *chunksize* argument.
74+
Added the *chunksize* parameter.
75+
76+
.. versionchanged:: next
77+
Added the *buffersize* parameter.
7278

7379
.. method:: shutdown(wait=True, *, cancel_futures=False)
7480

Doc/whatsnew/3.14.rst

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,13 @@ contextvars
465465
* Support context manager protocol by :class:`contextvars.Token`.
466466
(Contributed by Andrew Svetlov in :gh:`129889`.)
467467

468+
* Add the optional ``buffersize`` parameter to
469+
:meth:`concurrent.futures.Executor.map` to limit the number of submitted
470+
tasks whose results have not yet been yielded. If the buffer is full,
471+
iteration over the *iterables* pauses until a result is yielded from the
472+
buffer.
473+
(Contributed by Enzo Bonnal and Josh Rosenberg in :gh:`74028`.)
474+
468475

469476
ctypes
470477
------

Lib/concurrent/futures/_base.py

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
import threading
99
import time
1010
import types
11+
import weakref
12+
from itertools import islice
1113

1214
FIRST_COMPLETED = 'FIRST_COMPLETED'
1315
FIRST_EXCEPTION = 'FIRST_EXCEPTION'
@@ -572,7 +574,7 @@ def submit(self, fn, /, *args, **kwargs):
572574
"""
573575
raise NotImplementedError()
574576

575-
def map(self, fn, *iterables, timeout=None, chunksize=1):
577+
def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None):
576578
"""Returns an iterator equivalent to map(fn, iter).
577579
578580
Args:
@@ -584,6 +586,11 @@ def map(self, fn, *iterables, timeout=None, chunksize=1):
584586
before being passed to a child process. This argument is only
585587
used by ProcessPoolExecutor; it is ignored by
586588
ThreadPoolExecutor.
589+
buffersize: The number of submitted tasks whose results have not
590+
yet been yielded. If the buffer is full, iteration over the
591+
iterables pauses until a result is yielded from the buffer.
592+
If None, all input elements are eagerly collected, and a task is
593+
submitted for each.
587594
588595
Returns:
589596
An iterator equivalent to: map(func, *iterables) but the calls may
@@ -594,10 +601,25 @@ def map(self, fn, *iterables, timeout=None, chunksize=1):
594601
before the given timeout.
595602
Exception: If fn(*args) raises for any values.
596603
"""
604+
if buffersize is not None and not isinstance(buffersize, int):
605+
raise TypeError("buffersize must be an integer or None")
606+
if buffersize is not None and buffersize < 1:
607+
raise ValueError("buffersize must be None or > 0")
608+
597609
if timeout is not None:
598610
end_time = timeout + time.monotonic()
599611

600-
fs = [self.submit(fn, *args) for args in zip(*iterables)]
612+
zipped_iterables = zip(*iterables)
613+
if buffersize:
614+
fs = collections.deque(
615+
self.submit(fn, *args) for args in islice(zipped_iterables, buffersize)
616+
)
617+
else:
618+
fs = [self.submit(fn, *args) for args in zipped_iterables]
619+
620+
# Use a weak reference to ensure that the executor can be garbage
621+
# collected independently of the result_iterator closure.
622+
executor_weakref = weakref.ref(self)
601623

602624
# Yield must be hidden in closure so that the futures are submitted
603625
# before the first iterator value is required.
@@ -606,6 +628,12 @@ def result_iterator():
606628
# reverse to keep finishing order
607629
fs.reverse()
608630
while fs:
631+
if (
632+
buffersize
633+
and (executor := executor_weakref())
634+
and (args := next(zipped_iterables, None))
635+
):
636+
fs.appendleft(executor.submit(fn, *args))
609637
# Careful not to keep a reference to the popped future
610638
if timeout is None:
611639
yield _result_or_cancel(fs.pop())

Lib/concurrent/futures/process.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -813,7 +813,7 @@ def submit(self, fn, /, *args, **kwargs):
813813
return f
814814
submit.__doc__ = _base.Executor.submit.__doc__
815815

816-
def map(self, fn, *iterables, timeout=None, chunksize=1):
816+
def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None):
817817
"""Returns an iterator equivalent to map(fn, iter).
818818
819819
Args:
@@ -824,6 +824,11 @@ def map(self, fn, *iterables, timeout=None, chunksize=1):
824824
chunksize: If greater than one, the iterables will be chopped into
825825
chunks of size chunksize and submitted to the process pool.
826826
If set to one, the items in the list will be sent one at a time.
827+
buffersize: The number of submitted tasks whose results have not
828+
yet been yielded. If the buffer is full, iteration over the
829+
iterables pauses until a result is yielded from the buffer.
830+
If None, all input elements are eagerly collected, and a task is
831+
submitted for each.
827832
828833
Returns:
829834
An iterator equivalent to: map(func, *iterables) but the calls may
@@ -839,7 +844,8 @@ def map(self, fn, *iterables, timeout=None, chunksize=1):
839844

840845
results = super().map(partial(_process_chunk, fn),
841846
itertools.batched(zip(*iterables), chunksize),
842-
timeout=timeout)
847+
timeout=timeout,
848+
buffersize=buffersize)
843849
return _chain_from_iterable_of_lists(results)
844850

845851
def shutdown(self, wait=True, *, cancel_futures=False):

Lib/test/test_concurrent_futures/executor.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
import itertools
12
import threading
23
import time
34
import weakref
45
from concurrent import futures
6+
from operator import add
57
from test import support
68
from test.support import Py_GIL_DISABLED
79

@@ -73,6 +75,74 @@ def test_map_timeout(self):
7375
# take longer than the specified timeout.
7476
self.assertIn(results, ([None, None], [None], []))
7577

78+
def test_map_buffersize_type_validation(self):
79+
for buffersize in ("foo", 2.0):
80+
with self.subTest(buffersize=buffersize):
81+
with self.assertRaisesRegex(
82+
TypeError,
83+
"buffersize must be an integer or None",
84+
):
85+
self.executor.map(str, range(4), buffersize=buffersize)
86+
87+
def test_map_buffersize_value_validation(self):
88+
for buffersize in (0, -1):
89+
with self.subTest(buffersize=buffersize):
90+
with self.assertRaisesRegex(
91+
ValueError,
92+
"buffersize must be None or > 0",
93+
):
94+
self.executor.map(str, range(4), buffersize=buffersize)
95+
96+
def test_map_buffersize(self):
97+
ints = range(4)
98+
for buffersize in (1, 2, len(ints), len(ints) * 2):
99+
with self.subTest(buffersize=buffersize):
100+
res = self.executor.map(str, ints, buffersize=buffersize)
101+
self.assertListEqual(list(res), ["0", "1", "2", "3"])
102+
103+
def test_map_buffersize_on_multiple_iterables(self):
104+
ints = range(4)
105+
for buffersize in (1, 2, len(ints), len(ints) * 2):
106+
with self.subTest(buffersize=buffersize):
107+
res = self.executor.map(add, ints, ints, buffersize=buffersize)
108+
self.assertListEqual(list(res), [0, 2, 4, 6])
109+
110+
def test_map_buffersize_on_infinite_iterable(self):
111+
res = self.executor.map(str, itertools.count(), buffersize=2)
112+
self.assertEqual(next(res, None), "0")
113+
self.assertEqual(next(res, None), "1")
114+
self.assertEqual(next(res, None), "2")
115+
116+
def test_map_buffersize_on_multiple_infinite_iterables(self):
117+
res = self.executor.map(
118+
add,
119+
itertools.count(),
120+
itertools.count(),
121+
buffersize=2
122+
)
123+
self.assertEqual(next(res, None), 0)
124+
self.assertEqual(next(res, None), 2)
125+
self.assertEqual(next(res, None), 4)
126+
127+
def test_map_buffersize_on_empty_iterable(self):
128+
res = self.executor.map(str, [], buffersize=2)
129+
self.assertIsNone(next(res, None))
130+
131+
def test_map_buffersize_without_iterable(self):
132+
res = self.executor.map(str, buffersize=2)
133+
self.assertIsNone(next(res, None))
134+
135+
def test_map_buffersize_when_buffer_is_full(self):
136+
ints = iter(range(4))
137+
buffersize = 2
138+
self.executor.map(str, ints, buffersize=buffersize)
139+
self.executor.shutdown(wait=True) # wait for tasks to complete
140+
99FF self.assertEqual(
141+
next(ints),
142+
buffersize,
143+
msg="should have fetched only `buffersize` elements from `ints`.",
144+
)
145+
76146
def test_shutdown_race_issue12456(self):
77147
# Issue #12456: race condition at shutdown where trying to post a
78148
# sentinel in the call queue blocks (the queue is full while processes
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
Add the optional ``buffersize`` parameter to
2+
:meth:`concurrent.futures.Executor.map` to limit the number of submitted tasks
3+
whose results have not yet been yielded. If the buffer is full, iteration over
4+
the *iterables* pauses until a result is yielded from the buffer.

0 commit comments

Comments
 (0)
0