Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Update unittest from CPython 3.10.5
  • Loading branch information
CPython developers authored and youknowone committed Jul 17, 2022
commit 799c78670b8575b9c2992acc5cb723b5c1912794
12 changes: 6 additions & 6 deletions Lib/test/test_unittest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
from test import support


def test_main():
# used by regrtest
support.run_unittest(unittest.test.suite())
support.reap_children()

def load_tests(*_):
# used by unittest
return unittest.test.suite()


def tearDownModule():
support.reap_children()


if __name__ == "__main__":
test_main()
unittest.main()
25 changes: 21 additions & 4 deletions Lib/unittest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,26 +44,28 @@ def testMultiply(self):
SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
"""

__all__ = ['TestResult', 'TestCase', 'TestSuite',
__all__ = ['TestResult', 'TestCase', 'IsolatedAsyncioTestCase', 'TestSuite',
'TextTestRunner', 'TestLoader', 'FunctionTestCase', 'main',
'defaultTestLoader', 'SkipTest', 'skip', 'skipIf', 'skipUnless',
'expectedFailure', 'TextTestResult', 'installHandler',
'registerResult', 'removeResult', 'removeHandler']
'registerResult', 'removeResult', 'removeHandler',
'addModuleCleanup']

# Expose obsolete functions for backwards compatibility
__all__.extend(['getTestCaseNames', 'makeSuite', 'findTestCases'])

__unittest = True

from .result import TestResult
from .case import (TestCase, FunctionTestCase, SkipTest, skip, skipIf,
skipUnless, expectedFailure)
from .case import (addModuleCleanup, TestCase, FunctionTestCase, SkipTest, skip,
skipIf, skipUnless, expectedFailure)
from .suite import BaseTestSuite, TestSuite
from .loader import (TestLoader, defaultTestLoader, makeSuite, getTestCaseNames,
findTestCases)
from .main import TestProgram, main
from .runner import TextTestRunner, TextTestResult
from .signals import installHandler, registerResult, removeResult, removeHandler
# IsolatedAsyncioTestCase will be imported lazily.

# deprecated
_TextTestResult = TextTestResult
Expand All @@ -76,3 +78,18 @@ def load_tests(loader, tests, pattern):
# top level directory cached on loader instance
this_dir = os.path.dirname(__file__)
return loader.discover(start_dir=this_dir, pattern=pattern)


# Lazy import of IsolatedAsyncioTestCase from .async_case
# It imports asyncio, which is relatively heavy, but most tests
# do not need it.

def __dir__():
return globals().keys() | {'IsolatedAsyncioTestCase'}

def __getattr__(name):
if name == 'IsolatedAsyncioTestCase':
global IsolatedAsyncioTestCase
from .async_case import IsolatedAsyncioTestCase
return IsolatedAsyncioTestCase
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
86 changes: 86 additions & 0 deletions Lib/unittest/_log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import logging
import collections

from .case import _BaseTestCaseContext


_LoggingWatcher = collections.namedtuple("_LoggingWatcher",
["records", "output"])

class _CapturingHandler(logging.Handler):
"""
A logging handler capturing all (raw and formatted) logging output.
"""

def __init__(self):
logging.Handler.__init__(self)
self.watcher = _LoggingWatcher([], [])

def flush(self):
pass

def emit(self, record):
self.watcher.records.append(record)
msg = self.format(record)
self.watcher.output.append(msg)


class _AssertLogsContext(_BaseTestCaseContext):
"""A context manager for assertLogs() and assertNoLogs() """

LOGGING_FORMAT = "%(levelname)s:%(name)s:%(message)s"

def __init__(self, test_case, logger_name, level, no_logs):
_BaseTestCaseContext.__init__(self, test_case)
self.logger_name = logger_name
if level:
self.level = logging._nameToLevel.get(level, level)
else:
self.level = logging.INFO
self.msg = None
self.no_logs = no_logs

def __enter__(self):
if isinstance(self.logger_name, logging.Logger):
logger = self.logger = self.logger_name
else:
logger = self.logger = logging.getLogger(self.logger_name)
formatter = logging.Formatter(self.LOGGING_FORMAT)
handler = _CapturingHandler()
handler.setLevel(self.level)
handler.setFormatter(formatter)
self.watcher = handler.watcher
self.old_handlers = logger.handlers[:]
self.old_level = logger.level
self.old_propagate = logger.propagate
logger.handlers = [handler]
logger.setLevel(self.level)
logger.propagate = False
if self.no_logs:
return
return handler.watcher

def __exit__(self, exc_type, exc_value, tb):
self.logger.handlers = self.old_handlers
self.logger.propagate = self.old_propagate
self.logger.setLevel(self.old_level)

if exc_type is not None:
# let unexpected exceptions pass through
return False

if self.no_logs:
# assertNoLogs
if len(self.watcher.records) > 0:
self._raiseFailure(
"Unexpected logs found: {!r}".format(
self.watcher.output
)
)

else:
# assertLogs
if len(self.watcher.records) == 0:
self._raiseFailure(
"no logs of level {} or higher triggered on {}"
.format(logging.getLevelName(self.level), self.logger.name))
170 changes: 170 additions & 0 deletions Lib/unittest/async_case.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import asyncio
import inspect

from .case import TestCase


class IsolatedAsyncioTestCase(TestCase):
# Names intentionally have a long prefix
# to reduce a chance of clashing with user-defined attributes
# from inherited test case
#
# The class doesn't call loop.run_until_complete(self.setUp()) and family
# but uses a different approach:
# 1. create a long-running task that reads self.setUp()
# awaitable from queue along with a future
# 2. await the awaitable object passing in and set the result
# into the future object
# 3. Outer code puts the awaitable and the future object into a queue
# with waiting for the future
# The trick is necessary because every run_until_complete() call
# creates a new task with embedded ContextVar context.
# To share contextvars between setUp(), test and tearDown() we need to execute
# them inside the same task.

# Note: the test case modifies event loop policy if the policy was not instantiated
# yet.
# asyncio.get_event_loop_policy() creates a default policy on demand but never
# returns None
# I believe this is not an issue in user level tests but python itself for testing
# should reset a policy in every test module
# by calling asyncio.set_event_loop_policy(None) in tearDownModule()

def __init__(self, methodName='runTest'):
super().__init__(methodName)
self._asyncioTestLoop = None
self._asyncioCallsQueue = None

async def asyncSetUp(self):
pass

async def asyncTearDown(self):
pass

def addAsyncCleanup(self, func, /, *args, **kwargs):
# A trivial trampoline to addCleanup()
# the function exists because it has a different semantics
# and signature:
# addCleanup() accepts regular functions
# but addAsyncCleanup() accepts coroutines
#
# We intentionally don't add inspect.iscoroutinefunction() check
# for func argument because there is no way
# to check for async function reliably:
# 1. It can be "async def func()" itself
# 2. Class can implement "async def __call__()" method
# 3. Regular "def func()" that returns awaitable object
self.addCleanup(*(func, *args), **kwargs)

def _callSetUp(self):
self.setUp()
self._callAsync(self.asyncSetUp)

def _callTestMethod(self, method):
self._callMaybeAsync(method)

def _callTearDown(self):
self._callAsync(self.asyncTearDown)
self.tearDown()

def _callCleanup(self, function, *args, **kwargs):
self._callMaybeAsync(function, *args, **kwargs)

def _callAsync(self, func, /, *args, **kwargs):
assert self._asyncioTestLoop is not None, 'asyncio test loop is not initialized'
ret = func(*args, **kwargs)
assert inspect.isawaitable(ret), f'{func!r} returned non-awaitable'
fut = self._asyncioTestLoop.create_future()
self._asyncioCallsQueue.put_nowait((fut, ret))
return self._asyncioTestLoop.run_until_complete(fut)

def _callMaybeAsync(self, func, /, *args, **kwargs):
assert self._asyncioTestLoop is not None, 'asyncio test loop is not initialized'
ret = func(*args, **kwargs)
if inspect.isawaitable(ret):
fut = self._asyncioTestLoop.create_future()
self._asyncioCallsQueue.put_nowait((fut, ret))
return self._asyncioTestLoop.run_until_complete(fut)
else:
return ret

async def _asyncioLoopRunner(self, fut):
self._asyncioCallsQueue = queue = asyncio.Queue()
fut.set_result(None)
while True:
query = await queue.get()
queue.task_done()
if query is None:
return
fut, awaitable = query
try:
ret = await awaitable
if not fut.cancelled():
fut.set_result(ret)
except (SystemExit, KeyboardInterrupt):
raise
except (BaseException, asyncio.CancelledError) as ex:
if not fut.cancelled():
fut.set_exception(ex)

def _setupAsyncioLoop(self):
assert self._asyncioTestLoop is None, 'asyncio test loop already initialized'
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.set_debug(True)
self._asyncioTestLoop = loop
fut = loop.create_future()
self._asyncioCallsTask = loop.create_task(self._asyncioLoopRunner(fut))
loop.run_until_complete(fut)

def _tearDownAsyncioLoop(self):
assert self._asyncioTestLoop is not None, 'asyncio test loop is not initialized'
loop = self._asyncioTestLoop
self._asyncioTestLoop = None
self._asyncioCallsQueue.put_nowait(None)
loop.run_until_complete(self._asyncioCallsQueue.join())

try:
# cancel all tasks
to_cancel = asyncio.all_tasks(loop)
if not to_cancel:
return

for task in to_cancel:
task.cancel()

loop.run_until_complete(
asyncio.gather(*to_cancel, return_exceptions=True))

for task in to_cancel:
if task.cancelled():
continue
if task.exception() is not None:
loop.call_exception_handler({
'message': 'unhandled exception during test shutdown',
'exception': task.exception(),
'task': task,
})
# shutdown asyncgens
loop.run_until_complete(loop.shutdown_asyncgens())
finally:
# Prevent our executor environment from leaking to future tests.
loop.run_until_complete(loop.shutdown_default_executor())
asyncio.set_event_loop(None)
loop.close()

def run(self, result=None):
self._setupAsyncioLoop()
try:
return super().run(result)
finally:
self._tearDownAsyncioLoop()

def debug(self):
self._setupAsyncioLoop()
super().debug()
self._tearDownAsyncioLoop()

def __del__(self):
if self._asyncioTestLoop is not None:
self._tearDownAsyncioLoop()
Loading