Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Delete pathlib._Accessor.
  • Loading branch information
barneygale committed Jan 1, 2022
commit 734940fa9ab49f4c0edd68c1f44db56a9c8ebd29
161 changes: 53 additions & 108 deletions Lib/pathlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,93 +274,6 @@ def make_uri(self, path):
_posix_flavour = _PosixFlavour()


class _Accessor:
"""An accessor implements a particular (system-specific or not) way of
accessing paths on the filesystem."""


class _NormalAccessor(_Accessor):

stat = os.stat

open = io.open

listdir = os.listdir

scandir = os.scandir

chmod = os.chmod

mkdir = os.mkdir

unlink = os.unlink

if hasattr(os, "link"):
link = os.link
else:
def link(self, src, dst):
raise NotImplementedError("os.link() not available on this system")

rmdir = os.rmdir

rename = os.rename

replace = os.replace

if hasattr(os, "symlink"):
symlink = os.symlink
else:
def symlink(self, src, dst, target_is_directory=False):
raise NotImplementedError("os.symlink() not available on this system")

def touch(self, path, mode=0o666, exist_ok=True):
if exist_ok:
# First try to bump modification time
# Implementation note: GNU touch uses the UTIME_NOW option of
# the utimensat() / futimens() functions.
try:
os.utime(path, None)
except OSError:
# Avoid exception chaining
pass
else:
return
flags = os.O_CREAT | os.O_WRONLY
if not exist_ok:
flags |= os.O_EXCL
fd = os.open(path, flags, mode)
os.close(fd)

if hasattr(os, "readlink"):
readlink = os.readlink
else:
def readlink(self, path):
raise NotImplementedError("os.readlink() not available on this system")

def owner(self, path):
try:
import pwd
return pwd.getpwuid(self.stat(path).st_uid).pw_name
except ImportError:
raise NotImplementedError("Path.owner() is unsupported on this system")

def group(self, path):
try:
import grp
return grp.getgrgid(self.stat(path).st_gid).gr_name
except ImportError:
raise NotImplementedError("Path.group() is unsupported on this system")

getcwd = os.getcwd

expanduser = staticmethod(os.path.expanduser)

realpath = staticmethod(os.path.realpath)


_normal_accessor = _NormalAccessor()


