Skip to content
Open
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
ccd0bbc
draft: impl lazy input consumption in mp.Pool.imap(_unordered)
Jul 20, 2025
002ef46
Use semaphore to synchronize threads
Jul 20, 2025
6e0bc58
Update buffersize behavior to match concurrent.futures.Executor behavior
Jul 21, 2025
62b2b6a
Release all `buffersize_lock` obj from the parent thread when terminate
Jul 21, 2025
0b6ba41
Add 2 basic `ThreadPool.imap()` tests w/ and w/o buffersize
Jul 21, 2025
aade15e
Fix accidental swap in imports
Jul 21, 2025
fb38a72
clear Pool._taskqueue_buffersize_semaphores safely
Jul 21, 2025
6ef488b
Slightly optimize Pool._taskqueue_buffersize_semaphores terminate
Jul 21, 2025
1716725
Rename `Pool.imap()` buffersize-related tests
Jul 21, 2025
9b43cd0
Fix typo in `IMapIterator.__init__()`
Jul 22, 2025
2d89341
Add tests for buffersize combinations with other kwargs
Jul 22, 2025
9ab2705
Remove if-branch in `_terminate_pool`
Jul 27, 2025
a955003
Add more edge-case tests for `imap` and `imap_unodered`
Jul 27, 2025
80efd6e
Split inf iterable test for `imap` and `imap_unordered`
Jul 27, 2025
83d6930
Add doc for `buffersize` argument of `imap` and `imap_unordered`
Jul 27, 2025
995ad8c
add *versionadded* for `imap_unordered`
Jul 28, 2025
3b6ad65
Remove ambiguity in `buffersize` description.
Jul 28, 2025
c941c16
Set *versionadded* as next in docs
Jul 28, 2025
d09e891
Add whatsnew entry
Jul 28, 2025
9c6d89d
Fix aggreed comments on code formatting/minor refactoring
Jul 28, 2025
4550a01
Remove `imap` and `imap_unordered` body code duplication
Jul 28, 2025
77bde4d
Merge branch 'main' into feature/add-buffersize-to-multiprocessing
obaltian Aug 31, 2025
aec39fc
Merge branch 'main' into feature/add-buffersize-to-multiprocessing
obaltian Sep 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Release all buffersize_lock obj from the parent thread when terminate
  • Loading branch information
