Skip to content

Commit 4aae276

Browse files
committed
Issue #11271: concurrent.futures.Executor.map() now takes a *chunksize*
argument to allow batching of tasks in child processes and improve performance of ProcessPoolExecutor. Patch by Dan O'Reilly.
1 parent e4f4708 commit 4aae276

4 files changed

Lines changed: 83 additions & 3 deletions

File tree

Doc/library/concurrent.futures.rst

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ Executor Objects
3838
future = executor.submit(pow, 323, 1235)
3939
print(future.result())
4040

41-
.. method:: map(func, *iterables, timeout=None)
41+
.. method:: map(func, *iterables, timeout=None, chunksize=1)
4242

4343
Equivalent to :func:`map(func, *iterables) <map>` except *func* is executed
4444
asynchronously and several calls to *func* may be made concurrently. The
@@ -48,7 +48,16 @@ Executor Objects
4848
*timeout* can be an int or a float. If *timeout* is not specified or
4949
``None``, there is no limit to the wait time. If a call raises an
5050
exception, then that exception will be raised when its value is
51-
retrieved from the iterator.
51+
retrieved from the iterator. When using :class:`ProcessPoolExecutor`, this
52+
method chops *iterables* into a number of chunks which it submits to the
53+
pool as separate tasks. The (approximate) size of these chunks can be
54+
specified by setting *chunksize* to a positive integer. For very long
55+
iterables, using a large value for *chunksize* can significantly improve
56+
performance compared to the default size of 1. With :class:`ThreadPoolExecutor`,
57+
*chunksize* has no effect.
58+
59+
.. versionchanged:: 3.5
60+
Added the *chunksize* argument.
5261

5362
.. method:: shutdown(wait=True)
5463

Lib/concurrent/futures/_base.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -520,14 +520,18 @@ def submit(self, fn, *args, **kwargs):
520520
"""
521521
raise NotImplementedError()
522522

523-
def map(self, fn, *iterables, timeout=None):
523+
def map(self, fn, *iterables, timeout=None, chunksize=1):
524524
"""Returns a iterator equivalent to map(fn, iter).
525525
526526
Args:
527527
fn: A callable that will take as many arguments as there are
528528
passed iterables.
529529
timeout: The maximum number of seconds to wait. If None, then there
530530
is no limit on the wait time.
531+
chunksize: The size of the chunks the iterable will be broken into
532+
before being passed to a child process. This argument is only
533+
used by ProcessPoolExecutor; it is ignored by
534+
ThreadPoolExecutor.
531535
532536
Returns:
533537
An iterator equivalent to: map(func, *iterables) but the calls may

Lib/concurrent/futures/process.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@
5555
from multiprocessing.connection import wait
5656
import threading
5757
import weakref
58+
from functools import partial
59+
import itertools
5860

5961
# Workers are created as daemon threads and processes. This is done to allow the
6062
# interpreter to exit when there are still idle processes in a
@@ -108,6 +110,26 @@ def __init__(self, work_id, fn, args, kwargs):
108110
self.args = args
109111
self.kwargs = kwargs
110112

113+
def _get_chunks(*iterables, chunksize):
114+
""" Iterates over zip()ed iterables in chunks. """
115+
it = zip(*iterables)
116+
while True:
117+
chunk = tuple(itertools.islice(it, chunksize))
118+
if not chunk:
119+
return
120+
yield chunk
121+
122+
def _process_chunk(fn, chunk):
123+
""" Processes a chunk of an iterable passed to map.
124+
125+
Runs the function passed to map() on a chunk of the
126+
iterable passed to map.
127+
128+
This function is run in a separate process.
129+
130+
"""
131+
return [fn(*args) for args in chunk]
132+
111133
def _process_worker(call_queue, result_queue):
112134
"""Evaluates calls from call_queue and places the results in result_queue.
113135
@@ -411,6 +433,35 @@ def submit(self, fn, *args, **kwargs):
411433
return f
412434
submit.__doc__ = _base.Executor.submit.__doc__
413435

436+
def map(self, fn, *iterables, timeout=None, chunksize=1):
437+
"""Returns a iterator equivalent to map(fn, iter).
438+
439+
Args:
440+
fn: A callable that will take as many arguments as there are
441+
passed iterables.
442+
timeout: The maximum number of seconds to wait. If None, then there
443+
is no limit on the wait time.
444+
chunksize: If greater than one, the iterables will be chopped into
445+
chunks of size chunksize and submitted to the process pool.
446+
If set to one, the items in the list will be sent one at a time.
447+
448+
Returns:
449+
An iterator equivalent to: map(func, *iterables) but the calls may
450+
be evaluated out-of-order.
451+
452+
Raises:
453+
TimeoutError: If the entire result iterator could not be generated
454+
before the given timeout.
455+
Exception: If fn(*args) raises for any values.
456+
"""
457+
if chunksize < 1:
458+
raise ValueError("chunksize must be >= 1.")
459+
460+
results = super().map(partial(_process_chunk, fn),
461+
_get_chunks(*iterables, chunksize=chunksize),
462+
timeout=timeout)
463+
return itertools.chain.from_iterable(results)
464+
414465
def shutdown(self, wait=True):
415466
with self._shutdown_lock:
416467
self._shutdown_thread = True

Lib/test/test_concurrent_futures.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -464,6 +464,22 @@ def test_killed_child(self):
464464
# Submitting other jobs fails as well.
465465
self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
466466

467+
def test_map_chunksize(self):
468+
def bad_map():
469+
list(self.executor.map(pow, range(40), range(40), chunksize=-1))
470+
471+
ref = list(map(pow, range(40), range(40)))
472+
self.assertEqual(
473+
list(self.executor.map(pow, range(40), range(40), chunksize=6)),
474+
ref)
475+
self.assertEqual(
476+
list(self.executor.map(pow, range(40), range(40), chunksize=50)),
477+
ref)
478+
self.assertEqual(
479+
list(self.executor.map(pow, range(40), range(40), chunksize=40)),
480+
ref)
481+
self.assertRaises(ValueError, bad_map)
482+
467483

468484
class FutureTests(unittest.TestCase):
469485
def test_done_callback_with_result(self):

0 commit comments

Comments
 (0)