Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Update test.support from CPython 3.11.2
  • Loading branch information
CPython Devleopers authored and youknowone committed Feb 23, 2023
commit c48fe567f796601a422aa1b611406ab034ed330a
241 changes: 187 additions & 54 deletions Lib/test/support/__init__.py

Large diffs are not rendered by default.

46 changes: 37 additions & 9 deletions Lib/test/support/import_helper.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import contextlib
import _imp
import importlib
import importlib.util
import os
Expand Down Expand Up @@ -90,7 +91,24 @@ def _save_and_remove_modules(names):
return orig_modules


def import_fresh_module(name, fresh=(), blocked=(), deprecated=False):
@contextlib.contextmanager
def frozen_modules(enabled=True):
"""Force frozen modules to be used (or not).

This only applies to modules that haven't been imported yet.
Also, some essential modules will always be imported frozen.
"""
_imp._override_frozen_modules_for_tests(1 if enabled else -1)
try:
yield
finally:
_imp._override_frozen_modules_for_tests(0)


def import_fresh_module(name, fresh=(), blocked=(), *,
deprecated=False,
usefrozen=False,
):
"""Import and return a module, deliberately bypassing sys.modules.

This function imports and returns a fresh copy of the named Python module
Expand All @@ -115,6 +133,9 @@ def import_fresh_module(name, fresh=(), blocked=(), deprecated=False):

This function will raise ImportError if the named module cannot be
imported.

If "usefrozen" is False (the default) then the frozen importer is
disabled (except for essential modules like importlib._bootstrap).
"""
# NOTE: test_heapq, test_json and test_warnings include extra sanity checks
# to make sure that this utility function is working as expected
Expand All @@ -129,13 +150,14 @@ def import_fresh_module(name, fresh=(), blocked=(), deprecated=False):
sys.modules[modname] = None

try:
# Return None when one of the "fresh" modules can not be imported.
try:
for modname in fresh:
__import__(modname)
except ImportError:
return None
return importlib.import_module(name)
with frozen_modules(usefrozen):
# Return None when one of the "fresh" modules can not be imported.
try:
for modname in fresh:
__import__(modname)
except ImportError:
return None
return importlib.import_module(name)
finally:
_save_and_remove_modules(names)
sys.modules.update(orig_modules)
Expand All @@ -151,9 +173,12 @@ class CleanImport(object):

with CleanImport("foo"):
importlib.import_module("foo") # new reference

If "usefrozen" is False (the default) then the frozen importer is
disabled (except for essential modules like importlib._bootstrap).
"""

def __init__(self, *module_names):
def __init__(self, *module_names, usefrozen=False):
self.original_modules = sys.modules.copy()
for module_name in module_names:
if module_name in sys.modules:
Expand All @@ -165,12 +190,15 @@ def __init__(self, *module_names):
if module.__name__ != module_name:
del sys.modules[module.__name__]
del sys.modules[module_name]
self._frozen_modules = frozen_modules(usefrozen)

def __enter__(self):
self._frozen_modules.__enter__()
return self

def __exit__(self, *ignore_exc):
sys.modules.update(self.original_modules)
self._frozen_modules.__exit__(*ignore_exc)


class DirsOnSysPath(object):
Expand Down
106 changes: 100 additions & 6 deletions Lib/test/support/os_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@
'encoding (%s). Unicode filename tests may not be effective'
% (TESTFN_UNENCODABLE, sys.getfilesystemencoding()))
TESTFN_UNENCODABLE = None
# Mac OS X denies unencodable filenames (invalid utf-8)
elif sys.platform != 'darwin':
# macOS and Emscripten deny unencodable filenames (invalid utf-8)
elif sys.platform not in {'darwin', 'emscripten', 'wasi'}:
try:
# ascii and utf-8 cannot encode the byte 0xff
b'\xff'.decode(sys.getfilesystemencoding())
Expand Down Expand Up @@ -171,9 +171,13 @@ def can_symlink():
global _can_symlink
if _can_symlink is not None:
return _can_symlink
symlink_path = TESTFN + "can_symlink"
# WASI / wasmtime prevents symlinks with absolute paths, see man
# openat2(2) RESOLVE_BENEATH. Almost all symlink tests use absolute
# paths. Skip symlink tests on WASI for now.
src = os.path.abspath(TESTFN)
symlink_path = src + "can_symlink"
try:
os.symlink(TESTFN, symlink_path)
os.symlink(src, symlink_path)
can = True
except (OSError, NotImplementedError, AttributeError):
can = False
Expand Down Expand Up @@ -233,6 +237,84 @@ def skip_unless_xattr(test):
return test if ok else unittest.skip(msg)(test)


_can_chmod = None

def can_chmod():
global _can_chmod
if _can_chmod is not None:
return _can_chmod
if not hasattr(os, "chown"):
_can_chmod = False
return _can_chmod
try:
with open(TESTFN, "wb") as f:
try:
os.chmod(TESTFN, 0o777)
mode1 = os.stat(TESTFN).st_mode
os.chmod(TESTFN, 0o666)
mode2 = os.stat(TESTFN).st_mode
except OSError as e:
can = False
else:
can = stat.S_IMODE(mode1) != stat.S_IMODE(mode2)
finally:
unlink(TESTFN)
_can_chmod = can
return can


def skip_unless_working_chmod(test):
"""Skip tests that require working os.chmod()

