Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Tests
  • Loading branch information
barneygale committed May 27, 2024
commit b504019e43a2cf03d115c7f730b10b37d7f4d4fa
8 changes: 7 additions & 1 deletion Lib/os.py
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,13 @@ def _fwalk(topfd, toppath, isbytes, topdown, onerror, follow_symlinks):
yield from _fwalk(dirfd, dirpath, isbytes,
topdown, onerror, follow_symlinks)
finally:
close(dirfd)
try:
close(dirfd)
except OSError as err:
err.filename = path.join(toppath, name)
if onerror is not None:
onerror(err)
continue

if not topdown:
yield toppath, dirs, nondirs, topfd
Expand Down
1 change: 1 addition & 0 deletions Lib/pathlib/_abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,7 @@ def on_error(error):
except OSError as error:
on_error(error)
except OSError as error:
error.filename = str(self)
on_error(error)
rmtree.avoids_symlink_attacks = False

Expand Down
1 change: 1 addition & 0 deletions Lib/pathlib/_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,7 @@ def on_error(error):
on_error(error)
os.rmdir(path)
except OSError as error:
error.filename = path
on_error(error)
rmtree.avoids_symlink_attacks = True

Expand Down
250 changes: 250 additions & 0 deletions Lib/test/test_pathlib/test_pathlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from test.support import import_helper
from test.support import is_emscripten, is_wasi
from test.support import infinite_recursion
from test.support import swap_attr
from test.support import os_helper
from test.support.os_helper import TESTFN, FakePath
from test.test_pathlib import test_pathlib_abc
Expand All @@ -31,6 +32,10 @@
if hasattr(os, 'geteuid'):
root_in_posix = (os.geteuid() == 0)

rmtree_use_fd_functions = (
{os.open, os.stat, os.unlink, os.rmdir} <= os.supports_dir_fd and
os.listdir in os.supports_fd and os.stat in os.supports_follow_symlinks)

#
# Tests for the pure classes.
#
Expand Down Expand Up @@ -764,6 +769,251 @@ def test_group_no_follow_symlinks(self):
self.assertEqual(expected_gid, gid_2)
self.assertEqual(expected_name, link.group(follow_symlinks=False))

def test_rmtree_uses_safe_fd_version_if_available(self):
if rmtree_use_fd_functions:
self.assertTrue(self.cls.rmtree.avoids_symlink_attacks)
d = self.cls(self.base, 'a')
d.mkdir()
try:
real_fwalk = os.fwalk

class Called(Exception):
pass

def _raiser(*args, **kwargs):
raise Called

os.fwalk = _raiser
self.assertRaises(Called, d.rmtree)
finally:
os.fwalk = real_fwalk
else:
self.assertFalse(self.cls.rmtree.avoids_symlink_attacks)

@unittest.skipIf(sys.platform[:6] == 'cygwin',
"This test can't be run on Cygwin (issue #1071513).")
@os_helper.skip_if_dac_override
@os_helper.skip_unless_working_chmod
def test_rmtree_unwritable(self):
tmp = self.cls(self.base, 'rmtree')
tmp.mkdir()
child_file_path = tmp / 'a'
child_dir_path = tmp / 'b'
child_file_path.write_text("")
child_dir_path.mkdir()
old_dir_mode = tmp.stat().st_mode
old_child_file_mode = child_file_path.stat().st_mode
old_child_dir_mode = child_dir_path.stat().st_mode
# Make unwritable.
new_mode = stat.S_IREAD | stat.S_IEXEC
try:
child_file_path.chmod(new_mode)
child_dir_path.chmod(new_mode)
tmp.chmod(new_mode)

errors = []
tmp.rmtree(on_error=errors.append)
# Test whether onerror has actually been called.
print(errors)
self.assertEqual(len(errors), 3)
finally:
tmp.chmod(old_dir_mode)
child_file_path.chmod(old_child_file_mode)
child_dir_path.chmod(old_child_dir_mode)

@needs_windows
def test_rmtree_inner_junction(self):
import _winapi
tmp = self.cls(self.base, 'rmtree')
tmp.mkdir()
dir1 = tmp / 'dir1'
dir2 = dir1 / 'dir2'
dir3 = tmp / 'dir3'
for d in dir1, dir2, dir3:
d.mkdir()
file1 = tmp / 'file1'
file1.write_text('foo')
link1 = dir1 / 'link1'
_winapi.CreateJunction(dir2, link1)
link2 = dir1 / 'link2'
_winapi.CreateJunction(dir3, link2)
link3 = dir1 / 'link3'
_winapi.CreateJunction(file1, link3)
# make sure junctions are removed but not followed
dir1.rmtree()
self.assertFalse(dir1.exists())
self.assertTrue(dir3.exists())
self.assertTrue(file1.exists())

