From 4818d5938af5e33ad3a37349a79e715cdb264bc8 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Thu, 6 Dec 2018 00:22:23 +0100 Subject: [PATCH 1/2] Revert "bpo-34172: multiprocessing.Pool leaks resources after being deleted (GH-8450) (GH-9677)" This reverts commit 07b96a95db78eff3557d1bfed1df9ebecc40815b. --- Lib/multiprocessing/pool.py | 74 ++++++++++--------------------- Lib/test/_test_multiprocessing.py | 7 --- 2 files changed, 24 insertions(+), 57 deletions(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 32254d8ea6cfbe..a545f3c1a18961 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -147,9 +147,8 @@ class Pool(object): ''' _wrap_exception = True - @staticmethod - def Process(ctx, *args, **kwds): - return ctx.Process(*args, **kwds) + def Process(self, *args, **kwds): + return self._ctx.Process(*args, **kwds) def __init__(self, processes=None, initializer=None, initargs=(), maxtasksperchild=None, context=None): @@ -176,15 +175,13 @@ def __init__(self, processes=None, initializer=None, initargs=(), self._worker_handler = threading.Thread( target=Pool._handle_workers, - args=(self._cache, self._taskqueue, self._ctx, self.Process, - self._processes, self._pool, self._inqueue, self._outqueue, - self._initializer, self._initargs, self._maxtasksperchild, - self._wrap_exception) + args=(self, ) ) self._worker_handler.daemon = True self._worker_handler._state = RUN self._worker_handler.start() + self._task_handler = threading.Thread( target=Pool._handle_tasks, args=(self._taskqueue, self._quick_put, self._outqueue, @@ -210,62 +207,43 @@ def __init__(self, processes=None, initializer=None, initargs=(), exitpriority=15 ) - @staticmethod - def _join_exited_workers(pool): + def _join_exited_workers(self): """Cleanup after any worker processes which have exited due to reaching their specified lifetime. Returns True if any workers were cleaned up. """ cleaned = False - for i in reversed(range(len(pool))): - worker = pool[i] + for i in reversed(range(len(self._pool))): + worker = self._pool[i] if worker.exitcode is not None: # worker exited util.debug('cleaning up worker %d' % i) worker.join() cleaned = True - del pool[i] + del self._pool[i] return cleaned def _repopulate_pool(self): - return self._repopulate_pool_static(self._ctx, self.Process, - self._processes, - self._pool, self._inqueue, - self._outqueue, self._initializer, - self._initargs, - self._maxtasksperchild, - self._wrap_exception) - - @staticmethod - def _repopulate_pool_static(ctx, Process, processes, pool, inqueue, - outqueue, initializer, initargs, - maxtasksperchild, wrap_exception): """Bring the number of pool processes up to the specified number, for use after reaping workers which have exited. """ - for i in range(processes - len(pool)): - w = Process(ctx, target=worker, - args=(inqueue, outqueue, - initializer, - initargs, maxtasksperchild, - wrap_exception) - ) - pool.append(w) + for i in range(self._processes - len(self._pool)): + w = self.Process(target=worker, + args=(self._inqueue, self._outqueue, + self._initializer, + self._initargs, self._maxtasksperchild, + self._wrap_exception) + ) + self._pool.append(w) w.name = w.name.replace('Process', 'PoolWorker') w.daemon = True w.start() util.debug('added worker') - @staticmethod - def _maintain_pool(ctx, Process, processes, pool, inqueue, outqueue, - initializer, initargs, maxtasksperchild, - wrap_exception): + def _maintain_pool(self): """Clean up any exited workers and start replacements for them. """ - if Pool._join_exited_workers(pool): - Pool._repopulate_pool_static(ctx, Process, processes, pool, - inqueue, outqueue, initializer, - initargs, maxtasksperchild, - wrap_exception) + if self._join_exited_workers(): + self._repopulate_pool() def _setup_queues(self): self._inqueue = self._ctx.SimpleQueue() @@ -418,20 +396,16 @@ def _map_async(self, func, iterable, mapper, chunksize=None, callback=None, return result @staticmethod - def _handle_workers(cache, taskqueue, ctx, Process, processes, pool, - inqueue, outqueue, initializer, initargs, - maxtasksperchild, wrap_exception): + def _handle_workers(pool): thread = threading.current_thread() # Keep maintaining workers until the cache gets drained, unless the pool # is terminated. - while thread._state == RUN or (cache and thread._state != TERMINATE): - Pool._maintain_pool(ctx, Process, processes, pool, inqueue, - outqueue, initializer, initargs, - maxtasksperchild, wrap_exception) + while thread._state == RUN or (pool._cache and thread._state != TERMINATE): + pool._maintain_pool() time.sleep(0.1) # send sentinel to stop workers - taskqueue.put(None) + pool._taskqueue.put(None) util.debug('worker handler exiting') @staticmethod @@ -807,7 +781,7 @@ class ThreadPool(Pool): _wrap_exception = False @staticmethod - def Process(ctx, *args, **kwds): + def Process(*args, **kwds): from .dummy import Process return Process(*args, **kwds) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 59f9a2e1e2eb62..6cafc2e9cbc0e2 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -2286,13 +2286,6 @@ def test_release_task_refs(self): # they were released too. self.assertEqual(CountedObject.n_instances, 0) - @support.reap_threads - def test_del_pool(self): - p = self.Pool(1) - wr = weakref.ref(p) - del p - gc.collect() - self.assertIsNone(wr()) def raising(): raise KeyError("key") From e2b1f7d237ad0a02acdd2039936d2033a60f41ef Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Thu, 6 Dec 2018 00:31:28 +0100 Subject: [PATCH 2/2] Add NEWS entry --- .../next/Library/2018-12-06-00-31-25.bpo-34172.l7CIYt.rst | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 Misc/NEWS.d/next/Library/2018-12-06-00-31-25.bpo-34172.l7CIYt.rst diff --git a/Misc/NEWS.d/next/Library/2018-12-06-00-31-25.bpo-34172.l7CIYt.rst b/Misc/NEWS.d/next/Library/2018-12-06-00-31-25.bpo-34172.l7CIYt.rst new file mode 100644 index 00000000000000..e467cc96782591 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2018-12-06-00-31-25.bpo-34172.l7CIYt.rst @@ -0,0 +1,3 @@ +REVERT: Fix a reference issue inside multiprocessing.Pool that caused the +pool to remain alive if it was deleted without being closed or terminated +explicitly.