Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
WIP
  • Loading branch information
barneygale committed Jun 24, 2024
commit bcff0539ca22cceead6c574c42f7420579f604b0
1 change: 0 additions & 1 deletion Lib/pathlib/_abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -935,7 +935,6 @@ def on_error(error):
except OSError as error:
error.filename = str(self)
on_error(error)
rmtree.avoids_symlink_attacks = False

def owner(self, *, follow_symlinks=True):
"""
Expand Down
14 changes: 5 additions & 9 deletions Lib/pathlib/_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
grp = None

from ._abc import UnsupportedOperation, PurePathBase, PathBase
from ._os import copyfile, rmtree as rmtree_impl
from ._os import copyfile


__all__ = [
Expand Down Expand Up @@ -831,18 +831,14 @@ def rmtree(self, ignore_errors=False, on_error=None):
*ignore_errors* nor *on_error* are set, exceptions are propagated to
the caller.
"""
if ignore_errors:
def onexc(func, filename, err):
pass
elif on_error:
if on_error:
def onexc(func, filename, err):
err.filename = filename
on_error(err)
else:
def onexc(func, filename, err):
raise err
rmtree_impl(str(self), None, onexc)
rmtree.avoids_symlink_attacks = rmtree_impl.avoids_symlink_attacks
onexc = None
import shutil
shutil.rmtree(str(self), ignore_errors, onexc=onexc)

