parent
8b5d0c286c
commit
e8c1bb0e3c
@ -42,6 +42,17 @@ tests:debian:
|
||||
script:
|
||||
- apt-get -qqy update
|
||||
- apt-get -qqy install --no-install-recommends python3-mutagen python3-gi-cairo gir1.2-poppler-0.18 gir1.2-gdkpixbuf-2.0 libimage-exiftool-perl python3-coverage ffmpeg
|
||||
- apt-get -qqy purge bubblewrap
|
||||
- python3-coverage run --branch -m unittest discover -s tests/
|
||||
- python3-coverage report --fail-under=90 -m --include 'libmat2/*'
|
||||
|
||||
tests:debian_with_bubblewrap:
|
||||
stage: test
|
||||
tags:
|
||||
- whitewhale
|
||||
script:
|
||||
- apt-get -qqy update
|
||||
- apt-get -qqy install --no-install-recommends python3-mutagen python3-gi-cairo gir1.2-poppler-0.18 gir1.2-gdkpixbuf-2.0 libimage-exiftool-perl python3-coverage ffmpeg bubblewrap
|
||||
- python3-coverage run --branch -m unittest discover -s tests/
|
||||
- python3-coverage report --fail-under=100 -m --include 'libmat2/*'
|
||||
|
||||
|
@ -9,9 +9,13 @@ installed like this:
|
||||
pip3 install mat2
|
||||
```
|
||||
|
||||
|
||||
# GNU/Linux
|
||||
|
||||
## Optional dependencies
|
||||
|
||||
When [bubblewrap](https://github.com/projectatomic/bubblewrap) is
|
||||
installed, MAT2 uses it to sandbox any external processes it invokes.
|
||||
|
||||
## Fedora
|
||||
|
||||
Thanks to [atenart](https://ack.tf/), there is a package available on
|
||||
|
@ -39,12 +39,11 @@ DEPENDENCIES = {
|
||||
}
|
||||
|
||||
|
||||
|
||||
def check_dependencies() -> Dict[str, bool]:
|
||||
ret = collections.defaultdict(bool) # type: Dict[str, bool]
|
||||
|
||||
ret['Exiftool'] = True if exiftool._get_exiftool_path() else False
|
||||
ret['Ffmpeg'] = True if video._get_ffmpeg_path() else False
|
||||
ret['Exiftool'] = bool(exiftool._get_exiftool_path())
|
||||
ret['Ffmpeg'] = bool(video._get_ffmpeg_path())
|
||||
|
||||
for key, value in DEPENDENCIES.items():
|
||||
ret[value] = True
|
||||
@ -55,6 +54,7 @@ def check_dependencies() -> Dict[str, bool]:
|
||||
|
||||
return ret
|
||||
|
||||
|
||||
@enum.unique
|
||||
class UnknownMemberPolicy(enum.Enum):
|
||||
ABORT = 'abort'
|
||||
|
@ -37,4 +37,5 @@ class AbstractParser(abc.ABC):
|
||||
"""
|
||||
:raises RuntimeError: Raised if the cleaning process went wrong.
|
||||
"""
|
||||
# pylint: disable=unnecessary-pass
|
||||
pass # pragma: no cover
|
||||
|
@ -132,7 +132,7 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser):
|
||||
logging.warning("In file %s, keeping unknown element %s (format: %s)",
|
||||
self.filename, item.filename, mtype)
|
||||
else:
|
||||
logging.error("In file %s, element %s's format (%s) " +
|
||||
logging.error("In file %s, element %s's format (%s) " \
|
||||
"isn't supported",
|
||||
self.filename, item.filename, mtype)
|
||||
abort = True
|
||||
|
@ -1,10 +1,10 @@
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
from typing import Dict, Union, Set
|
||||
|
||||
from . import abstract
|
||||
from . import subprocess
|
||||
|
||||
# Make pyflakes happy
|
||||
assert Set
|
||||
@ -18,7 +18,9 @@ class ExiftoolParser(abstract.AbstractParser):
|
||||
meta_whitelist = set() # type: Set[str]
|
||||
|
||||
def get_meta(self) -> Dict[str, Union[str, dict]]:
|
||||
out = subprocess.check_output([_get_exiftool_path(), '-json', self.filename])
|
||||
out = subprocess.run([_get_exiftool_path(), '-json', self.filename],
|
||||
input_filename=self.filename,
|
||||
check=True, stdout=subprocess.PIPE).stdout
|
||||
meta = json.loads(out.decode('utf-8'))[0]
|
||||
for key in self.meta_whitelist:
|
||||
meta.pop(key, None)
|
||||
@ -46,7 +48,9 @@ class ExiftoolParser(abstract.AbstractParser):
|
||||
'-o', self.output_filename,
|
||||
self.filename]
|
||||
try:
|
||||
subprocess.check_call(cmd)
|
||||
subprocess.run(cmd, check=True,
|
||||
input_filename=self.filename,
|
||||
output_filename=self.output_filename)
|
||||
except subprocess.CalledProcessError as e: # pragma: no cover
|
||||
logging.error("Something went wrong during the processing of %s: %s", self.filename, e)
|
||||
return False
|
||||
|
@ -266,7 +266,6 @@ class MSOfficeParser(ArchiveBasedAbstractParser):
|
||||
f.write(b'<cp:coreProperties xmlns:cp="http://schemas.openxmlformats.org/package/2006/metadata/core-properties">')
|
||||
f.write(b'</cp:coreProperties>')
|
||||
|
||||
|
||||
if self.__remove_rsid(full_path) is False:
|
||||
return False
|
||||
|
||||
|
@ -10,6 +10,7 @@ assert Tuple # make pyflakes happy
|
||||
|
||||
T = TypeVar('T', bound='abstract.AbstractParser')
|
||||
|
||||
|
||||
def __load_all_parsers():
|
||||
""" Loads every parser in a dynamic way """
|
||||
current_dir = os.path.dirname(__file__)
|
||||
@ -24,8 +25,10 @@ def __load_all_parsers():
|
||||
name, _ = os.path.splitext(basename)
|
||||
importlib.import_module('.' + name, package='libmat2')
|
||||
|
||||
|
||||
__load_all_parsers()
|
||||
|
||||
|
||||
def _get_parsers() -> List[T]:
|
||||
""" Get all our parsers!"""
|
||||
def __get_parsers(cls):
|
||||
|
100
libmat2/subprocess.py
Normal file
100
libmat2/subprocess.py
Normal file
@ -0,0 +1,100 @@
|
||||
"""
|
||||
Wrapper around a subset of the subprocess module,
|
||||
that uses bwrap (bubblewrap) when it is available.
|
||||
|
||||
Instead of importing subprocess, other modules should use this as follows:
|
||||
|
||||
from . import subprocess
|
||||
"""
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import tempfile
|
||||
from typing import List, Optional
|
||||
|
||||
|
||||
__all__ = ['PIPE', 'run', 'CalledProcessError']
|
||||
PIPE = subprocess.PIPE
|
||||
CalledProcessError = subprocess.CalledProcessError
|
||||
|
||||
|
||||
def _get_bwrap_path() -> str:
|
||||
bwrap_path = '/usr/bin/bwrap'
|
||||
if os.path.isfile(bwrap_path):
|
||||
if os.access(bwrap_path, os.X_OK):
|
||||
return bwrap_path
|
||||
|
||||
raise RuntimeError("Unable to find bwrap") # pragma: no cover
|
||||
|
||||
|
||||
# pylint: disable=bad-whitespace
|
||||
def _get_bwrap_args(tempdir: str,
|
||||
input_filename: str,
|
||||
output_filename: Optional[str] = None) -> List[str]:
|
||||
cwd = os.getcwd()
|
||||
|
||||
# XXX: use --ro-bind-try once all supported platforms
|
||||
# have a bubblewrap recent enough to support it.
|
||||
ro_bind_dirs = ['/usr', '/lib', '/lib64', '/bin', '/sbin', cwd]
|
||||
ro_bind_args = []
|
||||
for bind_dir in ro_bind_dirs:
|
||||
if os.path.isdir(bind_dir): # pragma: no cover
|
||||
ro_bind_args.extend(['--ro-bind', bind_dir, bind_dir])
|
||||
|
||||
args = ro_bind_args + \
|
||||
['--dev', '/dev',
|
||||
'--chdir', cwd,
|
||||
'--unshare-all',
|
||||
'--new-session',
|
||||
# XXX: enable --die-with-parent once all supported platforms have
|
||||
# a bubblewrap recent enough to support it.
|
||||
# '--die-with-parent',
|
||||
]
|
||||
|
||||
if output_filename:
|
||||
# Mount an empty temporary directory where the sandboxed
|
||||
# process will create its output file
|
||||
output_dirname = os.path.dirname(os.path.abspath(output_filename))
|
||||
args.extend(['--bind', tempdir, output_dirname])
|
||||
|
||||
absolute_input_filename = os.path.abspath(input_filename)
|
||||
args.extend(['--ro-bind', absolute_input_filename, absolute_input_filename])
|
||||
|
||||
return args
|
||||
|
||||
|
||||
# pylint: disable=bad-whitespace
|
||||
def run(args: List[str],
|
||||
input_filename: str,
|
||||
output_filename: Optional[str] = None,
|
||||
**kwargs) -> subprocess.CompletedProcess:
|
||||
"""Wrapper around `subprocess.run`, that uses bwrap (bubblewrap) if it
|
||||
is available.
|
||||
|
||||
Extra supported keyword arguments:
|
||||
|
||||
- `input_filename`, made available read-only in the sandbox
|
||||
- `output_filename`, where the file created by the sandboxed process
|
||||
is copied upon successful completion; an empty temporary directory
|
||||
is made visible as the parent directory of this file in the sandbox.
|
||||
Optional: one valid use case is to invoke an external process
|
||||
to inspect metadata present in a file.
|
||||
"""
|
||||
try:
|
||||
bwrap_path = _get_bwrap_path()
|
||||
except RuntimeError: # pragma: no cover
|
||||
# bubblewrap is not installed ⇒ short-circuit
|
||||
return subprocess.run(args, **kwargs)
|
||||
|
||||
with tempfile.TemporaryDirectory() as tempdir:
|
||||
prefix_args = [bwrap_path] + \
|
||||
_get_bwrap_args(input_filename=input_filename,
|
||||
output_filename=output_filename,
|
||||
tempdir=tempdir)
|
||||
completed_process = subprocess.run(prefix_args + args, **kwargs)
|
||||
if output_filename and completed_process.returncode == 0:
|
||||
shutil.copy(os.path.join(tempdir, os.path.basename(output_filename)),
|
||||
output_filename)
|
||||
|
||||
return completed_process
|
@ -3,6 +3,7 @@ from typing import Union, Tuple, Dict
|
||||
|
||||
from . import abstract
|
||||
|
||||
|
||||
class TorrentParser(abstract.AbstractParser):
|
||||
mimetypes = {'application/x-bittorrent', }
|
||||
whitelist = {b'announce', b'announce-list', b'info'}
|
||||
@ -32,7 +33,7 @@ class TorrentParser(abstract.AbstractParser):
|
||||
return True
|
||||
|
||||
|
||||
class _BencodeHandler(object):
|
||||
class _BencodeHandler():
|
||||
"""
|
||||
Since bencode isn't that hard to parse,
|
||||
MAT2 comes with its own parser, based on the spec
|
||||
|
@ -1,10 +1,10 @@
|
||||
import os
|
||||
import subprocess
|
||||
import logging
|
||||
|
||||
from typing import Dict, Union
|
||||
|
||||
from . import exiftool
|
||||
from . import subprocess
|
||||
|
||||
|
||||
class AbstractFFmpegParser(exiftool.ExiftoolParser):
|
||||
@ -32,7 +32,9 @@ class AbstractFFmpegParser(exiftool.ExiftoolParser):
|
||||
'-flags:a', '+bitexact', # don't add any metadata
|
||||
self.output_filename]
|
||||
try:
|
||||
subprocess.check_call(cmd)
|
||||
subprocess.run(cmd, check=True,
|
||||
input_filename=self.filename,
|
||||
output_filename=self.output_filename)
|
||||
except subprocess.CalledProcessError as e:
|
||||
logging.error("Something went wrong during the processing of %s: %s", self.filename, e)
|
||||
return False
|
||||
|
Loading…
Reference in New Issue
Block a user