Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
update linecache to 3.13.3
  • Loading branch information
arihant2math committed Apr 22, 2025
commit 70f3aec5527a8e3dfb672d033eb98f55b352391f
91 changes: 70 additions & 21 deletions Lib/linecache.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,13 @@
that name.
"""

import functools
import sys
import os
import tokenize

__all__ = ["getline", "clearcache", "checkcache", "lazycache"]


# The cache. Maps filenames to either a thunk which will provide source code,
# or a tuple (size, mtime, lines, fullname) once loaded.
cache = {}
_interactive_cache = {}


def clearcache():
Expand Down Expand Up @@ -49,28 +45,54 @@ def getlines(filename, module_globals=None):
return []


def _getline_from_code(filename, lineno):
lines = _getlines_from_code(filename)
if 1 <= lineno <= len(lines):
return lines[lineno - 1]
return ''

def _make_key(code):
return (code.co_filename, code.co_qualname, code.co_firstlineno)

def _getlines_from_code(code):
code_id = _make_key(code)
if code_id in _interactive_cache:
entry = _interactive_cache[code_id]
if len(entry) != 1:
return _interactive_cache[code_id][2]
return []


def checkcache(filename=None):
"""Discard cache entries that are out of date.
(This is not checked upon each call!)"""

if filename is None:
filenames = list(cache.keys())
elif filename in cache:
filenames = [filename]
# get keys atomically
filenames = cache.copy().keys()
else:
return
filenames = [filename]

for filename in filenames:
entry = cache[filename]
try:
entry = cache[filename]
except KeyError:
continue

if len(entry) == 1:
# lazy cache entry, leave it lazy.
continue
size, mtime, lines, fullname = entry
if mtime is None:
continue # no-op for files loaded via a __loader__
try:
# This import can fail if the interpreter is shutting down
import os
except ImportError:
return
try:
stat = os.stat(fullname)
except OSError:
except (OSError, ValueError):
cache.pop(filename, None)
continue
if size != stat.st_size or mtime != stat.st_mtime:
Expand All @@ -82,6 +104,17 @@ def updatecache(filename, module_globals=None):
If something's wrong, print a message, discard the cache entry,
and return an empty list."""

# These imports are not at top level because linecache is in the critical
# path of the interpreter startup and importing os and sys take a lot of time
# and slows down the startup sequence.
try:
import os
import sys
import tokenize
except ImportError:
# These import can fail if the interpreter is shutting down
return []

if filename in cache:
if len(cache[filename]) != 1:
cache.pop(filename, None)
Expand Down Expand Up @@ -128,16 +161,20 @@ def updatecache(filename, module_globals=None):
try:
stat = os.stat(fullname)
break
except OSError:
except (OSError, ValueError):
pass
else:
return []
except ValueError: # may be raised by os.stat()
return []
try:
with tokenize.open(fullname) as fp:
lines = fp.readlines()
except (OSError, UnicodeDecodeError, SyntaxError):
return []
if lines and not lines[-1].endswith('\n'):
if not lines:
lines = ['\n']
elif not lines[-1].endswith('\n'):
lines[-1] += '\n'
size, mtime = stat.st_size, stat.st_mtime
cache[filename] = size, mtime, lines, fullname
Expand Down Expand Up @@ -166,17 +203,29 @@ def lazycache(filename, module_globals):
return False
# Try for a __loader__, if available
if module_globals and '__name__' in module_globals:
name = module_globals['__name__']
if (loader := module_globals.get('__loader__')) is None:
if spec := module_globals.get('__spec__'):
try:
loader = spec.loader
except AttributeError:
pass
spec = module_globals.get('__spec__')
name = getattr(spec, 'name', None) or module_globals['__name__']
loader = getattr(spec, 'loader', None)
if loader is None:
loader = module_globals.get('__loader__')
get_source = getattr(loader, 'get_source', None)

if name and get_source:
get_lines = functools.partial(get_source, name)
def get_lines(name=name, *args, **kwargs):
return get_source(name, *args, **kwargs)
cache[filename] = (get_lines,)
return True
return False

