From de123ea936aa5d1b3f9adcd7124f52f31611e53d Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Thu, 9 Nov 2017 13:28:59 +0100 Subject: [PATCH 1/3] Silence error output in test_concurrent_futures (bpo-21423) --- Lib/test/test_concurrent_futures.py | 37 ++++++++++++++++++++++++----- 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 296398f0d948e4..6b08a136e67632 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -9,7 +9,10 @@ import contextlib import itertools +import logging +from logging.handlers import QueueHandler import os +import queue import sys import threading import time @@ -61,7 +64,12 @@ def init(x): def get_init_status(): return INITIALIZER_STATUS -def init_fail(): +def init_fail(log_queue=None): + if log_queue is not None: + logger = logging.getLogger('concurrent.futures') + logger.addHandler(QueueHandler(log_queue)) + logger.setLevel('CRITICAL') + logger.propagate = False time.sleep(0.1) # let some futures be scheduled raise ValueError('error in initializer') @@ -206,7 +214,18 @@ class FailingInitializerMixin(ExecutorMixin): worker_count = 2 def setUp(self): - self.executor_kwargs = dict(initializer=init_fail) + if hasattr(self, "ctx"): + # Pass a queue to redirect the child's logging output + self.mp_context = get_context(self.ctx) + self.log_queue = self.mp_context.Queue() + self.executor_kwargs = dict(initializer=init_fail, + initargs=(self.log_queue,)) + else: + # In a thread pool, the child shares our logging setup + # (see _assert_logged()) + self.mp_context = None + self.log_queue = None + self.executor_kwargs = dict(initializer=init_fail) super().setUp() def test_initializer(self): @@ -234,14 +253,20 @@ def _prime_executor(self): @contextlib.contextmanager def _assert_logged(self, msg): - if self.executor_type is futures.ProcessPoolExecutor: - # No easy way to catch the child processes' stderr + if self.log_queue is not None: yield + output = [] + while True: + try: + output.append(self.log_queue.get_nowait().getMessage()) + except queue.Empty: + break else: with self.assertLogs('concurrent.futures', 'CRITICAL') as cm: yield - self.assertTrue(any(msg in line for line in cm.output), - cm.output) + output = cm.output + self.assertTrue(any(msg in line for line in output), + output) create_executor_tests(InitializerMixin) From 21a0f1c3726bc2c3ca5a9ece20ff4c22567d28f8 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Thu, 9 Nov 2017 14:56:25 +0100 Subject: [PATCH 2/3] Fix tests under Windows, hopefully --- Lib/test/test_concurrent_futures.py | 34 ++++++++++++++--------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 6b08a136e67632..15a03bdaf6a704 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -109,18 +109,15 @@ def setUp(self): super().setUp() self.t1 = time.time() - try: - if hasattr(self, "ctx"): - self.executor = self.executor_type( - max_workers=self.worker_count, - mp_context=get_context(self.ctx), - **self.executor_kwargs) - else: - self.executor = self.executor_type( - max_workers=self.worker_count, - **self.executor_kwargs) - except NotImplementedError as e: - self.skipTest(str(e)) + if hasattr(self, "ctx"): + self.executor = self.executor_type( + max_workers=self.worker_count, + mp_context=self.get_context(), + **self.executor_kwargs) + else: + self.executor = self.executor_type( + max_workers=self.worker_count, + **self.executor_kwargs) self._prime_executor() def tearDown(self): @@ -134,6 +131,9 @@ def tearDown(self): super().tearDown() + def get_context(self): + return get_context(self.ctx) + def _prime_executor(self): # Make sure that the executor is ready to do work before running the # tests. This should reduce the probability of timeouts in the tests. @@ -151,10 +151,10 @@ class ProcessPoolForkMixin(ExecutorMixin): executor_type = futures.ProcessPoolExecutor ctx = "fork" - def setUp(self): + def get_context(self): if sys.platform == "win32": self.skipTest("require unix system") - super().setUp() + return super().get_context() class ProcessPoolSpawnMixin(ExecutorMixin): @@ -166,10 +166,10 @@ class ProcessPoolForkserverMixin(ExecutorMixin): executor_type = futures.ProcessPoolExecutor ctx = "forkserver" - def setUp(self): + def get_context(self): if sys.platform == "win32": self.skipTest("require unix system") - super().setUp() + return super().get_context() def create_executor_tests(mixin, bases=(BaseTestCase,), @@ -216,7 +216,7 @@ class FailingInitializerMixin(ExecutorMixin): def setUp(self): if hasattr(self, "ctx"): # Pass a queue to redirect the child's logging output - self.mp_context = get_context(self.ctx) + self.mp_context = self.get_context() self.log_queue = self.mp_context.Queue() self.executor_kwargs = dict(initializer=init_fail, initargs=(self.log_queue,)) From bd15bb4f2ffbcfb871272c191cbcaf632cfc14c3 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Thu, 9 Nov 2017 14:57:57 +0100 Subject: [PATCH 3/3] Tweak loop --- Lib/test/test_concurrent_futures.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 15a03bdaf6a704..76878992f9afe6 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -256,11 +256,11 @@ def _assert_logged(self, msg): if self.log_queue is not None: yield output = [] - while True: - try: + try: + while True: output.append(self.log_queue.get_nowait().getMessage()) - except queue.Empty: - break + except queue.Empty: + pass else: with self.assertLogs('concurrent.futures', 'CRITICAL') as cm: yield