Oleksandr Baltian authored and obaltian committed Aug 14, 2025
commit 62b2b6a9b50f3bdfb4a8930956ffa7c9afe2c29c
104 changes: 51 additions & 53 deletions Lib/multiprocessing/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@ def __init__(self, processes=None, initializer=None, initargs=(),
self._ctx = context or get_context()
self._setup_queues()
self._taskqueue = queue.SimpleQueue()
# The _taskqueue_buffersize_semaphores exist to allow calling .release()
# on every active semaphore when the pool is terminating to let task_handler
# wake up to stop. It's a dict so that each iterator object can efficiently
# deregister its semaphore when iterator finishes.
self._taskqueue_buffersize_semaphores = {}
# The _change_notifier queue exist to wake up self._handle_workers()
# when the cache (self._cache) is empty or when there is a change in
# the _state variable of the thread that runs _handle_workers.
Expand Down Expand Up @@ -257,7 +262,8 @@ def __init__(self, processes=None, initializer=None, initargs=(),
self, self._terminate_pool,
args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
self._change_notifier, self._worker_handler, self._task_handler,
self._result_handler, self._cache),
self._result_handler, self._cache,
self._taskqueue_buffersize_semaphores),
exitpriority=15
)
self._state = RUN
Expand Down Expand Up @@ -383,33 +389,27 @@ def starmap_async(self, func, iterable, chunksize=None, callback=None,
return self._map_async(func, iterable, starmapstar, chunksize,
callback, error_callback)

def _guarded_task_generation(self, result_job, func, iterable):
def _guarded_task_generation(self, result_job, func, iterable,
buffersize_sema=None):
Comment thread
obaltian marked this conversation as resolved.
Outdated
'''Provides a generator of tasks for imap and imap_unordered with
appropriate handling for iterables which throw exceptions during
iteration.'''
try:
i = -1
for i, x in enumerate(iterable):
yield (result_job, i, func, (x,), {})

except Exception as e:
yield (result_job, i+1, _helper_reraises_exception, (e,), {})
if buffersize_sema is None:
for i, x in enumerate(iterable):
yield (result_job, i, func, (x,), {})

def _guarded_task_generation_lazy(self, result_job, func, iterable,
backpressure_sema):
"""Provides a generator of tasks for imap and imap_unordered with
appropriate handling for iterables which throw exceptions during
iteration."""
try:
i = -1
enumerated_iter = iter(enumerate(iterable))
while True:
backpressure_sema.acquire()
try:
i, x = next(enumerated_iter)
except StopIteration:
break
yield (result_job, i, func, (x,), {})
else:
enumerated_iter = iter(enumerate(iterable))
while True:
buffersize_sema.acquire()
try:
i, x = next(enumerated_iter)
except StopIteration:
break
yield (result_job, i, func, (x,), {})

except Exception as e:
yield (result_job, i+1, _helper_reraises_exception, (e,), {})
Expand All @@ -428,19 +428,11 @@ def imap(self, func, iterable, chunksize=1, buffersize=None):
raise ValueError("buffersize must be None or > 0")

result = IMapIterator(self, buffersize)

if result._backpressure_sema is None:
task_generation = self._guarded_task_generation
else:
task_generation = functools.partial(
self._guarded_task_generation_lazy,
backpressure_sema=result._backpressure_sema,
)

if chunksize == 1:
self._taskqueue.put(
(
task_generation(result._job, func, iterable),
self._guarded_task_generation(result._job, func, iterable,
result._buffersize_sema),
result._set_length,
)
)
Expand All @@ -449,7 +441,8 @@ def imap(self, func, iterable, chunksize=1, buffersize=None):
task_batches = Pool._get_tasks(func, iterable, chunksize)
self._taskqueue.put(
(
task_generation(result._job, mapstar, task_batches),
self._guarded_task_generation(result._job, mapstar, task_batches,
Comment thread
obaltian marked this conversation as resolved.
Outdated
result._buffersize_sema),
result._set_length,
)
)
Expand All @@ -471,19 +464,11 @@ def imap_unordered(self, func, iterable, chunksize=1, buffersize=None):
raise ValueError("buffersize must be None or > 0")

result = IMapUnorderedIterator(self, buffersize)

if result._backpressure_sema is None:
task_generation = self._guarded_task_generation
else:
task_generation = functools.partial(
self._guarded_task_generation_lazy,
backpressure_sema=result._backpressure_sema,
)

if chunksize == 1:
self._taskqueue.put(
(
task_generation(result._job, func, iterable),
self._guarded_task_generation(result._job, func, iterable,
result._buffersize_sema),
result._set_length,
)
)
Expand All @@ -492,7 +477,8 @@ def imap_unordered(self, func, iterable, chunksize=1, buffersize=None):
task_batches = Pool._get_tasks(func, iterable, chunksize)
self._taskqueue.put(
(
task_generation(result._job, mapstar, task_batches),
self._guarded_task_generation(result._job, mapstar, task_batches,
result._buffersize_sema),
result._set_length,
)
)
Expand Down Expand Up @@ -727,7 +713,8 @@ def _help_stuff_finish(inqueue, task_handler, size):

@classmethod
def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier,
worker_handler, task_handler, result_handler, cache):
worker_handler, task_handler, result_handler, cache,
taskqueue_buffersize_semaphores):
# this is guaranteed to only be called once
util.debug('finalizing pool')

Expand All @@ -738,6 +725,10 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier,
change_notifier.put(None)

task_handler._state = TERMINATE
# Release all semaphores to wake up task_handler to stop.
for job_id, sema in tuple(taskqueue_buffersize_semaphores.items()):
taskqueue_buffersize_semaphores.pop(job_id)
sema.release()

util.debug('helping task handler/workers to finish')
cls._help_stuff_finish(inqueue, task_handler, len(pool))
Expand Down Expand Up @@ -893,11 +884,13 @@ def __init__(self, pool, buffersize):
self._length = None
self._unsorted = {}
self._cache[self._job] = self

if buffersize is None:
self._backpressure_sema = None
self._buffersize_sema = None
else:
self._backpressure_sema = threading.Semaphore(buffersize)
self._buffersize_sema = threading.Semaphore(buffersize)
self._pool._taskqueue_buffersize_semaphores[self] = (
self._buffersize_sema
)

def __iter__(self):
return self
Expand All @@ -908,25 +901,30 @@ def next(self, timeout=None):
item = self._items.popleft()
except IndexError:
if self._index == self._length:
self._pool = None
raise StopIteration from None
self._stop_iterator()
self._cond.wait(timeout)
try:
item = self._items.popleft()
except IndexError:
if self._index == self._length:
self._pool = None
raise StopIteration from None
self._stop_iterator()
raise TimeoutError from None

if self._backpressure_sema is not None:
self._backpressure_sema.release()
if self._buffersize_sema is not None:
self._buffersize_sema.release()

success, value = item
if success:
return value
raise value

def _stop_iterator(self):
if self._pool is not None:
# could be deleted in previous `.next()` calls
self._pool._taskqueue_buffersize_semaphores.pop(self._job)
self._pool = None
raise StopIteration from None

__next__ = next # XXX

def _set(self, i, obj):
Expand Down