Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 47 additions & 22 deletions Lib/test/test_concurrent_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@

import contextlib
import itertools
import logging
from logging.handlers import QueueHandler
import os
import queue
import sys
import threading
import time
Expand Down Expand Up @@ -61,7 +64,12 @@ def init(x):
def get_init_status():
return INITIALIZER_STATUS

def init_fail():
def init_fail(log_queue=None):
if log_queue is not None:
logger = logging.getLogger('concurrent.futures')
logger.addHandler(QueueHandler(log_queue))
logger.setLevel('CRITICAL')
logger.propagate = False
time.sleep(0.1) # let some futures be scheduled
raise ValueError('error in initializer')

Expand Down Expand Up @@ -101,18 +109,15 @@ def setUp(self):
super().setUp()

self.t1 = time.time()
try:
if hasattr(self, "ctx"):
self.executor = self.executor_type(
max_workers=self.worker_count,
mp_context=get_context(self.ctx),
**self.executor_kwargs)
else:
self.executor = self.executor_type(
max_workers=self.worker_count,
**self.executor_kwargs)
except NotImplementedError as e:
self.skipTest(str(e))
if hasattr(self, "ctx"):
self.executor = self.executor_type(
max_workers=self.worker_count,
mp_context=self.get_context(),
**self.executor_kwargs)
else:
self.executor = self.executor_type(
max_workers=self.worker_count,
**self.executor_kwargs)
self._prime_executor()

def tearDown(self):
Expand All @@ -126,6 +131,9 @@ def tearDown(self):

super().tearDown()

def get_context(self):
return get_context(self.ctx)

def _prime_executor(self):
# Make sure that the executor is ready to do work before running the
# tests. This should reduce the probability of timeouts in the tests.
Expand All @@ -143,10 +151,10 @@ class ProcessPoolForkMixin(ExecutorMixin):
executor_type = futures.ProcessPoolExecutor
ctx = "fork"

def setUp(self):
def get_context(self):
if sys.platform == "win32":
self.skipTest("require unix system")
super().setUp()
return super().get_context()


class ProcessPoolSpawnMixin(ExecutorMixin):
Expand All @@ -158,10 +166,10 @@ class ProcessPoolForkserverMixin(ExecutorMixin):
executor_type = futures.ProcessPoolExecutor
ctx = "forkserver"

def setUp(self):
def get_context(self):
if sys.platform == "win32":
self.skipTest("require unix system")
super().setUp()
return super().get_context()


def create_executor_tests(mixin, bases=(BaseTestCase,),
Expand Down Expand Up @@ -206,7 +214,18 @@ class FailingInitializerMixin(ExecutorMixin):
worker_count = 2

def setUp(self):
self.executor_kwargs = dict(initializer=init_fail)
if hasattr(self, "ctx"):
# Pass a queue to redirect the child's logging output
self.mp_context = self.get_context()
self.log_queue = self.mp_context.Queue()
self.executor_kwargs = dict(initializer=init_fail,
initargs=(self.log_queue,))
else:
# In a thread pool, the child shares our logging setup
# (see _assert_logged())
self.mp_context = None
self.log_queue = None
self.executor_kwargs = dict(initializer=init_fail)
super().setUp()

def test_initializer(self):
Expand Down Expand Up @@ -234,14 +253,20 @@ def _prime_executor(self):

@contextlib.contextmanager
def _assert_logged(self, msg):
if self.executor_type is futures.ProcessPoolExecutor:
# No easy way to catch the child processes' stderr
if self.log_queue is not None:
yield
output = []
try:
while True:
output.append(self.log_queue.get_nowait().getMessage())
except queue.Empty:
pass
else:
with self.assertLogs('concurrent.futures', 'CRITICAL') as cm:
yield
self.assertTrue(any(msg in line for line in cm.output),
cm.output)
output = cm.output
self.assertTrue(any(msg in line for line in output),
output)


create_executor_tests(InitializerMixin)
Expand Down