|
55 | 55 | from multiprocessing.connection import wait |
56 | 56 | import threading |
57 | 57 | import weakref |
| 58 | +from functools import partial |
| 59 | +import itertools |
58 | 60 |
|
59 | 61 | # Workers are created as daemon threads and processes. This is done to allow the |
60 | 62 | # interpreter to exit when there are still idle processes in a |
@@ -108,6 +110,26 @@ def __init__(self, work_id, fn, args, kwargs): |
108 | 110 | self.args = args |
109 | 111 | self.kwargs = kwargs |
110 | 112 |
|
| 113 | +def _get_chunks(*iterables, chunksize): |
| 114 | + """ Iterates over zip()ed iterables in chunks. """ |
| 115 | + it = zip(*iterables) |
| 116 | + while True: |
| 117 | + chunk = tuple(itertools.islice(it, chunksize)) |
| 118 | + if not chunk: |
| 119 | + return |
| 120 | + yield chunk |
| 121 | + |
| 122 | +def _process_chunk(fn, chunk): |
| 123 | + """ Processes a chunk of an iterable passed to map. |
| 124 | +
|
| 125 | + Runs the function passed to map() on a chunk of the |
| 126 | + iterable passed to map. |
| 127 | +
|
| 128 | + This function is run in a separate process. |
| 129 | +
|
| 130 | + """ |
| 131 | + return [fn(*args) for args in chunk] |
| 132 | + |
111 | 133 | def _process_worker(call_queue, result_queue): |
112 | 134 | """Evaluates calls from call_queue and places the results in result_queue. |
113 | 135 |
|
@@ -411,6 +433,35 @@ def submit(self, fn, *args, **kwargs): |
411 | 433 | return f |
412 | 434 | submit.__doc__ = _base.Executor.submit.__doc__ |
413 | 435 |
|
| 436 | + def map(self, fn, *iterables, timeout=None, chunksize=1): |
| 437 | + """Returns a iterator equivalent to map(fn, iter). |
| 438 | +
|
| 439 | + Args: |
| 440 | + fn: A callable that will take as many arguments as there are |
| 441 | + passed iterables. |
| 442 | + timeout: The maximum number of seconds to wait. If None, then there |
| 443 | + is no limit on the wait time. |
| 444 | + chunksize: If greater than one, the iterables will be chopped into |
| 445 | + chunks of size chunksize and submitted to the process pool. |
| 446 | + If set to one, the items in the list will be sent one at a time. |
| 447 | +
|
| 448 | + Returns: |
| 449 | + An iterator equivalent to: map(func, *iterables) but the calls may |
| 450 | + be evaluated out-of-order. |
| 451 | +
|
| 452 | + Raises: |
| 453 | + TimeoutError: If the entire result iterator could not be generated |
| 454 | + before the given timeout. |
| 455 | + Exception: If fn(*args) raises for any values. |
| 456 | + """ |
| 457 | + if chunksize < 1: |
| 458 | + raise ValueError("chunksize must be >= 1.") |
| 459 | + |
| 460 | + results = super().map(partial(_process_chunk, fn), |
| 461 | + _get_chunks(*iterables, chunksize=chunksize), |
| 462 | + timeout=timeout) |
| 463 | + return itertools.chain.from_iterable(results) |
| 464 | + |
414 | 465 | def shutdown(self, wait=True): |
415 | 466 | with self._shutdown_lock: |
416 | 467 | self._shutdown_thread = True |
|
0 commit comments