def rename(self, target):
"""
Expand Down
156 changes: 0 additions & 156 deletions Lib/pathlib/_os.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,159 +157,3 @@ def copyfileobj(source_f, target_f):
write_target = target_f.write
while buf := read_source(1024 * 1024):
write_target(buf)

if ({os.open, os.stat, os.unlink, os.rmdir} <= os.supports_dir_fd and
os.scandir in os.supports_fd and os.stat in os.supports_follow_symlinks):

def _rmtree_safe_fd_step(stack, onexc):
# Each stack item has four elements:
# * func: The first operation to perform: os.lstat, os.close or os.rmdir.
# Walking a directory starts with an os.lstat() to detect symlinks; in
# this case, func is updated before subsequent operations and passed to
# onexc() if an error occurs.
# * dirfd: Open file descriptor, or None if we're processing the top-level
# directory given to rmtree() and the user didn't supply dir_fd.
# * path: Path of file to operate upon. This is passed to onexc() if an
# error occurs.
# * orig_entry: os.DirEntry, or None if we're processing the top-level
# directory given to rmtree(). We used the cached stat() of the entry to
# save a call to os.lstat() when walking subdirectories.
func, dirfd, path, orig_entry = stack.pop()
name = path if orig_entry is None else orig_entry.name
try:
if func is os.close:
os.close(dirfd)
return
if func is os.rmdir:
os.rmdir(name, dir_fd=dirfd)
return

# Note: To guard against symlink races, we use the standard
# lstat()/open()/fstat() trick.
assert func is os.lstat
if orig_entry is None:
orig_st = os.lstat(name, dir_fd=dirfd)
else:
orig_st = orig_entry.stat(follow_symlinks=False)

func = os.open # For error reporting.
topfd = os.open(name, os.O_RDONLY | os.O_NONBLOCK, dir_fd=dirfd)

func = os.path.islink # For error reporting.
try:
if not os.path.samestat(orig_st, os.fstat(topfd)):
# Symlinks to directories are forbidden, see GH-46010.
raise OSError("Cannot call rmtree on a symbolic link")
stack.append((os.rmdir, dirfd, path, orig_entry))
finally:
stack.append((os.close, topfd, path, orig_entry))

func = os.scandir # For error reporting.
with os.scandir(topfd) as scandir_it:
entries = list(scandir_it)
for entry in entries:
fullname = os.path.join(path, entry.name)
try:
if entry.is_dir(follow_symlinks=False):
# Traverse into sub-directory.
stack.append((os.lstat, topfd, fullname, entry))
continue
except FileNotFoundError:
continue
except OSError:
pass
try:
os.unlink(entry.name, dir_fd=topfd)
except FileNotFoundError:
continue
except OSError as err:
onexc(os.unlink, fullname, err)
except FileNotFoundError as err:
if orig_entry is None or func is os.close:
err.filename = path
onexc(func, path, err)
except OSError as err:
err.filename = path
onexc(func, path, err)

# Version using fd-based APIs to protect against races
def rmtree(path, dir_fd, onexc):
# While the unsafe rmtree works fine on bytes, the fd based does not.
if isinstance(path, bytes):
path = os.fsdecode(path)
stack = [(os.lstat, dir_fd, path, None)]
try:
while stack:
_rmtree_safe_fd_step(stack, onexc)
finally:
# Close any file descriptors still on the stack.
while stack:
func, fd, path, entry = stack.pop()
if func is not os.close:
continue
try:
os.close(fd)
except OSError as err:
onexc(os.close, path, err)

rmtree.avoids_symlink_attacks = True

else:
if hasattr(os.stat_result, 'st_file_attributes'):
def _rmtree_islink(st):
return (stat.S_ISLNK(st.st_mode) or
(st.st_file_attributes & stat.FILE_ATTRIBUTE_REPARSE_POINT
and st.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT))
else:
def _rmtree_islink(st):
return stat.S_ISLNK(st.st_mode)

# version vulnerable to race conditions
def rmtree(path, dir_fd, onexc):
if dir_fd is not None:
raise NotImplementedError("dir_fd unavailable on this platform")
try:
st = os.lstat(path)
except OSError as err:
onexc(os.lstat, path, err)
return
try:
if _rmtree_islink(st):
# symlinks to directories are forbidden, see bug #1669
raise OSError("Cannot call rmtree on a symbolic link")
except OSError as err:
onexc(os.path.islink, path, err)
# can't continue even if onexc hook returns
return

def onerror(err):
if not isinstance(err, FileNotFoundError):
onexc(os.scandir, err.filename, err)

results = os.walk(path, topdown=False, onerror=onerror,
followlinks=os._walk_symlinks_as_files)
for dirpath, dirnames, filenames in results:
for name in dirnames:
fullname = os.path.join(dirpath, name)
try:
os.rmdir(fullname)
except FileNotFoundError:
continue
except OSError as err:
onexc(os.rmdir, fullname, err)
for name in filenames:
fullname = os.path.join(dirpath, name)
try:
os.unlink(fullname)
except FileNotFoundError:
continue
except OSError as err:
onexc(os.unlink, fullname, err)
try:
os.rmdir(path)
except FileNotFoundError:
pass
except OSError as err:
onexc(os.rmdir, path, err)

rmtree.avoids_symlink_attacks = False
154 changes: 152 additions & 2 deletions Lib/shutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import fnmatch
import collections
import errno
from pathlib._os import rmtree as _rmtree_impl

try:
import zlib
Expand Down Expand Up @@ -596,6 +595,157 @@ def copytree(src, dst, symlinks=False, ignore=None, copy_function=copy2,
ignore_dangling_symlinks=ignore_dangling_symlinks,
dirs_exist_ok=dirs_exist_ok)

if hasattr(os.stat_result, 'st_file_attributes'):
def _rmtree_islink(st):
return (stat.S_ISLNK(st.st_mode) or
(st.st_file_attributes & stat.FILE_ATTRIBUTE_REPARSE_POINT
and st.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT))
else:
def _rmtree_islink(st):
return stat.S_ISLNK(st.st_mode)

# version vulnerable to race conditions
def _rmtree_unsafe(path, dir_fd, onexc):
if dir_fd is not None:
raise NotImplementedError("dir_fd unavailable on this platform")
try:
st = os.lstat(path)
except OSError as err:
onexc(os.lstat, path, err)
return
try:
if _rmtree_islink(st):
# symlinks to directories are forbidden, see bug #1669
raise OSError("Cannot call rmtree on a symbolic link")
except OSError as err:
onexc(os.path.islink, path, err)
# can't continue even if onexc hook returns
return
def onerror(err):
if not isinstance(err, FileNotFoundError):
onexc(os.scandir, err.filename, err)
results = os.walk(path, topdown=False, onerror=onerror, followlinks=os._walk_symlinks_as_files)
for dirpath, dirnames, filenames in results:
for name in dirnames:
fullname = os.path.join(dirpath, name)
try:
os.rmdir(fullname)
except FileNotFoundError:
continue
except OSError as err:
onexc(os.rmdir, fullname, err)
for name in filenames:
fullname = os.path.join(dirpath, name)
try:
os.unlink(fullname)
except FileNotFoundError:
continue
except OSError as err:
onexc(os.unlink, fullname, err)
try:
os.rmdir(path)
except FileNotFoundError:
pass
except OSError as err:
onexc(os.rmdir, path, err)

# Version using fd-based APIs to protect against races
def _rmtree_safe_fd(path, dir_fd, onexc):
# While the unsafe rmtree works fine on bytes, the fd based does not.
if isinstance(path, bytes):
path = os.fsdecode(path)
stack = [(os.lstat, dir_fd, path, None)]
try:
while stack:
_rmtree_safe_fd_step(stack, onexc)
finally:
# Close any file descriptors still on the stack.
while stack:
func, fd, path, entry = stack.pop()
if func is not os.close:
continue
try:
os.close(fd)
except OSError as err:
onexc(os.close, path, err)

def _rmtree_safe_fd_step(stack, onexc):
# Each stack item has four elements:
# * func: The first operation to perform: os.lstat, os.close or os.rmdir.
# Walking a directory starts with an os.lstat() to detect symlinks; in
# this case, func is updated before subsequent operations and passed to
# onexc() if an error occurs.
# * dirfd: Open file descriptor, or None if we're processing the top-level
# directory given to rmtree() and the user didn't supply dir_fd.
# * path: Path of file to operate upon. This is passed to onexc() if an
# error occurs.
# * orig_entry: os.DirEntry, or None if we're processing the top-level
# directory given to rmtree(). We used the cached stat() of the entry to
# save a call to os.lstat() when walking subdirectories.
func, dirfd, path, orig_entry = stack.pop()
name = path if orig_entry is None else orig_entry.name
try:
if func is os.close:
os.close(dirfd)
return
if func is os.rmdir:
os.rmdir(name, dir_fd=dirfd)
return

# Note: To guard against symlink races, we use the standard
# lstat()/open()/fstat() trick.
assert func is os.lstat
if orig_entry is None:
orig_st = os.lstat(name, dir_fd=dirfd)
else:
orig_st = orig_entry.stat(follow_symlinks=False)

func = os.open # For error reporting.
topfd = os.open(name, os.O_RDONLY | os.O_NONBLOCK, dir_fd=dirfd)

func = os.path.islink # For error reporting.
try:
if not os.path.samestat(orig_st, os.fstat(topfd)):
# Symlinks to directories are forbidden, see GH-46010.
raise OSError("Cannot call rmtree on a symbolic link")
stack.append((os.rmdir, dirfd, path, orig_entry))
finally:
stack.append((os.close, topfd, path, orig_entry))

func = os.scandir # For error reporting.
with os.scandir(topfd) as scandir_it:
entries = list(scandir_it)
for entry in entries:
fullname = os.path.join(path, entry.name)
try:
if entry.is_dir(follow_symlinks=False):
# Traverse into sub-directory.
stack.append((os.lstat, topfd, fullname, entry))
continue
except FileNotFoundError:
continue
except OSError:
pass
try:
os.unlink(entry.name, dir_fd=topfd)
except FileNotFoundError:
continue
except OSError as err:
onexc(os.unlink, fullname, err)
except FileNotFoundError as err:
if orig_entry is None or func is os.close:
err.filename = path
onexc(func, path, err)
except OSError as err:
err.filename = path
onexc(func, path, err)

_use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <=
os.supports_dir_fd and
os.scandir in os.supports_fd and
os.stat in os.supports_follow_symlinks)
_rmtree_impl = _rmtree_safe_fd if _use_fd_functions else _rmtree_unsafe

def rmtree(path, ignore_errors=False, onerror=None, *, onexc=None, dir_fd=None):
"""Recursively delete a directory tree.

Expand Down Expand Up @@ -642,7 +792,7 @@ def onexc(*args):

# Allow introspection of whether or not the hardening against symlink
# attacks is supported on the current platform
rmtree.avoids_symlink_attacks = _rmtree_impl.avoids_symlink_attacks
rmtree.avoids_symlink_attacks = _use_fd_functions

def _basename(path):
"""A basename() variant which first strips the trailing slash, if present.
Expand Down
3 changes: 0 additions & 3 deletions Lib/test/test_pathlib/test_pathlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,6 @@ def test_group_no_follow_symlinks(self):

def test_rmtree_uses_safe_fd_version_if_available(self):
if rmtree_use_fd_functions:
self.assertTrue(self.cls.rmtree.avoids_symlink_attacks)
d = self.cls(self.base, 'a')
d.mkdir()
try:
Expand All @@ -800,8 +799,6 @@ def _raiser(*args, **kwargs):
self.assertRaises(Called, d.rmtree)
finally:
os.open = real_open
else:
self.assertFalse(self.cls.rmtree.avoids_symlink_attacks)

@unittest.skipIf(sys.platform[:6] == 'cygwin',
"This test can't be run on Cygwin (issue #1071513).")
Expand Down