Add support for compressed tar files
This commit is contained in:
parent
82cc822a1d
commit
8e41b098d6
@ -25,6 +25,11 @@ class AbstractParser(abc.ABC):
|
||||
|
||||
self.filename = filename
|
||||
fname, extension = os.path.splitext(filename)
|
||||
|
||||
# Special case for tar.gz, tar.bz2, … files
|
||||
if fname.endswith('.tar') and len(fname) > 4:
|
||||
fname, extension = fname[:-4], '.tar' + extension
|
||||
|
||||
self.output_filename = fname + '.cleaned' + extension
|
||||
self.lightweight_cleaning = False
|
||||
|
||||
|
@ -40,6 +40,10 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser):
|
||||
the Union[A, B] constrain, hence the weird `# type: ignore`
|
||||
annotations.
|
||||
"""
|
||||
# Tarfiles can optionally support compression
|
||||
# https://docs.python.org/3/library/tarfile.html#tarfile.open
|
||||
compression = ''
|
||||
|
||||
def __init__(self, filename):
|
||||
super().__init__(filename)
|
||||
self.archive_class = None # type: Optional[ArchiveClass]
|
||||
@ -134,7 +138,7 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser):
|
||||
# pylint: disable=too-many-branches
|
||||
|
||||
with self.archive_class(self.filename) as zin,\
|
||||
self.archive_class(self.output_filename, 'w') as zout:
|
||||
self.archive_class(self.output_filename, 'w' + self.compression) as zout:
|
||||
|
||||
temp_folder = tempfile.mkdtemp()
|
||||
abort = False
|
||||
@ -212,7 +216,11 @@ class TarParser(ArchiveBasedAbstractParser):
|
||||
mimetypes = {'application/x-tar'}
|
||||
def __init__(self, filename):
|
||||
super().__init__(filename)
|
||||
self.archive_class = tarfile.TarFile
|
||||
# yes, it's tarfile.TarFile.open and not tarfile.TarFile,
|
||||
# as stated in the documentation:
|
||||
# https://docs.python.org/3/library/tarfile.html#tarfile.TarFile
|
||||
# This is required to support compressed archives.
|
||||
self.archive_class = tarfile.TarFile.open
|
||||
self.member_class = tarfile.TarInfo
|
||||
|
||||
def is_archive_valid(self):
|
||||
@ -259,6 +267,22 @@ class TarParser(ArchiveBasedAbstractParser):
|
||||
assert isinstance(member, tarfile.TarInfo) # please mypy
|
||||
return member.name
|
||||
|
||||
|
||||
class TarGzParser(TarParser):
|
||||
compression = ':gz'
|
||||
mimetypes = {'application/x-tar+gz'}
|
||||
|
||||
|
||||
class TarBz2Parser(TarParser):
|
||||
compression = ':bz2'
|
||||
mimetypes = {'application/x-tar+bz2'}
|
||||
|
||||
|
||||
class TarXzParser(TarParser):
|
||||
compression = ':xz'
|
||||
mimetypes = {'application/x-tar+xz'}
|
||||
|
||||
|
||||
class ZipParser(ArchiveBasedAbstractParser):
|
||||
mimetypes = {'application/zip'}
|
||||
def __init__(self, filename):
|
||||
|
@ -50,6 +50,10 @@ def get_parser(filename: str) -> Tuple[Optional[T], Optional[str]]:
|
||||
if extension.lower() in UNSUPPORTED_EXTENSIONS:
|
||||
return None, mtype
|
||||
|
||||
if mtype == 'application/x-tar':
|
||||
if extension[1:] in ('bz2', 'gz', 'xz'):
|
||||
mtype = mtype + '+' + extension[1:]
|
||||
|
||||
for parser_class in _get_parsers(): # type: ignore
|
||||
if mtype in parser_class.mimetypes:
|
||||
try:
|
||||
|
@ -30,6 +30,14 @@ class TestParserFactory(unittest.TestCase):
|
||||
self.assertEqual(mimetype, 'audio/mpeg')
|
||||
self.assertEqual(parser.__class__, audio.MP3Parser)
|
||||
|
||||
def test_tarfile_double_extension_handling(self):
|
||||
""" Test that our module auto-detection is handling sub-sub-classes """
|
||||
with tarfile.TarFile.open('./tests/data/dirty.tar.bz2', 'w:bz2') as zout:
|
||||
zout.add('./tests/data/dirty.jpg')
|
||||
parser, mimetype = parser_factory.get_parser('./tests/data/dirty.tar.bz2')
|
||||
self.assertEqual(mimetype, 'application/x-tar+bz2')
|
||||
os.remove('./tests/data/dirty.tar.bz2')
|
||||
|
||||
|
||||
class TestParameterInjection(unittest.TestCase):
|
||||
def test_ver_injection(self):
|
||||
@ -719,7 +727,7 @@ class TestCleaning(unittest.TestCase):
|
||||
os.remove('./tests/data/clean.cleaned.cleaned.css')
|
||||
|
||||
def test_tar(self):
|
||||
with tarfile.TarFile('./tests/data/dirty.tar', 'w') as zout:
|
||||
with tarfile.TarFile.open('./tests/data/dirty.tar', 'w') as zout:
|
||||
zout.add('./tests/data/dirty.flac')
|
||||
zout.add('./tests/data/dirty.docx')
|
||||
zout.add('./tests/data/dirty.jpg')
|
||||
@ -752,3 +760,108 @@ class TestCleaning(unittest.TestCase):
|
||||
os.remove('./tests/data/dirty.tar')
|
||||
os.remove('./tests/data/dirty.cleaned.tar')
|
||||
os.remove('./tests/data/dirty.cleaned.cleaned.tar')
|
||||
|
||||
def test_targz(self):
|
||||
with tarfile.TarFile.open('./tests/data/dirty.tar.gz', 'w:gz') as zout:
|
||||
zout.add('./tests/data/dirty.flac')
|
||||
zout.add('./tests/data/dirty.docx')
|
||||
zout.add('./tests/data/dirty.jpg')
|
||||
p = archive.TarParser('./tests/data/dirty.tar.gz')
|
||||
meta = p.get_meta()
|
||||
self.assertEqual(meta['./tests/data/dirty.docx']['word/media/image1.png']['Comment'], 'This is a comment, be careful!')
|
||||
|
||||
ret = p.remove_all()
|
||||
self.assertTrue(ret)
|
||||
|
||||
p = archive.TarParser('./tests/data/dirty.cleaned.tar.gz')
|
||||
self.assertEqual(p.get_meta(), {})
|
||||
self.assertTrue(p.remove_all())
|
||||
|
||||
tmp_dir = tempfile.mkdtemp()
|
||||
with tarfile.open('./tests/data/dirty.cleaned.tar.gz') as zout:
|
||||
zout.extractall(path=tmp_dir)
|
||||
zout.close()
|
||||
|
||||
number_of_files = 0
|
||||
for root, _, fnames in os.walk(tmp_dir):
|
||||
for f in fnames:
|
||||
complete_path = os.path.join(root, f)
|
||||
p, _ = parser_factory.get_parser(complete_path)
|
||||
self.assertIsNotNone(p)
|
||||
self.assertEqual(p.get_meta(), {})
|
||||
number_of_files += 1
|
||||
self.assertEqual(number_of_files, 3)
|
||||
|
||||
os.remove('./tests/data/dirty.tar.gz')
|
||||
os.remove('./tests/data/dirty.cleaned.tar.gz')
|
||||
os.remove('./tests/data/dirty.cleaned.cleaned.tar.gz')
|
||||
|
||||
def test_tarbz2(self):
|
||||
with tarfile.TarFile.open('./tests/data/dirty.tar.bz2', 'w:bz2') as zout:
|
||||
zout.add('./tests/data/dirty.flac')
|
||||
zout.add('./tests/data/dirty.docx')
|
||||
zout.add('./tests/data/dirty.jpg')
|
||||
p = archive.TarParser('./tests/data/dirty.tar.bz2')
|
||||
meta = p.get_meta()
|
||||
self.assertEqual(meta['./tests/data/dirty.docx']['word/media/image1.png']['Comment'], 'This is a comment, be careful!')
|
||||
|
||||
ret = p.remove_all()
|
||||
self.assertTrue(ret)
|
||||
|
||||
p = archive.TarParser('./tests/data/dirty.cleaned.tar.bz2')
|
||||
self.assertEqual(p.get_meta(), {})
|
||||
self.assertTrue(p.remove_all())
|
||||
|
||||
tmp_dir = tempfile.mkdtemp()
|
||||
with tarfile.open('./tests/data/dirty.cleaned.tar.bz2') as zout:
|
||||
zout.extractall(path=tmp_dir)
|
||||
zout.close()
|
||||
|
||||
number_of_files = 0
|
||||
for root, _, fnames in os.walk(tmp_dir):
|
||||
for f in fnames:
|
||||
complete_path = os.path.join(root, f)
|
||||
p, _ = parser_factory.get_parser(complete_path)
|
||||
self.assertIsNotNone(p)
|
||||
self.assertEqual(p.get_meta(), {})
|
||||
number_of_files += 1
|
||||
self.assertEqual(number_of_files, 3)
|
||||
|
||||
os.remove('./tests/data/dirty.tar.bz2')
|
||||
os.remove('./tests/data/dirty.cleaned.tar.bz2')
|
||||
os.remove('./tests/data/dirty.cleaned.cleaned.tar.bz2')
|
||||
|
||||
def test_tarxz(self):
|
||||
with tarfile.TarFile.open('./tests/data/dirty.tar.xz', 'w:xz') as zout:
|
||||
zout.add('./tests/data/dirty.flac')
|
||||
zout.add('./tests/data/dirty.docx')
|
||||
zout.add('./tests/data/dirty.jpg')
|
||||
p = archive.TarParser('./tests/data/dirty.tar.xz')
|
||||
meta = p.get_meta()
|
||||
self.assertEqual(meta['./tests/data/dirty.docx']['word/media/image1.png']['Comment'], 'This is a comment, be careful!')
|
||||
|
||||
ret = p.remove_all()
|
||||
self.assertTrue(ret)
|
||||
|
||||
p = archive.TarParser('./tests/data/dirty.cleaned.tar.xz')
|
||||
self.assertEqual(p.get_meta(), {})
|
||||
self.assertTrue(p.remove_all())
|
||||
|
||||
tmp_dir = tempfile.mkdtemp()
|
||||
with tarfile.open('./tests/data/dirty.cleaned.tar.xz') as zout:
|
||||
zout.extractall(path=tmp_dir)
|
||||
zout.close()
|
||||
|
||||
number_of_files = 0
|
||||
for root, _, fnames in os.walk(tmp_dir):
|
||||
for f in fnames:
|
||||
complete_path = os.path.join(root, f)
|
||||
p, _ = parser_factory.get_parser(complete_path)
|
||||
self.assertIsNotNone(p)
|
||||
self.assertEqual(p.get_meta(), {})
|
||||
number_of_files += 1
|
||||
self.assertEqual(number_of_files, 3)
|
||||
|
||||
os.remove('./tests/data/dirty.tar.xz')
|
||||
os.remove('./tests/data/dirty.cleaned.tar.xz')
|
||||
os.remove('./tests/data/dirty.cleaned.cleaned.tar.xz')
|
||||
|
Loading…
Reference in New Issue
Block a user