diff --git a/libmat2/abstract.py b/libmat2/abstract.py index 414a68b..9b510f6 100644 --- a/libmat2/abstract.py +++ b/libmat2/abstract.py @@ -1,5 +1,6 @@ import abc import os +import re from typing import Set, Dict, Union assert Set # make pyflakes happy @@ -17,6 +18,11 @@ class AbstractParser(abc.ABC): """ :raises ValueError: Raised upon an invalid file """ + if re.search('^[a-z0-9./]', filename) is None: + # Some parsers are calling external binaries, + # this prevents shell command injections + filename = os.path.join('.', filename) + self.filename = filename fname, extension = os.path.splitext(filename) self.output_filename = fname + '.cleaned' + extension diff --git a/libmat2/exiftool.py b/libmat2/exiftool.py index 331ae0c..11dd36d 100644 --- a/libmat2/exiftool.py +++ b/libmat2/exiftool.py @@ -1,11 +1,7 @@ import json import os -import re -import shutil import subprocess -import tempfile - -from typing import Dict, Union, Set, Callable, Any +from typing import Dict, Union, Set from . import abstract @@ -20,23 +16,8 @@ class ExiftoolParser(abstract.AbstractParser): """ meta_whitelist = set() # type: Set[str] - def _handle_problematic_filename(self, callback: Callable[[str], Any]) -> bytes: - """ This method takes a filename with a potentially problematic name, - and safely applies a `callback` to it. - """ - if re.search('^[a-z0-9/]', self.filename) is not None: - return callback(self.filename) - - tmpdirname = tempfile.mkdtemp() - fname = os.path.join(tmpdirname, "temp_file") - shutil.copy(self.filename, fname) - out = callback(fname) - shutil.rmtree(tmpdirname) - return out - def get_meta(self) -> Dict[str, Union[str, dict]]: - fun = lambda f: subprocess.check_output([_get_exiftool_path(), '-json', f]) - out = self._handle_problematic_filename(fun) + out = subprocess.check_output([_get_exiftool_path(), '-json', self.filename]) meta = json.loads(out.decode('utf-8'))[0] for key in self.meta_whitelist: meta.pop(key, None) diff --git a/libmat2/video.py b/libmat2/video.py index 2fa65e8..fe2a1af 100644 --- a/libmat2/video.py +++ b/libmat2/video.py @@ -24,10 +24,9 @@ class AVIParser(exiftool.ExiftoolParser): 'SampleRate', 'AvgBytesPerSec', 'BitsPerSample', 'Duration', 'ImageSize', 'Megapixels'} - - def __remove_all_internal(self, filename: str): + def remove_all(self): cmd = [_get_ffmpeg_path(), - '-i', filename, # input file + '-i', self.filename, # input file '-y', # overwrite existing output file '-loglevel', 'panic', # Don't show log '-hide_banner', # hide the banner @@ -38,11 +37,8 @@ class AVIParser(exiftool.ExiftoolParser): '-flags:v', '+bitexact', # don't add any metadata '-flags:a', '+bitexact', # don't add any metadata self.output_filename] - subprocess.check_call(cmd) - - def remove_all(self) -> bool: try: - self._handle_problematic_filename(self.__remove_all_internal) + subprocess.check_call(cmd) except subprocess.CalledProcessError as e: logging.error("Something went wrong during the processing of %s: %s", self.filename, e) return False diff --git a/tests/test_corrupted_files.py b/tests/test_corrupted_files.py index 490ea42..82c6c3b 100644 --- a/tests/test_corrupted_files.py +++ b/tests/test_corrupted_files.py @@ -204,3 +204,14 @@ class TestCorruptedFiles(unittest.TestCase): p = video.AVIParser('./tests/data/clean.avi') self.assertFalse(p.remove_all()) os.remove('./tests/data/clean.avi') + + def test_avi_injection(self): + try: + video._get_ffmpeg_path() + except RuntimeError: + raise unittest.SkipTest + + shutil.copy('./tests/data/dirty.torrent', './tests/data/--output.avi') + p = video.AVIParser('./tests/data/--output.avi') + self.assertFalse(p.remove_all()) + os.remove('./tests/data/--output.avi') diff --git a/tests/test_libmat2.py b/tests/test_libmat2.py index 241c6eb..f5fc9e8 100644 --- a/tests/test_libmat2.py +++ b/tests/test_libmat2.py @@ -46,10 +46,23 @@ class TestParameterInjection(unittest.TestCase): shutil.copy('./tests/data/dirty.avi', './--output') p = video.AVIParser('--output') meta = p.get_meta() - print(meta) self.assertEqual(meta['Software'], 'MEncoder SVN-r33148-4.0.1') os.remove('--output') + def test_ffmpeg_injection_complete_path(self): + try: + video._get_ffmpeg_path() + except RuntimeError: + raise unittest.SkipTest + + shutil.copy('./tests/data/dirty.avi', './tests/data/ --output.avi') + p = video.AVIParser('./tests/data/ --output.avi') + meta = p.get_meta() + self.assertEqual(meta['Software'], 'MEncoder SVN-r33148-4.0.1') + self.assertTrue(p.remove_all()) + os.remove('./tests/data/ --output.avi') + os.remove('./tests/data/ --output.cleaned.avi') + class TestUnsupportedEmbeddedFiles(unittest.TestCase): def test_odt_with_svg(self):