diff --git a/libmat2/abstract.py b/libmat2/abstract.py index aaf00d7..a7c5fa5 100644 --- a/libmat2/abstract.py +++ b/libmat2/abstract.py @@ -25,6 +25,11 @@ class AbstractParser(abc.ABC): self.filename = filename fname, extension = os.path.splitext(filename) + + # Special case for tar.gz, tar.bz2, … files + if fname.endswith('.tar') and len(fname) > 4: + fname, extension = fname[:-4], '.tar' + extension + self.output_filename = fname + '.cleaned' + extension self.lightweight_cleaning = False diff --git a/libmat2/archive.py b/libmat2/archive.py index 2936f39..d295afe 100644 --- a/libmat2/archive.py +++ b/libmat2/archive.py @@ -40,6 +40,10 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser): the Union[A, B] constrain, hence the weird `# type: ignore` annotations. """ + # Tarfiles can optionally support compression + # https://docs.python.org/3/library/tarfile.html#tarfile.open + compression = '' + def __init__(self, filename): super().__init__(filename) self.archive_class = None # type: Optional[ArchiveClass] @@ -134,7 +138,7 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser): # pylint: disable=too-many-branches with self.archive_class(self.filename) as zin,\ - self.archive_class(self.output_filename, 'w') as zout: + self.archive_class(self.output_filename, 'w' + self.compression) as zout: temp_folder = tempfile.mkdtemp() abort = False @@ -212,7 +216,11 @@ class TarParser(ArchiveBasedAbstractParser): mimetypes = {'application/x-tar'} def __init__(self, filename): super().__init__(filename) - self.archive_class = tarfile.TarFile + # yes, it's tarfile.TarFile.open and not tarfile.TarFile, + # as stated in the documentation: + # https://docs.python.org/3/library/tarfile.html#tarfile.TarFile + # This is required to support compressed archives. + self.archive_class = tarfile.TarFile.open self.member_class = tarfile.TarInfo def is_archive_valid(self): @@ -259,6 +267,22 @@ class TarParser(ArchiveBasedAbstractParser): assert isinstance(member, tarfile.TarInfo) # please mypy return member.name + +class TarGzParser(TarParser): + compression = ':gz' + mimetypes = {'application/x-tar+gz'} + + +class TarBz2Parser(TarParser): + compression = ':bz2' + mimetypes = {'application/x-tar+bz2'} + + +class TarXzParser(TarParser): + compression = ':xz' + mimetypes = {'application/x-tar+xz'} + + class ZipParser(ArchiveBasedAbstractParser): mimetypes = {'application/zip'} def __init__(self, filename): diff --git a/libmat2/parser_factory.py b/libmat2/parser_factory.py index e93ee4f..3931903 100644 --- a/libmat2/parser_factory.py +++ b/libmat2/parser_factory.py @@ -50,6 +50,10 @@ def get_parser(filename: str) -> Tuple[Optional[T], Optional[str]]: if extension.lower() in UNSUPPORTED_EXTENSIONS: return None, mtype + if mtype == 'application/x-tar': + if extension[1:] in ('bz2', 'gz', 'xz'): + mtype = mtype + '+' + extension[1:] + for parser_class in _get_parsers(): # type: ignore if mtype in parser_class.mimetypes: try: diff --git a/tests/test_libmat2.py b/tests/test_libmat2.py index 1d2a22a..4f562e6 100644 --- a/tests/test_libmat2.py +++ b/tests/test_libmat2.py @@ -30,6 +30,14 @@ class TestParserFactory(unittest.TestCase): self.assertEqual(mimetype, 'audio/mpeg') self.assertEqual(parser.__class__, audio.MP3Parser) + def test_tarfile_double_extension_handling(self): + """ Test that our module auto-detection is handling sub-sub-classes """ + with tarfile.TarFile.open('./tests/data/dirty.tar.bz2', 'w:bz2') as zout: + zout.add('./tests/data/dirty.jpg') + parser, mimetype = parser_factory.get_parser('./tests/data/dirty.tar.bz2') + self.assertEqual(mimetype, 'application/x-tar+bz2') + os.remove('./tests/data/dirty.tar.bz2') + class TestParameterInjection(unittest.TestCase): def test_ver_injection(self): @@ -719,7 +727,7 @@ class TestCleaning(unittest.TestCase): os.remove('./tests/data/clean.cleaned.cleaned.css') def test_tar(self): - with tarfile.TarFile('./tests/data/dirty.tar', 'w') as zout: + with tarfile.TarFile.open('./tests/data/dirty.tar', 'w') as zout: zout.add('./tests/data/dirty.flac') zout.add('./tests/data/dirty.docx') zout.add('./tests/data/dirty.jpg') @@ -752,3 +760,108 @@ class TestCleaning(unittest.TestCase): os.remove('./tests/data/dirty.tar') os.remove('./tests/data/dirty.cleaned.tar') os.remove('./tests/data/dirty.cleaned.cleaned.tar') + + def test_targz(self): + with tarfile.TarFile.open('./tests/data/dirty.tar.gz', 'w:gz') as zout: + zout.add('./tests/data/dirty.flac') + zout.add('./tests/data/dirty.docx') + zout.add('./tests/data/dirty.jpg') + p = archive.TarParser('./tests/data/dirty.tar.gz') + meta = p.get_meta() + self.assertEqual(meta['./tests/data/dirty.docx']['word/media/image1.png']['Comment'], 'This is a comment, be careful!') + + ret = p.remove_all() + self.assertTrue(ret) + + p = archive.TarParser('./tests/data/dirty.cleaned.tar.gz') + self.assertEqual(p.get_meta(), {}) + self.assertTrue(p.remove_all()) + + tmp_dir = tempfile.mkdtemp() + with tarfile.open('./tests/data/dirty.cleaned.tar.gz') as zout: + zout.extractall(path=tmp_dir) + zout.close() + + number_of_files = 0 + for root, _, fnames in os.walk(tmp_dir): + for f in fnames: + complete_path = os.path.join(root, f) + p, _ = parser_factory.get_parser(complete_path) + self.assertIsNotNone(p) + self.assertEqual(p.get_meta(), {}) + number_of_files += 1 + self.assertEqual(number_of_files, 3) + + os.remove('./tests/data/dirty.tar.gz') + os.remove('./tests/data/dirty.cleaned.tar.gz') + os.remove('./tests/data/dirty.cleaned.cleaned.tar.gz') + + def test_tarbz2(self): + with tarfile.TarFile.open('./tests/data/dirty.tar.bz2', 'w:bz2') as zout: + zout.add('./tests/data/dirty.flac') + zout.add('./tests/data/dirty.docx') + zout.add('./tests/data/dirty.jpg') + p = archive.TarParser('./tests/data/dirty.tar.bz2') + meta = p.get_meta() + self.assertEqual(meta['./tests/data/dirty.docx']['word/media/image1.png']['Comment'], 'This is a comment, be careful!') + + ret = p.remove_all() + self.assertTrue(ret) + + p = archive.TarParser('./tests/data/dirty.cleaned.tar.bz2') + self.assertEqual(p.get_meta(), {}) + self.assertTrue(p.remove_all()) + + tmp_dir = tempfile.mkdtemp() + with tarfile.open('./tests/data/dirty.cleaned.tar.bz2') as zout: + zout.extractall(path=tmp_dir) + zout.close() + + number_of_files = 0 + for root, _, fnames in os.walk(tmp_dir): + for f in fnames: + complete_path = os.path.join(root, f) + p, _ = parser_factory.get_parser(complete_path) + self.assertIsNotNone(p) + self.assertEqual(p.get_meta(), {}) + number_of_files += 1 + self.assertEqual(number_of_files, 3) + + os.remove('./tests/data/dirty.tar.bz2') + os.remove('./tests/data/dirty.cleaned.tar.bz2') + os.remove('./tests/data/dirty.cleaned.cleaned.tar.bz2') + + def test_tarxz(self): + with tarfile.TarFile.open('./tests/data/dirty.tar.xz', 'w:xz') as zout: + zout.add('./tests/data/dirty.flac') + zout.add('./tests/data/dirty.docx') + zout.add('./tests/data/dirty.jpg') + p = archive.TarParser('./tests/data/dirty.tar.xz') + meta = p.get_meta() + self.assertEqual(meta['./tests/data/dirty.docx']['word/media/image1.png']['Comment'], 'This is a comment, be careful!') + + ret = p.remove_all() + self.assertTrue(ret) + + p = archive.TarParser('./tests/data/dirty.cleaned.tar.xz') + self.assertEqual(p.get_meta(), {}) + self.assertTrue(p.remove_all()) + + tmp_dir = tempfile.mkdtemp() + with tarfile.open('./tests/data/dirty.cleaned.tar.xz') as zout: + zout.extractall(path=tmp_dir) + zout.close() + + number_of_files = 0 + for root, _, fnames in os.walk(tmp_dir): + for f in fnames: + complete_path = os.path.join(root, f) + p, _ = parser_factory.get_parser(complete_path) + self.assertIsNotNone(p) + self.assertEqual(p.get_meta(), {}) + number_of_files += 1 + self.assertEqual(number_of_files, 3) + + os.remove('./tests/data/dirty.tar.xz') + os.remove('./tests/data/dirty.cleaned.tar.xz') + os.remove('./tests/data/dirty.cleaned.cleaned.tar.xz')