Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Update test.support from Python3.12
  • Loading branch information
CPython developers authored and youknowone committed Oct 23, 2023
commit 48ca7a771e0e6bb037313da08f5ea8b00886e847
435 changes: 371 additions & 64 deletions Lib/test/support/__init__.py

Large diffs are not rendered by default.

101 changes: 101 additions & 0 deletions Lib/test/support/bytecode_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import unittest
import dis
import io
from _testinternalcapi import compiler_codegen, optimize_cfg, assemble_code_object

_UNSPECIFIED = object()

Expand All @@ -16,6 +17,7 @@ def get_disassembly_as_string(self, co):

def assertInBytecode(self, x, opname, argval=_UNSPECIFIED):
"""Returns instr if opname is found, otherwise throws AssertionError"""
self.assertIn(opname, dis.opmap)
for instr in dis.get_instructions(x):
if instr.opname == opname:
if argval is _UNSPECIFIED or instr.argval == argval:
Expand All @@ -30,6 +32,7 @@ def assertInBytecode(self, x, opname, argval=_UNSPECIFIED):

def assertNotInBytecode(self, x, opname, argval=_UNSPECIFIED):
"""Throws AssertionError if opname is found"""
self.assertIn(opname, dis.opmap)
for instr in dis.get_instructions(x):
if instr.opname == opname:
disassembly = self.get_disassembly_as_string(x)
Expand All @@ -40,3 +43,101 @@ def assertNotInBytecode(self, x, opname, argval=_UNSPECIFIED):
msg = '(%s,%r) occurs in bytecode:\n%s'
msg = msg % (opname, argval, disassembly)
self.fail(msg)

class CompilationStepTestCase(unittest.TestCase):

HAS_ARG = set(dis.hasarg)
HAS_TARGET = set(dis.hasjrel + dis.hasjabs + dis.hasexc)
HAS_ARG_OR_TARGET = HAS_ARG.union(HAS_TARGET)

class Label:
pass

def assertInstructionsMatch(self, actual_, expected_):
# get two lists where each entry is a label or
# an instruction tuple. Normalize the labels to the
# instruction count of the target, and compare the lists.

self.assertIsInstance(actual_, list)
self.assertIsInstance(expected_, list)

actual = self.normalize_insts(actual_)
expected = self.normalize_insts(expected_)
self.assertEqual(len(actual), len(expected))

# compare instructions
for act, exp in zip(actual, expected):
if isinstance(act, int):
self.assertEqual(exp, act)
continue
self.assertIsInstance(exp, tuple)
self.assertIsInstance(act, tuple)
# crop comparison to the provided expected values
if len(act) > len(exp):
act = act[:len(exp)]
self.assertEqual(exp, act)

def resolveAndRemoveLabels(self, insts):
idx = 0
res = []
for item in insts:
assert isinstance(item, (self.Label, tuple))
if isinstance(item, self.Label):
item.value = idx
else:
idx += 1
res.append(item)

return res

def normalize_insts(self, insts):
""" Map labels to instruction index.
Map opcodes to opnames.
"""
insts = self.resolveAndRemoveLabels(insts)
res = []
for item in insts:
assert isinstance(item, tuple)
opcode, oparg, *loc = item
opcode = dis.opmap.get(opcode, opcode)
if isinstance(oparg, self.Label):
arg = oparg.value
else:
arg = oparg if opcode in self.HAS_ARG else None
opcode = dis.opname[opcode]
res.append((opcode, arg, *loc))
return res

def complete_insts_info(self, insts):
# fill in omitted fields in location, and oparg 0 for ops with no arg.
res = []
for item in insts:
assert isinstance(item, tuple)
inst = list(item)
opcode = dis.opmap[inst[0]]
oparg = inst[1]
loc = inst[2:] + [-1] * (6 - len(inst))
res.append((opcode, oparg, *loc))
return res


class CodegenTestCase(CompilationStepTestCase):

def generate_code(self, ast):
insts, _ = compiler_codegen(ast, "my_file.py", 0)
return insts


class CfgOptimizationTestCase(CompilationStepTestCase):

def get_optimized(self, insts, consts, nlocals=0):
insts = self.normalize_insts(insts)
insts = self.complete_insts_info(insts)
insts = optimize_cfg(insts, consts, nlocals)
return insts, consts

class AssemblerTestCase(CompilationStepTestCase):

def get_code_object(self, filename, insts, metadata):
co = assemble_code_object(filename, insts, metadata)
return co
28 changes: 28 additions & 0 deletions Lib/test/support/import_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,26 @@ def frozen_modules(enabled=True):
_imp._override_frozen_modules_for_tests(0)


@contextlib.contextmanager
def multi_interp_extensions_check(enabled=True):
"""Force legacy modules to be allowed in subinterpreters (or not).

("legacy" == single-phase init)

This only applies to modules that haven't been imported yet.
It overrides the PyInterpreterConfig.check_multi_interp_extensions
setting (see support.run_in_subinterp_with_config() and
_xxsubinterpreters.create()).

Also see importlib.utils.allowing_all_extensions().
"""
old = _imp._override_multi_interp_extensions_check(1 if enabled else -1)
try:
yield
finally:
_imp._override_multi_interp_extensions_check(old)


