Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add and fix tests for filtererror
  • Loading branch information
mattprodani committed Dec 1, 2023
commit 191998a6a9c780e8b986a894413df36950b0a365
22 changes: 13 additions & 9 deletions Lib/tarfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -2309,21 +2309,25 @@ def _get_extract_tarinfo(self, member, filter_function, path):
else:
tarinfo = member

filtered = None
unfiltered = tarinfo
try:
filtered = filter_function(tarinfo, path)
except (OSError, FilterError) as e:
tarinfo = filter_function(tarinfo, path)
except FilterError as e:
tarinfo = None
self._handle_fatal_error(e)
except OSError as e:
self._handle_fatal_error(e)
except ExtractError as e:
self._handle_nonfatal_error(e)
if filtered is None:
self._dbg(2, "tarfile: Excluded %r" % tarinfo.name)

if tarinfo is None:
self._dbg(2, "tarfile: Excluded %r" % unfiltered.name)
return None
# Prepare the link target for makelink().
if filtered.islnk():
filtered = copy.copy(filtered)
filtered._link_target = os.path.join(path, filtered.linkname)
return filtered
if tarinfo.islnk():
tarinfo = copy.copy(tarinfo)
tarinfo._link_target = os.path.join(path, tarinfo.linkname)
return tarinfo

def _extract_one(self, tarinfo, path, set_attrs, numeric_owner):
"""Extract from filtered tarinfo to disk"""
Expand Down
39 changes: 32 additions & 7 deletions Lib/test/test_tarfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -3141,15 +3141,30 @@ def check_files_present(self, directory):
@contextmanager
def extract_with_none(self, *attr_names):
DIR = pathlib.Path(TEMPDIR) / "extractall_none"

self.tar.errorlevel = 0
for member in self.tar.getmembers():
for attr_name in attr_names:
setattr(member, attr_name, None)

filter_function = self.tar._get_filter_function(self.extraction_filter)

with os_helper.temp_dir(DIR):
self.tar.extractall(DIR, filter='fully_trusted')

# this makes sure we only test files which would have been extracted
# prior to changing metadata
# for example, special files are not extracted with the data filter,
# but 'mode' is required to identify them
filtered_members = []

for member in self.tar.getmembers():
member = self.tar._get_extract_tarinfo(member, filter_function, DIR)
if member is not None:
for attr_name in attr_names:
setattr(member, attr_name, None)
filtered_members.append(member)
self.tar.extractall(DIR, members = filtered_members, filter=self.extraction_filter)
self.check_files_present(DIR)
yield DIR


def test_extractall_none_mtime(self):
# mtimes of extracted files should be later than 'now' -- the mtime
# of a previously created directory.
Expand Down Expand Up @@ -3478,6 +3493,14 @@ def expect_file(self, name, type=None, symlink_to=None, mode=None,
for parent in path.parents:
self.expected_paths.discard(parent)

def expect_file_skipped(self, name):
""" Check a single file extraction is skipped. E.g. due to a filter. """
if self.raised_exception:
raise self.raised_exception
# use normpath() rather than resolve() so we don't follow symlinks
path = pathlib.Path(os.path.normpath(self.destdir / name))
self.assertNotIn(path, self.expected_paths)

def expect_exception(self, exc_type, message_re='.'):
with self.assertRaisesRegex(exc_type, message_re):
if self.raised_exception is not None:
Expand Down Expand Up @@ -4071,9 +4094,6 @@ def valueerror_filter(tarinfo, path):
with self.check_context(arc.open(errorlevel=0), extracterror_filter):
self.expect_file('file')

with self.check_context(arc.open(errorlevel=0), filtererror_filter):
self.expect_file('file')

with self.check_context(arc.open(errorlevel=0), oserror_filter):
self.expect_file('file')

Expand All @@ -4083,6 +4103,11 @@ def valueerror_filter(tarinfo, path):
with self.check_context(arc.open(errorlevel=0), valueerror_filter):
self.expect_exception(ValueError)

# If errorlevel is 0, FilterErrors are logged and member is skipped

with self.check_context(arc.open(errorlevel=0), filtererror_filter):
self.expect_file_skipped('file')

# If 1, all fatal errors are raised

with self.check_context(arc.open(errorlevel=1), extracterror_filter):
Expand Down