Skip to content
Open
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
ccd0bbc
draft: impl lazy input consumption in mp.Pool.imap(_unordered)
Jul 20, 2025
002ef46
Use semaphore to synchronize threads
Jul 20, 2025
6e0bc58
Update buffersize behavior to match concurrent.futures.Executor behavior
Jul 21, 2025
62b2b6a
Release all `buffersize_lock` obj from the parent thread when terminate
Jul 21, 2025
0b6ba41
Add 2 basic `ThreadPool.imap()` tests w/ and w/o buffersize
Jul 21, 2025
aade15e
Fix accidental swap in imports
Jul 21, 2025
fb38a72
clear Pool._taskqueue_buffersize_semaphores safely
Jul 21, 2025
6ef488b
Slightly optimize Pool._taskqueue_buffersize_semaphores terminate
Jul 21, 2025
1716725
Rename `Pool.imap()` buffersize-related tests
Jul 21, 2025
9b43cd0
Fix typo in `IMapIterator.__init__()`
Jul 22, 2025
2d89341
Add tests for buffersize combinations with other kwargs
Jul 22, 2025
9ab2705
Remove if-branch in `_terminate_pool`
Jul 27, 2025
a955003
Add more edge-case tests for `imap` and `imap_unodered`
Jul 27, 2025
80efd6e
Split inf iterable test for `imap` and `imap_unordered`
Jul 27, 2025
83d6930
Add doc for `buffersize` argument of `imap` and `imap_unordered`
Jul 27, 2025
995ad8c
add *versionadded* for `imap_unordered`
Jul 28, 2025
3b6ad65
Remove ambiguity in `buffersize` description.
Jul 28, 2025
c941c16
Set *versionadded* as next in docs
Jul 28, 2025
d09e891
Add whatsnew entry
Jul 28, 2025
9c6d89d
Fix aggreed comments on code formatting/minor refactoring
Jul 28, 2025
4550a01
Remove `imap` and `imap_unordered` body code duplication
Jul 28, 2025
77bde4d
Merge branch 'main' into feature/add-buffersize-to-multiprocessing
obaltian Aug 31, 2025
aec39fc
Merge branch 'main' into feature/add-buffersize-to-multiprocessing
obaltian Sep 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add more edge-case tests for imap and imap_unodered
These tests mostly come from a similar PR adding `buffersize` param
to `concurrent.futures.Executor.map` -
https://github.com/python/cpython/pull/125663/files
  • Loading branch information
Oleksandr Baltian authored and obaltian committed Aug 14, 2025
commit a95500340fe193e84a31c13168763737dbb7d4f7
180 changes: 122 additions & 58 deletions Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2953,64 +2953,6 @@ def test_imap(self):
self.assertEqual(next(it), i * i)
self.assertRaises(StopIteration, it.__next__)

def test_imap_fast_iterable_with_slow_task(self):
if self.TYPE != "threads":
self.skipTest("test not appropriate for {}".format(self.TYPE))

processes = 4
p = self.Pool(processes)

tasks_started_later = 2
last_produced_task_arg = Value("i")

def produce_args():
for arg in range(1, processes + tasks_started_later + 1):
last_produced_task_arg.value = arg
yield arg

it = p.imap(functools.partial(sqr, wait=0.2), produce_args())

next(it)
time.sleep(0.2)
# `iterable` should've been advanced only up by `processes` times,
# but in fact advances further (by `>=processes+1`).
# In this case, it advances to the maximum value.
self.assertGreater(last_produced_task_arg.value, processes + 1)

p.terminate()
p.join()

def test_imap_fast_iterable_with_slow_task_and_buffersize(self):
if self.TYPE != "threads":
self.skipTest("test not appropriate for {}".format(self.TYPE))

processes = 4
p = self.Pool(processes)

tasks_started_later = 2
last_produced_task_arg = Value("i")

def produce_args():
for arg in range(1, processes + tasks_started_later + 1):
last_produced_task_arg.value = arg
yield arg

it = p.imap(
functools.partial(sqr, wait=0.2),
produce_args(),
buffersize=processes,
)

time.sleep(0.2)
self.assertEqual(last_produced_task_arg.value, processes)

