From d581bbaef6eb75c0baf58ac8bcfaf80262394dd4 Mon Sep 17 00:00:00 2001 From: Grzegorz Grzywacz Date: Tue, 4 Jul 2017 16:13:07 +0200 Subject: [PATCH 1/2] bpo-27144: concurrent.futures as_complie and map iterators do not keep reference to returned object --- Lib/concurrent/futures/_base.py | 32 ++++++++++---- Lib/concurrent/futures/process.py | 14 ++++++- Lib/test/test_concurrent_futures.py | 42 +++++++++++++++++++ .../2017-08-30-11-26-14.bpo-27144.PEDJsE.rst | 2 + 4 files changed, 80 insertions(+), 10 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2017-08-30-11-26-14.bpo-27144.PEDJsE.rst diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 295489c93e56d8..cbdecae341e21a 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -170,6 +170,16 @@ def _create_and_install_waiters(fs, return_when): return waiter + +def _yield_and_decref(fs, ref_collect): + """Yields a future. Before yielding, removes the future + from each set in collection of sets (`ref_collect`).""" + while fs: + for futures_set in ref_collect: + futures_set.remove(fs[-1]) + yield fs.pop() + + def as_completed(fs, timeout=None): """An iterator over the given futures that yields each as it completes. @@ -191,6 +201,8 @@ def as_completed(fs, timeout=None): if timeout is not None: end_time = timeout + time.time() + total_futures = len(fs) + fs = set(fs) with _AcquireFutures(fs): finished = set( @@ -198,9 +210,9 @@ def as_completed(fs, timeout=None): if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) pending = fs - finished waiter = _create_and_install_waiters(fs, _AS_COMPLETED) - + finished = list(finished) try: - yield from finished + yield from _yield_and_decref(finished, ref_collect=(fs,)) while pending: if timeout is None: @@ -210,7 +222,7 @@ def as_completed(fs, timeout=None): if wait_timeout < 0: raise TimeoutError( '%d (of %d) futures unfinished' % ( - len(pending), len(fs))) + len(pending), total_futures)) waiter.event.wait(wait_timeout) @@ -219,9 +231,9 @@ def as_completed(fs, timeout=None): waiter.finished_futures = [] waiter.event.clear() - for future in finished: - yield future - pending.remove(future) + # reverse to keep finishing order + finished.reverse() + yield from _yield_and_decref(finished, ref_collect=(fs, pending)) finally: for f in fs: @@ -551,11 +563,13 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): # before the first iterator value is required. def result_iterator(): try: - for future in fs: + # reverse to keep finishing order + fs.reverse() + while fs: if timeout is None: - yield future.result() + yield fs.pop().result() else: - yield future.result(end_time - time.time()) + yield fs.pop().result(end_time - time.time()) finally: for future in fs: future.cancel() diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 8f1d714193ab79..f130ba256826a8 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -357,6 +357,18 @@ def _check_system_limits(): raise NotImplementedError(_system_limited) +def _chain_from_iterable(iterable): + """ + Different implementation of itertools.chain.from_iterable. + The difference is _chain_from_iterable do not keep reference to returned objects. + """ + + for element in iterable: + element.reverse() + while element: + yield element.pop() + + class BrokenProcessPool(RuntimeError): """ Raised when a process in a ProcessPoolExecutor terminated abruptly @@ -482,7 +494,7 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): results = super().map(partial(_process_chunk, fn), _get_chunks(*iterables, chunksize=chunksize), timeout=timeout) - return itertools.chain.from_iterable(results) + return _chain_from_iterable(results) def shutdown(self, wait=True): with self._shutdown_lock: diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index ebc30a49e5e4b6..e54a68446ce61f 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -54,6 +54,10 @@ def sleep_and_print(t, msg): sys.stdout.flush() +def _dummy_object_fn(_): + return object() + + class MyObject(object): def my_method(self): pass @@ -396,6 +400,34 @@ def test_duplicate_futures(self): completed = [f for f in futures.as_completed([future1,future1])] self.assertEqual(len(completed), 1) + def test_free_reference_yielded_future(self): + # Issue #14406: Generator should not keep reference + # for finished futures. + futures_list = [Future() for _ in range(8)] + futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED)) + futures_list.append(create_future(state=SUCCESSFUL_FUTURE)) + + with self.assertRaises(futures.TimeoutError): + for future in futures.as_completed(futures_list, timeout=0): + futures_list.remove(future) + self.assertEqual(sys.getrefcount(future), 2) + + futures_list[0].set_result("test") + for future in futures.as_completed(futures_list): + futures_list.remove(future) + self.assertEqual(sys.getrefcount(future), 2) + if futures_list: + futures_list[0].set_result("test") + + def test_correct_timeout_exception_msg(self): + futures_list = [CANCELLED_AND_NOTIFIED_FUTURE, PENDING_FUTURE, + RUNNING_FUTURE, SUCCESSFUL_FUTURE] + + with self.assertRaises(futures.TimeoutError) as cm: + list(futures.as_completed(futures_list, timeout=0)) + + self.assertEqual(str(cm.exception), '2 (of 4) futures unfinished') + class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests, BaseTestCase): pass @@ -421,6 +453,10 @@ def test_map(self): list(self.executor.map(pow, range(10), range(10))), list(map(pow, range(10), range(10)))) + self.assertEqual( + list(self.executor.map(pow, range(10), range(10), chunksize=3)), + list(map(pow, range(10), range(10)))) + def test_map_exception(self): i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5]) self.assertEqual(i.__next__(), (0, 1)) @@ -471,6 +507,12 @@ def test_max_workers_negative(self): "than 0"): self.executor_type(max_workers=number) + def test_free_reference(self): + # Issue #14406: Result iterator should not keep reference + # for finished futures. + for result_object in self.executor.map(_dummy_object_fn, range(10)): + self.assertEqual(sys.getrefcount(result_object), 2) + class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, BaseTestCase): def test_map_submits_without_iteration(self): diff --git a/Misc/NEWS.d/next/Library/2017-08-30-11-26-14.bpo-27144.PEDJsE.rst b/Misc/NEWS.d/next/Library/2017-08-30-11-26-14.bpo-27144.PEDJsE.rst new file mode 100644 index 00000000000000..e199f07a57408f --- /dev/null +++ b/Misc/NEWS.d/next/Library/2017-08-30-11-26-14.bpo-27144.PEDJsE.rst @@ -0,0 +1,2 @@ +concurrent.futures as_complie and map iterators do not keep reference to +returned object From d67b22cda735a55b55943001c0cf33877afc58cc Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Fri, 1 Sep 2017 18:29:31 +0200 Subject: [PATCH 2/2] Some nits. Improve wordings in docstrings and comments, and avoid relying on sys.getrefcount() in tests. --- Lib/concurrent/futures/_base.py | 9 ++++-- Lib/concurrent/futures/process.py | 10 +++--- Lib/test/test_concurrent_futures.py | 32 +++++++++++-------- .../2017-08-30-11-26-14.bpo-27144.PEDJsE.rst | 4 +-- 4 files changed, 33 insertions(+), 22 deletions(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index cbdecae341e21a..88521ae317e53d 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -172,11 +172,15 @@ def _create_and_install_waiters(fs, return_when): def _yield_and_decref(fs, ref_collect): - """Yields a future. Before yielding, removes the future - from each set in collection of sets (`ref_collect`).""" + """ + Iterate on the list *fs*, yielding objects one by one in reverse order. + Before yielding an object, it is removed from each set in + the collection of sets *ref_collect*. + """ while fs: for futures_set in ref_collect: futures_set.remove(fs[-1]) + # Careful not to keep a reference to the popped value yield fs.pop() @@ -566,6 +570,7 @@ def result_iterator(): # reverse to keep finishing order fs.reverse() while fs: + # Careful not to keep a reference to the popped future if timeout is None: yield fs.pop().result() else: diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index f130ba256826a8..03b28ab5d68e86 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -357,12 +357,12 @@ def _check_system_limits(): raise NotImplementedError(_system_limited) -def _chain_from_iterable(iterable): +def _chain_from_iterable_of_lists(iterable): """ - Different implementation of itertools.chain.from_iterable. - The difference is _chain_from_iterable do not keep reference to returned objects. + Specialized implementation of itertools.chain.from_iterable. + Each item in *iterable* should be a list. This function is + careful not to keep references to yielded objects. """ - for element in iterable: element.reverse() while element: @@ -494,7 +494,7 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): results = super().map(partial(_process_chunk, fn), _get_chunks(*iterables, chunksize=chunksize), timeout=timeout) - return _chain_from_iterable(results) + return _chain_from_iterable_of_lists(results) def shutdown(self, wait=True): with self._shutdown_lock: diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index e54a68446ce61f..f1226fe7090959 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -54,15 +54,15 @@ def sleep_and_print(t, msg): sys.stdout.flush() -def _dummy_object_fn(_): - return object() - - class MyObject(object): def my_method(self): pass +def make_dummy_object(_): + return MyObject() + + class BaseTestCase(unittest.TestCase): def setUp(self): self._thread_key = test.support.threading_setup() @@ -401,8 +401,8 @@ def test_duplicate_futures(self): self.assertEqual(len(completed), 1) def test_free_reference_yielded_future(self): - # Issue #14406: Generator should not keep reference - # for finished futures. + # Issue #14406: Generator should not keep references + # to finished futures. futures_list = [Future() for _ in range(8)] futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED)) futures_list.append(create_future(state=SUCCESSFUL_FUTURE)) @@ -410,18 +410,22 @@ def test_free_reference_yielded_future(self): with self.assertRaises(futures.TimeoutError): for future in futures.as_completed(futures_list, timeout=0): futures_list.remove(future) - self.assertEqual(sys.getrefcount(future), 2) + wr = weakref.ref(future) + del future + self.assertIsNone(wr()) futures_list[0].set_result("test") for future in futures.as_completed(futures_list): futures_list.remove(future) - self.assertEqual(sys.getrefcount(future), 2) + wr = weakref.ref(future) + del future + self.assertIsNone(wr()) if futures_list: futures_list[0].set_result("test") def test_correct_timeout_exception_msg(self): futures_list = [CANCELLED_AND_NOTIFIED_FUTURE, PENDING_FUTURE, - RUNNING_FUTURE, SUCCESSFUL_FUTURE] + RUNNING_FUTURE, SUCCESSFUL_FUTURE] with self.assertRaises(futures.TimeoutError) as cm: list(futures.as_completed(futures_list, timeout=0)) @@ -508,10 +512,12 @@ def test_max_workers_negative(self): self.executor_type(max_workers=number) def test_free_reference(self): - # Issue #14406: Result iterator should not keep reference - # for finished futures. - for result_object in self.executor.map(_dummy_object_fn, range(10)): - self.assertEqual(sys.getrefcount(result_object), 2) + # Issue #14406: Result iterator should not keep an internal + # reference to result objects. + for obj in self.executor.map(make_dummy_object, range(10)): + wr = weakref.ref(obj) + del obj + self.assertIsNone(wr()) class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, BaseTestCase): diff --git a/Misc/NEWS.d/next/Library/2017-08-30-11-26-14.bpo-27144.PEDJsE.rst b/Misc/NEWS.d/next/Library/2017-08-30-11-26-14.bpo-27144.PEDJsE.rst index e199f07a57408f..857fad0c852870 100644 --- a/Misc/NEWS.d/next/Library/2017-08-30-11-26-14.bpo-27144.PEDJsE.rst +++ b/Misc/NEWS.d/next/Library/2017-08-30-11-26-14.bpo-27144.PEDJsE.rst @@ -1,2 +1,2 @@ -concurrent.futures as_complie and map iterators do not keep reference to -returned object +The ``map()`` and ``as_completed()`` iterators in ``concurrent.futures`` +now avoid keeping a reference to yielded objects. \ No newline at end of file