WASI SDK 15.0 cannot change file mode bits.
"""
ok = can_chmod()
msg = "requires working os.chmod()"
return test if ok else unittest.skip(msg)(test)


# Check whether the current effective user has the capability to override
# DAC (discretionary access control). Typically user root is able to
# bypass file read, write, and execute permission checks. The capability
# is independent of the effective user. See capabilities(7).
_can_dac_override = None

def can_dac_override():
global _can_dac_override

if not can_chmod():
_can_dac_override = False
if _can_dac_override is not None:
return _can_dac_override

try:
with open(TESTFN, "wb") as f:
os.chmod(TESTFN, 0o400)
try:
with open(TESTFN, "wb"):
pass
except OSError:
_can_dac_override = False
else:
_can_dac_override = True
finally:
unlink(TESTFN)

return _can_dac_override


def skip_if_dac_override(test):
ok = not can_dac_override()
msg = "incompatible with CAP_DAC_OVERRIDE"
return test if ok else unittest.skip(msg)(test)


def skip_unless_dac_override(test):
ok = can_dac_override()
msg = "requires CAP_DAC_OVERRIDE"
return test if ok else unittest.skip(msg)(test)


def unlink(filename):
try:
_unlink(filename)
Expand Down Expand Up @@ -459,7 +541,10 @@ def create_empty_file(filename):
def open_dir_fd(path):
"""Open a file descriptor to a directory."""
assert os.path.isdir(path)
dir_fd = os.open(path, os.O_RDONLY)
flags = os.O_RDONLY
if hasattr(os, "O_DIRECTORY"):
flags |= os.O_DIRECTORY
dir_fd = os.open(path, flags)
try:
yield dir_fd
finally:
Expand Down Expand Up @@ -502,7 +587,7 @@ def __fspath__(self):
def fd_count():
"""Count the number of open file descriptors.
"""
if sys.platform.startswith(('linux', 'freebsd')):
if sys.platform.startswith(('linux', 'freebsd', 'emscripten')):
try:
names = os.listdir("/proc/self/fd")
# Subtract one because listdir() internally opens a file
Expand Down Expand Up @@ -568,6 +653,11 @@ def temp_umask(umask):
yield
finally:
os.umask(oldmask)
else:
@contextlib.contextmanager
def temp_umask(umask):
"""no-op on platforms without umask()"""
yield


class EnvironmentVarGuard(collections.abc.MutableMapping):
Expand Down Expand Up @@ -610,6 +700,10 @@ def set(self, envvar, value):
def unset(self, envvar):
del self[envvar]

def copy(self):
# We do what os.environ.copy() does.
return dict(self)

def __enter__(self):
return self

Expand Down
8 changes: 8 additions & 0 deletions Lib/test/support/script_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ def interpreter_requires_environment():
if 'PYTHONHOME' in os.environ:
__cached_interp_requires_environment = True
return True
# cannot run subprocess, assume we don't need it
if not support.has_subprocess_support:
__cached_interp_requires_environment = False
return False

# Try running an interpreter with -E to see if it works or not.
try:
Expand Down Expand Up @@ -87,6 +91,7 @@ def fail(self, cmd_line):


# Executing the interpreter in a subprocess
@support.requires_subprocess()
def run_python_until_end(*args, **env_vars):
env_required = interpreter_requires_environment()
cwd = env_vars.pop('__cwd', None)
Expand Down Expand Up @@ -139,6 +144,7 @@ def run_python_until_end(*args, **env_vars):
return _PythonRunResult(rc, out, err), cmd_line


@support.requires_subprocess()
def _assert_python(expected_success, /, *args, **env_vars):
res, cmd_line = run_python_until_end(*args, **env_vars)
if (res.rc and expected_success) or (not res.rc and not expected_success):
Expand Down Expand Up @@ -171,6 +177,7 @@ def assert_python_failure(*args, **env_vars):
return _assert_python(False, *args, **env_vars)


@support.requires_subprocess()
def spawn_python(*args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kw):
"""Run a Python subprocess with the given arguments.