next(it)
time.sleep(0.2)
self.assertEqual(last_produced_task_arg.value, processes + 1)

p.terminate()
p.join()

def test_imap_handle_iterable_exception(self):
if self.TYPE == 'manager':
self.skipTest('test not appropriate for {}'.format(self.TYPE))
Expand Down Expand Up @@ -3101,6 +3043,128 @@ def test_imap_unordered_handle_iterable_exception(self):
self.assertIn(value, expected_values)
expected_values.remove(value)

def test_imap_and_imap_unordered_buffersize_type_validation(self):
for method_name in ("imap", "imap_unordered"):
for buffersize in ("foo", 2.0):
with (
self.subTest(method=method_name, buffersize=buffersize),
self.assertRaisesRegex(
TypeError, "buffersize must be an integer or None"
),
):
method = getattr(self.pool, method_name)
method(str, range(4), buffersize=buffersize)

def test_imap_and_imap_unordered_buffersize_value_validation(self):
for method_name in ("imap", "imap_unordered"):
for buffersize in (0, -1):
with (
self.subTest(method=method_name, buffersize=buffersize),
self.assertRaisesRegex(
ValueError, "buffersize must be None or > 0"
),
):
method = getattr(self.pool, method_name)
method(str, range(4), buffersize=buffersize)

def test_imap_and_imap_unordered_when_buffer_is_full(self):
if self.TYPE != "threads":
self.skipTest("test not appropriate for {}".format(self.TYPE))

for method_name in ("imap", "imap_unordered"):
with self.subTest(method=method_name):
processes = 4
p = self.Pool(processes)
last_produced_task_arg = Value("i")

def produce_args():
for arg in itertools.count(1):
last_produced_task_arg.value = arg
yield arg

method = getattr(p, method_name)
it = method(functools.partial(sqr, wait=0.2), produce_args())

time.sleep(0.2)
# `iterable` could've been advanced only `processes` times,
# but in fact it advances further (`> processes`) because of
# not waiting for workers or user code to catch up.
self.assertGreater(last_produced_task_arg.value, processes)

next(it)
time.sleep(0.2)
self.assertGreater(last_produced_task_arg.value, processes + 1)

next(it)
time.sleep(0.2)
self.assertGreater(last_produced_task_arg.value, processes + 2)

p.terminate()
p.join()

def test_imap_and_imap_unordered_buffersize_when_buffer_is_full(self):
if self.TYPE != "threads":
self.skipTest("test not appropriate for {}".format(self.TYPE))

for method_name in ("imap", "imap_unordered"):
with self.subTest(method=method_name):
processes = 4
p = self.Pool(processes)
last_produced_task_arg = Value("i")

def produce_args():
for arg in itertools.count(1):
last_produced_task_arg.value = arg
yield arg

method = getattr(p, method_name)
it = method(
functools.partial(sqr, wait=0.2),
produce_args(),
buffersize=processes,
)

time.sleep(0.2)
self.assertEqual(last_produced_task_arg.value, processes)

next(it)
time.sleep(0.2)
self.assertEqual(last_produced_task_arg.value, processes + 1)

next(it)
time.sleep(0.2)
self.assertEqual(last_produced_task_arg.value, processes + 2)

p.terminate()
p.join()

def test_imap_and_imap_unordered_buffersize_on_infinite_iterable(self):
if self.TYPE != "threads":
self.skipTest("test not appropriate for {}".format(self.TYPE))

for method_name in ("imap", "imap_unordered"):
with self.subTest(method=method_name):
p = self.Pool(4)
method = getattr(p, method_name)

res = method(str, itertools.count(), buffersize=2)

self.assertEqual(next(res, None), "0")
self.assertEqual(next(res, None), "1")
self.assertEqual(next(res, None), "2")

p.terminate()
p.join()

def test_imap_and_imap_unordered_buffersize_on_empty_iterable(self):
for method_name in ("imap", "imap_unordered"):
with self.subTest(method=method_name):
method = getattr(self.pool, method_name)

res = method(str, [], buffersize=2)

self.assertIsNone(next(res, None))

def test_make_pool(self):
expected_error = (RemoteError if self.TYPE == 'manager'
else ValueError)
Expand Down