diff --git a/libmat2/abstract.py b/libmat2/abstract.py index 8861966..5cfd0f2 100644 --- a/libmat2/abstract.py +++ b/libmat2/abstract.py @@ -32,6 +32,7 @@ class AbstractParser(abc.ABC): self.output_filename = fname + '.cleaned' + extension self.lightweight_cleaning = False + self.sandbox = True @abc.abstractmethod def get_meta(self) -> Dict[str, Union[str, dict]]: diff --git a/libmat2/subprocess.py b/libmat2/bubblewrap.py similarity index 100% rename from libmat2/subprocess.py rename to libmat2/bubblewrap.py diff --git a/libmat2/exiftool.py b/libmat2/exiftool.py index 024f490..89081e2 100644 --- a/libmat2/exiftool.py +++ b/libmat2/exiftool.py @@ -2,10 +2,11 @@ import functools import json import logging import os +import subprocess from typing import Dict, Union, Set from . import abstract -from . import subprocess +from . import bubblewrap # Make pyflakes happy assert Set @@ -19,9 +20,13 @@ class ExiftoolParser(abstract.AbstractParser): meta_allowlist = set() # type: Set[str] def get_meta(self) -> Dict[str, Union[str, dict]]: - out = subprocess.run([_get_exiftool_path(), '-json', self.filename], - input_filename=self.filename, - check=True, stdout=subprocess.PIPE).stdout + if self.sandbox: + out = bubblewrap.run([_get_exiftool_path(), '-json', self.filename], + input_filename=self.filename, + check=True, stdout=subprocess.PIPE).stdout + else: + out = subprocess.run([_get_exiftool_path(), '-json', self.filename], + check=True, stdout=subprocess.PIPE).stdout meta = json.loads(out.decode('utf-8'))[0] for key in self.meta_allowlist: meta.pop(key, None) @@ -48,9 +53,12 @@ class ExiftoolParser(abstract.AbstractParser): '-o', self.output_filename, self.filename] try: - subprocess.run(cmd, check=True, - input_filename=self.filename, - output_filename=self.output_filename) + if self.sandbox: + bubblewrap.run(cmd, check=True, + input_filename=self.filename, + output_filename=self.output_filename) + else: + subprocess.run(cmd, check=True) except subprocess.CalledProcessError as e: # pragma: no cover logging.error("Something went wrong during the processing of %s: %s", self.filename, e) return False diff --git a/libmat2/video.py b/libmat2/video.py index 1492ba1..2b33bc0 100644 --- a/libmat2/video.py +++ b/libmat2/video.py @@ -1,3 +1,4 @@ +import subprocess import functools import os import logging @@ -5,7 +6,7 @@ import logging from typing import Dict, Union from . import exiftool -from . import subprocess +from . import bubblewrap class AbstractFFmpegParser(exiftool.ExiftoolParser): @@ -33,9 +34,12 @@ class AbstractFFmpegParser(exiftool.ExiftoolParser): '-flags:a', '+bitexact', # don't add any metadata self.output_filename] try: - subprocess.run(cmd, check=True, - input_filename=self.filename, - output_filename=self.output_filename) + if self.sandbox: + bubblewrap.run(cmd, check=True, + input_filename=self.filename, + output_filename=self.output_filename) + else: + subprocess.run(cmd, check=True) except subprocess.CalledProcessError as e: logging.error("Something went wrong during the processing of %s: %s", self.filename, e) return False diff --git a/mat2 b/mat2 index b9f02f2..e67fea0 100755 --- a/mat2 +++ b/mat2 @@ -55,6 +55,8 @@ def create_arg_parser() -> argparse.ArgumentParser: ', '.join(p.value for p in UnknownMemberPolicy)) parser.add_argument('--inplace', action='store_true', help='clean in place, without backup') + parser.add_argument('--no-sandbox', dest='sandbox', action='store_true', + default=False, help='Disable bubblewrap\'s sandboxing.') excl_group = parser.add_mutually_exclusive_group() excl_group.add_argument('files', nargs='*', help='the files to process', @@ -78,7 +80,7 @@ def create_arg_parser() -> argparse.ArgumentParser: return parser -def show_meta(filename: str): +def show_meta(filename: str, sandbox: bool): if not __check_file(filename): return @@ -86,6 +88,7 @@ def show_meta(filename: str): if p is None: print("[-] %s's format (%s) is not supported" % (filename, mtype)) return + p.sandbox = sandbox __print_meta(filename, p.get_meta()) @@ -116,7 +119,7 @@ def __print_meta(filename: str, metadata: dict, depth: int = 1): print(padding + " %s: harmful content" % k) -def clean_meta(filename: str, is_lightweight: bool, inplace: bool, +def clean_meta(filename: str, is_lightweight: bool, inplace: bool, sandbox: bool, policy: UnknownMemberPolicy) -> bool: mode = (os.R_OK | os.W_OK) if inplace else os.R_OK if not __check_file(filename, mode): @@ -128,6 +131,7 @@ def clean_meta(filename: str, is_lightweight: bool, inplace: bool, return False p.unknown_member_policy = policy p.lightweight_cleaning = is_lightweight + p.sandbox = sandbox try: logging.debug('Cleaning %s…', filename) @@ -140,7 +144,6 @@ def clean_meta(filename: str, is_lightweight: bool, inplace: bool, return False - def show_parsers(): print('[+] Supported formats:') formats = set() # Set[str] @@ -171,6 +174,7 @@ def __get_files_recursively(files: List[str]) -> List[str]: ret.add(f) return list(ret) + def main() -> int: arg_parser = create_arg_parser() args = arg_parser.parse_args() @@ -193,7 +197,7 @@ def main() -> int: elif args.show: for f in __get_files_recursively(args.files): - show_meta(f) + show_meta(f, args.sandbox) return 0 else: @@ -210,7 +214,7 @@ def main() -> int: futures = list() for f in files: future = executor.submit(clean_meta, f, args.lightweight, - inplace, policy) + inplace, args.sandbox, policy) futures.append(future) for future in concurrent.futures.as_completed(futures): no_failure &= future.result() diff --git a/tests/test_climat2.py b/tests/test_climat2.py index 6cf8a39..9d816b1 100644 --- a/tests/test_climat2.py +++ b/tests/test_climat2.py @@ -20,17 +20,17 @@ class TestHelp(unittest.TestCase): def test_help(self): proc = subprocess.Popen(mat2_binary + ['--help'], stdout=subprocess.PIPE) stdout, _ = proc.communicate() - self.assertIn(b'mat2 [-h] [-V] [--unknown-members policy] [--inplace] [-v] [-l]', + self.assertIn(b'mat2 [-h] [-V] [--unknown-members policy] [--inplace] [--no-sandbox]', stdout) - self.assertIn(b'[--check-dependencies] [-L | -s]', stdout) + self.assertIn(b' [-v] [-l] [--check-dependencies] [-L | -s]', stdout) self.assertIn(b'[files [files ...]]', stdout) def test_no_arg(self): proc = subprocess.Popen(mat2_binary, stdout=subprocess.PIPE) stdout, _ = proc.communicate() - self.assertIn(b'mat2 [-h] [-V] [--unknown-members policy] [--inplace] [-v] [-l]', + self.assertIn(b'mat2 [-h] [-V] [--unknown-members policy] [--inplace] [--no-sandbox]', stdout) - self.assertIn(b'[--check-dependencies] [-L | -s]', stdout) + self.assertIn(b' [-v] [-l] [--check-dependencies] [-L | -s]', stdout) self.assertIn(b'[files [files ...]]', stdout) @@ -40,12 +40,14 @@ class TestVersion(unittest.TestCase): stdout, _ = proc.communicate() self.assertTrue(stdout.startswith(b'MAT2 ')) + class TestDependencies(unittest.TestCase): def test_dependencies(self): proc = subprocess.Popen(mat2_binary + ['--check-dependencies'], stdout=subprocess.PIPE) stdout, _ = proc.communicate() self.assertTrue(b'MAT2' in stdout) + class TestReturnValue(unittest.TestCase): def test_nonzero(self): ret = subprocess.call(mat2_binary + ['mat2'], stdout=subprocess.DEVNULL) @@ -112,6 +114,25 @@ class TestCleanMeta(unittest.TestCase): os.remove('./tests/data/clean.jpg') + def test_jpg_nosandbox(self): + shutil.copy('./tests/data/dirty.jpg', './tests/data/clean.jpg') + + proc = subprocess.Popen(mat2_binary + ['--show', '--no-sandbox', './tests/data/clean.jpg'], + stdout=subprocess.PIPE) + stdout, _ = proc.communicate() + self.assertIn(b'Comment: Created with GIMP', stdout) + + proc = subprocess.Popen(mat2_binary + ['./tests/data/clean.jpg'], + stdout=subprocess.PIPE) + stdout, _ = proc.communicate() + + proc = subprocess.Popen(mat2_binary + ['--show', './tests/data/clean.cleaned.jpg'], + stdout=subprocess.PIPE) + stdout, _ = proc.communicate() + self.assertNotIn(b'Comment: Created with GIMP', stdout) + + os.remove('./tests/data/clean.jpg') + class TestIsSupported(unittest.TestCase): def test_pdf(self): @@ -181,6 +202,7 @@ class TestGetMeta(unittest.TestCase): self.assertIn(b'i am a : various comment', stdout) self.assertIn(b'artist: jvoisin', stdout) + class TestControlCharInjection(unittest.TestCase): def test_jpg(self): proc = subprocess.Popen(mat2_binary + ['--show', './tests/data/control_chars.jpg'], @@ -242,6 +264,7 @@ class TestCommandLineParallel(unittest.TestCase): os.remove(path) os.remove('./tests/data/dirty_%d.docx' % i) + class TestInplaceCleaning(unittest.TestCase): def test_cleaning(self): shutil.copy('./tests/data/dirty.jpg', './tests/data/clean.jpg') diff --git a/tests/test_libmat2.py b/tests/test_libmat2.py index 13d861d..20e6a01 100644 --- a/tests/test_libmat2.py +++ b/tests/test_libmat2.py @@ -721,3 +721,43 @@ class TestCleaningArchives(unittest.TestCase): os.remove('./tests/data/dirty.tar.xz') os.remove('./tests/data/dirty.cleaned.tar.xz') os.remove('./tests/data/dirty.cleaned.cleaned.tar.xz') + +class TestNoSandbox(unittest.TestCase): + def test_avi_nosandbox(self): + shutil.copy('./tests/data/dirty.avi', './tests/data/clean.avi') + p = video.AVIParser('./tests/data/clean.avi') + p.sandbox = False + + meta = p.get_meta() + self.assertEqual(meta['Software'], 'MEncoder SVN-r33148-4.0.1') + + ret = p.remove_all() + self.assertTrue(ret) + + p = video.AVIParser('./tests/data/clean.cleaned.avi') + self.assertEqual(p.get_meta(), {}) + self.assertTrue(p.remove_all()) + + os.remove('./tests/data/clean.avi') + os.remove('./tests/data/clean.cleaned.avi') + os.remove('./tests/data/clean.cleaned.cleaned.avi') + + def test_png_nosandbox(self): + shutil.copy('./tests/data/dirty.png', './tests/data/clean.png') + p = images.PNGParser('./tests/data/clean.png') + p.sandbox = False + p.lightweight_cleaning = True + + meta = p.get_meta() + self.assertEqual(meta['Comment'], 'This is a comment, be careful!') + + ret = p.remove_all() + self.assertTrue(ret) + + p = images.PNGParser('./tests/data/clean.cleaned.png') + self.assertEqual(p.get_meta(), {}) + self.assertTrue(p.remove_all()) + + os.remove('./tests/data/clean.png') + os.remove('./tests/data/clean.cleaned.png') + os.remove('./tests/data/clean.cleaned.cleaned.png')