Expand Down Expand Up @@ -273,6 +280,7 @@ def make_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename,
return zip_name, os.path.join(zip_name, script_name_in_zip)


@support.requires_subprocess()
def run_test_script(script):
# use -u to try to get the full output if the test hangs or crash
if support.verbose:
Expand Down
9 changes: 6 additions & 3 deletions Lib/test/support/socket_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@
import sys

from .. import support

from . import warnings_helper

HOST = "localhost"
HOSTv4 = "127.0.0.1"
HOSTv6 = "::1"

# WASI SDK 15.0 does not provide gethostname, stub raises OSError ENOTSUP.
has_gethostname = not support.is_wasi


def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
"""Returns an unused port that should be suitable for binding. This is
Expand Down Expand Up @@ -190,7 +193,7 @@ def get_socket_conn_refused_errs():
def transient_internet(resource_name, *, timeout=_NOT_SET, errnos=()):
"""Return a context manager that raises ResourceDenied when various issues
with the internet connection manifest themselves as exceptions."""
import nntplib
nntplib = warnings_helper.import_deprecated("nntplib")
import urllib.error
if timeout is _NOT_SET:
timeout = support.INTERNET_TIMEOUT
Expand Down Expand Up @@ -256,7 +259,7 @@ def filter_error(err):
err = a[0]
# The error can also be wrapped as args[1]:
# except socket.error as msg:
# raise OSError('socket error', msg).with_traceback(sys.exc_info()[2])
# raise OSError('socket error', msg) from msg
elif len(a) >= 2 and isinstance(a[1], OSError):
err = a[1]
else:
Expand Down
2 changes: 1 addition & 1 deletion Lib/test/support/testresult.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def test_error(self):
raise RuntimeError('error message')

suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestTests))
suite.addTest(unittest.TestLoader().loadTestsFromTestCase(TestTests))
stream = io.StringIO()
runner_cls = get_test_runner_class(sum(a == '-v' for a in sys.argv))
runner = runner_cls(sys.stdout)
Expand Down
35 changes: 35 additions & 0 deletions Lib/test/support/threading_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import sys
import threading
import time
import unittest

from test import support

Expand Down Expand Up @@ -207,3 +208,37 @@ def __exit__(self, *exc_info):
del self.exc_value
del self.exc_traceback
del self.thread


def _can_start_thread() -> bool:
"""Detect whether Python can start new threads.

Some WebAssembly platforms do not provide a working pthread
implementation. Thread support is stubbed and any attempt
to create a new thread fails.

- wasm32-wasi does not have threading.
- wasm32-emscripten can be compiled with or without pthread
support (-s USE_PTHREADS / __EMSCRIPTEN_PTHREADS__).
"""
if sys.platform == "emscripten":
return sys._emscripten_info.pthreads
elif sys.platform == "wasi":
return False
else:
# assume all other platforms have working thread support.
return True

can_start_thread = _can_start_thread()

def requires_working_threading(*, module=False):
"""Skip tests or modules that require working threading.

Can be used as a function/class decorator or to skip an entire module.
"""
msg = "requires threading support"
if module:
if not can_start_thread:
raise unittest.SkipTest(msg)
else:
return unittest.skipUnless(can_start_thread, msg)
Loading