diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index 909359b648709f..60f5b0780c477e 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -192,7 +192,7 @@ def __init__(self, max_workers=None, thread_name_prefix='', self._threads = set() self._broken = False self._shutdown = False - self._shutdown_lock = threading.Lock() + self._shutdown_lock = threading.RLock() self._thread_name_prefix = (thread_name_prefix or ("ThreadPoolExecutor-%d" % self._counter())) @@ -213,6 +213,10 @@ def submit(self, fn, /, *args, **kwargs): self._work_queue.put(w) self._adjust_thread_count() + if self._shutdown or _shutdown: + f.cancel() + w.task = None # Clear reference to task arguments to avoid memory leak + raise RuntimeError('cannot schedule new futures after shutdown') return f submit.__doc__ = _base.Executor.submit.__doc__ @@ -252,6 +256,15 @@ def _initializer_failed(self): work_item.future.set_exception(self.BROKEN(self._broken)) def shutdown(self, wait=True, *, cancel_futures=False): + # Detect if we are called reentrantly (e.g. from a signal handler on a thread + # already holding self._shutdown_lock). Fallback if RLock does not support _is_owned. + reentrant = False + if hasattr(self._shutdown_lock, '_is_owned'): + try: + reentrant = self._shutdown_lock._is_owned() + except Exception: + pass + with self._shutdown_lock: self._shutdown = True if cancel_futures: @@ -268,7 +281,10 @@ def shutdown(self, wait=True, *, cancel_futures=False): # Send a wake-up to prevent threads calling # _work_queue.get(block=True) from permanently blocking. self._work_queue.put(None) - if wait: + + # If we are reentrant, we cannot join threads synchronously because the current + # thread is interrupted and blocking it would cause a deadlock. + if wait and not reentrant: for t in self._threads: t.join() shutdown.__doc__ = _base.Executor.shutdown.__doc__