def _register_code(code, string, name):
entry = (len(string),
None,
[line + '\n' for line in string.splitlines()],
name)
stack = [code]
while stack:
code = stack.pop()
for const in code.co_consts:
if isinstance(const, type(code)):
stack.append(const)
_interactive_cache[_make_key(code)] = entry
80 changes: 80 additions & 0 deletions Lib/test/test_linecache.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
import os.path
import tempfile
import tokenize
from importlib.machinery import ModuleSpec
from test import support
from test.support import os_helper
from test.support.script_helper import assert_python_ok


FILENAME = linecache.__file__
Expand Down Expand Up @@ -82,6 +84,10 @@ def test_getlines(self):
class EmptyFile(GetLineTestsGoodData, unittest.TestCase):
file_list = []

def test_getlines(self):
lines = linecache.getlines(self.file_name)
self.assertEqual(lines, ['\n'])


class SingleEmptyLine(GetLineTestsGoodData, unittest.TestCase):
file_list = ['\n']
Expand All @@ -97,6 +103,16 @@ class BadUnicode_WithDeclaration(GetLineTestsBadData, unittest.TestCase):
file_byte_string = b'# coding=utf-8\n\x80abc'


class FakeLoader:
def get_source(self, fullname):
return f'source for {fullname}'


class NoSourceLoader:
def get_source(self, fullname):
return None


class LineCacheTests(unittest.TestCase):

def test_getline(self):
Expand Down Expand Up @@ -238,6 +254,70 @@ def raise_memoryerror(*args, **kwargs):
self.assertEqual(lines3, [])
self.assertEqual(linecache.getlines(FILENAME), lines)

def test_loader(self):
filename = 'scheme://path'

for loader in (None, object(), NoSourceLoader()):
linecache.clearcache()
module_globals = {'__name__': 'a.b.c', '__loader__': loader}
self.assertEqual(linecache.getlines(filename, module_globals), [])

linecache.clearcache()
module_globals = {'__name__': 'a.b.c', '__loader__': FakeLoader()}
self.assertEqual(linecache.getlines(filename, module_globals),
['source for a.b.c\n'])

for spec in (None, object(), ModuleSpec('', FakeLoader())):
linecache.clearcache()
module_globals = {'__name__': 'a.b.c', '__loader__': FakeLoader(),
'__spec__': spec}
self.assertEqual(linecache.getlines(filename, module_globals),
['source for a.b.c\n'])

linecache.clearcache()
spec = ModuleSpec('x.y.z', FakeLoader())
module_globals = {'__name__': 'a.b.c', '__loader__': spec.loader,
'__spec__': spec}
self.assertEqual(linecache.getlines(filename, module_globals),
['source for x.y.z\n'])

def test_invalid_names(self):
for name, desc in [
('\x00', 'NUL bytes filename'),
(__file__ + '\x00', 'filename with embedded NUL bytes'),
# A filename with surrogate codes. A UnicodeEncodeError is raised
# by os.stat() upon querying, which is a subclass of ValueError.
("\uD834\uDD1E.py", 'surrogate codes (MUSICAL SYMBOL G CLEF)'),
# For POSIX platforms, an OSError will be raised but for Windows
# platforms, a ValueError is raised due to the path_t converter.
# See: https://github.com/python/cpython/issues/122170
('a' * 1_000_000, 'very long filename'),
]:
with self.subTest(f'updatecache: {desc}'):
linecache.clearcache()
lines = linecache.updatecache(name)
self.assertListEqual(lines, [])
self.assertNotIn(name, linecache.cache)

# hack into the cache (it shouldn't be allowed
# but we never know what people do...)
for key, fullname in [(name, 'ok'), ('key', name), (name, name)]:
with self.subTest(f'checkcache: {desc}',
key=key, fullname=fullname):
linecache.clearcache()
linecache.cache[key] = (0, 1234, [], fullname)
linecache.checkcache(key)
self.assertNotIn(key, linecache.cache)

# just to be sure that we did not mess with cache
linecache.clearcache()

def test_linecache_python_string(self):
cmdline = "import linecache;assert len(linecache.cache) == 0"
retcode, stdout, stderr = assert_python_ok('-c', cmdline)
self.assertEqual(retcode, 0)
self.assertEqual(stdout, b'')
self.assertEqual(stderr, b'')

class LineCacheInvalidationTests(unittest.TestCase):
def setUp(self):
Expand Down