Add a way to disable the sandbox
Due to bubblewrap's pickiness, mat2 can now be run without a sandbox, even if bubblewrap is installed.
This commit is contained in:
parent
3cef7fe7fc
commit
5f0b3beb46
@ -32,6 +32,7 @@ class AbstractParser(abc.ABC):
|
||||
|
||||
self.output_filename = fname + '.cleaned' + extension
|
||||
self.lightweight_cleaning = False
|
||||
self.sandbox = True
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_meta(self) -> Dict[str, Union[str, dict]]:
|
||||
|
@ -2,10 +2,11 @@ import functools
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
from typing import Dict, Union, Set
|
||||
|
||||
from . import abstract
|
||||
from . import subprocess
|
||||
from . import bubblewrap
|
||||
|
||||
# Make pyflakes happy
|
||||
assert Set
|
||||
@ -19,9 +20,13 @@ class ExiftoolParser(abstract.AbstractParser):
|
||||
meta_allowlist = set() # type: Set[str]
|
||||
|
||||
def get_meta(self) -> Dict[str, Union[str, dict]]:
|
||||
out = subprocess.run([_get_exiftool_path(), '-json', self.filename],
|
||||
input_filename=self.filename,
|
||||
check=True, stdout=subprocess.PIPE).stdout
|
||||
if self.sandbox:
|
||||
out = bubblewrap.run([_get_exiftool_path(), '-json', self.filename],
|
||||
input_filename=self.filename,
|
||||
check=True, stdout=subprocess.PIPE).stdout
|
||||
else:
|
||||
out = subprocess.run([_get_exiftool_path(), '-json', self.filename],
|
||||
check=True, stdout=subprocess.PIPE).stdout
|
||||
meta = json.loads(out.decode('utf-8'))[0]
|
||||
for key in self.meta_allowlist:
|
||||
meta.pop(key, None)
|
||||
@ -48,9 +53,12 @@ class ExiftoolParser(abstract.AbstractParser):
|
||||
'-o', self.output_filename,
|
||||
self.filename]
|
||||
try:
|
||||
subprocess.run(cmd, check=True,
|
||||
input_filename=self.filename,
|
||||
output_filename=self.output_filename)
|
||||
if self.sandbox:
|
||||
bubblewrap.run(cmd, check=True,
|
||||
input_filename=self.filename,
|
||||
output_filename=self.output_filename)
|
||||
else:
|
||||
subprocess.run(cmd, check=True)
|
||||
except subprocess.CalledProcessError as e: # pragma: no cover
|
||||
logging.error("Something went wrong during the processing of %s: %s", self.filename, e)
|
||||
return False
|
||||
|
@ -1,3 +1,4 @@
|
||||
import subprocess
|
||||
import functools
|
||||
import os
|
||||
import logging
|
||||
@ -5,7 +6,7 @@ import logging
|
||||
from typing import Dict, Union
|
||||
|
||||
from . import exiftool
|
||||
from . import subprocess
|
||||
from . import bubblewrap
|
||||
|
||||
|
||||
class AbstractFFmpegParser(exiftool.ExiftoolParser):
|
||||
@ -33,9 +34,12 @@ class AbstractFFmpegParser(exiftool.ExiftoolParser):
|
||||
'-flags:a', '+bitexact', # don't add any metadata
|
||||
self.output_filename]
|
||||
try:
|
||||
subprocess.run(cmd, check=True,
|
||||
input_filename=self.filename,
|
||||
output_filename=self.output_filename)
|
||||
if self.sandbox:
|
||||
bubblewrap.run(cmd, check=True,
|
||||
input_filename=self.filename,
|
||||
output_filename=self.output_filename)
|
||||
else:
|
||||
subprocess.run(cmd, check=True)
|
||||
except subprocess.CalledProcessError as e:
|
||||
logging.error("Something went wrong during the processing of %s: %s", self.filename, e)
|
||||
return False
|
||||
|
14
mat2
14
mat2
@ -55,6 +55,8 @@ def create_arg_parser() -> argparse.ArgumentParser:
|
||||
', '.join(p.value for p in UnknownMemberPolicy))
|
||||
parser.add_argument('--inplace', action='store_true',
|
||||
help='clean in place, without backup')
|
||||
parser.add_argument('--no-sandbox', dest='sandbox', action='store_true',
|
||||
default=False, help='Disable bubblewrap\'s sandboxing.')
|
||||
|
||||
excl_group = parser.add_mutually_exclusive_group()
|
||||
excl_group.add_argument('files', nargs='*', help='the files to process',
|
||||
@ -78,7 +80,7 @@ def create_arg_parser() -> argparse.ArgumentParser:
|
||||
return parser
|
||||
|
||||
|
||||
def show_meta(filename: str):
|
||||
def show_meta(filename: str, sandbox: bool):
|
||||
if not __check_file(filename):
|
||||
return
|
||||
|
||||
@ -86,6 +88,7 @@ def show_meta(filename: str):
|
||||
if p is None:
|
||||
print("[-] %s's format (%s) is not supported" % (filename, mtype))
|
||||
return
|
||||
p.sandbox = sandbox
|
||||
__print_meta(filename, p.get_meta())
|
||||
|
||||
|
||||
@ -116,7 +119,7 @@ def __print_meta(filename: str, metadata: dict, depth: int = 1):
|
||||
print(padding + " %s: harmful content" % k)
|
||||
|
||||
|
||||
def clean_meta(filename: str, is_lightweight: bool, inplace: bool,
|
||||
def clean_meta(filename: str, is_lightweight: bool, inplace: bool, sandbox: bool,
|
||||
policy: UnknownMemberPolicy) -> bool:
|
||||
mode = (os.R_OK | os.W_OK) if inplace else os.R_OK
|
||||
if not __check_file(filename, mode):
|
||||
@ -128,6 +131,7 @@ def clean_meta(filename: str, is_lightweight: bool, inplace: bool,
|
||||
return False
|
||||
p.unknown_member_policy = policy
|
||||
p.lightweight_cleaning = is_lightweight
|
||||
p.sandbox = sandbox
|
||||
|
||||
try:
|
||||
logging.debug('Cleaning %s…', filename)
|
||||
@ -140,7 +144,6 @@ def clean_meta(filename: str, is_lightweight: bool, inplace: bool,
|
||||
return False
|
||||
|
||||
|
||||
|
||||
def show_parsers():
|
||||
print('[+] Supported formats:')
|
||||
formats = set() # Set[str]
|
||||
@ -171,6 +174,7 @@ def __get_files_recursively(files: List[str]) -> List[str]:
|
||||
ret.add(f)
|
||||
return list(ret)
|
||||
|
||||
|
||||
def main() -> int:
|
||||
arg_parser = create_arg_parser()
|
||||
args = arg_parser.parse_args()
|
||||
@ -193,7 +197,7 @@ def main() -> int:
|
||||
|
||||
elif args.show:
|
||||
for f in __get_files_recursively(args.files):
|
||||
show_meta(f)
|
||||
show_meta(f, args.sandbox)
|
||||
return 0
|
||||
|
||||
else:
|
||||
@ -210,7 +214,7 @@ def main() -> int:
|
||||
futures = list()
|
||||
for f in files:
|
||||
future = executor.submit(clean_meta, f, args.lightweight,
|
||||
inplace, policy)
|
||||
inplace, args.sandbox, policy)
|
||||
futures.append(future)
|
||||
for future in concurrent.futures.as_completed(futures):
|
||||
no_failure &= future.result()
|
||||
|
@ -20,17 +20,17 @@ class TestHelp(unittest.TestCase):
|
||||
def test_help(self):
|
||||
proc = subprocess.Popen(mat2_binary + ['--help'], stdout=subprocess.PIPE)
|
||||
stdout, _ = proc.communicate()
|
||||
self.assertIn(b'mat2 [-h] [-V] [--unknown-members policy] [--inplace] [-v] [-l]',
|
||||
self.assertIn(b'mat2 [-h] [-V] [--unknown-members policy] [--inplace] [--no-sandbox]',
|
||||
stdout)
|
||||
self.assertIn(b'[--check-dependencies] [-L | -s]', stdout)
|
||||
self.assertIn(b' [-v] [-l] [--check-dependencies] [-L | -s]', stdout)
|
||||
self.assertIn(b'[files [files ...]]', stdout)
|
||||
|
||||
def test_no_arg(self):
|
||||
proc = subprocess.Popen(mat2_binary, stdout=subprocess.PIPE)
|
||||
stdout, _ = proc.communicate()
|
||||
self.assertIn(b'mat2 [-h] [-V] [--unknown-members policy] [--inplace] [-v] [-l]',
|
||||
self.assertIn(b'mat2 [-h] [-V] [--unknown-members policy] [--inplace] [--no-sandbox]',
|
||||
stdout)
|
||||
self.assertIn(b'[--check-dependencies] [-L | -s]', stdout)
|
||||
self.assertIn(b' [-v] [-l] [--check-dependencies] [-L | -s]', stdout)
|
||||
self.assertIn(b'[files [files ...]]', stdout)
|
||||
|
||||
|
||||
@ -40,12 +40,14 @@ class TestVersion(unittest.TestCase):
|
||||
stdout, _ = proc.communicate()
|
||||
self.assertTrue(stdout.startswith(b'MAT2 '))
|
||||
|
||||
|
||||
class TestDependencies(unittest.TestCase):
|
||||
def test_dependencies(self):
|
||||
proc = subprocess.Popen(mat2_binary + ['--check-dependencies'], stdout=subprocess.PIPE)
|
||||
stdout, _ = proc.communicate()
|
||||
self.assertTrue(b'MAT2' in stdout)
|
||||
|
||||
|
||||
class TestReturnValue(unittest.TestCase):
|
||||
def test_nonzero(self):
|
||||
ret = subprocess.call(mat2_binary + ['mat2'], stdout=subprocess.DEVNULL)
|
||||
@ -112,6 +114,25 @@ class TestCleanMeta(unittest.TestCase):
|
||||
|
||||
os.remove('./tests/data/clean.jpg')
|
||||
|
||||
def test_jpg_nosandbox(self):
|
||||
shutil.copy('./tests/data/dirty.jpg', './tests/data/clean.jpg')
|
||||
|
||||
proc = subprocess.Popen(mat2_binary + ['--show', '--no-sandbox', './tests/data/clean.jpg'],
|
||||
stdout=subprocess.PIPE)
|
||||
stdout, _ = proc.communicate()
|
||||
self.assertIn(b'Comment: Created with GIMP', stdout)
|
||||
|
||||
proc = subprocess.Popen(mat2_binary + ['./tests/data/clean.jpg'],
|
||||
stdout=subprocess.PIPE)
|
||||
stdout, _ = proc.communicate()
|
||||
|
||||
proc = subprocess.Popen(mat2_binary + ['--show', './tests/data/clean.cleaned.jpg'],
|
||||
stdout=subprocess.PIPE)
|
||||
stdout, _ = proc.communicate()
|
||||
self.assertNotIn(b'Comment: Created with GIMP', stdout)
|
||||
|
||||
os.remove('./tests/data/clean.jpg')
|
||||
|
||||
|
||||
class TestIsSupported(unittest.TestCase):
|
||||
def test_pdf(self):
|
||||
@ -181,6 +202,7 @@ class TestGetMeta(unittest.TestCase):
|
||||
self.assertIn(b'i am a : various comment', stdout)
|
||||
self.assertIn(b'artist: jvoisin', stdout)
|
||||
|
||||
|
||||
class TestControlCharInjection(unittest.TestCase):
|
||||
def test_jpg(self):
|
||||
proc = subprocess.Popen(mat2_binary + ['--show', './tests/data/control_chars.jpg'],
|
||||
@ -242,6 +264,7 @@ class TestCommandLineParallel(unittest.TestCase):
|
||||
os.remove(path)
|
||||
os.remove('./tests/data/dirty_%d.docx' % i)
|
||||
|
||||
|
||||
class TestInplaceCleaning(unittest.TestCase):
|
||||
def test_cleaning(self):
|
||||
shutil.copy('./tests/data/dirty.jpg', './tests/data/clean.jpg')
|
||||
|
@ -721,3 +721,43 @@ class TestCleaningArchives(unittest.TestCase):
|
||||
os.remove('./tests/data/dirty.tar.xz')
|
||||
os.remove('./tests/data/dirty.cleaned.tar.xz')
|
||||
os.remove('./tests/data/dirty.cleaned.cleaned.tar.xz')
|
||||
|
||||
class TestNoSandbox(unittest.TestCase):
|
||||
def test_avi_nosandbox(self):
|
||||
shutil.copy('./tests/data/dirty.avi', './tests/data/clean.avi')
|
||||
p = video.AVIParser('./tests/data/clean.avi')
|
||||
p.sandbox = False
|
||||
|
||||
meta = p.get_meta()
|
||||
self.assertEqual(meta['Software'], 'MEncoder SVN-r33148-4.0.1')
|
||||
|
||||
ret = p.remove_all()
|
||||
self.assertTrue(ret)
|
||||
|
||||
p = video.AVIParser('./tests/data/clean.cleaned.avi')
|
||||
self.assertEqual(p.get_meta(), {})
|
||||
self.assertTrue(p.remove_all())
|
||||
|
||||
os.remove('./tests/data/clean.avi')
|
||||
os.remove('./tests/data/clean.cleaned.avi')
|
||||
os.remove('./tests/data/clean.cleaned.cleaned.avi')
|
||||
|
||||
def test_png_nosandbox(self):
|
||||
shutil.copy('./tests/data/dirty.png', './tests/data/clean.png')
|
||||
p = images.PNGParser('./tests/data/clean.png')
|
||||
p.sandbox = False
|
||||
p.lightweight_cleaning = True
|
||||
|
||||
meta = p.get_meta()
|
||||
self.assertEqual(meta['Comment'], 'This is a comment, be careful!')
|
||||
|
||||
ret = p.remove_all()
|
||||
self.assertTrue(ret)
|
||||
|
||||
p = images.PNGParser('./tests/data/clean.cleaned.png')
|
||||
self.assertEqual(p.get_meta(), {})
|
||||
self.assertTrue(p.remove_all())
|
||||
|
||||
os.remove('./tests/data/clean.png')
|
||||
os.remove('./tests/data/clean.cleaned.png')
|
||||
os.remove('./tests/data/clean.cleaned.cleaned.png')
|
||||
|
Loading…
Reference in New Issue
Block a user