1
0
mirror of synced 2024-11-25 10:44:23 +01:00

Improve problematic filenames support

This commit is contained in:
jvoisin 2018-10-22 16:45:30 +02:00
parent 5bc88faedf
commit 44f267a596
3 changed files with 32 additions and 22 deletions

View File

@ -5,7 +5,7 @@ import shutil
import subprocess import subprocess
import tempfile import tempfile
from typing import Dict, Union, Set from typing import Dict, Union, Set, Callable, Any
from . import abstract from . import abstract
@ -20,27 +20,23 @@ class ExiftoolParser(abstract.AbstractParser):
""" """
meta_whitelist = set() # type: Set[str] meta_whitelist = set() # type: Set[str]
@staticmethod def _handle_problematic_filename(self, callback: Callable[[str], Any]) -> bytes:
def __handle_problematic_filename(filename: str, callback) -> bytes: """ This method takes a filename with a potentially problematic name,
""" This method takes a filename with a problematic name, and safely applies a `callback` to it.
and safely applies it a `callback`.""" """
if re.search('^[a-z0-9/]', self.filename) is not None:
return callback(self.filename)
tmpdirname = tempfile.mkdtemp() tmpdirname = tempfile.mkdtemp()
fname = os.path.join(tmpdirname, "temp_file") fname = os.path.join(tmpdirname, "temp_file")
shutil.copy(filename, fname) shutil.copy(self.filename, fname)
out = callback(fname) out = callback(fname)
shutil.rmtree(tmpdirname) shutil.rmtree(tmpdirname)
return out return out
def get_meta(self) -> Dict[str, Union[str, dict]]: def get_meta(self) -> Dict[str, Union[str, dict]]:
""" There is no way to escape the leading(s) dash(es) of the current
self.filename to prevent parameter injections, so we need to take care
of this.
"""
fun = lambda f: subprocess.check_output([_get_exiftool_path(), '-json', f]) fun = lambda f: subprocess.check_output([_get_exiftool_path(), '-json', f])
if re.search('^[a-z0-9/]', self.filename) is None: out = self._handle_problematic_filename(fun)
out = self.__handle_problematic_filename(self.filename, fun)
else:
out = fun(self.filename)
meta = json.loads(out.decode('utf-8'))[0] meta = json.loads(out.decode('utf-8'))[0]
for key in self.meta_whitelist: for key in self.meta_whitelist:
meta.pop(key, None) meta.pop(key, None)

View File

@ -1,5 +1,6 @@
import os import os
import subprocess import subprocess
import logging
from . import exiftool from . import exiftool
@ -23,13 +24,10 @@ class AVIParser(exiftool.ExiftoolParser):
'SampleRate', 'AvgBytesPerSec', 'BitsPerSample', 'SampleRate', 'AvgBytesPerSec', 'BitsPerSample',
'Duration', 'ImageSize', 'Megapixels'} 'Duration', 'ImageSize', 'Megapixels'}
def remove_all(self) -> bool:
""" def __remove_all_internal(self, filename: str):
TODO: handle problematic filenames starting with `-` and `--`,
check exiftool.py
"""
cmd = [_get_ffmpeg_path(), cmd = [_get_ffmpeg_path(),
'-i', self.filename, # input file '-i', filename, # input file
'-y', # overwrite existing output file '-y', # overwrite existing output file
'-loglevel', 'panic', # Don't show log '-loglevel', 'panic', # Don't show log
'-hide_banner', # hide the banner '-hide_banner', # hide the banner
@ -40,10 +38,13 @@ class AVIParser(exiftool.ExiftoolParser):
'-flags:v', '+bitexact', # don't add any metadata '-flags:v', '+bitexact', # don't add any metadata
'-flags:a', '+bitexact', # don't add any metadata '-flags:a', '+bitexact', # don't add any metadata
self.output_filename] self.output_filename]
subprocess.check_call(cmd)
def remove_all(self) -> bool:
try: try:
subprocess.check_call(cmd) self._handle_problematic_filename(self.__remove_all_internal)
except subprocess.CalledProcessError: except subprocess.CalledProcessError as e:
logging.error("Something went wrong during the processing of %s: %s", self.filename, e)
return False return False
return True return True

View File

@ -37,6 +37,19 @@ class TestParameterInjection(unittest.TestCase):
self.assertEqual(meta['ModifyDate'], "2018:03:20 21:59:25") self.assertEqual(meta['ModifyDate'], "2018:03:20 21:59:25")
os.remove('-ver') os.remove('-ver')
def test_ffmpeg_injection(self):
try:
video._get_ffmpeg_path()
except RuntimeError:
raise unittest.SkipTest
shutil.copy('./tests/data/dirty.avi', './--output')
p = video.AVIParser('--output')
meta = p.get_meta()
print(meta)
self.assertEqual(meta['Software'], 'MEncoder SVN-r33148-4.0.1')
os.remove('--output')
class TestUnsupportedEmbeddedFiles(unittest.TestCase): class TestUnsupportedEmbeddedFiles(unittest.TestCase):
def test_odt_with_svg(self): def test_odt_with_svg(self):