Unknown Members: make policy use an Enum
Closes #60 Note: this changeset also ensures that clean.cleaned.docx is removed up after the pytest is over.
This commit is contained in:
parent
2d9ba81a84
commit
f3cef319b9
@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
import os
|
import os
|
||||||
import collections
|
import collections
|
||||||
|
from enum import Enum
|
||||||
import importlib
|
import importlib
|
||||||
from typing import Dict, Optional
|
from typing import Dict, Optional
|
||||||
|
|
||||||
@ -62,3 +63,8 @@ def check_dependencies() -> dict:
|
|||||||
ret[value] = False # pragma: no cover
|
ret[value] = False # pragma: no cover
|
||||||
|
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
|
class UnknownMemberPolicy(Enum):
|
||||||
|
ABORT = 'abort'
|
||||||
|
OMIT = 'omit'
|
||||||
|
KEEP = 'keep'
|
||||||
|
@ -9,7 +9,7 @@ from typing import Dict, Set, Pattern
|
|||||||
|
|
||||||
import xml.etree.ElementTree as ET # type: ignore
|
import xml.etree.ElementTree as ET # type: ignore
|
||||||
|
|
||||||
from . import abstract, parser_factory
|
from . import abstract, parser_factory, UnknownMemberPolicy
|
||||||
|
|
||||||
# Make pyflakes happy
|
# Make pyflakes happy
|
||||||
assert Set
|
assert Set
|
||||||
@ -37,8 +37,8 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser):
|
|||||||
files_to_omit = set() # type: Set[Pattern]
|
files_to_omit = set() # type: Set[Pattern]
|
||||||
|
|
||||||
# what should the parser do if it encounters an unknown file in
|
# what should the parser do if it encounters an unknown file in
|
||||||
# the archive? valid policies are 'abort', 'omit', 'keep'
|
# the archive?
|
||||||
unknown_member_policy = 'abort' # type: str
|
unknown_member_policy = UnknownMemberPolicy.ABORT # type: UnknownMemberPolicy
|
||||||
|
|
||||||
def __init__(self, filename):
|
def __init__(self, filename):
|
||||||
super().__init__(filename)
|
super().__init__(filename)
|
||||||
@ -81,10 +81,6 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser):
|
|||||||
def remove_all(self) -> bool:
|
def remove_all(self) -> bool:
|
||||||
# pylint: disable=too-many-branches
|
# pylint: disable=too-many-branches
|
||||||
|
|
||||||
if self.unknown_member_policy not in ['omit', 'keep', 'abort']:
|
|
||||||
logging.error("The policy %s is invalid.", self.unknown_member_policy)
|
|
||||||
raise ValueError
|
|
||||||
|
|
||||||
with zipfile.ZipFile(self.filename) as zin,\
|
with zipfile.ZipFile(self.filename) as zin,\
|
||||||
zipfile.ZipFile(self.output_filename, 'w') as zout:
|
zipfile.ZipFile(self.output_filename, 'w') as zout:
|
||||||
|
|
||||||
@ -113,11 +109,11 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser):
|
|||||||
# supported files that we want to clean then add
|
# supported files that we want to clean then add
|
||||||
tmp_parser, mtype = parser_factory.get_parser(full_path) # type: ignore
|
tmp_parser, mtype = parser_factory.get_parser(full_path) # type: ignore
|
||||||
if not tmp_parser:
|
if not tmp_parser:
|
||||||
if self.unknown_member_policy == 'omit':
|
if self.unknown_member_policy == UnknownMemberPolicy.OMIT:
|
||||||
logging.warning("In file %s, omitting unknown element %s (format: %s)",
|
logging.warning("In file %s, omitting unknown element %s (format: %s)",
|
||||||
self.filename, item.filename, mtype)
|
self.filename, item.filename, mtype)
|
||||||
continue
|
continue
|
||||||
elif self.unknown_member_policy == 'keep':
|
elif self.unknown_member_policy == UnknownMemberPolicy.KEEP:
|
||||||
logging.warning("In file %s, keeping unknown element %s (format: %s)",
|
logging.warning("In file %s, keeping unknown element %s (format: %s)",
|
||||||
self.filename, item.filename, mtype)
|
self.filename, item.filename, mtype)
|
||||||
else:
|
else:
|
||||||
|
17
mat2
17
mat2
@ -10,7 +10,8 @@ import multiprocessing
|
|||||||
import logging
|
import logging
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from libmat2 import parser_factory, UNSUPPORTED_EXTENSIONS, check_dependencies
|
from libmat2 import (parser_factory, UNSUPPORTED_EXTENSIONS, check_dependencies,
|
||||||
|
UnknownMemberPolicy)
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
print(e)
|
print(e)
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
@ -42,8 +43,8 @@ def create_arg_parser():
|
|||||||
parser.add_argument('-V', '--verbose', action='store_true',
|
parser.add_argument('-V', '--verbose', action='store_true',
|
||||||
help='show more verbose status information')
|
help='show more verbose status information')
|
||||||
parser.add_argument('--unknown-members', metavar='policy', default='abort',
|
parser.add_argument('--unknown-members', metavar='policy', default='abort',
|
||||||
help='how to handle unknown members of archive-style files ' +
|
help='how to handle unknown members of archive-style files (policy should' +
|
||||||
'(policy should be abort, omit, or keep)')
|
' be one of: ' + ', '.join([x.value for x in UnknownMemberPolicy]) + ')')
|
||||||
|
|
||||||
|
|
||||||
info = parser.add_mutually_exclusive_group()
|
info = parser.add_mutually_exclusive_group()
|
||||||
@ -70,7 +71,7 @@ def show_meta(filename: str):
|
|||||||
except UnicodeEncodeError:
|
except UnicodeEncodeError:
|
||||||
print(" %s: harmful content" % k)
|
print(" %s: harmful content" % k)
|
||||||
|
|
||||||
def clean_meta(params: Tuple[str, bool, str]) -> bool:
|
def clean_meta(params: Tuple[str, bool, UnknownMemberPolicy]) -> bool:
|
||||||
filename, is_lightweight, unknown_member_policy = params
|
filename, is_lightweight, unknown_member_policy = params
|
||||||
if not __check_file(filename, os.R_OK|os.W_OK):
|
if not __check_file(filename, os.R_OK|os.W_OK):
|
||||||
return False
|
return False
|
||||||
@ -137,15 +138,13 @@ def main():
|
|||||||
return 0
|
return 0
|
||||||
|
|
||||||
else:
|
else:
|
||||||
if args.unknown_members == 'keep':
|
unknown_member_policy = UnknownMemberPolicy(args.unknown_members)
|
||||||
|
if unknown_member_policy == UnknownMemberPolicy.KEEP:
|
||||||
logging.warning('Keeping unknown member files may leak metadata in the resulting file!')
|
logging.warning('Keeping unknown member files may leak metadata in the resulting file!')
|
||||||
elif args.unknown_members not in ['omit', 'abort']:
|
|
||||||
logging.warning('Undefined policy for handling unknown member files: "%s"',
|
|
||||||
args.unknown_members)
|
|
||||||
p = multiprocessing.Pool()
|
p = multiprocessing.Pool()
|
||||||
mode = (args.lightweight is True)
|
mode = (args.lightweight is True)
|
||||||
l = zip(__get_files_recursively(args.files), itertools.repeat(mode),
|
l = zip(__get_files_recursively(args.files), itertools.repeat(mode),
|
||||||
itertools.repeat(args.unknown_members))
|
itertools.repeat(unknown_member_policy))
|
||||||
|
|
||||||
ret = list(p.imap_unordered(clean_meta, list(l)))
|
ret = list(p.imap_unordered(clean_meta, list(l)))
|
||||||
return 0 if all(ret) else -1
|
return 0 if all(ret) else -1
|
||||||
|
@ -4,28 +4,29 @@ import unittest
|
|||||||
import shutil
|
import shutil
|
||||||
import os
|
import os
|
||||||
|
|
||||||
from libmat2 import office
|
from libmat2 import office, UnknownMemberPolicy
|
||||||
|
|
||||||
class TestPolicy(unittest.TestCase):
|
class TestPolicy(unittest.TestCase):
|
||||||
def test_policy_omit(self):
|
def test_policy_omit(self):
|
||||||
shutil.copy('./tests/data/embedded.docx', './tests/data/clean.docx')
|
shutil.copy('./tests/data/embedded.docx', './tests/data/clean.docx')
|
||||||
p = office.MSOfficeParser('./tests/data/clean.docx')
|
p = office.MSOfficeParser('./tests/data/clean.docx')
|
||||||
p.unknown_member_policy = 'omit'
|
p.unknown_member_policy = UnknownMemberPolicy.OMIT
|
||||||
self.assertTrue(p.remove_all())
|
self.assertTrue(p.remove_all())
|
||||||
os.remove('./tests/data/clean.docx')
|
os.remove('./tests/data/clean.docx')
|
||||||
|
os.remove('./tests/data/clean.cleaned.docx')
|
||||||
|
|
||||||
def test_policy_keep(self):
|
def test_policy_keep(self):
|
||||||
shutil.copy('./tests/data/embedded.docx', './tests/data/clean.docx')
|
shutil.copy('./tests/data/embedded.docx', './tests/data/clean.docx')
|
||||||
p = office.MSOfficeParser('./tests/data/clean.docx')
|
p = office.MSOfficeParser('./tests/data/clean.docx')
|
||||||
p.unknown_member_policy = 'keep'
|
p.unknown_member_policy = UnknownMemberPolicy.KEEP
|
||||||
self.assertTrue(p.remove_all())
|
self.assertTrue(p.remove_all())
|
||||||
os.remove('./tests/data/clean.docx')
|
os.remove('./tests/data/clean.docx')
|
||||||
|
os.remove('./tests/data/clean.cleaned.docx')
|
||||||
|
|
||||||
def test_policy_unknown(self):
|
def test_policy_unknown(self):
|
||||||
shutil.copy('./tests/data/embedded.docx', './tests/data/clean.docx')
|
shutil.copy('./tests/data/embedded.docx', './tests/data/clean.docx')
|
||||||
p = office.MSOfficeParser('./tests/data/clean.docx')
|
p = office.MSOfficeParser('./tests/data/clean.docx')
|
||||||
p.unknown_member_policy = 'unknown_policy_name_totally_invalid'
|
|
||||||
with self.assertRaises(ValueError):
|
with self.assertRaises(ValueError):
|
||||||
p.remove_all()
|
p.unknown_member_policy = UnknownMemberPolicy('unknown_policy_name_totally_invalid')
|
||||||
os.remove('./tests/data/clean.docx')
|
os.remove('./tests/data/clean.docx')
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user