Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add _forget_extension().
  • Loading branch information
ericsnowcurrently committed Feb 17, 2023
commit 828a734364ccfe94281f1c960396cd79f417cfa9
131 changes: 87 additions & 44 deletions Lib/test/test_imp.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,38 @@ def requires_load_dynamic(meth):
'imp.load_dynamic() required')(meth)


def _forget_extension(mod):
"""Clear all internally cached data for the extension.

This mostly applies only to single-phase init modules.
"""
if isinstance(mod, str):
name = mod
mod = None
else:
name = mod.__name__

try:
del sys.modules[name]
except KeyError:
pass
sys.modules[name] = None

try:
if mod is None:
fileobj, filename, _ = imp.find_module(name)
fileobj.close()
mod = imp.load_dynamic(name, filename)
else:
filename = mod.__file__
if hasattr(mod, '_clear_globals'):
mod._clear_globals()
del mod
_testinternalcapi.clear_extension(name, filename)
finally:
del sys.modules[name]


class LockTests(unittest.TestCase):

"""Very basic test of import lock functions."""
Expand Down Expand Up @@ -263,6 +295,16 @@ def test_issue16421_multiple_modules_in_one_dll(self):
with self.assertRaises(ImportError):
imp.load_dynamic('nonexistent', pathname)

@requires_load_dynamic
def test_singlephase_clears_globals(self):
import _testsinglephase
self.addCleanup(_forget_extension, _testsinglephase)

_testsinglephase._clear_globals()
init_count = _testsinglephase.initialized_count()

self.assertEqual(init_count, -1)

@requires_subinterpreters
@requires_load_dynamic
def test_singlephase_multiple_interpreters(self):
Expand All @@ -271,63 +313,65 @@ def test_singlephase_multiple_interpreters(self):
# PyModuleDef for that object, which can be a problem.

# This single-phase module has global state, which is shared
# by the interpreters.
# by all interpreters.
import _testsinglephase
name = _testsinglephase.__name__
filename = _testsinglephase.__file__

del sys.modules[name]
_testsinglephase._clear_globals()
_testinternalcapi.clear_extension(name, filename)
# Start and end clean.
_forget_extension(_testsinglephase)
import _testsinglephase
self.addCleanup(_forget_extension, _testsinglephase)

init_count = _testsinglephase.initialized_count()
assert init_count == -1, (init_count,)
lookedup = _testsinglephase.look_up_self()
_initialized = _testsinglephase._initialized
initialized = _testsinglephase.initialized()

def clean_up():
_testsinglephase._clear_globals()
_testinternalcapi.clear_extension(name, filename)
self.addCleanup(clean_up)
self.assertEqual(init_count, 1)
self.assertIs(lookedup, _testsinglephase)
self.assertEqual(_initialized, initialized)

interp1 = _interpreters.create(isolated=False)
self.addCleanup(_interpreters.destroy, interp1)
interp2 = _interpreters.create(isolated=False)
self.addCleanup(_interpreters.destroy, interp2)

script = textwrap.dedent(f'''
import _testsinglephase
with self.subTest('without resetting'):
script = textwrap.dedent(f'''
import _testsinglephase

expected = %d
init_count = _testsinglephase.initialized_count()
if init_count != expected:
raise Exception(init_count)
expected = %d
init_count = _testsinglephase.initialized_count()
if init_count != expected:
raise Exception(init_count)

lookedup = _testsinglephase.look_up_self()
if lookedup is not _testsinglephase:
raise Exception((_testsinglephase, lookedup))
lookedup = _testsinglephase.look_up_self()
if lookedup is not _testsinglephase:
raise Exception((_testsinglephase, lookedup))

# Attrs set in the module init func are in m_copy.
_initialized = _testsinglephase._initialized
initialized = _testsinglephase.initialized()
if _initialized != initialized:
raise Exception((_initialized, initialized))
# Attrs set in the module init func are in m_copy.
_initialized = _testsinglephase._initialized
initialized = _testsinglephase.initialized()
if _initialized != initialized:
raise Exception((_initialized, initialized))

# Attrs set after loading are not in m_copy.
if hasattr(_testsinglephase, 'spam'):
raise Exception(_testsinglephase.spam)
_testsinglephase.spam = expected
''')
# Attrs set after loading are not in m_copy.
if hasattr(_testsinglephase, 'spam'):
raise Exception(_testsinglephase.spam)
_testsinglephase.spam = expected
''')

# Use an interpreter that gets destroyed right away.
ret = support.run_in_subinterp(script % 1)
self.assertEqual(ret, 0)
# Use an interpreter that gets destroyed right away.
ret = support.run_in_subinterp(script % 1)
self.assertEqual(ret, 0)

# The module's init func gets run again.
# The module's globals did not get destroyed.
_interpreters.run_string(interp1, script % 2)
# The module's init func gets run again.
# The module's globals did not get destroyed.
_interpreters.run_string(interp1, script % 1)

# The module's init func is not run again.
# The second interpreter copies the module's m_copy.
# However, globals are still shared.
_interpreters.run_string(interp2, script % 2)
# The module's init func is not run again.
# The second interpreter copies the module's m_copy.
# However, globals are still shared.
_interpreters.run_string(interp2, script % 1)

@requires_load_dynamic
def test_singlephase_variants(self):
Expand All @@ -338,10 +382,9 @@ def test_singlephase_variants(self):
fileobj, pathname, _ = imp.find_module(basename)
fileobj.close()

def clean_up():
import _testsinglephase
_testsinglephase._clear_globals()
self.addCleanup(clean_up)
# Start and end clean.
_forget_extension(basename)
self.addCleanup(_forget_extension, basename)

def add_ext_cleanup(name):
def clean_up():
Expand Down