@@ -209,7 +209,8 @@ def _sendback_result(result_queue, work_id, result=None, exception=None):
209209 result_queue .put (_ResultItem (work_id , exception = exc ))
210210
211211
212- def _process_worker (call_queue , result_queue , initializer , initargs ):
212+ def _process_worker (call_queue , result_queue , initializer , initargs ,
213+ idle_worker_semaphore ):
213214 """Evaluates calls from call_queue and places the results in result_queue.
214215
215216 This worker is run in a separate process.
@@ -221,6 +222,8 @@ def _process_worker(call_queue, result_queue, initializer, initargs):
221222 to by the worker.
222223 initializer: A callable initializer, or None
223224 initargs: A tuple of args for the initializer
225+ idle_worker_semaphore: A multiprocessing.Semaphore that is used to
226+ prevent new workers from being spawned when there are idle workers.
224227 """
225228 if initializer is not None :
226229 try :
@@ -249,6 +252,8 @@ def _process_worker(call_queue, result_queue, initializer, initargs):
249252 # open files or shared memory that is not needed anymore
250253 del call_item
251254
255+ # increment idle process count after worker finishes job
256+ idle_worker_semaphore .release ()
252257
253258class _ExecutorManagerThread (threading .Thread ):
254259 """Manages the communication between this process and the worker processes.
@@ -601,6 +606,7 @@ def __init__(self, max_workers=None, mp_context=None,
601606 # Shutdown is a two-step process.
602607 self ._shutdown_thread = False
603608 self ._shutdown_lock = threading .Lock ()
609+ self ._idle_worker_semaphore = mp_context .Semaphore (0 )
604610 self ._broken = False
605611 self ._queue_count = 0
606612 self ._pending_work_items = {}
@@ -633,20 +639,25 @@ def __init__(self, max_workers=None, mp_context=None,
633639 def _start_executor_manager_thread (self ):
634640 if self ._executor_manager_thread is None :
635641 # Start the processes so that their sentinels are known.
636- self ._adjust_process_count ()
637642 self ._executor_manager_thread = _ExecutorManagerThread (self )
638643 self ._executor_manager_thread .start ()
639644 _threads_wakeups [self ._executor_manager_thread ] = \
640645 self ._executor_manager_thread_wakeup
641646
642647 def _adjust_process_count (self ):
643- for _ in range (len (self ._processes ), self ._max_workers ):
648+ # if there's an idle process, we don't need to spawn a new one.
649+ if self ._idle_worker_semaphore .acquire (block = False ):
650+ return
651+
652+ process_count = len (self ._processes )
653+ if process_count < self ._max_workers :
644654 p = self ._mp_context .Process (
645655 target = _process_worker ,
646656 args = (self ._call_queue ,
647657 self ._result_queue ,
648658 self ._initializer ,
649- self ._initargs ))
659+ self ._initargs ,
660+ self ._idle_worker_semaphore ))
650661 p .start ()
651662 self ._processes [p .pid ] = p
652663
@@ -669,6 +680,7 @@ def submit(self, fn, /, *args, **kwargs):
669680 # Wake up queue management thread
670681 self ._executor_manager_thread_wakeup .wakeup ()
671682
683+ self ._adjust_process_count ()
672684 self ._start_executor_manager_thread ()
673685 return f
674686 submit .__doc__ = _base .Executor .submit .__doc__
0 commit comments