1
0
mirror of synced 2024-11-12 20:38:53 +01:00
mat2/libmat2/archive.py

319 lines
13 KiB
Python
Raw Normal View History

2019-04-27 13:05:36 +02:00
import abc
2018-09-06 11:32:45 +02:00
import zipfile
import datetime
2019-04-27 13:05:36 +02:00
import tarfile
2018-09-06 11:32:45 +02:00
import tempfile
import os
import logging
import shutil
from typing import Dict, Set, Pattern, Union, Any, List
2018-09-06 11:32:45 +02:00
from . import abstract, UnknownMemberPolicy, parser_factory
# Make pyflakes happy
assert Set
assert Pattern
2019-04-27 13:05:36 +02:00
# pylint: disable=not-callable,assignment-from-no-return
# An ArchiveClass is a class representing an archive,
# while an ArchiveMember is a class representing an element
# (usually a file) of an archive.
ArchiveClass = Union[zipfile.ZipFile, tarfile.TarFile]
ArchiveMember = Union[zipfile.ZipInfo, tarfile.TarInfo]
2018-09-06 11:32:45 +02:00
class ArchiveBasedAbstractParser(abstract.AbstractParser):
2019-04-27 13:05:36 +02:00
"""Base class for all archive-based formats.
Welcome to a world of frustrating complexity and tediouness:
- A lot of file formats (docx, odt, epubs, ) are archive-based,
so we need to add callbacks erverywhere to allow their respective
parsers to apply specific cleanup to the required files.
- Python has two different modules to deal with .tar and .zip files,
with similar-but-yet-o-so-different API, so we need to write
a ghetto-wrapper to avoid duplicating everything
- The combination of @staticmethod and @abstractstaticmethod is
required because for now, mypy doesn't know that
@abstractstaticmethod is, indeed, a static method.
- Mypy is too dumb (yet) to realise that a type A is valid under
the Union[A, B] constrain, hence the weird `# type: ignore`
annotations.
"""
def __init__(self, filename):
super().__init__(filename)
2019-04-27 13:05:36 +02:00
self.archive_class = None # type: Optional[ArchiveClass]
self.member_class = None # type: Optional[ArchiveMember]
2018-09-06 11:32:45 +02:00
# Those are the files that have a format that _isn't_
# supported by MAT2, but that we want to keep anyway.
self.files_to_keep = set() # type: Set[Pattern]
2018-09-06 11:32:45 +02:00
# Those are the files that we _do not_ want to keep,
# no matter if they are supported or not.
self.files_to_omit = set() # type: Set[Pattern]
# what should the parser do if it encounters an unknown file in
# the archive?
self.unknown_member_policy = UnknownMemberPolicy.ABORT # type: UnknownMemberPolicy
2018-09-06 11:32:45 +02:00
2019-04-27 13:05:36 +02:00
self.is_archive_valid()
def is_archive_valid(self):
"""Raise a ValueError is the current archive isn't a valid one."""
2018-09-06 11:32:45 +02:00
def _specific_cleanup(self, full_path: str) -> bool:
""" This method can be used to apply specific treatment
to files present in the archive."""
# pylint: disable=unused-argument,no-self-use
return True # pragma: no cover
def _specific_get_meta(self, full_path: str, file_path: str) -> Dict[str, Any]:
""" This method can be used to extract specific metadata
from files present in the archive."""
# pylint: disable=unused-argument,no-self-use
return {} # pragma: no cover
2018-09-06 11:32:45 +02:00
@staticmethod
2019-04-27 13:05:36 +02:00
@abc.abstractstaticmethod
def _get_all_members(archive: ArchiveClass) -> List[ArchiveMember]:
"""Return all the members of the archive."""
2018-09-06 11:32:45 +02:00
@staticmethod
2019-04-27 13:05:36 +02:00
@abc.abstractstaticmethod
def _clean_member(member: ArchiveMember) -> ArchiveMember:
"""Remove all the metadata for a given member."""
2018-09-06 11:32:45 +02:00
2019-04-27 13:05:36 +02:00
@staticmethod
@abc.abstractstaticmethod
def _get_member_meta(member: ArchiveMember) -> Dict[str, str]:
"""Return all the metadata of a given member."""
2018-09-06 11:32:45 +02:00
2019-04-27 13:05:36 +02:00
@staticmethod
@abc.abstractstaticmethod
def _get_member_name(member: ArchiveMember) -> str:
"""Return the name of the given member."""
2018-09-06 11:32:45 +02:00
2019-04-27 13:05:36 +02:00
@staticmethod
@abc.abstractstaticmethod
def _add_file_to_archive(archive: ArchiveClass, member: ArchiveMember,
full_path: str):
"""Add the file at full_path to the archive, via the given member."""
2018-09-06 11:32:45 +02:00
2018-10-25 11:29:50 +02:00
def get_meta(self) -> Dict[str, Union[str, dict]]:
meta = dict() # type: Dict[str, Union[str, dict]]
2019-04-27 13:05:36 +02:00
with self.archive_class(self.filename) as zin:
2018-10-25 11:29:50 +02:00
temp_folder = tempfile.mkdtemp()
2019-04-27 13:05:36 +02:00
for item in self._get_all_members(zin):
local_meta = self._get_member_meta(item)
member_name = self._get_member_name(item)
2019-04-27 13:05:36 +02:00
if member_name[-1] == '/': # pragma: no cover
2018-10-25 11:29:50 +02:00
# `is_dir` is added in Python3.6
continue # don't keep empty folders
zin.extract(member=item, path=temp_folder)
2019-04-27 13:05:36 +02:00
full_path = os.path.join(temp_folder, member_name)
2018-10-25 11:29:50 +02:00
2019-04-27 13:05:36 +02:00
specific_meta = self._specific_get_meta(full_path, member_name)
local_meta = {**local_meta, **specific_meta}
2019-04-27 13:05:36 +02:00
member_parser, _ = parser_factory.get_parser(full_path) # type: ignore
if member_parser:
local_meta = {**local_meta, **member_parser.get_meta()}
2018-10-25 11:29:50 +02:00
if local_meta:
2019-04-27 13:05:36 +02:00
meta[member_name] = local_meta
2018-10-25 11:29:50 +02:00
shutil.rmtree(temp_folder)
return meta
2018-09-06 11:32:45 +02:00
def remove_all(self) -> bool:
# pylint: disable=too-many-branches
2019-04-27 13:05:36 +02:00
with self.archive_class(self.filename) as zin,\
self.archive_class(self.output_filename, 'w') as zout:
2018-09-06 11:32:45 +02:00
temp_folder = tempfile.mkdtemp()
abort = False
2019-04-27 13:05:36 +02:00
# Sort the items to process, to reduce fingerprinting,
# and keep them in the `items` variable.
items = list() # type: List[ArchiveMember]
for item in sorted(self._get_all_members(zin), key=self._get_member_name):
2019-02-25 15:37:44 +01:00
# Some fileformats do require to have the `mimetype` file
# as the first file in the archive.
2019-04-27 13:05:36 +02:00
if self._get_member_name(item) == 'mimetype':
items = [item] + items
else:
items.append(item)
# Since files order is a fingerprint factor,
# we're iterating (and thus inserting) them in lexicographic order.
for item in items:
2019-04-27 13:05:36 +02:00
member_name = self._get_member_name(item)
if member_name[-1] == '/': # `is_dir` is added in Python3.6
2018-09-06 11:32:45 +02:00
continue # don't keep empty folders
zin.extract(member=item, path=temp_folder)
2019-04-27 13:05:36 +02:00
full_path = os.path.join(temp_folder, member_name)
2018-09-06 11:32:45 +02:00
if self._specific_cleanup(full_path) is False:
logging.warning("Something went wrong during deep cleaning of %s",
2019-04-27 13:05:36 +02:00
member_name)
2018-09-06 11:32:45 +02:00
abort = True
continue
2019-04-27 13:05:36 +02:00
if any(map(lambda r: r.search(member_name), self.files_to_keep)):
2018-09-06 11:32:45 +02:00
# those files aren't supported, but we want to add them anyway
pass
2019-04-27 13:05:36 +02:00
elif any(map(lambda r: r.search(member_name), self.files_to_omit)):
2018-09-06 11:32:45 +02:00
continue
2018-09-24 20:15:07 +02:00
else: # supported files that we want to first clean, then add
2019-04-27 13:05:36 +02:00
member_parser, mtype = parser_factory.get_parser(full_path) # type: ignore
if not member_parser:
2018-09-06 11:32:45 +02:00
if self.unknown_member_policy == UnknownMemberPolicy.OMIT:
logging.warning("In file %s, omitting unknown element %s (format: %s)",
2019-04-27 13:05:36 +02:00
self.filename, member_name, mtype)
2018-09-06 11:32:45 +02:00
continue
elif self.unknown_member_policy == UnknownMemberPolicy.KEEP:
logging.warning("In file %s, keeping unknown element %s (format: %s)",
2019-04-27 13:05:36 +02:00
self.filename, member_name, mtype)
2018-09-06 11:32:45 +02:00
else:
logging.error("In file %s, element %s's format (%s) " \
2018-09-06 11:32:45 +02:00
"isn't supported",
2019-04-27 13:05:36 +02:00
self.filename, member_name, mtype)
2018-09-06 11:32:45 +02:00
abort = True
continue
2019-04-27 13:05:36 +02:00
else:
if member_parser.remove_all() is False:
logging.warning("In file %s, something went wrong \
with the cleaning of %s \
(format: %s)",
2019-04-27 13:05:36 +02:00
self.filename, member_name, mtype)
abort = True
continue
2019-04-27 13:05:36 +02:00
os.rename(member_parser.output_filename, full_path)
2018-09-06 11:32:45 +02:00
2019-04-27 13:05:36 +02:00
zinfo = self.member_class(member_name) # type: ignore
clean_zinfo = self._clean_member(zinfo)
self._add_file_to_archive(zout, clean_zinfo, full_path)
2018-09-06 11:32:45 +02:00
shutil.rmtree(temp_folder)
if abort:
os.remove(self.output_filename)
return False
return True
2018-10-25 11:56:46 +02:00
2019-04-27 13:05:36 +02:00
class TarParser(ArchiveBasedAbstractParser):
mimetypes = {'application/x-tar'}
def __init__(self, filename):
super().__init__(filename)
self.archive_class = tarfile.TarFile
self.member_class = tarfile.TarInfo
def is_archive_valid(self):
if tarfile.is_tarfile(self.filename) is False:
raise ValueError
@staticmethod
def _clean_member(member: ArchiveMember) -> ArchiveMember:
assert isinstance(member, tarfile.TarInfo) # please mypy
member.mtime = member.uid = member.gid = 0
member.uname = member.gname = ''
return member
@staticmethod
def _get_member_meta(member: ArchiveMember) -> Dict[str, str]:
assert isinstance(member, tarfile.TarInfo) # please mypy
metadata = {}
if member.mtime != 0:
metadata['mtime'] = str(member.mtime)
if member.uid != 0:
metadata['uid'] = str(member.uid)
if member.gid != 0:
metadata['gid'] = str(member.gid)
if member.uname != '':
metadata['uname'] = member.uname
if member.gname != '':
metadata['gname'] = member.gname
return metadata
@staticmethod
def _add_file_to_archive(archive: ArchiveClass, member: ArchiveMember,
full_path: str):
assert isinstance(member, tarfile.TarInfo) # please mypy
assert isinstance(archive, tarfile.TarFile) # please mypy
archive.add(full_path, member.name, filter=TarParser._clean_member) # type: ignore
@staticmethod
def _get_all_members(archive: ArchiveClass) -> List[ArchiveMember]:
assert isinstance(archive, tarfile.TarFile) # please mypy
return archive.getmembers() # type: ignore
@staticmethod
def _get_member_name(member: ArchiveMember) -> str:
assert isinstance(member, tarfile.TarInfo) # please mypy
return member.name
2018-10-25 11:56:46 +02:00
class ZipParser(ArchiveBasedAbstractParser):
mimetypes = {'application/zip'}
2019-04-27 13:05:36 +02:00
def __init__(self, filename):
super().__init__(filename)
self.archive_class = zipfile.ZipFile
self.member_class = zipfile.ZipInfo
def is_archive_valid(self):
try:
zipfile.ZipFile(self.filename)
except zipfile.BadZipFile:
raise ValueError
@staticmethod
def _clean_member(member: ArchiveMember) -> ArchiveMember:
assert isinstance(member, zipfile.ZipInfo) # please mypy
member.create_system = 3 # Linux
member.comment = b''
member.date_time = (1980, 1, 1, 0, 0, 0) # this is as early as a zipfile can be
return member
@staticmethod
def _get_member_meta(member: ArchiveMember) -> Dict[str, str]:
assert isinstance(member, zipfile.ZipInfo) # please mypy
metadata = {}
if member.create_system == 3: # this is Linux
pass
elif member.create_system == 2:
metadata['create_system'] = 'Windows'
else:
metadata['create_system'] = 'Weird'
if member.comment:
metadata['comment'] = member.comment # type: ignore
if member.date_time != (1980, 1, 1, 0, 0, 0):
metadata['date_time'] = str(datetime.datetime(*member.date_time))
return metadata
@staticmethod
def _add_file_to_archive(archive: ArchiveClass, member: ArchiveMember,
full_path: str):
assert isinstance(archive, zipfile.ZipFile) # please mypy
assert isinstance(member, zipfile.ZipInfo) # please mypy
with open(full_path, 'rb') as f:
archive.writestr(member, f.read())
@staticmethod
def _get_all_members(archive: ArchiveClass) -> List[ArchiveMember]:
assert isinstance(archive, zipfile.ZipFile) # please mypy
return archive.infolist() # type: ignore
@staticmethod
def _get_member_name(member: ArchiveMember) -> str:
assert isinstance(member, zipfile.ZipInfo) # please mypy
return member.filename