def import_fresh_module(name, fresh=(), blocked=(), *,
deprecated=False,
usefrozen=False,
Expand Down Expand Up @@ -246,3 +266,11 @@ def modules_cleanup(oldmodules):
# do currently). Implicitly imported *real* modules should be left alone
# (see issue 10556).
sys.modules.update(oldmodules)


def mock_register_at_fork(func):
# bpo-30599: Mock os.register_at_fork() when importing the random module,
# since this function doesn't allow to unregister callbacks and would leak
# memory.
from unittest import mock
return mock.patch('os.register_at_fork', create=True)(func)
23 changes: 12 additions & 11 deletions Lib/test/support/interpreters.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@

import time
import _xxsubinterpreters as _interpreters
import _xxinterpchannels as _channels

# aliases:
from _xxsubinterpreters import (
from _xxsubinterpreters import is_shareable, RunFailedError
from _xxinterpchannels import (
ChannelError, ChannelNotFoundError, ChannelEmptyError,
is_shareable,
)


Expand Down Expand Up @@ -102,22 +103,22 @@ def create_channel():

The channel may be used to pass data safely between interpreters.
"""
cid = _interpreters.channel_create()
cid = _channels.create()
recv, send = RecvChannel(cid), SendChannel(cid)
return recv, send


def list_all_channels():
"""Return a list of (recv, send) for all open channels."""
return [(RecvChannel(cid), SendChannel(cid))
for cid in _interpreters.channel_list_all()]
for cid in _channels.list_all()]


class _ChannelEnd:
"""The base class for RecvChannel and SendChannel."""

def __init__(self, id):
if not isinstance(id, (int, _interpreters.ChannelID)):
if not isinstance(id, (int, _channels.ChannelID)):
raise TypeError(f'id must be an int, got {id!r}')
self._id = id

Expand Down Expand Up @@ -152,10 +153,10 @@ def recv(self, *, _sentinel=object(), _delay=10 / 1000): # 10 milliseconds
This blocks until an object has been sent, if none have been
sent already.
"""
obj = _interpreters.channel_recv(self._id, _sentinel)
obj = _channels.recv(self._id, _sentinel)
while obj is _sentinel:
time.sleep(_delay)
obj = _interpreters.channel_recv(self._id, _sentinel)
obj = _channels.recv(self._id, _sentinel)
return obj

def recv_nowait(self, default=_NOT_SET):
Expand All @@ -166,9 +167,9 @@ def recv_nowait(self, default=_NOT_SET):
is the same as recv().
"""
if default is _NOT_SET:
return _interpreters.channel_recv(self._id)
return _channels.recv(self._id)
else:
return _interpreters.channel_recv(self._id, default)
return _channels.recv(self._id, default)


class SendChannel(_ChannelEnd):
Expand All @@ -179,7 +180,7 @@ def send(self, obj):

This blocks until the object is received.
"""
_interpreters.channel_send(self._id, obj)
_channels.send(self._id, obj)
# XXX We are missing a low-level channel_send_wait().
# See bpo-32604 and gh-19829.
# Until that shows up we fake it:
Expand All @@ -194,4 +195,4 @@ def send_nowait(self, obj):
# XXX Note that at the moment channel_send() only ever returns
# None. This should be fixed when channel_send_wait() is added.
# See bpo-32604 and gh-19829.
return _interpreters.channel_send(self._id, obj)
return _channels.send(self._id, obj)
48 changes: 42 additions & 6 deletions Lib/test/support/os_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,15 @@
import os
import re
import stat
import string
import sys
import time
import unittest
import warnings


# Filename used for testing
if os.name == 'java':
# Jython disallows @ in module names
TESTFN_ASCII = '$test'
else:
TESTFN_ASCII = '@test'
TESTFN_ASCII = '@test'

# Disambiguate TESTFN for parallel testing, while letting it remain a valid
# module name.
Expand Down Expand Up @@ -141,6 +138,11 @@
try:
name.decode(sys.getfilesystemencoding())
except UnicodeDecodeError:
try:
name.decode(sys.getfilesystemencoding(),
sys.getfilesystemencodeerrors())
except UnicodeDecodeError:
continue
TESTFN_UNDECODABLE = os.fsencode(TESTFN_ASCII) + name
break

Expand Down Expand Up @@ -567,7 +569,7 @@ def fs_is_case_insensitive(directory):


class FakePath:
"""Simple implementing of the path protocol.
"""Simple implementation of the path protocol.
"""
def __init__(self, path):
self.path = path
Expand Down Expand Up @@ -715,3 +717,37 @@ def __exit__(self, *ignore_exc):
else:
self._environ[k] = v
os.environ = self._environ


try:
import ctypes
kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)

ERROR_FILE_NOT_FOUND = 2
DDD_REMOVE_DEFINITION = 2
DDD_EXACT_MATCH_ON_REMOVE = 4
DDD_NO_BROADCAST_SYSTEM = 8
except (ImportError, AttributeError):
def subst_drive(path):
raise unittest.SkipTest('ctypes or kernel32 is not available')
else:
@contextlib.contextmanager
def subst_drive(path):
"""Temporarily yield a substitute drive for a given path."""
for c in reversed(string.ascii_uppercase):
drive = f'{c}:'
if (not kernel32.QueryDosDeviceW(drive, None, 0) and
ctypes.get_last_error() == ERROR_FILE_NOT_FOUND):
break
else:
raise unittest.SkipTest('no available logical drive')
if not kernel32.DefineDosDeviceW(
DDD_NO_BROADCAST_SYSTEM, drive, path):
raise ctypes.WinError(ctypes.get_last_error())
try:
yield drive
finally:
if not kernel32.DefineDosDeviceW(
DDD_REMOVE_DEFINITION | DDD_EXACT_MATCH_ON_REMOVE,
drive, path):
raise ctypes.WinError(ctypes.get_last_error())
Loading