#
# Globbing helpers
#
Expand Down Expand Up @@ -948,7 +861,6 @@ class Path(PurePath):
object. You can also instantiate a PosixPath or WindowsPath directly,
but cannot instantiate a WindowsPath on a POSIX system or vice versa.
"""
_accessor = _normal_accessor
__slots__ = ()

def __new__(cls, *args, **kwargs):
Expand Down Expand Up @@ -987,7 +899,7 @@ def cwd(cls):
"""Return a new path pointing to the current working directory
(as returned by os.getcwd()).
"""
return cls(cls._accessor.getcwd())
return cls(os.getcwd())

@classmethod
def home(cls):
Expand All @@ -1011,14 +923,14 @@ def iterdir(self):
"""Iterate over the files in this directory. Does not yield any
result for the special paths '.' and '..'.
"""
for name in self._accessor.listdir(self):
for name in os.listdir(self):
if name in {'.', '..'}:
# Yielding a path object for these makes little sense
continue
yield self._make_child_relpath(name)

def scandir(self):
return self._accessor.scandir(self)
return os.scandir(self)

def glob(self, pattern):
"""Iterate over this subtree and yield all existing files (of any
Expand Down Expand Up @@ -1072,7 +984,7 @@ def check_eloop(e):
raise RuntimeError("Symlink loop from %r" % e.filename)

try:
s = self._accessor.realpath(self, strict=strict)
s = os.path.realpath(self, strict=strict)
except OSError as e:
check_eloop(e)
raise
Expand All @@ -1092,19 +1004,28 @@ def stat(self, *, follow_symlinks=True):
Return the result of the stat() system call on this path, like
os.stat() does.
"""
return self._accessor.stat(self, follow_symlinks=follow_symlinks)
return os.stat(self, follow_symlinks=follow_symlinks)

def owner(self):
"""
Return the login name of the file owner.
"""
return self._accessor.owner(self)
try:
import pwd
return pwd.getpwuid(self.stat().st_uid).pw_name
except ImportError:
raise NotImplementedError("Path.owner() is unsupported on this system")

def group(self):
"""
Return the group name of the file gid.
"""
return self._accessor.group(self)

try:
import grp
return grp.getgrgid(self.stat().st_gid).gr_name
except ImportError:
raise NotImplementedError("Path.group() is unsupported on this system")

def open(self, mode='r', buffering=-1, encoding=None,
errors=None, newline=None):
Expand All @@ -1114,8 +1035,7 @@ def open(self, mode='r', buffering=-1, encoding=None,
"""
if "b" not in mode:
encoding = io.text_encoding(encoding)
return self._accessor.open(self, mode, buffering, encoding, errors,
newline)
return io.open(self, mode, buffering, encoding, errors, newline)

def read_bytes(self):
"""
Expand Down Expand Up @@ -1156,21 +1076,40 @@ def readlink(self):
"""
Return the path to which the symbolic link points.
"""
path = self._accessor.readlink(self)
if hasattr(os, "readlink"):
path = os.readlink(self)
else:
raise NotImplementedError("os.readlink() not available on this system")
return self._from_parts((path,))
Comment thread
barneygale marked this conversation as resolved.
Outdated

def touch(self, mode=0o666, exist_ok=True):
"""
Create this file with the given access mode, if it doesn't exist.
"""
self._accessor.touch(self, mode, exist_ok)

if exist_ok:
# First try to bump modification time
# Implementation note: GNU touch uses the UTIME_NOW option of
# the utimensat() / futimens() functions.
try:
os.utime(self, None)
except OSError:
# Avoid exception chaining
pass
else:
return
flags = os.O_CREAT | os.O_WRONLY
if not exist_ok:
flags |= os.O_EXCL
fd = os.open(self, flags, mode)
os.close(fd)

def mkdir(self, mode=0o777, parents=False, exist_ok=False):
"""
Create a new directory at this given path.
"""
try:
self._accessor.mkdir(self, mode)
os.mkdir(self, mode)
except FileNotFoundError:
if not parents or self.parent == self:
raise
Expand All @@ -1186,7 +1125,7 @@ def chmod(self, mode, *, follow_symlinks=True):
"""
Change the permissions of the path, like os.chmod().
"""
self._accessor.chmod(self, mode, follow_symlinks=follow_symlinks)
os.chmod(self, mode, follow_symlinks=follow_symlinks)

def lchmod(self, mode):
"""
Expand All @@ -1201,7 +1140,7 @@ def unlink(self, missing_ok=False):
If the path is a directory, use rmdir() instead.
"""
try:
self._accessor.unlink(self)
os.unlink(self)
except FileNotFoundError:
if not missing_ok:
raise
Expand All @@ -1210,7 +1149,7 @@ def rmdir(self):
"""
Remove this directory. The directory must be empty.
"""
self._accessor.rmdir(self)
os.rmdir(self)

def lstat(self):
"""
Expand All @@ -1229,7 +1168,7 @@ def rename(self, target):

Returns the new Path instance pointing to the target path.
"""
self._accessor.rename(self, target)
os.rename(self, target)
return self.__class__(target)

def replace(self, target):
Expand All @@ -1242,23 +1181,29 @@ def replace(self, target):

Returns the new Path instance pointing to the target path.
"""
self._accessor.replace(self, target)
os.replace(self, target)
return self.__class__(target)

def symlink_to(self, target, target_is_directory=False):
"""
Make this path a symlink pointing to the target path.
Note the order of arguments (link, target) is the reverse of os.symlink.
"""
self._accessor.symlink(target, self, target_is_directory)
if hasattr(os, "symlink"):
os.symlink(target, self, target_is_directory)
else:
raise NotImplementedError("os.symlink() not available on this system")

def hardlink_to(self, target):
"""
Make this path a hard link pointing to the same file as *target*.

Note the order of arguments (self, target) is the reverse of os.link's.
"""
self._accessor.link(target, self)
if hasattr(os, "link"):
os.link(target, self)
else:
raise NotImplementedError("os.link() not available on this system")

def link_to(self, target):
"""
Expand Down Expand Up @@ -1433,7 +1378,7 @@ def expanduser(self):
"""
if (not (self._drv or self._root) and
self._parts and self._parts[0][:1] == '~'):
homedir = self._accessor.expanduser(self._parts[0])
homedir = os.path.expanduser(self._parts[0])
if homedir[:1] == "~":
raise RuntimeError("Could not determine home directory.")
return self._from_parts([homedir] + self._parts[1:])
Expand Down
7 changes: 4 additions & 3 deletions Lib/test/test_pathlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -2177,6 +2177,7 @@ def test_mkdir_concurrent_parent_creation(self):
p = self.cls(BASE, 'dirCPC%d' % pattern_num)
self.assertFalse(p.exists())

real_mkdir = os.mkdir
def my_mkdir(path, mode=0o777):
path = str(path)
# Emulate another process that would create the directory
Expand All @@ -2185,15 +2186,15 @@ def my_mkdir(path, mode=0o777):
# function is called at most 5 times (dirCPC/dir1/dir2,
# dirCPC/dir1, dirCPC, dirCPC/dir1, dirCPC/dir1/dir2).
if pattern.pop():
os.mkdir(path, mode) # From another process.
real_mkdir(path, mode) # From another process.
concurrently_created.add(path)
os.mkdir(path, mode) # Our real call.
real_mkdir(path, mode) # Our real call.

pattern = [bool(pattern_num & (1 << n)) for n in range(5)]
concurrently_created = set()
p12 = p / 'dir1' / 'dir2'
try:
with mock.patch("pathlib._normal_accessor.mkdir", my_mkdir):
with mock.patch("os.mkdir", my_mkdir):
p12.mkdir(parents=True, exist_ok=False)
except FileExistsError:
self.assertIn(str(p12), concurrently_created)
Expand Down