@needs_windows
def test_rmtree_outer_junction(self):
import _winapi
tmp = self.cls(self.base, 'rmtree')
try:
src = tmp / 'cheese'
dst = tmp / 'shop'
src.mkdir()
spam = src / 'spam'
spam.write_text('')
_winapi.CreateJunction(src, dst)
self.assertRaises(OSError, dst.rmtree)
dst.rmtree(ignore_errors=True)
finally:
tmp.rmtree(ignore_errors=True)

@needs_windows
def test_rmtree_outer_junction_on_error(self):
import _winapi
tmp = self.cls(self.base, 'rmtree')
tmp.mkdir()
dir_ = tmp / 'dir'
dir_.mkdir()
link = tmp / 'link'
_winapi.CreateJunction(dir_, link)
self.addCleanup(os_helper.unlink, link)
self.assertRaises(OSError, link.rmtree)
self.assertTrue(dir_.exists())
self.assertTrue(link.exists(follow_symlinks=False))
errors = []

def on_error(error):
errors.append(error)

link.rmtree(on_error=on_error)
self.assertEqual(len(errors), 1)
self.assertIsInstance(errors[0], OSError)
self.assertEqual(errors[0].filename, str(link))

@unittest.skipUnless(rmtree_use_fd_functions, "requires safe rmtree")
def test_rmtree_fails_on_close(self):
# Test that the error handler is called for failed os.close() and that
# os.close() is only called once for a file descriptor.
tmp = self.cls(self.base, 'rmtree')
tmp.mkdir()
dir1 = tmp / 'dir1'
dir1.mkdir()
dir2 = dir1 / 'dir2'
dir2.mkdir()

def close(fd):
orig_close(fd)
nonlocal close_count
close_count += 1
raise OSError

close_count = 0
with swap_attr(os, 'close', close) as orig_close:
with self.assertRaises(OSError):
dir1.rmtree()
self.assertTrue(dir2.is_dir())
self.assertEqual(close_count, 2)

close_count = 0
errors = []

with swap_attr(os, 'close', close) as orig_close:
dir1.rmtree(on_error=errors.append)
print(errors)
self.assertEqual(len(errors), 2)
self.assertEqual(errors[0].filename, str(dir2))
self.assertEqual(errors[1].filename, str(dir1))
self.assertEqual(close_count, 2)

@unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
@unittest.skipIf(sys.platform == "vxworks",
"fifo requires special path on VxWorks")
def test_rmtree_on_named_pipe(self):
p = self.cls(self.base, 'pipe')
os.mkfifo(p)
try:
with self.assertRaises(NotADirectoryError):
p.rmtree()
self.assertTrue(p.exists())
finally:
p.unlink()

p = self.cls(self.base, 'dir')
p.mkdir()
os.mkfifo(p / 'mypipe')
p.rmtree()
self.assertFalse(p.exists())

@unittest.skipIf(sys.platform[:6] == 'cygwin',
"This test can't be run on Cygwin (issue #1071513).")
@os_helper.skip_if_dac_override
@os_helper.skip_unless_working_chmod
def test_rmtree_deleted_race_condition(self):
# bpo-37260
#
# Test that a file or a directory deleted after it is enumerated
# by scandir() but before unlink() or rmdr() is called doesn't
# generate any errors.
def on_error(exc):
if not isinstance(exc, PermissionError):
raise
# Make the parent and the children writeable.
for p, mode in zip(paths, old_modes):
os.chmod(p, mode)
# Remove other dirs except one.
keep = next(p for p in dirs if p != exc.filename)
for p in dirs:
if p != keep:
os.rmdir(p)
# Remove other files except one.
keep = next(p for p in files if p != exc.filename)
for p in files:
if p != keep:
os.unlink(p)

tmp = self.cls(self.base, 'rmtree')
tmp.mkdir()
paths = [tmp] + [tmp / f'child{i}' for i in range(6)]
dirs = paths[1::2]
files = paths[2::2]
for path in dirs:
path.mkdir()
for path in files:
path.write_text('')

old_modes = [path.stat().st_mode for path in paths]

# Make the parent and the children non-writeable.
new_mode = stat.S_IREAD | stat.S_IEXEC
for path in reversed(paths):
path.chmod(new_mode)

try:
tmp.rmtree(on_error=on_error)
except:
# Test failed, so cleanup artifacts.
for path, mode in zip(paths, old_modes):
try:
path.chmod(mode)
except OSError:
pass
tmp.rmtree()
raise

def test_rmtree_does_not_choke_on_failing_lstat(self):
try:
orig_lstat = os.lstat

def raiser(fn, *args, **kwargs):
if fn != TESTFN:
raise OSError()
else:
return orig_lstat(fn)

os.lstat = raiser

tmp = self.cls(self.base, 'rmtree')
tmp.mkdir()
foo = tmp / 'foo'
foo.write_text('')
tmp.rmtree()
finally:
os.lstat = orig_lstat

@os_helper.skip_unless_hardlink
def test_hardlink_to(self):
P = self.cls(self.base)
Expand Down
Loading