diff --git a/libmat2/archive.py b/libmat2/archive.py index aa1b24c..2936f39 100644 --- a/libmat2/archive.py +++ b/libmat2/archive.py @@ -1,5 +1,7 @@ +import abc import zipfile import datetime +import tarfile import tempfile import os import logging @@ -11,14 +13,37 @@ from . import abstract, UnknownMemberPolicy, parser_factory # Make pyflakes happy assert Set assert Pattern -assert List -assert Union + +# pylint: disable=not-callable,assignment-from-no-return + +# An ArchiveClass is a class representing an archive, +# while an ArchiveMember is a class representing an element +# (usually a file) of an archive. +ArchiveClass = Union[zipfile.ZipFile, tarfile.TarFile] +ArchiveMember = Union[zipfile.ZipInfo, tarfile.TarInfo] class ArchiveBasedAbstractParser(abstract.AbstractParser): - """ Office files (.docx, .odt, …) are zipped files. """ + """Base class for all archive-based formats. + + Welcome to a world of frustrating complexity and tediouness: + - A lot of file formats (docx, odt, epubs, …) are archive-based, + so we need to add callbacks erverywhere to allow their respective + parsers to apply specific cleanup to the required files. + - Python has two different modules to deal with .tar and .zip files, + with similar-but-yet-o-so-different API, so we need to write + a ghetto-wrapper to avoid duplicating everything + - The combination of @staticmethod and @abstractstaticmethod is + required because for now, mypy doesn't know that + @abstractstaticmethod is, indeed, a static method. + - Mypy is too dumb (yet) to realise that a type A is valid under + the Union[A, B] constrain, hence the weird `# type: ignore` + annotations. + """ def __init__(self, filename): super().__init__(filename) + self.archive_class = None # type: Optional[ArchiveClass] + self.member_class = None # type: Optional[ArchiveMember] # Those are the files that have a format that _isn't_ # supported by MAT2, but that we want to keep anyway. @@ -32,10 +57,10 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser): # the archive? self.unknown_member_policy = UnknownMemberPolicy.ABORT # type: UnknownMemberPolicy - try: # better fail here than later - zipfile.ZipFile(self.filename) - except zipfile.BadZipFile: - raise ValueError + self.is_archive_valid() + + def is_archive_valid(self): + """Raise a ValueError is the current archive isn't a valid one.""" def _specific_cleanup(self, full_path: str) -> bool: """ This method can be used to apply specific treatment @@ -50,59 +75,57 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser): return {} # pragma: no cover @staticmethod - def _clean_zipinfo(zipinfo: zipfile.ZipInfo) -> zipfile.ZipInfo: - zipinfo.create_system = 3 # Linux - zipinfo.comment = b'' - zipinfo.date_time = (1980, 1, 1, 0, 0, 0) # this is as early as a zipfile can be - return zipinfo + @abc.abstractstaticmethod + def _get_all_members(archive: ArchiveClass) -> List[ArchiveMember]: + """Return all the members of the archive.""" @staticmethod - def _get_zipinfo_meta(zipinfo: zipfile.ZipInfo) -> Dict[str, str]: - metadata = {} - if zipinfo.create_system == 3: # this is Linux - pass - elif zipinfo.create_system == 2: - metadata['create_system'] = 'Windows' - else: - metadata['create_system'] = 'Weird' + @abc.abstractstaticmethod + def _clean_member(member: ArchiveMember) -> ArchiveMember: + """Remove all the metadata for a given member.""" - if zipinfo.comment: - metadata['comment'] = zipinfo.comment # type: ignore + @staticmethod + @abc.abstractstaticmethod + def _get_member_meta(member: ArchiveMember) -> Dict[str, str]: + """Return all the metadata of a given member.""" - if zipinfo.date_time != (1980, 1, 1, 0, 0, 0): - metadata['date_time'] = str(datetime.datetime(*zipinfo.date_time)) + @staticmethod + @abc.abstractstaticmethod + def _get_member_name(member: ArchiveMember) -> str: + """Return the name of the given member.""" - return metadata + @staticmethod + @abc.abstractstaticmethod + def _add_file_to_archive(archive: ArchiveClass, member: ArchiveMember, + full_path: str): + """Add the file at full_path to the archive, via the given member.""" def get_meta(self) -> Dict[str, Union[str, dict]]: meta = dict() # type: Dict[str, Union[str, dict]] - with zipfile.ZipFile(self.filename) as zin: + with self.archive_class(self.filename) as zin: temp_folder = tempfile.mkdtemp() - for item in zin.infolist(): - local_meta = dict() # type: Dict[str, Union[str, Dict]] - for k, v in self._get_zipinfo_meta(item).items(): - local_meta[k] = v + for item in self._get_all_members(zin): + local_meta = self._get_member_meta(item) + member_name = self._get_member_name(item) - if item.filename[-1] == '/': # pragma: no cover + if member_name[-1] == '/': # pragma: no cover # `is_dir` is added in Python3.6 continue # don't keep empty folders zin.extract(member=item, path=temp_folder) - full_path = os.path.join(temp_folder, item.filename) + full_path = os.path.join(temp_folder, member_name) - specific_meta = self._specific_get_meta(full_path, item.filename) - for (k, v) in specific_meta.items(): - local_meta[k] = v + specific_meta = self._specific_get_meta(full_path, member_name) + local_meta = {**local_meta, **specific_meta} - tmp_parser, _ = parser_factory.get_parser(full_path) # type: ignore - if tmp_parser: - for k, v in tmp_parser.get_meta().items(): - local_meta[k] = v + member_parser, _ = parser_factory.get_parser(full_path) # type: ignore + if member_parser: + local_meta = {**local_meta, **member_parser.get_meta()} if local_meta: - meta[item.filename] = local_meta + meta[member_name] = local_meta shutil.rmtree(temp_folder) return meta @@ -110,17 +133,19 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser): def remove_all(self) -> bool: # pylint: disable=too-many-branches - with zipfile.ZipFile(self.filename) as zin,\ - zipfile.ZipFile(self.output_filename, 'w') as zout: + with self.archive_class(self.filename) as zin,\ + self.archive_class(self.output_filename, 'w') as zout: temp_folder = tempfile.mkdtemp() abort = False - items = list() # type: List[zipfile.ZipInfo] - for item in sorted(zin.infolist(), key=lambda z: z.filename): + # Sort the items to process, to reduce fingerprinting, + # and keep them in the `items` variable. + items = list() # type: List[ArchiveMember] + for item in sorted(self._get_all_members(zin), key=self._get_member_name): # Some fileformats do require to have the `mimetype` file # as the first file in the archive. - if item.filename == 'mimetype': + if self._get_member_name(item) == 'mimetype': items = [item] + items else: items.append(item) @@ -128,53 +153,53 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser): # Since files order is a fingerprint factor, # we're iterating (and thus inserting) them in lexicographic order. for item in items: - if item.filename[-1] == '/': # `is_dir` is added in Python3.6 + member_name = self._get_member_name(item) + if member_name[-1] == '/': # `is_dir` is added in Python3.6 continue # don't keep empty folders zin.extract(member=item, path=temp_folder) - full_path = os.path.join(temp_folder, item.filename) + full_path = os.path.join(temp_folder, member_name) if self._specific_cleanup(full_path) is False: logging.warning("Something went wrong during deep cleaning of %s", - item.filename) + member_name) abort = True continue - if any(map(lambda r: r.search(item.filename), self.files_to_keep)): + if any(map(lambda r: r.search(member_name), self.files_to_keep)): # those files aren't supported, but we want to add them anyway pass - elif any(map(lambda r: r.search(item.filename), self.files_to_omit)): + elif any(map(lambda r: r.search(member_name), self.files_to_omit)): continue else: # supported files that we want to first clean, then add - tmp_parser, mtype = parser_factory.get_parser(full_path) # type: ignore - if not tmp_parser: + member_parser, mtype = parser_factory.get_parser(full_path) # type: ignore + if not member_parser: if self.unknown_member_policy == UnknownMemberPolicy.OMIT: logging.warning("In file %s, omitting unknown element %s (format: %s)", - self.filename, item.filename, mtype) + self.filename, member_name, mtype) continue elif self.unknown_member_policy == UnknownMemberPolicy.KEEP: logging.warning("In file %s, keeping unknown element %s (format: %s)", - self.filename, item.filename, mtype) + self.filename, member_name, mtype) else: logging.error("In file %s, element %s's format (%s) " \ "isn't supported", - self.filename, item.filename, mtype) + self.filename, member_name, mtype) abort = True continue - if tmp_parser: - if tmp_parser.remove_all() is False: + else: + if member_parser.remove_all() is False: logging.warning("In file %s, something went wrong \ with the cleaning of %s \ (format: %s)", - self.filename, item.filename, mtype) + self.filename, member_name, mtype) abort = True continue - os.rename(tmp_parser.output_filename, full_path) + os.rename(member_parser.output_filename, full_path) - zinfo = zipfile.ZipInfo(item.filename) # type: ignore - clean_zinfo = self._clean_zipinfo(zinfo) - with open(full_path, 'rb') as f: - zout.writestr(clean_zinfo, f.read()) + zinfo = self.member_class(member_name) # type: ignore + clean_zinfo = self._clean_member(zinfo) + self._add_file_to_archive(zout, clean_zinfo, full_path) shutil.rmtree(temp_folder) if abort: @@ -183,6 +208,111 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser): return True +class TarParser(ArchiveBasedAbstractParser): + mimetypes = {'application/x-tar'} + def __init__(self, filename): + super().__init__(filename) + self.archive_class = tarfile.TarFile + self.member_class = tarfile.TarInfo + + def is_archive_valid(self): + if tarfile.is_tarfile(self.filename) is False: + raise ValueError + + @staticmethod + def _clean_member(member: ArchiveMember) -> ArchiveMember: + assert isinstance(member, tarfile.TarInfo) # please mypy + member.mtime = member.uid = member.gid = 0 + member.uname = member.gname = '' + return member + + @staticmethod + def _get_member_meta(member: ArchiveMember) -> Dict[str, str]: + assert isinstance(member, tarfile.TarInfo) # please mypy + metadata = {} + if member.mtime != 0: + metadata['mtime'] = str(member.mtime) + if member.uid != 0: + metadata['uid'] = str(member.uid) + if member.gid != 0: + metadata['gid'] = str(member.gid) + if member.uname != '': + metadata['uname'] = member.uname + if member.gname != '': + metadata['gname'] = member.gname + return metadata + + @staticmethod + def _add_file_to_archive(archive: ArchiveClass, member: ArchiveMember, + full_path: str): + assert isinstance(member, tarfile.TarInfo) # please mypy + assert isinstance(archive, tarfile.TarFile) # please mypy + archive.add(full_path, member.name, filter=TarParser._clean_member) # type: ignore + + @staticmethod + def _get_all_members(archive: ArchiveClass) -> List[ArchiveMember]: + assert isinstance(archive, tarfile.TarFile) # please mypy + return archive.getmembers() # type: ignore + + @staticmethod + def _get_member_name(member: ArchiveMember) -> str: + assert isinstance(member, tarfile.TarInfo) # please mypy + return member.name class ZipParser(ArchiveBasedAbstractParser): mimetypes = {'application/zip'} + def __init__(self, filename): + super().__init__(filename) + self.archive_class = zipfile.ZipFile + self.member_class = zipfile.ZipInfo + + def is_archive_valid(self): + try: + zipfile.ZipFile(self.filename) + except zipfile.BadZipFile: + raise ValueError + + @staticmethod + def _clean_member(member: ArchiveMember) -> ArchiveMember: + assert isinstance(member, zipfile.ZipInfo) # please mypy + member.create_system = 3 # Linux + member.comment = b'' + member.date_time = (1980, 1, 1, 0, 0, 0) # this is as early as a zipfile can be + return member + + @staticmethod + def _get_member_meta(member: ArchiveMember) -> Dict[str, str]: + assert isinstance(member, zipfile.ZipInfo) # please mypy + metadata = {} + if member.create_system == 3: # this is Linux + pass + elif member.create_system == 2: + metadata['create_system'] = 'Windows' + else: + metadata['create_system'] = 'Weird' + + if member.comment: + metadata['comment'] = member.comment # type: ignore + + if member.date_time != (1980, 1, 1, 0, 0, 0): + metadata['date_time'] = str(datetime.datetime(*member.date_time)) + + return metadata + + @staticmethod + def _add_file_to_archive(archive: ArchiveClass, member: ArchiveMember, + full_path: str): + assert isinstance(archive, zipfile.ZipFile) # please mypy + assert isinstance(member, zipfile.ZipInfo) # please mypy + with open(full_path, 'rb') as f: + archive.writestr(member, f.read()) + + @staticmethod + def _get_all_members(archive: ArchiveClass) -> List[ArchiveMember]: + assert isinstance(archive, zipfile.ZipFile) # please mypy + return archive.infolist() # type: ignore + + @staticmethod + def _get_member_name(member: ArchiveMember) -> str: + assert isinstance(member, zipfile.ZipInfo) # please mypy + return member.filename diff --git a/libmat2/epub.py b/libmat2/epub.py index d385465..390ee63 100644 --- a/libmat2/epub.py +++ b/libmat2/epub.py @@ -5,7 +5,7 @@ import xml.etree.ElementTree as ET # type: ignore from . import archive, office -class EPUBParser(archive.ArchiveBasedAbstractParser): +class EPUBParser(archive.ZipParser): mimetypes = {'application/epub+zip', } metadata_namespace = '{http://purl.org/dc/elements/1.1/}' diff --git a/libmat2/office.py b/libmat2/office.py index 2c9cbff..b769991 100644 --- a/libmat2/office.py +++ b/libmat2/office.py @@ -6,7 +6,7 @@ from typing import Dict, Set, Pattern, Tuple, Any import xml.etree.ElementTree as ET # type: ignore -from .archive import ArchiveBasedAbstractParser +from .archive import ZipParser # pylint: disable=line-too-long @@ -43,7 +43,7 @@ def _sort_xml_attributes(full_path: str) -> bool: return True -class MSOfficeParser(ArchiveBasedAbstractParser): +class MSOfficeParser(ZipParser): mimetypes = { 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', @@ -312,7 +312,7 @@ class MSOfficeParser(ArchiveBasedAbstractParser): return {file_path: 'harmful content', } -class LibreOfficeParser(ArchiveBasedAbstractParser): +class LibreOfficeParser(ZipParser): mimetypes = { 'application/vnd.oasis.opendocument.text', 'application/vnd.oasis.opendocument.spreadsheet', diff --git a/tests/test_corrupted_files.py b/tests/test_corrupted_files.py index 4a16d51..1331f1c 100644 --- a/tests/test_corrupted_files.py +++ b/tests/test_corrupted_files.py @@ -1,13 +1,15 @@ #!/usr/bin/env python3 import unittest +import time import shutil import os import logging import zipfile +import tarfile from libmat2 import pdf, images, audio, office, parser_factory, torrent -from libmat2 import harmless, video, web +from libmat2 import harmless, video, web, archive # No need to logging messages, should something go wrong, # the testsuite _will_ fail. @@ -278,7 +280,6 @@ class TestCorruptedFiles(unittest.TestCase): p.remove_all() os.remove('./tests/data/clean.html') - def test_epub(self): with zipfile.ZipFile('./tests/data/clean.epub', 'w') as zout: zout.write('./tests/data/dirty.jpg', 'OEBPS/content.opf') @@ -291,3 +292,27 @@ class TestCorruptedFiles(unittest.TestCase): self.assertFalse(p.remove_all()) os.remove('./tests/data/clean.epub') + def test_tar(self): + with tarfile.TarFile('./tests/data/clean.tar', 'w') as zout: + zout.add('./tests/data/dirty.flac') + zout.add('./tests/data/dirty.docx') + zout.add('./tests/data/dirty.jpg') + zout.add('./tests/data/embedded_corrupted.docx') + tarinfo = tarfile.TarInfo(name='./tests/data/dirty.png') + tarinfo.mtime = time.time() + tarinfo.uid = 1337 + tarinfo.gid = 1338 + with open('./tests/data/dirty.png', 'rb') as f: + zout.addfile(tarinfo, f) + p, mimetype = parser_factory.get_parser('./tests/data/clean.tar') + self.assertEqual(mimetype, 'application/x-tar') + meta = p.get_meta() + self.assertEqual(meta['./tests/data/dirty.flac']['comments'], 'Thank you for using MAT !') + self.assertEqual(meta['./tests/data/dirty.docx']['word/media/image1.png']['Comment'], 'This is a comment, be careful!') + self.assertFalse(p.remove_all()) + os.remove('./tests/data/clean.tar') + + shutil.copy('./tests/data/dirty.png', './tests/data/clean.tar') + with self.assertRaises(ValueError): + archive.TarParser('./tests/data/clean.tar') + os.remove('./tests/data/clean.tar') diff --git a/tests/test_libmat2.py b/tests/test_libmat2.py index 397855e..1d2a22a 100644 --- a/tests/test_libmat2.py +++ b/tests/test_libmat2.py @@ -4,6 +4,8 @@ import unittest import shutil import os import re +import tarfile +import tempfile import zipfile from libmat2 import pdf, images, audio, office, parser_factory, torrent, harmless @@ -195,6 +197,19 @@ class TestGetMeta(unittest.TestCase): self.assertEqual(meta['version'], '1.0') self.assertEqual(meta['harmful data'], 'underline is cool') + def test_tar(self): + with tarfile.TarFile('./tests/data/dirty.tar', 'w') as tout: + tout.add('./tests/data/dirty.flac') + tout.add('./tests/data/dirty.docx') + tout.add('./tests/data/dirty.jpg') + p, mimetype = parser_factory.get_parser('./tests/data/dirty.tar') + self.assertEqual(mimetype, 'application/x-tar') + meta = p.get_meta() + self.assertEqual(meta['./tests/data/dirty.flac']['comments'], 'Thank you for using MAT !') + self.assertEqual(meta['./tests/data/dirty.docx']['word/media/image1.png']['Comment'], 'This is a comment, be careful!') + os.remove('./tests/data/dirty.tar') + + class TestRemovingThumbnails(unittest.TestCase): def test_odt(self): shutil.copy('./tests/data/revision.odt', './tests/data/clean.odt') @@ -702,3 +717,38 @@ class TestCleaning(unittest.TestCase): os.remove('./tests/data/clean.css') os.remove('./tests/data/clean.cleaned.css') os.remove('./tests/data/clean.cleaned.cleaned.css') + + def test_tar(self): + with tarfile.TarFile('./tests/data/dirty.tar', 'w') as zout: + zout.add('./tests/data/dirty.flac') + zout.add('./tests/data/dirty.docx') + zout.add('./tests/data/dirty.jpg') + p = archive.TarParser('./tests/data/dirty.tar') + meta = p.get_meta() + self.assertEqual(meta['./tests/data/dirty.docx']['word/media/image1.png']['Comment'], 'This is a comment, be careful!') + + ret = p.remove_all() + self.assertTrue(ret) + + p = archive.TarParser('./tests/data/dirty.cleaned.tar') + self.assertEqual(p.get_meta(), {}) + self.assertTrue(p.remove_all()) + + tmp_dir = tempfile.mkdtemp() + with tarfile.open('./tests/data/dirty.cleaned.tar') as zout: + zout.extractall(path=tmp_dir) + zout.close() + + number_of_files = 0 + for root, _, fnames in os.walk(tmp_dir): + for f in fnames: + complete_path = os.path.join(root, f) + p, _ = parser_factory.get_parser(complete_path) + self.assertIsNotNone(p) + self.assertEqual(p.get_meta(), {}) + number_of_files += 1 + self.assertEqual(number_of_files, 3) + + os.remove('./tests/data/dirty.tar') + os.remove('./tests/data/dirty.cleaned.tar') + os.remove('./tests/data/dirty.cleaned.cleaned.tar')