@@ -324,12 +324,6 @@ def run(self):
324324 # while waiting on new results.
325325 del result_item
326326
327- # attempt to increment idle process count
328- executor = self .executor_reference ()
329- if executor is not None :
330- executor ._idle_worker_semaphore .release ()
331- del executor
332-
333327 if self .is_shutting_down ():
334328 self .flag_executor_shutting_down ()
335329
@@ -625,7 +619,6 @@ def __init__(self, max_workers=None, mp_context=None,
625619 # Shutdown is a two-step process.
626620 self ._shutdown_thread = False
627621 self ._shutdown_lock = threading .Lock ()
628- self ._idle_worker_semaphore = threading .Semaphore (0 )
629622 self ._broken = False
630623 self ._queue_count = 0
631624 self ._pending_work_items = {}
@@ -661,18 +654,21 @@ def __init__(self, max_workers=None, mp_context=None,
661654 def _start_executor_manager_thread (self ):
662655 if self ._executor_manager_thread is None :
663656 # Start the processes so that their sentinels are known.
657+ self ._adjust_process_count ()
664658 self ._executor_manager_thread = _ExecutorManagerThread (self )
665659 self ._executor_manager_thread .start ()
666660 _threads_wakeups [self ._executor_manager_thread ] = \
667661 self ._executor_manager_thread_wakeup
668662
669663 def _adjust_process_count (self ):
670- # if there's an idle process, we don't need to spawn a new one.
671- if self ._idle_worker_semaphore .acquire (blocking = False ):
672- return
673-
674- process_count = len (self ._processes )
675- if process_count < self ._max_workers :
664+ # To get rid of this condition don't fork() from this process.
665+ # This applies to _any_ thread existing in the process at all, but
666+ # that is a long standing issue. We at least make sure this library
667+ # is not the cause of its own deadlocks.
668+ assert not self ._executor_manager_thread , (
669+ 'Processes cannot be fork()ed after the thread has started, '
670+ 'deadlock in the child processes could result; bpo-46464.' )
671+ for _ in range (len (self ._processes ), self ._max_workers ):
676672 p = self ._mp_context .Process (
677673 target = _process_worker ,
678674 args = (self ._call_queue ,
@@ -701,7 +697,6 @@ def submit(self, fn, /, *args, **kwargs):
701697 # Wake up queue management thread
702698 self ._executor_manager_thread_wakeup .wakeup ()
703699
704- self ._adjust_process_count ()
705700 self ._start_executor_manager_thread ()
706701 return f
707702 submit .__doc__ = _base .Executor .submit .__doc__
0 commit comments