From b89c67b819048bbca6eb3479ec6d14d045b216c9 Mon Sep 17 00:00:00 2001 From: Pablo Galindo Date: Sun, 2 Dec 2018 20:31:57 +0000 Subject: [PATCH 1/7] bpo-35378: Link the lifetime of the pool to the pool's iterators and results --- Lib/multiprocessing/pool.py | 40 +++++++++++++++++++------------ Lib/test/_test_multiprocessing.py | 34 ++++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 15 deletions(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index bfb2769ba6ec1e..e8996d9254605a 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -331,7 +331,7 @@ def imap(self, func, iterable, chunksize=1): ''' self._check_running() if chunksize == 1: - result = IMapIterator(self._cache) + result = IMapIterator(self) self._taskqueue.put( ( self._guarded_task_generation(result._job, func, iterable), @@ -344,7 +344,7 @@ def imap(self, func, iterable, chunksize=1): "Chunksize must be 1+, not {0:n}".format( chunksize)) task_batches = Pool._get_tasks(func, iterable, chunksize) - result = IMapIterator(self._cache) + result = IMapIterator(self) self._taskqueue.put( ( self._guarded_task_generation(result._job, @@ -360,7 +360,7 @@ def imap_unordered(self, func, iterable, chunksize=1): ''' self._check_running() if chunksize == 1: - result = IMapUnorderedIterator(self._cache) + result = IMapUnorderedIterator(self) self._taskqueue.put( ( self._guarded_task_generation(result._job, func, iterable), @@ -372,7 +372,7 @@ def imap_unordered(self, func, iterable, chunksize=1): raise ValueError( "Chunksize must be 1+, not {0!r}".format(chunksize)) task_batches = Pool._get_tasks(func, iterable, chunksize) - result = IMapUnorderedIterator(self._cache) + result = IMapUnorderedIterator(self) self._taskqueue.put( ( self._guarded_task_generation(result._job, @@ -388,7 +388,7 @@ def apply_async(self, func, args=(), kwds={}, callback=None, Asynchronous version of `apply()` method. ''' self._check_running() - result = ApplyResult(self._cache, callback, error_callback) + result = ApplyResult(self, callback, error_callback) self._taskqueue.put(([(result._job, 0, func, args, kwds)], None)) return result @@ -417,7 +417,7 @@ def _map_async(self, func, iterable, mapper, chunksize=None, callback=None, chunksize = 0 task_batches = Pool._get_tasks(func, iterable, chunksize) - result = MapResult(self._cache, chunksize, len(iterable), callback, + result = MapResult(self, chunksize, len(iterable), callback, error_callback=error_callback) self._taskqueue.put( ( @@ -656,13 +656,14 @@ def __exit__(self, exc_type, exc_val, exc_tb): class ApplyResult(object): - def __init__(self, cache, callback, error_callback): + def __init__(self, pool, callback, error_callback, cache=None): + self._pool = pool self._event = threading.Event() self._job = next(job_counter) - self._cache = cache + self._cache = cache if cache is not None else pool._cache self._callback = callback self._error_callback = error_callback - cache[self._job] = self + self._cache[self._job] = self def ready(self): return self._event.is_set() @@ -692,6 +693,7 @@ def _set(self, i, obj): self._error_callback(self._value) self._event.set() del self._cache[self._job] + self._pool = None AsyncResult = ApplyResult # create alias -- see #17805 @@ -701,8 +703,8 @@ def _set(self, i, obj): class MapResult(ApplyResult): - def __init__(self, cache, chunksize, length, callback, error_callback): - ApplyResult.__init__(self, cache, callback, + def __init__(self, pool, chunksize, length, callback, error_callback, cache=None): + ApplyResult.__init__(self, pool, callback, error_callback=error_callback) self._success = True self._value = [None] * length @@ -710,7 +712,7 @@ def __init__(self, cache, chunksize, length, callback, error_callback): if chunksize <= 0: self._number_left = 0 self._event.set() - del cache[self._job] + del self._cache[self._job] else: self._number_left = length//chunksize + bool(length % chunksize) @@ -724,6 +726,7 @@ def _set(self, i, success_result): self._callback(self._value) del self._cache[self._job] self._event.set() + self._pool = None else: if not success and self._success: # only store first exception @@ -735,6 +738,7 @@ def _set(self, i, success_result): self._error_callback(self._value) del self._cache[self._job] self._event.set() + self._pool = None # # Class whose instances are returned by `Pool.imap()` @@ -742,15 +746,16 @@ def _set(self, i, success_result): class IMapIterator(object): - def __init__(self, cache): + def __init__(self, pool, cache=None): + self._pool = pool self._cond = threading.Condition(threading.Lock()) self._job = next(job_counter) - self._cache = cache + self._cache = cache if cache is not None else pool._cache self._items = collections.deque() self._index = 0 self._length = None self._unsorted = {} - cache[self._job] = self + self._cache[self._job] = self def __iter__(self): return self @@ -761,12 +766,14 @@ def next(self, timeout=None): item = self._items.popleft() except IndexError: if self._index == self._length: + self._pool = None raise StopIteration from None self._cond.wait(timeout) try: item = self._items.popleft() except IndexError: if self._index == self._length: + self._pool = None raise StopIteration from None raise TimeoutError from None @@ -792,6 +799,7 @@ def _set(self, i, obj): if self._index == self._length: del self._cache[self._job] + self._pool = None def _set_length(self, length): with self._cond: @@ -799,6 +807,7 @@ def _set_length(self, length): if self._index == self._length: self._cond.notify() del self._cache[self._job] + self._pool = None # # Class whose instances are returned by `Pool.imap_unordered()` @@ -813,6 +822,7 @@ def _set(self, i, obj): self._cond.notify() if self._index == self._length: del self._cache[self._job] + self._pool = None # # diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 7341131231a4f0..5c52b8338072d2 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -2594,6 +2594,40 @@ def test_resource_warning(self): support.gc_collect() + @support.reap_threads + def test_pool_iterators_and_results_keeps_alive_the_pool_and_does_not_hang(self): + # Check bpo35378 + def _test_func(func, args, test_body): + result = getattr(self.Pool(), func)(*args) + # If the pool is used as a context manager, *result* is a proxy object, + # but then we won't need to close and join as the context manager will do + # it for us. + if hasattr(result, "_pool"): + close_func = result._pool.close + join_func = result._pool.join + else: + close_func = lambda: None + join_func = lambda: None + + try: + test_body(result) + finally: + close_func() + join_func() + del result + + test_functions = [ + ("imap", (str, range(100)),lambda result: list(result)), + ("imap_unordered", (str, range(100)),lambda result: list(result)), + ("apply_async", (str, [10]),lambda result: result.get()), + ("map_async", (str, range(100)),lambda result: result.get()), + ] + for args in test_functions: + p = threading.Thread(target=_test_func, args=args) + p.start() + p.join(timeout=1) + self.assertFalse(p.is_alive()) + def raising(): raise KeyError("key") From 514cd681f700790ffbf68012a4d357e429d37c7e Mon Sep 17 00:00:00 2001 From: Pablo Galindo Date: Sun, 2 Dec 2018 20:42:57 +0000 Subject: [PATCH 2/7] Use a longer (parametrised) timeout for the slower buildbots --- Lib/test/_test_multiprocessing.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 5c52b8338072d2..85698965b24b04 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -2622,10 +2622,11 @@ def _test_func(func, args, test_body): ("apply_async", (str, [10]),lambda result: result.get()), ("map_async", (str, range(100)),lambda result: result.get()), ] + for args in test_functions: p = threading.Thread(target=_test_func, args=args) p.start() - p.join(timeout=1) + p.join(timeout=TIMEOUT) self.assertFalse(p.is_alive()) def raising(): From c7f0260b77ea46cb69aa06ce34c53c118b65b149 Mon Sep 17 00:00:00 2001 From: Pablo Galindo Date: Mon, 21 Jan 2019 02:09:42 +0000 Subject: [PATCH 3/7] Remove test as it can leak resources --- Lib/test/_test_multiprocessing.py | 36 ------------------------------- 1 file changed, 36 deletions(-) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 85698965b24b04..48bf90bbd8d4a3 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -2593,42 +2593,6 @@ def test_resource_warning(self): pool = None support.gc_collect() - - @support.reap_threads - def test_pool_iterators_and_results_keeps_alive_the_pool_and_does_not_hang(self): - # Check bpo35378 - def _test_func(func, args, test_body): - result = getattr(self.Pool(), func)(*args) - # If the pool is used as a context manager, *result* is a proxy object, - # but then we won't need to close and join as the context manager will do - # it for us. - if hasattr(result, "_pool"): - close_func = result._pool.close - join_func = result._pool.join - else: - close_func = lambda: None - join_func = lambda: None - - try: - test_body(result) - finally: - close_func() - join_func() - del result - - test_functions = [ - ("imap", (str, range(100)),lambda result: list(result)), - ("imap_unordered", (str, range(100)),lambda result: list(result)), - ("apply_async", (str, [10]),lambda result: result.get()), - ("map_async", (str, range(100)),lambda result: result.get()), - ] - - for args in test_functions: - p = threading.Thread(target=_test_func, args=args) - p.start() - p.join(timeout=TIMEOUT) - self.assertFalse(p.is_alive()) - def raising(): raise KeyError("key") From 2b29f41f808c3c3ba5ee40d6b88f9522b97fabc5 Mon Sep 17 00:00:00 2001 From: Pablo Galindo Date: Mon, 21 Jan 2019 02:10:07 +0000 Subject: [PATCH 4/7] multiprocessing.Pool leaks resources after being deleted Fix a reference issue inside multiprocessing.Pool that caused the pool to remain alive if it was deleted without being closed or terminated explicitly. --- Lib/multiprocessing/pool.py | 72 +++++++++++++++++++++++++------------ 1 file changed, 49 insertions(+), 23 deletions(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index e8996d9254605a..4237bfbb1fb3d2 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -151,8 +151,9 @@ class Pool(object): ''' _wrap_exception = True - def Process(self, *args, **kwds): - return self._ctx.Process(*args, **kwds) + @staticmethod + def Process(ctx, *args, **kwds): + return ctx.Process(*args, **kwds) def __init__(self, processes=None, initializer=None, initargs=(), maxtasksperchild=None, context=None): @@ -190,7 +191,10 @@ def __init__(self, processes=None, initializer=None, initargs=(), self._worker_handler = threading.Thread( target=Pool._handle_workers, - args=(self, ) + args=(self._cache, self._taskqueue, self._ctx, self.Process, + self._processes, self._pool, self._inqueue, self._outqueue, + self._initializer, self._initargs, self._maxtasksperchild, + self._wrap_exception) ) self._worker_handler.daemon = True self._worker_handler._state = RUN @@ -236,43 +240,61 @@ def __repr__(self): f'state={self._state} ' f'pool_size={len(self._pool)}>') - def _join_exited_workers(self): + @staticmethod + def _join_exited_workers(pool): """Cleanup after any worker processes which have exited due to reaching their specified lifetime. Returns True if any workers were cleaned up. """ cleaned = False - for i in reversed(range(len(self._pool))): - worker = self._pool[i] + for i in reversed(range(len(pool))): + worker = pool[i] if worker.exitcode is not None: # worker exited util.debug('cleaning up worker %d' % i) worker.join() cleaned = True - del self._pool[i] + del pool[i] return cleaned def _repopulate_pool(self): + return self._repopulate_pool_static(self._ctx, self.Process, + self._processes, + self._pool, self._inqueue, + self._outqueue, self._initializer, + self._initargs, + self._maxtasksperchild, + self._wrap_exception) + + @staticmethod + def _repopulate_pool_static(ctx, Process, processes, pool, inqueue, + outqueue, initializer, initargs, + maxtasksperchild, wrap_exception): """Bring the number of pool processes up to the specified number, for use after reaping workers which have exited. """ - for i in range(self._processes - len(self._pool)): - w = self.Process(target=worker, - args=(self._inqueue, self._outqueue, - self._initializer, - self._initargs, self._maxtasksperchild, - self._wrap_exception) - ) + for i in range(processes - len(pool)): + w = Process(ctx, target=worker, + args=(inqueue, outqueue, + initializer, + initargs, maxtasksperchild, + wrap_exception)) w.name = w.name.replace('Process', 'PoolWorker') w.daemon = True w.start() - self._pool.append(w) + pool.append(w) util.debug('added worker') - def _maintain_pool(self): + @staticmethod + def _maintain_pool(ctx, Process, processes, pool, inqueue, outqueue, + initializer, initargs, maxtasksperchild, + wrap_exception): """Clean up any exited workers and start replacements for them. """ - if self._join_exited_workers(): - self._repopulate_pool() + if Pool._join_exited_workers(pool): + Pool._repopulate_pool_static(ctx, Process, processes, pool, + inqueue, outqueue, initializer, + initargs, maxtasksperchild, + wrap_exception) def _setup_queues(self): self._inqueue = self._ctx.SimpleQueue() @@ -430,16 +452,20 @@ def _map_async(self, func, iterable, mapper, chunksize=None, callback=None, return result @staticmethod - def _handle_workers(pool): + def _handle_workers(cache, taskqueue, ctx, Process, processes, pool, + inqueue, outqueue, initializer, initargs, + maxtasksperchild, wrap_exception): thread = threading.current_thread() # Keep maintaining workers until the cache gets drained, unless the pool # is terminated. - while thread._state == RUN or (pool._cache and thread._state != TERMINATE): - pool._maintain_pool() + while thread._state == RUN or (cache and thread._state != TERMINATE): + Pool._maintain_pool(ctx, Process, processes, pool, inqueue, + outqueue, initializer, initargs, + maxtasksperchild, wrap_exception) time.sleep(0.1) # send sentinel to stop workers - pool._taskqueue.put(None) + taskqueue.put(None) util.debug('worker handler exiting') @staticmethod @@ -832,7 +858,7 @@ class ThreadPool(Pool): _wrap_exception = False @staticmethod - def Process(*args, **kwds): + def Process(ctx, *args, **kwds): from .dummy import Process return Process(*args, **kwds) From ae81354b342f075676c1c52ce3c9b84b782a0ac0 Mon Sep 17 00:00:00 2001 From: Pablo Galindo Date: Mon, 21 Jan 2019 02:15:28 +0000 Subject: [PATCH 5/7] Add News entry --- .../next/Library/2019-01-21-02-15-20.bpo-35378.4oF03i.rst | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 Misc/NEWS.d/next/Library/2019-01-21-02-15-20.bpo-35378.4oF03i.rst diff --git a/Misc/NEWS.d/next/Library/2019-01-21-02-15-20.bpo-35378.4oF03i.rst b/Misc/NEWS.d/next/Library/2019-01-21-02-15-20.bpo-35378.4oF03i.rst new file mode 100644 index 00000000000000..70b7182974e6ec --- /dev/null +++ b/Misc/NEWS.d/next/Library/2019-01-21-02-15-20.bpo-35378.4oF03i.rst @@ -0,0 +1,3 @@ +Fix a reference issue inside :class:`multiprocessing.Pool` that caused +the pool to remain alive if it was deleted without being closed or +terminated explicitly. From edee524290929b2b23ae3ab611534998a27bfa5b Mon Sep 17 00:00:00 2001 From: Pablo Galindo Date: Tue, 22 Jan 2019 22:09:17 +0000 Subject: [PATCH 6/7] Always get the cache from the pool --- Lib/multiprocessing/pool.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 4237bfbb1fb3d2..18a56f8524b4b6 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -682,11 +682,11 @@ def __exit__(self, exc_type, exc_val, exc_tb): class ApplyResult(object): - def __init__(self, pool, callback, error_callback, cache=None): + def __init__(self, pool, callback, error_callback): self._pool = pool self._event = threading.Event() self._job = next(job_counter) - self._cache = cache if cache is not None else pool._cache + self._cache = pool._cache self._callback = callback self._error_callback = error_callback self._cache[self._job] = self @@ -729,7 +729,7 @@ def _set(self, i, obj): class MapResult(ApplyResult): - def __init__(self, pool, chunksize, length, callback, error_callback, cache=None): + def __init__(self, pool, chunksize, length, callback, error_callback): ApplyResult.__init__(self, pool, callback, error_callback=error_callback) self._success = True @@ -772,11 +772,11 @@ def _set(self, i, success_result): class IMapIterator(object): - def __init__(self, pool, cache=None): + def __init__(self, pool): self._pool = pool self._cond = threading.Condition(threading.Lock()) self._job = next(job_counter) - self._cache = cache if cache is not None else pool._cache + self._cache = pool._cache self._items = collections.deque() self._index = 0 self._length = None From b0a5c89678dd29dbc456158e7d94022d6bf54307 Mon Sep 17 00:00:00 2001 From: Pablo Galindo Date: Thu, 7 Feb 2019 00:04:31 +0000 Subject: [PATCH 7/7] Make a note about strong references in the NEWS --- .../next/Library/2019-01-21-02-15-20.bpo-35378.4oF03i.rst | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/Misc/NEWS.d/next/Library/2019-01-21-02-15-20.bpo-35378.4oF03i.rst b/Misc/NEWS.d/next/Library/2019-01-21-02-15-20.bpo-35378.4oF03i.rst index 70b7182974e6ec..bb57f7115991c7 100644 --- a/Misc/NEWS.d/next/Library/2019-01-21-02-15-20.bpo-35378.4oF03i.rst +++ b/Misc/NEWS.d/next/Library/2019-01-21-02-15-20.bpo-35378.4oF03i.rst @@ -1,3 +1,6 @@ Fix a reference issue inside :class:`multiprocessing.Pool` that caused the pool to remain alive if it was deleted without being closed or -terminated explicitly. +terminated explicitly. A new strong reference is added to the pool +iterators to link the lifetime of the pool to the lifetime of its +iterators so the pool does not get destroyed if a pool iterator is +still alive.