From 40669186c937a36fa73c16d5bba0f343005f398d Mon Sep 17 00:00:00 2001 From: jvoisin Date: Sat, 31 Aug 2019 10:31:08 -0700 Subject: [PATCH] Add support for inplace cleaning --- mat2 | 17 +++++++++++++---- tests/test_climat2.py | 35 +++++++++++++++++++++++++++++++++-- 2 files changed, 46 insertions(+), 6 deletions(-) diff --git a/mat2 b/mat2 index 70712b8..b9f02f2 100755 --- a/mat2 +++ b/mat2 @@ -53,6 +53,8 @@ def create_arg_parser() -> argparse.ArgumentParser: help='how to handle unknown members of archive-style ' 'files (policy should be one of: %s) [Default: abort]' % ', '.join(p.value for p in UnknownMemberPolicy)) + parser.add_argument('--inplace', action='store_true', + help='clean in place, without backup') excl_group = parser.add_mutually_exclusive_group() excl_group.add_argument('files', nargs='*', help='the files to process', @@ -114,8 +116,10 @@ def __print_meta(filename: str, metadata: dict, depth: int = 1): print(padding + " %s: harmful content" % k) -def clean_meta(filename: str, is_lightweight: bool, policy: UnknownMemberPolicy) -> bool: - if not __check_file(filename, os.R_OK): +def clean_meta(filename: str, is_lightweight: bool, inplace: bool, + policy: UnknownMemberPolicy) -> bool: + mode = (os.R_OK | os.W_OK) if inplace else os.R_OK + if not __check_file(filename, mode): return False p, mtype = parser_factory.get_parser(filename) # type: ignore @@ -127,7 +131,10 @@ def clean_meta(filename: str, is_lightweight: bool, policy: UnknownMemberPolicy) try: logging.debug('Cleaning %s…', filename) - return p.remove_all() + ret = p.remove_all() + if inplace is True: + os.rename(p.output_filename, filename) + return ret except RuntimeError as e: print("[-] %s can't be cleaned: %s" % (filename, e)) return False @@ -190,6 +197,7 @@ def main() -> int: return 0 else: + inplace = args.inplace policy = UnknownMemberPolicy(args.unknown_members) if policy == UnknownMemberPolicy.KEEP: logging.warning('Keeping unknown member files may leak metadata in the resulting file!') @@ -201,7 +209,8 @@ def main() -> int: with concurrent.futures.ProcessPoolExecutor() as executor: futures = list() for f in files: - future = executor.submit(clean_meta, f, args.lightweight, policy) + future = executor.submit(clean_meta, f, args.lightweight, + inplace, policy) futures.append(future) for future in concurrent.futures.as_completed(futures): no_failure &= future.result() diff --git a/tests/test_climat2.py b/tests/test_climat2.py index bbb9c06..6cf8a39 100644 --- a/tests/test_climat2.py +++ b/tests/test_climat2.py @@ -20,7 +20,7 @@ class TestHelp(unittest.TestCase): def test_help(self): proc = subprocess.Popen(mat2_binary + ['--help'], stdout=subprocess.PIPE) stdout, _ = proc.communicate() - self.assertIn(b'mat2 [-h] [-V] [--unknown-members policy] [-v] [-l]', + self.assertIn(b'mat2 [-h] [-V] [--unknown-members policy] [--inplace] [-v] [-l]', stdout) self.assertIn(b'[--check-dependencies] [-L | -s]', stdout) self.assertIn(b'[files [files ...]]', stdout) @@ -28,7 +28,7 @@ class TestHelp(unittest.TestCase): def test_no_arg(self): proc = subprocess.Popen(mat2_binary, stdout=subprocess.PIPE) stdout, _ = proc.communicate() - self.assertIn(b'mat2 [-h] [-V] [--unknown-members policy] [-v] [-l]', + self.assertIn(b'mat2 [-h] [-V] [--unknown-members policy] [--inplace] [-v] [-l]', stdout) self.assertIn(b'[--check-dependencies] [-L | -s]', stdout) self.assertIn(b'[files [files ...]]', stdout) @@ -241,3 +241,34 @@ class TestCommandLineParallel(unittest.TestCase): os.remove('./tests/data/dirty_%d.cleaned.jpg' % i) os.remove(path) os.remove('./tests/data/dirty_%d.docx' % i) + +class TestInplaceCleaning(unittest.TestCase): + def test_cleaning(self): + shutil.copy('./tests/data/dirty.jpg', './tests/data/clean.jpg') + proc = subprocess.Popen(mat2_binary + ['--inplace', './tests/data/clean.jpg'], + stdout=subprocess.PIPE) + stdout, _ = proc.communicate() + proc = subprocess.Popen(mat2_binary + ['--show', './tests/data/clean.jpg'], + stdout=subprocess.PIPE) + stdout, _ = proc.communicate() + self.assertIn(b' No metadata found in ./tests/data/clean.jpg.\n', stdout) + os.remove('./tests/data/clean.jpg') + + def test_cleaning_multiple_one_fails(self): + files = ['./tests/data/clean_%d.jpg' % i for i in range(9)] + for f in files: + shutil.copy('./tests/data/dirty.jpg', f) + shutil.copy('./tests/data/dirty.torrent', './tests/data/clean_9.jpg') + + proc = subprocess.Popen(mat2_binary + ['--inplace'] + files, + stdout=subprocess.PIPE) + stdout, _ = proc.communicate() + + for f in files: + p = images.JPGParser(f) + meta = p.get_meta() + self.assertEqual(meta, {}) + + for i in range(10): + os.remove('./tests/data/clean_%d.jpg' % i) +