Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions Lib/test/test_linecache.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,5 +239,48 @@ def raise_memoryerror(*args, **kwargs):
self.assertEqual(linecache.getlines(FILENAME), lines)


class LineCacheInvalidationTests(unittest.TestCase):
def setUp(self):
super().setUp()
linecache.clearcache()
self.deleted_file = os_helper.TESTFN + '.1'
self.modified_file = os_helper.TESTFN + '.2'
self.unchange_file = os_helper.TESTFN + '.3'
self.addCleanup(os_helper.unlink, self.deleted_file)
self.addCleanup(os_helper.unlink, self.modified_file)
self.addCleanup(os_helper.unlink, self.unchange_file)
with open(self.deleted_file, 'w', encoding='utf-8') as source:
source.write('print("will be deleted")')
with open(self.modified_file, 'w', encoding='utf-8') as source:
source.write('print("will be modified")')
with open(self.unchange_file, 'w', encoding='utf-8') as source:
source.write('print("unchange")')

linecache.getlines(self.deleted_file)
linecache.getlines(self.modified_file)
linecache.getlines(self.unchange_file)

os.remove(self.deleted_file)
with open(self.modified_file, 'w', encoding='utf-8') as source:
source.write('print("was modified")')

def test_checkcache_with_oserror(self):
Comment thread
uniocto marked this conversation as resolved.
Outdated
self.assertEqual(3, len(linecache.cache.keys()))
linecache.checkcache(self.deleted_file)
self.assertTrue(2 == len(linecache.cache.keys()) and
self.deleted_file not in linecache.cache.keys())
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Break this up into assertEqual and assertNotIn. See the assert methods here: https://docs.python.org/3/library/unittest.html

That way we get better error messages in the output when the assertion fails.


def test_checkcache_with_not_match_size_or_timestamp(self):
Comment thread
uniocto marked this conversation as resolved.
Outdated
self.assertEqual(3, len(linecache.cache.keys()))
linecache.checkcache(self.modified_file)
self.assertTrue(2 == len(linecache.cache.keys()) and
self.modified_file not in linecache.cache.keys())

def test_checkcache_with_no_parameters(self):
self.assertEqual(3, len(linecache.cache.keys()))
linecache.checkcache()
self.assertTrue([self.unchange_file] == list(linecache.cache.keys()))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertEqual



if __name__ == "__main__":
unittest.main()
19 changes: 19 additions & 0 deletions Lib/test/test_threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import subprocess
import signal
import textwrap
import traceback

from unittest import mock
from test import lock_tests
Expand Down Expand Up @@ -1338,6 +1339,24 @@ def run(self):
# explicitly break the reference cycle to not leak a dangling thread
thread.exc = None

def test_multithread_modify_file_noerror(self):
Comment thread
uniocto marked this conversation as resolved.
def modify_file():
with open(__file__, 'a') as fp:
Comment thread
iritkatriel marked this conversation as resolved.
Outdated
fp.write(' ')
Comment thread
iritkatriel marked this conversation as resolved.
traceback.format_stack()

threads = [
threading.Thread(target=modify_file)
for i in range(100)
]
try:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which exceptions are you trying to ignore here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing, sorry. I removed try.

for t in threads:
t.start()
for t in threads:
t.join()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why you don't do

for t in threads:
    t.start()
    t.join()

?

Copy link
Copy Markdown
Contributor Author

@uniocto uniocto May 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing, sorry.
So I fixed it to your feedback.

finally:
pass


class ThreadRunFail(threading.Thread):
def run(self):
Expand Down