Do a pylint pass
This commit is contained in:
parent
0354c3b7e3
commit
fa7d18784c
16
main.py
16
main.py
@ -12,7 +12,7 @@ from src import parser_factory, unsupported_extensions
|
|||||||
|
|
||||||
__version__ = '0.1.0'
|
__version__ = '0.1.0'
|
||||||
|
|
||||||
def __check_file(filename:str, mode:int = os.R_OK) -> bool:
|
def __check_file(filename: str, mode: int = os.R_OK) -> bool:
|
||||||
if not os.path.isfile(filename):
|
if not os.path.isfile(filename):
|
||||||
print("[-] %s is not a regular file." % filename)
|
print("[-] %s is not a regular file." % filename)
|
||||||
return False
|
return False
|
||||||
@ -40,7 +40,7 @@ def create_arg_parser():
|
|||||||
return parser
|
return parser
|
||||||
|
|
||||||
|
|
||||||
def show_meta(filename:str):
|
def show_meta(filename: str):
|
||||||
if not __check_file(filename):
|
if not __check_file(filename):
|
||||||
return
|
return
|
||||||
|
|
||||||
@ -48,18 +48,18 @@ def show_meta(filename:str):
|
|||||||
if p is None:
|
if p is None:
|
||||||
print("[-] %s's format (%s) is not supported" % (filename, mtype))
|
print("[-] %s's format (%s) is not supported" % (filename, mtype))
|
||||||
return
|
return
|
||||||
|
|
||||||
print("[+] Metadata for %s:" % filename)
|
print("[+] Metadata for %s:" % filename)
|
||||||
for k,v in p.get_meta().items():
|
for k, v in p.get_meta().items():
|
||||||
try: # FIXME this is ugly.
|
try: # FIXME this is ugly.
|
||||||
print(" %s: %s" % (k, v))
|
print(" %s: %s" % (k, v))
|
||||||
except UnicodeEncodeError:
|
except UnicodeEncodeError:
|
||||||
print(" %s: harmful content" % k)
|
print(" %s: harmful content" % k)
|
||||||
|
|
||||||
|
def clean_meta(params: Tuple[str, bool]) -> bool:
|
||||||
def clean_meta(params:Tuple[str, bool]) -> bool:
|
|
||||||
filename, is_lightweigth = params
|
filename, is_lightweigth = params
|
||||||
if not __check_file(filename, os.R_OK|os.W_OK):
|
if not __check_file(filename, os.R_OK|os.W_OK):
|
||||||
return
|
return False
|
||||||
|
|
||||||
p, mtype = parser_factory.get_parser(filename)
|
p, mtype = parser_factory.get_parser(filename)
|
||||||
if p is None:
|
if p is None:
|
||||||
@ -102,12 +102,12 @@ def main():
|
|||||||
if not args.list:
|
if not args.list:
|
||||||
return arg_parser.print_help()
|
return arg_parser.print_help()
|
||||||
show_parsers()
|
show_parsers()
|
||||||
return
|
return 0
|
||||||
|
|
||||||
elif args.show:
|
elif args.show:
|
||||||
for f in __get_files_recursively(args.files):
|
for f in __get_files_recursively(args.files):
|
||||||
show_meta(f)
|
show_meta(f)
|
||||||
return
|
return 0
|
||||||
|
|
||||||
else:
|
else:
|
||||||
p = multiprocessing.Pool()
|
p = multiprocessing.Pool()
|
||||||
|
@ -2,4 +2,5 @@
|
|||||||
|
|
||||||
# A set of extension that aren't supported, despite matching a supported mimetype
|
# A set of extension that aren't supported, despite matching a supported mimetype
|
||||||
unsupported_extensions = set(['bat', 'c', 'h', 'ksh', 'pl', 'txt', 'asc',
|
unsupported_extensions = set(['bat', 'c', 'h', 'ksh', 'pl', 'txt', 'asc',
|
||||||
'text', 'pot', 'brf', 'srt', 'rdf', 'wsdl', 'xpdl', 'xsl', 'xsd'])
|
'text', 'pot', 'brf', 'srt', 'rdf', 'wsdl',
|
||||||
|
'xpdl', 'xsl', 'xsd'])
|
||||||
|
@ -9,7 +9,7 @@ class MutagenParser(abstract.AbstractParser):
|
|||||||
def get_meta(self):
|
def get_meta(self):
|
||||||
f = mutagen.File(self.filename)
|
f = mutagen.File(self.filename)
|
||||||
if f.tags:
|
if f.tags:
|
||||||
return {k:', '.join(v) for k,v in f.tags.items()}
|
return {k:', '.join(v) for k, v in f.tags.items()}
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
def remove_all(self):
|
def remove_all(self):
|
||||||
|
@ -6,6 +6,7 @@ class HarmlessParser(abstract.AbstractParser):
|
|||||||
mimetypes = {'application/xml', 'text/plain'}
|
mimetypes = {'application/xml', 'text/plain'}
|
||||||
|
|
||||||
def __init__(self, filename: str):
|
def __init__(self, filename: str):
|
||||||
|
super().__init__(filename)
|
||||||
self.filename = filename
|
self.filename = filename
|
||||||
self.output_filename = filename
|
self.output_filename = filename
|
||||||
|
|
||||||
|
@ -14,11 +14,12 @@ from . import abstract
|
|||||||
class PNGParser(abstract.AbstractParser):
|
class PNGParser(abstract.AbstractParser):
|
||||||
mimetypes = {'image/png', }
|
mimetypes = {'image/png', }
|
||||||
meta_whitelist = {'SourceFile', 'ExifToolVersion', 'FileName',
|
meta_whitelist = {'SourceFile', 'ExifToolVersion', 'FileName',
|
||||||
'Directory', 'FileSize', 'FileModifyDate', 'FileAccessDate',
|
'Directory', 'FileSize', 'FileModifyDate',
|
||||||
"FileInodeChangeDate", 'FilePermissions', 'FileType',
|
'FileAccessDate', 'FileInodeChangeDate',
|
||||||
'FileTypeExtension', 'MIMEType', 'ImageWidth', 'BitDepth', 'ColorType',
|
'FilePermissions', 'FileType', 'FileTypeExtension',
|
||||||
'Compression', 'Filter', 'Interlace', 'BackgroundColor', 'ImageSize',
|
'MIMEType', 'ImageWidth', 'BitDepth', 'ColorType',
|
||||||
'Megapixels', 'ImageHeight'}
|
'Compression', 'Filter', 'Interlace', 'BackgroundColor',
|
||||||
|
'ImageSize', 'Megapixels', 'ImageHeight'}
|
||||||
|
|
||||||
def __init__(self, filename):
|
def __init__(self, filename):
|
||||||
super().__init__(filename)
|
super().__init__(filename)
|
||||||
@ -63,24 +64,26 @@ class GdkPixbufAbstractParser(abstract.AbstractParser):
|
|||||||
class JPGParser(GdkPixbufAbstractParser):
|
class JPGParser(GdkPixbufAbstractParser):
|
||||||
mimetypes = {'image/jpeg'}
|
mimetypes = {'image/jpeg'}
|
||||||
meta_whitelist = {'SourceFile', 'ExifToolVersion', 'FileName',
|
meta_whitelist = {'SourceFile', 'ExifToolVersion', 'FileName',
|
||||||
'Directory', 'FileSize', 'FileModifyDate', 'FileAccessDate',
|
'Directory', 'FileSize', 'FileModifyDate',
|
||||||
"FileInodeChangeDate", 'FilePermissions', 'FileType',
|
'FileAccessDate', "FileInodeChangeDate",
|
||||||
'FileTypeExtension', 'MIMEType', 'ImageWidth',
|
'FilePermissions', 'FileType', 'FileTypeExtension',
|
||||||
'ImageSize', 'BitsPerSample', 'ColorComponents', 'EncodingProcess',
|
'MIMEType', 'ImageWidth', 'ImageSize', 'BitsPerSample',
|
||||||
'JFIFVersion', 'ResolutionUnit', 'XResolution', 'YCbCrSubSampling',
|
'ColorComponents', 'EncodingProcess', 'JFIFVersion',
|
||||||
|
'ResolutionUnit', 'XResolution', 'YCbCrSubSampling',
|
||||||
'YResolution', 'Megapixels', 'ImageHeight'}
|
'YResolution', 'Megapixels', 'ImageHeight'}
|
||||||
|
|
||||||
|
|
||||||
class TiffParser(GdkPixbufAbstractParser):
|
class TiffParser(GdkPixbufAbstractParser):
|
||||||
mimetypes = {'image/tiff'}
|
mimetypes = {'image/tiff'}
|
||||||
meta_whitelist = {'Compression', 'ExifByteOrder', 'ExtraSamples',
|
meta_whitelist = {'Compression', 'ExifByteOrder', 'ExtraSamples',
|
||||||
'FillOrder', 'PhotometricInterpretation', 'PlanarConfiguration',
|
'FillOrder', 'PhotometricInterpretation',
|
||||||
'RowsPerStrip', 'SamplesPerPixel', 'StripByteCounts',
|
'PlanarConfiguration', 'RowsPerStrip', 'SamplesPerPixel',
|
||||||
'StripOffsets', 'BitsPerSample', 'Directory', 'ExifToolVersion',
|
'StripByteCounts', 'StripOffsets', 'BitsPerSample',
|
||||||
'FileAccessDate', 'FileInodeChangeDate', 'FileModifyDate',
|
'Directory', 'ExifToolVersion', 'FileAccessDate',
|
||||||
'FileName', 'FilePermissions', 'FileSize', 'FileType',
|
'FileInodeChangeDate', 'FileModifyDate', 'FileName',
|
||||||
'FileTypeExtension', 'ImageHeight', 'ImageSize', 'ImageWidth',
|
'FilePermissions', 'FileSize', 'FileType',
|
||||||
'MIMEType', 'Megapixels', 'SourceFile'}
|
'FileTypeExtension', 'ImageHeight', 'ImageSize',
|
||||||
|
'ImageWidth', 'MIMEType', 'Megapixels', 'SourceFile'}
|
||||||
|
|
||||||
|
|
||||||
class BMPParser(GdkPixbufAbstractParser):
|
class BMPParser(GdkPixbufAbstractParser):
|
||||||
@ -88,11 +91,11 @@ class BMPParser(GdkPixbufAbstractParser):
|
|||||||
meta_whitelist = {'SourceFile', 'ExifToolVersion', 'FileName', 'Directory',
|
meta_whitelist = {'SourceFile', 'ExifToolVersion', 'FileName', 'Directory',
|
||||||
'FileSize', 'FileModifyDate', 'FileAccessDate',
|
'FileSize', 'FileModifyDate', 'FileAccessDate',
|
||||||
'FileInodeChangeDate', 'FilePermissions', 'FileType',
|
'FileInodeChangeDate', 'FilePermissions', 'FileType',
|
||||||
'FileTypeExtension', 'MIMEType', 'BMPVersion', 'ImageWidth',
|
'FileTypeExtension', 'MIMEType', 'BMPVersion',
|
||||||
'ImageHeight', 'Planes', 'BitDepth', 'Compression', 'ImageLength',
|
'ImageWidth', 'ImageHeight', 'Planes', 'BitDepth',
|
||||||
'PixelsPerMeterX', 'PixelsPerMeterY', 'NumColors',
|
'Compression', 'ImageLength', 'PixelsPerMeterX',
|
||||||
'NumImportantColors', 'RedMask', 'GreenMask', 'BlueMask',
|
'PixelsPerMeterY', 'NumColors', 'NumImportantColors',
|
||||||
'AlphaMask', 'ColorSpace', 'RedEndpoint', 'GreenEndpoint',
|
'RedMask', 'GreenMask', 'BlueMask', 'AlphaMask',
|
||||||
'BlueEndpoint', 'GammaRed', 'GammaGreen', 'GammaBlue', 'ImageSize',
|
'ColorSpace', 'RedEndpoint', 'GreenEndpoint',
|
||||||
'Megapixels'}
|
'BlueEndpoint', 'GammaRed', 'GammaGreen', 'GammaBlue',
|
||||||
|
'ImageSize', 'Megapixels'}
|
||||||
|
@ -9,14 +9,14 @@ from . import abstract, parser_factory
|
|||||||
|
|
||||||
|
|
||||||
class ArchiveBasedAbstractParser(abstract.AbstractParser):
|
class ArchiveBasedAbstractParser(abstract.AbstractParser):
|
||||||
def _clean_zipinfo(self, zipinfo:zipfile.ZipInfo) -> zipfile.ZipInfo:
|
def _clean_zipinfo(self, zipinfo: zipfile.ZipInfo) -> zipfile.ZipInfo:
|
||||||
zipinfo.compress_type = zipfile.ZIP_DEFLATED
|
zipinfo.compress_type = zipfile.ZIP_DEFLATED
|
||||||
zipinfo.create_system = 3 # Linux
|
zipinfo.create_system = 3 # Linux
|
||||||
zipinfo.comment = b''
|
zipinfo.comment = b''
|
||||||
zipinfo.date_time = (1980, 1, 1, 0, 0, 0)
|
zipinfo.date_time = (1980, 1, 1, 0, 0, 0)
|
||||||
return zipinfo
|
return zipinfo
|
||||||
|
|
||||||
def _get_zipinfo_meta(self, zipinfo:zipfile.ZipInfo) -> dict:
|
def _get_zipinfo_meta(self, zipinfo: zipfile.ZipInfo) -> dict:
|
||||||
metadata = {}
|
metadata = {}
|
||||||
if zipinfo.create_system == 3:
|
if zipinfo.create_system == 3:
|
||||||
#metadata['create_system'] = 'Linux'
|
#metadata['create_system'] = 'Linux'
|
||||||
@ -35,7 +35,8 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser):
|
|||||||
return metadata
|
return metadata
|
||||||
|
|
||||||
|
|
||||||
def _clean_internal_file(self, item:zipfile.ZipInfo, temp_folder:str, zin:zipfile.ZipFile, zout:zipfile.ZipFile):
|
def _clean_internal_file(self, item: zipfile.ZipInfo, temp_folder: str,
|
||||||
|
zin: zipfile.ZipFile, zout: zipfile.ZipFile):
|
||||||
zin.extract(member=item, path=temp_folder)
|
zin.extract(member=item, path=temp_folder)
|
||||||
tmp_parser, mtype = parser_factory.get_parser(os.path.join(temp_folder, item.filename))
|
tmp_parser, mtype = parser_factory.get_parser(os.path.join(temp_folder, item.filename))
|
||||||
if not tmp_parser:
|
if not tmp_parser:
|
||||||
|
@ -2,10 +2,10 @@ import os
|
|||||||
import mimetypes
|
import mimetypes
|
||||||
import importlib
|
import importlib
|
||||||
import pkgutil
|
import pkgutil
|
||||||
|
from typing import TypeVar
|
||||||
|
|
||||||
from . import abstract, unsupported_extensions
|
from . import abstract, unsupported_extensions
|
||||||
|
|
||||||
from typing import TypeVar
|
|
||||||
|
|
||||||
T = TypeVar('T', bound='abstract.AbstractParser')
|
T = TypeVar('T', bound='abstract.AbstractParser')
|
||||||
|
|
||||||
|
@ -103,7 +103,8 @@ class PDFParser(abstract.AbstractParser):
|
|||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def __remove_superficial_meta(self, in_file:str, out_file: str) -> bool:
|
@staticmethod
|
||||||
|
def __remove_superficial_meta(in_file: str, out_file: str) -> bool:
|
||||||
document = Poppler.Document.new_from_file('file://' + in_file)
|
document = Poppler.Document.new_from_file('file://' + in_file)
|
||||||
document.set_producer('')
|
document.set_producer('')
|
||||||
document.set_creator('')
|
document.set_creator('')
|
||||||
@ -112,7 +113,8 @@ class PDFParser(abstract.AbstractParser):
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
def __parse_metadata_field(self, data:str) -> dict:
|
@staticmethod
|
||||||
|
def __parse_metadata_field(data: str) -> dict:
|
||||||
metadata = {}
|
metadata = {}
|
||||||
for (_, key, value) in re.findall(r"<(xmp|pdfx|pdf|xmpMM):(.+)>(.+)</\1:\2>", data, re.I):
|
for (_, key, value) in re.findall(r"<(xmp|pdfx|pdf|xmpMM):(.+)>(.+)</\1:\2>", data, re.I):
|
||||||
metadata[key] = value
|
metadata[key] = value
|
||||||
|
@ -11,7 +11,7 @@ class TorrentParser(abstract.AbstractParser):
|
|||||||
d = _BencodeHandler().bdecode(f.read())
|
d = _BencodeHandler().bdecode(f.read())
|
||||||
if d is None:
|
if d is None:
|
||||||
return {'Unknown meta': 'Unable to parse torrent file "%s".' % self.filename}
|
return {'Unknown meta': 'Unable to parse torrent file "%s".' % self.filename}
|
||||||
for k,v in d.items():
|
for k, v in d.items():
|
||||||
if k not in self.whitelist:
|
if k not in self.whitelist:
|
||||||
metadata[k.decode('utf-8')] = v
|
metadata[k.decode('utf-8')] = v
|
||||||
return metadata
|
return metadata
|
||||||
@ -23,7 +23,7 @@ class TorrentParser(abstract.AbstractParser):
|
|||||||
d = _BencodeHandler().bdecode(f.read())
|
d = _BencodeHandler().bdecode(f.read())
|
||||||
if d is None:
|
if d is None:
|
||||||
return False
|
return False
|
||||||
for k,v in d.items():
|
for k, v in d.items():
|
||||||
if k in self.whitelist:
|
if k in self.whitelist:
|
||||||
cleaned[k] = v
|
cleaned[k] = v
|
||||||
with open(self.output_filename, 'wb') as f:
|
with open(self.output_filename, 'wb') as f:
|
||||||
@ -53,7 +53,8 @@ class _BencodeHandler(object):
|
|||||||
list: self.__encode_list,
|
list: self.__encode_list,
|
||||||
}
|
}
|
||||||
|
|
||||||
def __decode_int(self, s:str) -> (int, str):
|
@staticmethod
|
||||||
|
def __decode_int(s: str) -> (int, str):
|
||||||
s = s[1:]
|
s = s[1:]
|
||||||
next_idx = s.index(b'e')
|
next_idx = s.index(b'e')
|
||||||
if s.startswith(b'-0'):
|
if s.startswith(b'-0'):
|
||||||
@ -62,7 +63,8 @@ class _BencodeHandler(object):
|
|||||||
raise ValueError # no leading zero except for zero itself
|
raise ValueError # no leading zero except for zero itself
|
||||||
return int(s[:next_idx]), s[next_idx+1:]
|
return int(s[:next_idx]), s[next_idx+1:]
|
||||||
|
|
||||||
def __decode_string(self, s:str) -> (str, str):
|
@staticmethod
|
||||||
|
def __decode_string(s: str) -> (str, str):
|
||||||
sep = s.index(b':')
|
sep = s.index(b':')
|
||||||
str_len = int(s[:sep])
|
str_len = int(s[:sep])
|
||||||
if str_len < 0:
|
if str_len < 0:
|
||||||
@ -72,7 +74,7 @@ class _BencodeHandler(object):
|
|||||||
s = s[1:]
|
s = s[1:]
|
||||||
return s[sep:sep+str_len], s[sep+str_len:]
|
return s[sep:sep+str_len], s[sep+str_len:]
|
||||||
|
|
||||||
def __decode_list(self, s:str) -> (list, str):
|
def __decode_list(self, s: str) -> (list, str):
|
||||||
r = list()
|
r = list()
|
||||||
s = s[1:] # skip leading `l`
|
s = s[1:] # skip leading `l`
|
||||||
while s[0] != ord('e'):
|
while s[0] != ord('e'):
|
||||||
@ -80,7 +82,7 @@ class _BencodeHandler(object):
|
|||||||
r.append(v)
|
r.append(v)
|
||||||
return r, s[1:]
|
return r, s[1:]
|
||||||
|
|
||||||
def __decode_dict(self, s:str) -> (dict, str):
|
def __decode_dict(self, s: str) -> (dict, str):
|
||||||
r = dict()
|
r = dict()
|
||||||
s = s[1:] # skip leading `d`
|
s = s[1:] # skip leading `d`
|
||||||
while s[0] != ord(b'e'):
|
while s[0] != ord(b'e'):
|
||||||
@ -89,30 +91,30 @@ class _BencodeHandler(object):
|
|||||||
return r, s[1:]
|
return r, s[1:]
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def __encode_int(x:str) -> bytes:
|
def __encode_int(x: str) -> bytes:
|
||||||
return b'i' + bytes(str(x), 'utf-8') + b'e'
|
return b'i' + bytes(str(x), 'utf-8') + b'e'
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def __encode_string(x:str) -> bytes:
|
def __encode_string(x: str) -> bytes:
|
||||||
return bytes((str(len(x))), 'utf-8') + b':' + x
|
return bytes((str(len(x))), 'utf-8') + b':' + x
|
||||||
|
|
||||||
def __encode_list(self, x:str) -> bytes:
|
def __encode_list(self, x: str) -> bytes:
|
||||||
ret = b''
|
ret = b''
|
||||||
for i in x:
|
for i in x:
|
||||||
ret += self.__encode_func[type(i)](i)
|
ret += self.__encode_func[type(i)](i)
|
||||||
return b'l' + ret + b'e'
|
return b'l' + ret + b'e'
|
||||||
|
|
||||||
def __encode_dict(self, x:str) -> bytes:
|
def __encode_dict(self, x: str) -> bytes:
|
||||||
ret = b''
|
ret = b''
|
||||||
for k, v in sorted(x.items()):
|
for k, v in sorted(x.items()):
|
||||||
ret += self.__encode_func[type(k)](k)
|
ret += self.__encode_func[type(k)](k)
|
||||||
ret += self.__encode_func[type(v)](v)
|
ret += self.__encode_func[type(v)](v)
|
||||||
return b'd' + ret + b'e'
|
return b'd' + ret + b'e'
|
||||||
|
|
||||||
def bencode(self, s:str) -> bytes:
|
def bencode(self, s: str) -> bytes:
|
||||||
return self.__encode_func[type(s)](s)
|
return self.__encode_func[type(s)](s)
|
||||||
|
|
||||||
def bdecode(self, s:str):
|
def bdecode(self, s: str):
|
||||||
try:
|
try:
|
||||||
r, l = self.__decode_func[s[0]](s)
|
r, l = self.__decode_func[s[0]](s)
|
||||||
except (IndexError, KeyError, ValueError) as e:
|
except (IndexError, KeyError, ValueError) as e:
|
||||||
|
Loading…
Reference in New Issue
Block a user