Skip to content

Commit 674574a

Browse files
committed
Spawn workers on demand in ProcessPoolExecutor
1 parent 5cd2803 commit 674574a

3 files changed

Lines changed: 62 additions & 8 deletions

File tree

Doc/whatsnew/3.9.rst

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,11 @@ and :class:`~concurrent.futures.ProcessPoolExecutor`. This improves
200200
compatibility with subinterpreters and predictability in their shutdown
201201
processes. (Contributed by Kyle Stanley in :issue:`39812`.)
202202

203+
Workers in :class:`~concurrent.futures.ProcessPoolExecutor` are now spawned on
204+
demand, only when there are no available idle workers to reuse. This optimizes
205+
startup overhead and reduces the amount of lost CPU time to idle workers.
206+
(Contributed by Kyle Stanley in :issue:`39207`.)
207+
203208
curses
204209
------
205210

Lib/concurrent/futures/process.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,8 @@ def _sendback_result(result_queue, work_id, result=None, exception=None):
209209
result_queue.put(_ResultItem(work_id, exception=exc))
210210

211211

212-
def _process_worker(call_queue, result_queue, initializer, initargs):
212+
def _process_worker(call_queue, result_queue, initializer, initargs,
213+
idle_worker_semaphore):
213214
"""Evaluates calls from call_queue and places the results in result_queue.
214215
215216
This worker is run in a separate process.
@@ -221,6 +222,8 @@ def _process_worker(call_queue, result_queue, initializer, initargs):
221222
to by the worker.
222223
initializer: A callable initializer, or None
223224
initargs: A tuple of args for the initializer
225+
idle_worker_semaphore: A multiprocessing.Semaphore that is used to
226+
prevent new workers from being spawned when there are idle workers.
224227
"""
225228
if initializer is not None:
226229
try:
@@ -249,6 +252,8 @@ def _process_worker(call_queue, result_queue, initializer, initargs):
249252
# open files or shared memory that is not needed anymore
250253
del call_item
251254

255+
# increment idle process count after worker finishes job
256+
idle_worker_semaphore.release()
252257

253258
class _ExecutorManagerThread(threading.Thread):
254259
"""Manages the communication between this process and the worker processes.
@@ -601,6 +606,7 @@ def __init__(self, max_workers=None, mp_context=None,
601606
# Shutdown is a two-step process.
602607
self._shutdown_thread = False
603608
self._shutdown_lock = threading.Lock()
609+
self._idle_worker_semaphore = mp_context.Semaphore(0)
604610
self._broken = False
605611
self._queue_count = 0
606612
self._pending_work_items = {}
@@ -633,20 +639,25 @@ def __init__(self, max_workers=None, mp_context=None,
633639
def _start_executor_manager_thread(self):
634640
if self._executor_manager_thread is None:
635641
# Start the processes so that their sentinels are known.
636-
self._adjust_process_count()
637642
self._executor_manager_thread = _ExecutorManagerThread(self)
638643
self._executor_manager_thread.start()
639644
_threads_wakeups[self._executor_manager_thread] = \
640645
self._executor_manager_thread_wakeup
641646

642647
def _adjust_process_count(self):
643-
for _ in range(len(self._processes), self._max_workers):
648+
# if there's an idle process, we don't need to spawn a new one.
649+
if self._idle_worker_semaphore.acquire(block=False):
650+
return
651+
652+
process_count = len(self._processes)
653+
if process_count < self._max_workers:
644654
p = self._mp_context.Process(
645655
target=_process_worker,
646656
args=(self._call_queue,
647657
self._result_queue,
648658
self._initializer,
649-
self._initargs))
659+
self._initargs,
660+
self._idle_worker_semaphore))
650661
p.start()
651662
self._processes[p.pid] = p
652663

@@ -669,6 +680,7 @@ def submit(self, fn, /, *args, **kwargs):
669680
# Wake up queue management thread
670681
self._executor_manager_thread_wakeup.wakeup()
671682

683+
self._adjust_process_count()
672684
self._start_executor_manager_thread()
673685
return f
674686
submit.__doc__ = _base.Executor.submit.__doc__

Lib/test/test_concurrent_futures.py

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -486,10 +486,16 @@ def _prime_executor(self):
486486
pass
487487

488488
def test_processes_terminate(self):
489-
self.executor.submit(mul, 21, 2)
490-
self.executor.submit(mul, 6, 7)
491-
self.executor.submit(mul, 3, 14)
492-
self.assertEqual(len(self.executor._processes), 5)
489+
def acquire_lock(lock):
490+
lock.acquire()
491+
492+
mp_context = get_context()
493+
sem = mp_context.Semaphore(0)
494+
for _ in range(3):
495+
self.executor.submit(acquire_lock, sem)
496+
self.assertEqual(len(self.executor._processes), 3)
497+
for _ in range(3):
498+
sem.release()
493499
processes = self.executor._processes
494500
self.executor.shutdown()
495501

@@ -964,6 +970,37 @@ def test_ressources_gced_in_workers(self):
964970
mgr.shutdown()
965971
mgr.join()
966972

973+
def test_saturation(self):
974+
executor = self.executor_type(4)
975+
def acquire_lock(lock):
976+
lock.aquire()
977+
978+
mp_context = get_context()
979+
sem = mp_context.Semaphore(0)
980+
job_count = 15 * executor._max_workers
981+
for _ in range(job_count):
982+
executor.submit(acquire_lock, sem)
983+
self.assertEqual(len(executor._processes), executor._max_workers)
984+
for _ in range(job_count):
985+
sem.release()
986+
executor.shutdown()
987+
988+
def test_idle_process_reuse_one(self):
989+
executor = self.executor_type()
990+
executor.submit(mul, 21, 2).result()
991+
executor.submit(mul, 6, 7).result()
992+
executor.submit(mul, 3, 14).result()
993+
self.assertEqual(len(executor._processes), 1)
994+
executor.shutdown()
995+
996+
def test_idle_process_reuse_multiple(self):
997+
executor = self.executor_type()
998+
executor.submit(mul, 12, 7).result()
999+
executor.submit(mul, 33, 25)
1000+
executor.submit(mul, 25, 26).result()
1001+
executor.submit(mul, 18, 29)
1002+
self.assertEqual(len(executor._processes), 2)
1003+
executor.shutdown()
9671004

9681005
create_executor_tests(ProcessPoolExecutorTest,
9691006
executor_mixins=(ProcessPoolForkMixin,

0 commit comments

Comments
 (0)