2018-09-06 11:32:45 +02:00
|
|
|
import zipfile
|
|
|
|
import datetime
|
|
|
|
import tempfile
|
|
|
|
import os
|
|
|
|
import logging
|
|
|
|
import shutil
|
2019-02-24 23:03:17 +01:00
|
|
|
from typing import Dict, Set, Pattern, Union, Any, List
|
2018-09-06 11:32:45 +02:00
|
|
|
|
|
|
|
from . import abstract, UnknownMemberPolicy, parser_factory
|
|
|
|
|
|
|
|
# Make pyflakes happy
|
|
|
|
assert Set
|
|
|
|
assert Pattern
|
2019-02-24 23:03:17 +01:00
|
|
|
assert List
|
2018-10-12 11:58:01 +02:00
|
|
|
assert Union
|
2018-09-06 11:32:45 +02:00
|
|
|
|
|
|
|
|
|
|
|
class ArchiveBasedAbstractParser(abstract.AbstractParser):
|
|
|
|
""" Office files (.docx, .odt, …) are zipped files. """
|
2018-10-03 15:22:36 +02:00
|
|
|
def __init__(self, filename):
|
|
|
|
super().__init__(filename)
|
2018-09-06 11:32:45 +02:00
|
|
|
|
2018-10-03 15:22:36 +02:00
|
|
|
# Those are the files that have a format that _isn't_
|
|
|
|
# supported by MAT2, but that we want to keep anyway.
|
|
|
|
self.files_to_keep = set() # type: Set[Pattern]
|
2018-09-06 11:32:45 +02:00
|
|
|
|
2018-10-03 15:22:36 +02:00
|
|
|
# Those are the files that we _do not_ want to keep,
|
|
|
|
# no matter if they are supported or not.
|
|
|
|
self.files_to_omit = set() # type: Set[Pattern]
|
|
|
|
|
|
|
|
# what should the parser do if it encounters an unknown file in
|
|
|
|
# the archive?
|
|
|
|
self.unknown_member_policy = UnknownMemberPolicy.ABORT # type: UnknownMemberPolicy
|
2018-09-06 11:32:45 +02:00
|
|
|
|
|
|
|
try: # better fail here than later
|
|
|
|
zipfile.ZipFile(self.filename)
|
|
|
|
except zipfile.BadZipFile:
|
|
|
|
raise ValueError
|
|
|
|
|
|
|
|
def _specific_cleanup(self, full_path: str) -> bool:
|
|
|
|
""" This method can be used to apply specific treatment
|
|
|
|
to files present in the archive."""
|
|
|
|
# pylint: disable=unused-argument,no-self-use
|
|
|
|
return True # pragma: no cover
|
|
|
|
|
2019-02-03 22:55:15 +01:00
|
|
|
def _specific_get_meta(self, full_path: str, file_path: str) -> Dict[str, Any]:
|
|
|
|
""" This method can be used to extract specific metadata
|
|
|
|
from files present in the archive."""
|
|
|
|
# pylint: disable=unused-argument,no-self-use
|
|
|
|
return {} # pragma: no cover
|
|
|
|
|
2018-09-06 11:32:45 +02:00
|
|
|
@staticmethod
|
|
|
|
def _clean_zipinfo(zipinfo: zipfile.ZipInfo) -> zipfile.ZipInfo:
|
|
|
|
zipinfo.create_system = 3 # Linux
|
|
|
|
zipinfo.comment = b''
|
|
|
|
zipinfo.date_time = (1980, 1, 1, 0, 0, 0) # this is as early as a zipfile can be
|
|
|
|
return zipinfo
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def _get_zipinfo_meta(zipinfo: zipfile.ZipInfo) -> Dict[str, str]:
|
|
|
|
metadata = {}
|
|
|
|
if zipinfo.create_system == 3: # this is Linux
|
|
|
|
pass
|
|
|
|
elif zipinfo.create_system == 2:
|
|
|
|
metadata['create_system'] = 'Windows'
|
|
|
|
else:
|
|
|
|
metadata['create_system'] = 'Weird'
|
|
|
|
|
|
|
|
if zipinfo.comment:
|
|
|
|
metadata['comment'] = zipinfo.comment # type: ignore
|
|
|
|
|
|
|
|
if zipinfo.date_time != (1980, 1, 1, 0, 0, 0):
|
|
|
|
metadata['date_time'] = str(datetime.datetime(*zipinfo.date_time))
|
|
|
|
|
|
|
|
return metadata
|
|
|
|
|
2018-10-25 11:29:50 +02:00
|
|
|
def get_meta(self) -> Dict[str, Union[str, dict]]:
|
|
|
|
meta = dict() # type: Dict[str, Union[str, dict]]
|
|
|
|
|
|
|
|
with zipfile.ZipFile(self.filename) as zin:
|
|
|
|
temp_folder = tempfile.mkdtemp()
|
|
|
|
|
|
|
|
for item in zin.infolist():
|
2019-02-03 22:55:15 +01:00
|
|
|
local_meta = dict() # type: Dict[str, Union[str, Dict]]
|
|
|
|
for k, v in self._get_zipinfo_meta(item).items():
|
|
|
|
local_meta[k] = v
|
|
|
|
|
2018-10-25 11:29:50 +02:00
|
|
|
if item.filename[-1] == '/': # pragma: no cover
|
|
|
|
# `is_dir` is added in Python3.6
|
|
|
|
continue # don't keep empty folders
|
|
|
|
|
|
|
|
zin.extract(member=item, path=temp_folder)
|
|
|
|
full_path = os.path.join(temp_folder, item.filename)
|
|
|
|
|
2019-02-03 22:55:15 +01:00
|
|
|
specific_meta = self._specific_get_meta(full_path, item.filename)
|
|
|
|
for (k, v) in specific_meta.items():
|
|
|
|
local_meta[k] = v
|
|
|
|
|
2018-10-25 11:29:50 +02:00
|
|
|
tmp_parser, _ = parser_factory.get_parser(full_path) # type: ignore
|
2019-02-03 22:55:15 +01:00
|
|
|
if tmp_parser:
|
|
|
|
for k, v in tmp_parser.get_meta().items():
|
|
|
|
local_meta[k] = v
|
2018-10-25 11:29:50 +02:00
|
|
|
|
|
|
|
if local_meta:
|
|
|
|
meta[item.filename] = local_meta
|
|
|
|
|
|
|
|
shutil.rmtree(temp_folder)
|
|
|
|
return meta
|
|
|
|
|
2018-09-06 11:32:45 +02:00
|
|
|
def remove_all(self) -> bool:
|
|
|
|
# pylint: disable=too-many-branches
|
|
|
|
|
|
|
|
with zipfile.ZipFile(self.filename) as zin,\
|
|
|
|
zipfile.ZipFile(self.output_filename, 'w') as zout:
|
|
|
|
|
|
|
|
temp_folder = tempfile.mkdtemp()
|
|
|
|
abort = False
|
|
|
|
|
2019-02-24 23:03:17 +01:00
|
|
|
items = list() # type: List[zipfile.ZipInfo]
|
|
|
|
for item in sorted(zin.infolist(), key=lambda z: z.filename):
|
2019-02-25 15:37:44 +01:00
|
|
|
# Some fileformats do require to have the `mimetype` file
|
|
|
|
# as the first file in the archive.
|
2019-02-24 23:03:17 +01:00
|
|
|
if item.filename == 'mimetype':
|
|
|
|
items = [item] + items
|
|
|
|
else:
|
|
|
|
items.append(item)
|
|
|
|
|
2018-09-18 22:44:21 +02:00
|
|
|
# Since files order is a fingerprint factor,
|
|
|
|
# we're iterating (and thus inserting) them in lexicographic order.
|
2019-02-24 23:03:17 +01:00
|
|
|
for item in items:
|
2018-09-06 11:32:45 +02:00
|
|
|
if item.filename[-1] == '/': # `is_dir` is added in Python3.6
|
|
|
|
continue # don't keep empty folders
|
|
|
|
|
|
|
|
zin.extract(member=item, path=temp_folder)
|
|
|
|
full_path = os.path.join(temp_folder, item.filename)
|
|
|
|
|
|
|
|
if self._specific_cleanup(full_path) is False:
|
|
|
|
logging.warning("Something went wrong during deep cleaning of %s",
|
|
|
|
item.filename)
|
|
|
|
abort = True
|
|
|
|
continue
|
|
|
|
|
2018-10-01 22:26:35 +02:00
|
|
|
if any(map(lambda r: r.search(item.filename), self.files_to_keep)):
|
2018-09-06 11:32:45 +02:00
|
|
|
# those files aren't supported, but we want to add them anyway
|
|
|
|
pass
|
|
|
|
elif any(map(lambda r: r.search(item.filename), self.files_to_omit)):
|
|
|
|
continue
|
2018-09-24 20:15:07 +02:00
|
|
|
else: # supported files that we want to first clean, then add
|
2018-09-06 11:32:45 +02:00
|
|
|
tmp_parser, mtype = parser_factory.get_parser(full_path) # type: ignore
|
|
|
|
if not tmp_parser:
|
|
|
|
if self.unknown_member_policy == UnknownMemberPolicy.OMIT:
|
|
|
|
logging.warning("In file %s, omitting unknown element %s (format: %s)",
|
|
|
|
self.filename, item.filename, mtype)
|
|
|
|
continue
|
|
|
|
elif self.unknown_member_policy == UnknownMemberPolicy.KEEP:
|
|
|
|
logging.warning("In file %s, keeping unknown element %s (format: %s)",
|
|
|
|
self.filename, item.filename, mtype)
|
|
|
|
else:
|
2019-02-03 10:43:27 +01:00
|
|
|
logging.error("In file %s, element %s's format (%s) " \
|
2018-09-06 11:32:45 +02:00
|
|
|
"isn't supported",
|
|
|
|
self.filename, item.filename, mtype)
|
|
|
|
abort = True
|
|
|
|
continue
|
|
|
|
if tmp_parser:
|
2018-10-25 11:56:12 +02:00
|
|
|
if tmp_parser.remove_all() is False:
|
|
|
|
logging.warning("In file %s, something went wrong \
|
|
|
|
with the cleaning of %s \
|
|
|
|
(format: %s)",
|
|
|
|
self.filename, item.filename, mtype)
|
|
|
|
abort = True
|
|
|
|
continue
|
2018-09-06 11:32:45 +02:00
|
|
|
os.rename(tmp_parser.output_filename, full_path)
|
|
|
|
|
|
|
|
zinfo = zipfile.ZipInfo(item.filename) # type: ignore
|
|
|
|
clean_zinfo = self._clean_zipinfo(zinfo)
|
|
|
|
with open(full_path, 'rb') as f:
|
|
|
|
zout.writestr(clean_zinfo, f.read())
|
|
|
|
|
|
|
|
shutil.rmtree(temp_folder)
|
|
|
|
if abort:
|
|
|
|
os.remove(self.output_filename)
|
|
|
|
return False
|
|
|
|
return True
|
2018-10-25 11:56:46 +02:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ZipParser(ArchiveBasedAbstractParser):
|
|
|
|
mimetypes = {'application/zip'}
|