From a7ebb587e19ce1177a7ef067e2da74e4964ff19e Mon Sep 17 00:00:00 2001 From: jvoisin Date: Sat, 27 Apr 2019 22:33:54 +0200 Subject: [PATCH] Handle weird permissions in tar archives --- libmat2/archive.py | 24 +++++++++++++++++++++--- tests/test_corrupted_files.py | 26 +++++++++++++++++++++++++- 2 files changed, 46 insertions(+), 4 deletions(-) diff --git a/libmat2/archive.py b/libmat2/archive.py index 7aa5cb9..969bbd8 100644 --- a/libmat2/archive.py +++ b/libmat2/archive.py @@ -1,4 +1,5 @@ import abc +import stat import zipfile import datetime import tarfile @@ -104,6 +105,12 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser): full_path: str): """Add the file at full_path to the archive, via the given member.""" + @staticmethod + def _set_member_permissions(member: ArchiveMember, permissions: int) -> ArchiveMember: + """Set the permission of the archive member.""" + # pylint: disable=unused-argument + return member + def get_meta(self) -> Dict[str, Union[str, dict]]: meta = dict() # type: Dict[str, Union[str, dict]] @@ -120,6 +127,7 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser): zin.extract(member=item, path=temp_folder) full_path = os.path.join(temp_folder, member_name) + os.chmod(full_path, stat.S_IRUSR) specific_meta = self._specific_get_meta(full_path, member_name) local_meta = {**local_meta, **specific_meta} @@ -164,6 +172,9 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser): zin.extract(member=item, path=temp_folder) full_path = os.path.join(temp_folder, member_name) + original_permissions = os.stat(full_path).st_mode + os.chmod(full_path, original_permissions | stat.S_IWUSR | stat.S_IRUSR) + if self._specific_cleanup(full_path) is False: logging.warning("Something went wrong during deep cleaning of %s", member_name) @@ -202,6 +213,7 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser): os.rename(member_parser.output_filename, full_path) zinfo = self.member_class(member_name) # type: ignore + zinfo = self._set_member_permissions(zinfo, original_permissions) clean_zinfo = self._clean_member(zinfo) self._add_file_to_archive(zout, clean_zinfo, full_path) @@ -216,11 +228,11 @@ class TarParser(ArchiveBasedAbstractParser): mimetypes = {'application/x-tar'} def __init__(self, filename): super().__init__(filename) - # yes, it's tarfile.TarFile.open and not tarfile.TarFile, + # yes, it's tarfile.open and not tarfile.TarFile, # as stated in the documentation: # https://docs.python.org/3/library/tarfile.html#tarfile.TarFile # This is required to support compressed archives. - self.archive_class = tarfile.TarFile.open + self.archive_class = tarfile.open self.member_class = tarfile.TarInfo def is_archive_valid(self): @@ -239,7 +251,7 @@ class TarParser(ArchiveBasedAbstractParser): assert isinstance(member, tarfile.TarInfo) # please mypy metadata = {} if member.mtime != 0: - metadata['mtime'] = datetime.datetime.fromtimestamp(member.mtime) + metadata['mtime'] = str(datetime.datetime.fromtimestamp(member.mtime)) if member.uid != 0: metadata['uid'] = str(member.uid) if member.gid != 0: @@ -267,6 +279,12 @@ class TarParser(ArchiveBasedAbstractParser): assert isinstance(member, tarfile.TarInfo) # please mypy return member.name + @staticmethod + def _set_member_permissions(member: ArchiveMember, permissions: int) -> ArchiveMember: + assert isinstance(member, tarfile.TarInfo) # please mypy + member.mode = permissions + return member + class TarGzParser(TarParser): compression = ':gz' diff --git a/tests/test_corrupted_files.py b/tests/test_corrupted_files.py index 1331f1c..b7240fe 100644 --- a/tests/test_corrupted_files.py +++ b/tests/test_corrupted_files.py @@ -293,7 +293,7 @@ class TestCorruptedFiles(unittest.TestCase): os.remove('./tests/data/clean.epub') def test_tar(self): - with tarfile.TarFile('./tests/data/clean.tar', 'w') as zout: + with tarfile.TarFile.open('./tests/data/clean.tar', 'w') as zout: zout.add('./tests/data/dirty.flac') zout.add('./tests/data/dirty.docx') zout.add('./tests/data/dirty.jpg') @@ -302,6 +302,7 @@ class TestCorruptedFiles(unittest.TestCase): tarinfo.mtime = time.time() tarinfo.uid = 1337 tarinfo.gid = 1338 + tarinfo.size = os.stat('./tests/data/dirty.png').st_size with open('./tests/data/dirty.png', 'rb') as f: zout.addfile(tarinfo, f) p, mimetype = parser_factory.get_parser('./tests/data/clean.tar') @@ -316,3 +317,26 @@ class TestCorruptedFiles(unittest.TestCase): with self.assertRaises(ValueError): archive.TarParser('./tests/data/clean.tar') os.remove('./tests/data/clean.tar') + +class TestReadOnlyArchiveMembers(unittest.TestCase): + def test_onlymember_tar(self): + with tarfile.open('./tests/data/clean.tar', 'w') as zout: + zout.add('./tests/data/dirty.png') + tarinfo = tarfile.TarInfo('./tests/data/dirty.jpg') + tarinfo.mtime = time.time() + tarinfo.uid = 1337 + tarinfo.mode = 0o000 + tarinfo.size = os.stat('./tests/data/dirty.jpg').st_size + with open('./tests/data/dirty.jpg', 'rb') as f: + zout.addfile(tarinfo=tarinfo, fileobj=f) + p, mimetype = parser_factory.get_parser('./tests/data/clean.tar') + self.assertEqual(mimetype, 'application/x-tar') + meta = p.get_meta() + self.assertEqual(meta['./tests/data/dirty.jpg']['uid'], '1337') + self.assertTrue(p.remove_all()) + + p = archive.TarParser('./tests/data/clean.cleaned.tar') + self.assertEqual(p.get_meta(), {}) + os.remove('./tests/data/clean.tar') + os.remove('./tests/data/clean.cleaned.tar') +