From e3d817f57e77676c30fbfa05ed08deee7918b238 Mon Sep 17 00:00:00 2001 From: jvoisin Date: Thu, 6 Sep 2018 11:32:45 +0200 Subject: [PATCH] Split office and archives --- libmat2/archive.py | 127 +++++++++++++++++++++++++++++++++++++++++++++ libmat2/office.py | 118 +---------------------------------------- 2 files changed, 128 insertions(+), 117 deletions(-) create mode 100644 libmat2/archive.py diff --git a/libmat2/archive.py b/libmat2/archive.py new file mode 100644 index 0000000..d8f9007 --- /dev/null +++ b/libmat2/archive.py @@ -0,0 +1,127 @@ +import zipfile +import datetime +import tempfile +import os +import logging +import shutil +from typing import Dict, Set, Pattern + +from . import abstract, UnknownMemberPolicy, parser_factory + +# Make pyflakes happy +assert Set +assert Pattern + + +class ArchiveBasedAbstractParser(abstract.AbstractParser): + """ Office files (.docx, .odt, …) are zipped files. """ + # Those are the files that have a format that _isn't_ + # supported by MAT2, but that we want to keep anyway. + files_to_keep = set() # type: Set[str] + + # Those are the files that we _do not_ want to keep, + # no matter if they are supported or not. + files_to_omit = set() # type: Set[Pattern] + + # what should the parser do if it encounters an unknown file in + # the archive? + unknown_member_policy = UnknownMemberPolicy.ABORT # type: UnknownMemberPolicy + + def __init__(self, filename): + super().__init__(filename) + try: # better fail here than later + zipfile.ZipFile(self.filename) + except zipfile.BadZipFile: + raise ValueError + + def _specific_cleanup(self, full_path: str) -> bool: + """ This method can be used to apply specific treatment + to files present in the archive.""" + # pylint: disable=unused-argument,no-self-use + return True # pragma: no cover + + @staticmethod + def _clean_zipinfo(zipinfo: zipfile.ZipInfo) -> zipfile.ZipInfo: + zipinfo.create_system = 3 # Linux + zipinfo.comment = b'' + zipinfo.date_time = (1980, 1, 1, 0, 0, 0) # this is as early as a zipfile can be + return zipinfo + + @staticmethod + def _get_zipinfo_meta(zipinfo: zipfile.ZipInfo) -> Dict[str, str]: + metadata = {} + if zipinfo.create_system == 3: # this is Linux + pass + elif zipinfo.create_system == 2: + metadata['create_system'] = 'Windows' + else: + metadata['create_system'] = 'Weird' + + if zipinfo.comment: + metadata['comment'] = zipinfo.comment # type: ignore + + if zipinfo.date_time != (1980, 1, 1, 0, 0, 0): + metadata['date_time'] = str(datetime.datetime(*zipinfo.date_time)) + + return metadata + + def remove_all(self) -> bool: + # pylint: disable=too-many-branches + + with zipfile.ZipFile(self.filename) as zin,\ + zipfile.ZipFile(self.output_filename, 'w') as zout: + + temp_folder = tempfile.mkdtemp() + abort = False + + for item in zin.infolist(): + if item.filename[-1] == '/': # `is_dir` is added in Python3.6 + continue # don't keep empty folders + + zin.extract(member=item, path=temp_folder) + full_path = os.path.join(temp_folder, item.filename) + + if self._specific_cleanup(full_path) is False: + logging.warning("Something went wrong during deep cleaning of %s", + item.filename) + abort = True + continue + + if item.filename in self.files_to_keep: + # those files aren't supported, but we want to add them anyway + pass + elif any(map(lambda r: r.search(item.filename), self.files_to_omit)): + continue + else: + # supported files that we want to clean then add + tmp_parser, mtype = parser_factory.get_parser(full_path) # type: ignore + if not tmp_parser: + if self.unknown_member_policy == UnknownMemberPolicy.OMIT: + logging.warning("In file %s, omitting unknown element %s (format: %s)", + self.filename, item.filename, mtype) + continue + elif self.unknown_member_policy == UnknownMemberPolicy.KEEP: + logging.warning("In file %s, keeping unknown element %s (format: %s)", + self.filename, item.filename, mtype) + else: + logging.error("In file %s, element %s's format (%s) " + + "isn't supported", + self.filename, item.filename, mtype) + abort = True + continue + if tmp_parser: + tmp_parser.remove_all() + os.rename(tmp_parser.output_filename, full_path) + + zinfo = zipfile.ZipInfo(item.filename) # type: ignore + clean_zinfo = self._clean_zipinfo(zinfo) + with open(full_path, 'rb') as f: + zout.writestr(clean_zinfo, f.read()) + + shutil.rmtree(temp_folder) + if abort: + os.remove(self.output_filename) + return False + return True + + diff --git a/libmat2/office.py b/libmat2/office.py index 60c5478..50b776e 100644 --- a/libmat2/office.py +++ b/libmat2/office.py @@ -1,15 +1,11 @@ import os import re -import shutil -import tempfile -import datetime import zipfile -import logging from typing import Dict, Set, Pattern import xml.etree.ElementTree as ET # type: ignore -from . import abstract, parser_factory, UnknownMemberPolicy +from .archive import ArchiveBasedAbstractParser # Make pyflakes happy assert Set @@ -26,118 +22,6 @@ def _parse_xml(full_path: str): return ET.parse(full_path), namespace_map -class ArchiveBasedAbstractParser(abstract.AbstractParser): - """ Office files (.docx, .odt, …) are zipped files. """ - # Those are the files that have a format that _isn't_ - # supported by MAT2, but that we want to keep anyway. - files_to_keep = set() # type: Set[str] - - # Those are the files that we _do not_ want to keep, - # no matter if they are supported or not. - files_to_omit = set() # type: Set[Pattern] - - # what should the parser do if it encounters an unknown file in - # the archive? - unknown_member_policy = UnknownMemberPolicy.ABORT # type: UnknownMemberPolicy - - def __init__(self, filename): - super().__init__(filename) - try: # better fail here than later - zipfile.ZipFile(self.filename) - except zipfile.BadZipFile: - raise ValueError - - def _specific_cleanup(self, full_path: str) -> bool: - """ This method can be used to apply specific treatment - to files present in the archive.""" - # pylint: disable=unused-argument,no-self-use - return True # pragma: no cover - - @staticmethod - def _clean_zipinfo(zipinfo: zipfile.ZipInfo) -> zipfile.ZipInfo: - zipinfo.create_system = 3 # Linux - zipinfo.comment = b'' - zipinfo.date_time = (1980, 1, 1, 0, 0, 0) # this is as early as a zipfile can be - return zipinfo - - @staticmethod - def _get_zipinfo_meta(zipinfo: zipfile.ZipInfo) -> Dict[str, str]: - metadata = {} - if zipinfo.create_system == 3: # this is Linux - pass - elif zipinfo.create_system == 2: - metadata['create_system'] = 'Windows' - else: - metadata['create_system'] = 'Weird' - - if zipinfo.comment: - metadata['comment'] = zipinfo.comment # type: ignore - - if zipinfo.date_time != (1980, 1, 1, 0, 0, 0): - metadata['date_time'] = str(datetime.datetime(*zipinfo.date_time)) - - return metadata - - def remove_all(self) -> bool: - # pylint: disable=too-many-branches - - with zipfile.ZipFile(self.filename) as zin,\ - zipfile.ZipFile(self.output_filename, 'w') as zout: - - temp_folder = tempfile.mkdtemp() - abort = False - - for item in zin.infolist(): - if item.filename[-1] == '/': # `is_dir` is added in Python3.6 - continue # don't keep empty folders - - zin.extract(member=item, path=temp_folder) - full_path = os.path.join(temp_folder, item.filename) - - if self._specific_cleanup(full_path) is False: - logging.warning("Something went wrong during deep cleaning of %s", - item.filename) - abort = True - continue - - if item.filename in self.files_to_keep: - # those files aren't supported, but we want to add them anyway - pass - elif any(map(lambda r: r.search(item.filename), self.files_to_omit)): - continue - else: - # supported files that we want to clean then add - tmp_parser, mtype = parser_factory.get_parser(full_path) # type: ignore - if not tmp_parser: - if self.unknown_member_policy == UnknownMemberPolicy.OMIT: - logging.warning("In file %s, omitting unknown element %s (format: %s)", - self.filename, item.filename, mtype) - continue - elif self.unknown_member_policy == UnknownMemberPolicy.KEEP: - logging.warning("In file %s, keeping unknown element %s (format: %s)", - self.filename, item.filename, mtype) - else: - logging.error("In file %s, element %s's format (%s) " + - "isn't supported", - self.filename, item.filename, mtype) - abort = True - continue - if tmp_parser: - tmp_parser.remove_all() - os.rename(tmp_parser.output_filename, full_path) - - zinfo = zipfile.ZipInfo(item.filename) # type: ignore - clean_zinfo = self._clean_zipinfo(zinfo) - with open(full_path, 'rb') as f: - zout.writestr(clean_zinfo, f.read()) - - shutil.rmtree(temp_folder) - if abort: - os.remove(self.output_filename) - return False - return True - - class MSOfficeParser(ArchiveBasedAbstractParser): mimetypes = { 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',