Split office and archives
This commit is contained in:
parent
2e9adab86a
commit
e3d817f57e
127
libmat2/archive.py
Normal file
127
libmat2/archive.py
Normal file
@ -0,0 +1,127 @@
|
||||
import zipfile
|
||||
import datetime
|
||||
import tempfile
|
||||
import os
|
||||
import logging
|
||||
import shutil
|
||||
from typing import Dict, Set, Pattern
|
||||
|
||||
from . import abstract, UnknownMemberPolicy, parser_factory
|
||||
|
||||
# Make pyflakes happy
|
||||
assert Set
|
||||
assert Pattern
|
||||
|
||||
|
||||
class ArchiveBasedAbstractParser(abstract.AbstractParser):
|
||||
""" Office files (.docx, .odt, …) are zipped files. """
|
||||
# Those are the files that have a format that _isn't_
|
||||
# supported by MAT2, but that we want to keep anyway.
|
||||
files_to_keep = set() # type: Set[str]
|
||||
|
||||
# Those are the files that we _do not_ want to keep,
|
||||
# no matter if they are supported or not.
|
||||
files_to_omit = set() # type: Set[Pattern]
|
||||
|
||||
# what should the parser do if it encounters an unknown file in
|
||||
# the archive?
|
||||
unknown_member_policy = UnknownMemberPolicy.ABORT # type: UnknownMemberPolicy
|
||||
|
||||
def __init__(self, filename):
|
||||
super().__init__(filename)
|
||||
try: # better fail here than later
|
||||
zipfile.ZipFile(self.filename)
|
||||
except zipfile.BadZipFile:
|
||||
raise ValueError
|
||||
|
||||
def _specific_cleanup(self, full_path: str) -> bool:
|
||||
""" This method can be used to apply specific treatment
|
||||
to files present in the archive."""
|
||||
# pylint: disable=unused-argument,no-self-use
|
||||
return True # pragma: no cover
|
||||
|
||||
@staticmethod
|
||||
def _clean_zipinfo(zipinfo: zipfile.ZipInfo) -> zipfile.ZipInfo:
|
||||
zipinfo.create_system = 3 # Linux
|
||||
zipinfo.comment = b''
|
||||
zipinfo.date_time = (1980, 1, 1, 0, 0, 0) # this is as early as a zipfile can be
|
||||
return zipinfo
|
||||
|
||||
@staticmethod
|
||||
def _get_zipinfo_meta(zipinfo: zipfile.ZipInfo) -> Dict[str, str]:
|
||||
metadata = {}
|
||||
if zipinfo.create_system == 3: # this is Linux
|
||||
pass
|
||||
elif zipinfo.create_system == 2:
|
||||
metadata['create_system'] = 'Windows'
|
||||
else:
|
||||
metadata['create_system'] = 'Weird'
|
||||
|
||||
if zipinfo.comment:
|
||||
metadata['comment'] = zipinfo.comment # type: ignore
|
||||
|
||||
if zipinfo.date_time != (1980, 1, 1, 0, 0, 0):
|
||||
metadata['date_time'] = str(datetime.datetime(*zipinfo.date_time))
|
||||
|
||||
return metadata
|
||||
|
||||
def remove_all(self) -> bool:
|
||||
# pylint: disable=too-many-branches
|
||||
|
||||
with zipfile.ZipFile(self.filename) as zin,\
|
||||
zipfile.ZipFile(self.output_filename, 'w') as zout:
|
||||
|
||||
temp_folder = tempfile.mkdtemp()
|
||||
abort = False
|
||||
|
||||
for item in zin.infolist():
|
||||
if item.filename[-1] == '/': # `is_dir` is added in Python3.6
|
||||
continue # don't keep empty folders
|
||||
|
||||
zin.extract(member=item, path=temp_folder)
|
||||
full_path = os.path.join(temp_folder, item.filename)
|
||||
|
||||
if self._specific_cleanup(full_path) is False:
|
||||
logging.warning("Something went wrong during deep cleaning of %s",
|
||||
item.filename)
|
||||
abort = True
|
||||
continue
|
||||
|
||||
if item.filename in self.files_to_keep:
|
||||
# those files aren't supported, but we want to add them anyway
|
||||
pass
|
||||
elif any(map(lambda r: r.search(item.filename), self.files_to_omit)):
|
||||
continue
|
||||
else:
|
||||
# supported files that we want to clean then add
|
||||
tmp_parser, mtype = parser_factory.get_parser(full_path) # type: ignore
|
||||
if not tmp_parser:
|
||||
if self.unknown_member_policy == UnknownMemberPolicy.OMIT:
|
||||
logging.warning("In file %s, omitting unknown element %s (format: %s)",
|
||||
self.filename, item.filename, mtype)
|
||||
continue
|
||||
elif self.unknown_member_policy == UnknownMemberPolicy.KEEP:
|
||||
logging.warning("In file %s, keeping unknown element %s (format: %s)",
|
||||
self.filename, item.filename, mtype)
|
||||
else:
|
||||
logging.error("In file %s, element %s's format (%s) " +
|
||||
"isn't supported",
|
||||
self.filename, item.filename, mtype)
|
||||
abort = True
|
||||
continue
|
||||
if tmp_parser:
|
||||
tmp_parser.remove_all()
|
||||
os.rename(tmp_parser.output_filename, full_path)
|
||||
|
||||
zinfo = zipfile.ZipInfo(item.filename) # type: ignore
|
||||
clean_zinfo = self._clean_zipinfo(zinfo)
|
||||
with open(full_path, 'rb') as f:
|
||||
zout.writestr(clean_zinfo, f.read())
|
||||
|
||||
shutil.rmtree(temp_folder)
|
||||
if abort:
|
||||
os.remove(self.output_filename)
|
||||
return False
|
||||
return True
|
||||
|
||||
|
@ -1,15 +1,11 @@
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import tempfile
|
||||
import datetime
|
||||
import zipfile
|
||||
import logging
|
||||
from typing import Dict, Set, Pattern
|
||||
|
||||
import xml.etree.ElementTree as ET # type: ignore
|
||||
|
||||
from . import abstract, parser_factory, UnknownMemberPolicy
|
||||
from .archive import ArchiveBasedAbstractParser
|
||||
|
||||
# Make pyflakes happy
|
||||
assert Set
|
||||
@ -26,118 +22,6 @@ def _parse_xml(full_path: str):
|
||||
return ET.parse(full_path), namespace_map
|
||||
|
||||
|
||||
class ArchiveBasedAbstractParser(abstract.AbstractParser):
|
||||
""" Office files (.docx, .odt, …) are zipped files. """
|
||||
# Those are the files that have a format that _isn't_
|
||||
# supported by MAT2, but that we want to keep anyway.
|
||||
files_to_keep = set() # type: Set[str]
|
||||
|
||||
# Those are the files that we _do not_ want to keep,
|
||||
# no matter if they are supported or not.
|
||||
files_to_omit = set() # type: Set[Pattern]
|
||||
|
||||
# what should the parser do if it encounters an unknown file in
|
||||
# the archive?
|
||||
unknown_member_policy = UnknownMemberPolicy.ABORT # type: UnknownMemberPolicy
|
||||
|
||||
def __init__(self, filename):
|
||||
super().__init__(filename)
|
||||
try: # better fail here than later
|
||||
zipfile.ZipFile(self.filename)
|
||||
except zipfile.BadZipFile:
|
||||
raise ValueError
|
||||
|
||||
def _specific_cleanup(self, full_path: str) -> bool:
|
||||
""" This method can be used to apply specific treatment
|
||||
to files present in the archive."""
|
||||
# pylint: disable=unused-argument,no-self-use
|
||||
return True # pragma: no cover
|
||||
|
||||
@staticmethod
|
||||
def _clean_zipinfo(zipinfo: zipfile.ZipInfo) -> zipfile.ZipInfo:
|
||||
zipinfo.create_system = 3 # Linux
|
||||
zipinfo.comment = b''
|
||||
zipinfo.date_time = (1980, 1, 1, 0, 0, 0) # this is as early as a zipfile can be
|
||||
return zipinfo
|
||||
|
||||
@staticmethod
|
||||
def _get_zipinfo_meta(zipinfo: zipfile.ZipInfo) -> Dict[str, str]:
|
||||
metadata = {}
|
||||
if zipinfo.create_system == 3: # this is Linux
|
||||
pass
|
||||
elif zipinfo.create_system == 2:
|
||||
metadata['create_system'] = 'Windows'
|
||||
else:
|
||||
metadata['create_system'] = 'Weird'
|
||||
|
||||
if zipinfo.comment:
|
||||
metadata['comment'] = zipinfo.comment # type: ignore
|
||||
|
||||
if zipinfo.date_time != (1980, 1, 1, 0, 0, 0):
|
||||
metadata['date_time'] = str(datetime.datetime(*zipinfo.date_time))
|
||||
|
||||
return metadata
|
||||
|
||||
def remove_all(self) -> bool:
|
||||
# pylint: disable=too-many-branches
|
||||
|
||||
with zipfile.ZipFile(self.filename) as zin,\
|
||||
zipfile.ZipFile(self.output_filename, 'w') as zout:
|
||||
|
||||
temp_folder = tempfile.mkdtemp()
|
||||
abort = False
|
||||
|
||||
for item in zin.infolist():
|
||||
if item.filename[-1] == '/': # `is_dir` is added in Python3.6
|
||||
continue # don't keep empty folders
|
||||
|
||||
zin.extract(member=item, path=temp_folder)
|
||||
full_path = os.path.join(temp_folder, item.filename)
|
||||
|
||||
if self._specific_cleanup(full_path) is False:
|
||||
logging.warning("Something went wrong during deep cleaning of %s",
|
||||
item.filename)
|
||||
abort = True
|
||||
continue
|
||||
|
||||
if item.filename in self.files_to_keep:
|
||||
# those files aren't supported, but we want to add them anyway
|
||||
pass
|
||||
elif any(map(lambda r: r.search(item.filename), self.files_to_omit)):
|
||||
continue
|
||||
else:
|
||||
# supported files that we want to clean then add
|
||||
tmp_parser, mtype = parser_factory.get_parser(full_path) # type: ignore
|
||||
if not tmp_parser:
|
||||
if self.unknown_member_policy == UnknownMemberPolicy.OMIT:
|
||||
logging.warning("In file %s, omitting unknown element %s (format: %s)",
|
||||
self.filename, item.filename, mtype)
|
||||
continue
|
||||
elif self.unknown_member_policy == UnknownMemberPolicy.KEEP:
|
||||
logging.warning("In file %s, keeping unknown element %s (format: %s)",
|
||||
self.filename, item.filename, mtype)
|
||||
else:
|
||||
logging.error("In file %s, element %s's format (%s) " +
|
||||
"isn't supported",
|
||||
self.filename, item.filename, mtype)
|
||||
abort = True
|
||||
continue
|
||||
if tmp_parser:
|
||||
tmp_parser.remove_all()
|
||||
os.rename(tmp_parser.output_filename, full_path)
|
||||
|
||||
zinfo = zipfile.ZipInfo(item.filename) # type: ignore
|
||||
clean_zinfo = self._clean_zipinfo(zinfo)
|
||||
with open(full_path, 'rb') as f:
|
||||
zout.writestr(clean_zinfo, f.read())
|
||||
|
||||
shutil.rmtree(temp_folder)
|
||||
if abort:
|
||||
os.remove(self.output_filename)
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
class MSOfficeParser(ArchiveBasedAbstractParser):
|
||||
mimetypes = {
|
||||
'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
|
||||
|
Loading…
Reference in New Issue
Block a user