Rename some files to simplify packaging
- the `src` folder is now `libmat2` - the `main.py` script is now `mat2.py`
This commit is contained in:
parent
57d5cd0428
commit
38fae60b8b
14 changed files with 31 additions and 31 deletions
6
libmat2/__init__.py
Normal file
6
libmat2/__init__.py
Normal file
|
@ -0,0 +1,6 @@
|
|||
#!/bin/env python3
|
||||
|
||||
# A set of extension that aren't supported, despite matching a supported mimetype
|
||||
unsupported_extensions = set(['bat', 'c', 'h', 'ksh', 'pl', 'txt', 'asc',
|
||||
'text', 'pot', 'brf', 'srt', 'rdf', 'wsdl',
|
||||
'xpdl', 'xsl', 'xsd'])
|
24
libmat2/abstract.py
Normal file
24
libmat2/abstract.py
Normal file
|
@ -0,0 +1,24 @@
|
|||
import abc
|
||||
import os
|
||||
|
||||
|
||||
class AbstractParser(abc.ABC):
|
||||
meta_list = set()
|
||||
mimetypes = set()
|
||||
|
||||
def __init__(self, filename: str):
|
||||
self.filename = filename
|
||||
fname, extension = os.path.splitext(filename)
|
||||
self.output_filename = fname + '.cleaned' + extension
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_meta(self) -> dict:
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def remove_all(self) -> bool:
|
||||
pass
|
||||
|
||||
def remove_all_lightweight(self) -> bool:
|
||||
""" Remove _SOME_ metadata. """
|
||||
return self.remove_all()
|
39
libmat2/audio.py
Normal file
39
libmat2/audio.py
Normal file
|
@ -0,0 +1,39 @@
|
|||
import shutil
|
||||
|
||||
import mutagen
|
||||
|
||||
from . import abstract
|
||||
|
||||
|
||||
class MutagenParser(abstract.AbstractParser):
|
||||
def get_meta(self):
|
||||
f = mutagen.File(self.filename)
|
||||
if f.tags:
|
||||
return {k:', '.join(v) for k, v in f.tags.items()}
|
||||
return {}
|
||||
|
||||
def remove_all(self):
|
||||
shutil.copy(self.filename, self.output_filename)
|
||||
f = mutagen.File(self.output_filename)
|
||||
f.delete()
|
||||
f.save()
|
||||
return True
|
||||
|
||||
|
||||
class MP3Parser(MutagenParser):
|
||||
mimetypes = {'audio/mpeg', }
|
||||
|
||||
def get_meta(self):
|
||||
metadata = {}
|
||||
meta = mutagen.File(self.filename).tags
|
||||
for key in meta:
|
||||
metadata[key.rstrip(' \t\r\n\0')] = ', '.join(map(str, meta[key].text))
|
||||
return metadata
|
||||
|
||||
|
||||
class OGGParser(MutagenParser):
|
||||
mimetypes = {'audio/ogg', }
|
||||
|
||||
|
||||
class FLACParser(MutagenParser):
|
||||
mimetypes = {'audio/flac', }
|
17
libmat2/harmless.py
Normal file
17
libmat2/harmless.py
Normal file
|
@ -0,0 +1,17 @@
|
|||
from . import abstract
|
||||
|
||||
|
||||
class HarmlessParser(abstract.AbstractParser):
|
||||
""" This is the parser for filetypes that do not contain metadata. """
|
||||
mimetypes = {'application/xml', 'text/plain'}
|
||||
|
||||
def __init__(self, filename: str):
|
||||
super().__init__(filename)
|
||||
self.filename = filename
|
||||
self.output_filename = filename
|
||||
|
||||
def get_meta(self):
|
||||
return dict()
|
||||
|
||||
def remove_all(self):
|
||||
return True
|
101
libmat2/images.py
Normal file
101
libmat2/images.py
Normal file
|
@ -0,0 +1,101 @@
|
|||
import subprocess
|
||||
import json
|
||||
import os
|
||||
|
||||
import cairo
|
||||
|
||||
import gi
|
||||
gi.require_version('GdkPixbuf', '2.0')
|
||||
from gi.repository import GdkPixbuf
|
||||
|
||||
from . import abstract
|
||||
|
||||
|
||||
class PNGParser(abstract.AbstractParser):
|
||||
mimetypes = {'image/png', }
|
||||
meta_whitelist = {'SourceFile', 'ExifToolVersion', 'FileName',
|
||||
'Directory', 'FileSize', 'FileModifyDate',
|
||||
'FileAccessDate', 'FileInodeChangeDate',
|
||||
'FilePermissions', 'FileType', 'FileTypeExtension',
|
||||
'MIMEType', 'ImageWidth', 'BitDepth', 'ColorType',
|
||||
'Compression', 'Filter', 'Interlace', 'BackgroundColor',
|
||||
'ImageSize', 'Megapixels', 'ImageHeight'}
|
||||
|
||||
def __init__(self, filename):
|
||||
super().__init__(filename)
|
||||
try: # better fail here than later
|
||||
cairo.ImageSurface.create_from_png(self.filename)
|
||||
except MemoryError:
|
||||
raise ValueError
|
||||
|
||||
def get_meta(self):
|
||||
out = subprocess.check_output(['/usr/bin/exiftool', '-json', self.filename])
|
||||
meta = json.loads(out.decode('utf-8'))[0]
|
||||
for key in self.meta_whitelist:
|
||||
meta.pop(key, None)
|
||||
return meta
|
||||
|
||||
def remove_all(self):
|
||||
surface = cairo.ImageSurface.create_from_png(self.filename)
|
||||
surface.write_to_png(self.output_filename)
|
||||
return True
|
||||
|
||||
|
||||
class GdkPixbufAbstractParser(abstract.AbstractParser):
|
||||
""" GdkPixbuf can handle a lot of surfaces, so we're rending images on it,
|
||||
this has the side-effect of removing metadata completely.
|
||||
"""
|
||||
def get_meta(self):
|
||||
out = subprocess.check_output(['/usr/bin/exiftool', '-json', self.filename])
|
||||
meta = json.loads(out.decode('utf-8'))[0]
|
||||
for key in self.meta_whitelist:
|
||||
meta.pop(key, None)
|
||||
return meta
|
||||
|
||||
def remove_all(self):
|
||||
_, extension = os.path.splitext(self.filename)
|
||||
pixbuf = GdkPixbuf.Pixbuf.new_from_file(self.filename)
|
||||
if extension == '.jpg':
|
||||
extension = '.jpeg'
|
||||
pixbuf.savev(self.output_filename, extension[1:], [], [])
|
||||
return True
|
||||
|
||||
|
||||
class JPGParser(GdkPixbufAbstractParser):
|
||||
mimetypes = {'image/jpeg'}
|
||||
meta_whitelist = {'SourceFile', 'ExifToolVersion', 'FileName',
|
||||
'Directory', 'FileSize', 'FileModifyDate',
|
||||
'FileAccessDate', "FileInodeChangeDate",
|
||||
'FilePermissions', 'FileType', 'FileTypeExtension',
|
||||
'MIMEType', 'ImageWidth', 'ImageSize', 'BitsPerSample',
|
||||
'ColorComponents', 'EncodingProcess', 'JFIFVersion',
|
||||
'ResolutionUnit', 'XResolution', 'YCbCrSubSampling',
|
||||
'YResolution', 'Megapixels', 'ImageHeight'}
|
||||
|
||||
|
||||
class TiffParser(GdkPixbufAbstractParser):
|
||||
mimetypes = {'image/tiff'}
|
||||
meta_whitelist = {'Compression', 'ExifByteOrder', 'ExtraSamples',
|
||||
'FillOrder', 'PhotometricInterpretation',
|
||||
'PlanarConfiguration', 'RowsPerStrip', 'SamplesPerPixel',
|
||||
'StripByteCounts', 'StripOffsets', 'BitsPerSample',
|
||||
'Directory', 'ExifToolVersion', 'FileAccessDate',
|
||||
'FileInodeChangeDate', 'FileModifyDate', 'FileName',
|
||||
'FilePermissions', 'FileSize', 'FileType',
|
||||
'FileTypeExtension', 'ImageHeight', 'ImageSize',
|
||||
'ImageWidth', 'MIMEType', 'Megapixels', 'SourceFile'}
|
||||
|
||||
|
||||
class BMPParser(GdkPixbufAbstractParser):
|
||||
mimetypes = {'image/x-ms-bmp'}
|
||||
meta_whitelist = {'SourceFile', 'ExifToolVersion', 'FileName', 'Directory',
|
||||
'FileSize', 'FileModifyDate', 'FileAccessDate',
|
||||
'FileInodeChangeDate', 'FilePermissions', 'FileType',
|
||||
'FileTypeExtension', 'MIMEType', 'BMPVersion',
|
||||
'ImageWidth', 'ImageHeight', 'Planes', 'BitDepth',
|
||||
'Compression', 'ImageLength', 'PixelsPerMeterX',
|
||||
'PixelsPerMeterY', 'NumColors', 'NumImportantColors',
|
||||
'RedMask', 'GreenMask', 'BlueMask', 'AlphaMask',
|
||||
'ColorSpace', 'RedEndpoint', 'GreenEndpoint',
|
||||
'BlueEndpoint', 'GammaRed', 'GammaGreen', 'GammaBlue',
|
||||
'ImageSize', 'Megapixels'}
|
150
libmat2/office.py
Normal file
150
libmat2/office.py
Normal file
|
@ -0,0 +1,150 @@
|
|||
import os
|
||||
import re
|
||||
import shutil
|
||||
import tempfile
|
||||
import datetime
|
||||
import zipfile
|
||||
|
||||
from . import abstract, parser_factory
|
||||
|
||||
|
||||
class ArchiveBasedAbstractParser(abstract.AbstractParser):
|
||||
def _clean_zipinfo(self, zipinfo: zipfile.ZipInfo) -> zipfile.ZipInfo:
|
||||
zipinfo.compress_type = zipfile.ZIP_DEFLATED
|
||||
zipinfo.create_system = 3 # Linux
|
||||
zipinfo.comment = b''
|
||||
zipinfo.date_time = (1980, 1, 1, 0, 0, 0)
|
||||
return zipinfo
|
||||
|
||||
def _get_zipinfo_meta(self, zipinfo: zipfile.ZipInfo) -> dict:
|
||||
metadata = {}
|
||||
if zipinfo.create_system == 3:
|
||||
#metadata['create_system'] = 'Linux'
|
||||
pass
|
||||
elif zipinfo.create_system == 2:
|
||||
metadata['create_system'] = 'Windows'
|
||||
else:
|
||||
metadata['create_system'] = 'Weird'
|
||||
|
||||
if zipinfo.comment:
|
||||
metadata['comment'] = zipinfo.comment
|
||||
|
||||
if zipinfo.date_time != (1980, 1, 1, 0, 0, 0):
|
||||
metadata['date_time'] = datetime.datetime(*zipinfo.date_time)
|
||||
|
||||
return metadata
|
||||
|
||||
|
||||
def _clean_internal_file(self, item: zipfile.ZipInfo, temp_folder: str,
|
||||
zin: zipfile.ZipFile, zout: zipfile.ZipFile):
|
||||
zin.extract(member=item, path=temp_folder)
|
||||
tmp_parser, mtype = parser_factory.get_parser(os.path.join(temp_folder, item.filename))
|
||||
if not tmp_parser:
|
||||
print("%s's format (%s) isn't supported" % (item.filename, mtype))
|
||||
return
|
||||
tmp_parser.remove_all()
|
||||
zinfo = zipfile.ZipInfo(item.filename)
|
||||
clean_zinfo = self._clean_zipinfo(zinfo)
|
||||
with open(tmp_parser.output_filename, 'rb') as f:
|
||||
zout.writestr(clean_zinfo, f.read())
|
||||
|
||||
|
||||
class MSOfficeParser(ArchiveBasedAbstractParser):
|
||||
mimetypes = {
|
||||
'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
|
||||
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
|
||||
'application/vnd.openxmlformats-officedocument.presentationml.presentation'
|
||||
}
|
||||
files_to_keep = {'_rels/.rels', 'word/_rels/document.xml.rels'}
|
||||
|
||||
def get_meta(self):
|
||||
"""
|
||||
Yes, I know that parsing xml with regexp ain't pretty,
|
||||
be my guest and fix it if you want.
|
||||
"""
|
||||
metadata = {}
|
||||
zipin = zipfile.ZipFile(self.filename)
|
||||
for item in zipin.infolist():
|
||||
if item.filename.startswith('docProps/') and item.filename.endswith('.xml'):
|
||||
content = zipin.read(item).decode('utf-8')
|
||||
for (key, value) in re.findall(r"<(.+)>(.+)</\1>", content, re.I):
|
||||
metadata[key] = value
|
||||
if not metadata: # better safe than sorry
|
||||
metadata[item] = 'harmful content'
|
||||
|
||||
metadata = {**metadata, **self._get_zipinfo_meta(item)}
|
||||
zipin.close()
|
||||
return metadata
|
||||
|
||||
|
||||
def remove_all(self):
|
||||
zin = zipfile.ZipFile(self.filename, 'r')
|
||||
zout = zipfile.ZipFile(self.output_filename, 'w')
|
||||
temp_folder = tempfile.mkdtemp()
|
||||
|
||||
for item in zin.infolist():
|
||||
if item.filename[-1] == '/':
|
||||
continue # `is_dir` is added in Python3.6
|
||||
elif item.filename.startswith('docProps/'):
|
||||
if not item.filename.endswith('.rels'):
|
||||
continue # don't keep metadata files
|
||||
if item.filename in self.files_to_keep:
|
||||
item = self._clean_zipinfo(item)
|
||||
zout.writestr(item, zin.read(item))
|
||||
continue
|
||||
|
||||
self._clean_internal_file(item, temp_folder, zin, zout)
|
||||
|
||||
shutil.rmtree(temp_folder)
|
||||
zout.close()
|
||||
zin.close()
|
||||
return True
|
||||
|
||||
|
||||
|
||||
class LibreOfficeParser(ArchiveBasedAbstractParser):
|
||||
mimetypes = {
|
||||
'application/vnd.oasis.opendocument.text',
|
||||
'application/vnd.oasis.opendocument.spreadsheet',
|
||||
'application/vnd.oasis.opendocument.presentation',
|
||||
'application/vnd.oasis.opendocument.graphics',
|
||||
'application/vnd.oasis.opendocument.chart',
|
||||
'application/vnd.oasis.opendocument.formula',
|
||||
'application/vnd.oasis.opendocument.image',
|
||||
}
|
||||
|
||||
def get_meta(self):
|
||||
"""
|
||||
Yes, I know that parsing xml with regexp ain't pretty,
|
||||
be my guest and fix it if you want.
|
||||
"""
|
||||
metadata = {}
|
||||
zipin = zipfile.ZipFile(self.filename)
|
||||
for item in zipin.infolist():
|
||||
if item.filename == 'meta.xml':
|
||||
content = zipin.read(item).decode('utf-8')
|
||||
for (key, value) in re.findall(r"<((?:meta|dc|cp).+?)>(.+)</\1>", content, re.I):
|
||||
metadata[key] = value
|
||||
if not metadata: # better safe than sorry
|
||||
metadata[item] = 'harmful content'
|
||||
metadata = {**metadata, **self._get_zipinfo_meta(item)}
|
||||
zipin.close()
|
||||
return metadata
|
||||
|
||||
def remove_all(self):
|
||||
zin = zipfile.ZipFile(self.filename, 'r')
|
||||
zout = zipfile.ZipFile(self.output_filename, 'w')
|
||||
temp_folder = tempfile.mkdtemp()
|
||||
|
||||
for item in zin.infolist():
|
||||
if item.filename[-1] == '/':
|
||||
continue # `is_dir` is added in Python3.6
|
||||
elif item.filename == 'meta.xml':
|
||||
continue # don't keep metadata files
|
||||
|
||||
self._clean_internal_file(item, temp_folder, zin, zout)
|
||||
|
||||
shutil.rmtree(temp_folder)
|
||||
zout.close()
|
||||
zin.close()
|
||||
return True
|
42
libmat2/parser_factory.py
Normal file
42
libmat2/parser_factory.py
Normal file
|
@ -0,0 +1,42 @@
|
|||
import os
|
||||
import mimetypes
|
||||
import importlib
|
||||
import pkgutil
|
||||
from typing import TypeVar
|
||||
|
||||
from . import abstract, unsupported_extensions
|
||||
|
||||
|
||||
T = TypeVar('T', bound='abstract.AbstractParser')
|
||||
|
||||
# This loads every parser in a dynamic way
|
||||
for module_loader, name, ispkg in pkgutil.walk_packages('.libmat2'):
|
||||
if not name.startswith('libmat2.'):
|
||||
continue
|
||||
elif name == 'libmat2.abstract':
|
||||
continue
|
||||
importlib.import_module(name)
|
||||
|
||||
|
||||
def _get_parsers() -> list:
|
||||
""" Get all our parsers!"""
|
||||
def __get_parsers(cls):
|
||||
return cls.__subclasses__() + \
|
||||
[g for s in cls.__subclasses__() for g in __get_parsers(s)]
|
||||
return __get_parsers(abstract.AbstractParser)
|
||||
|
||||
|
||||
def get_parser(filename: str) -> (T, str):
|
||||
mtype, _ = mimetypes.guess_type(filename)
|
||||
|
||||
_, extension = os.path.splitext(filename)
|
||||
if extension in unsupported_extensions:
|
||||
return None, mtype
|
||||
|
||||
for c in _get_parsers():
|
||||
if mtype in c.mimetypes:
|
||||
try:
|
||||
return c(filename), mtype
|
||||
except ValueError:
|
||||
return None, mtype
|
||||
return None, mtype
|
135
libmat2/pdf.py
Normal file
135
libmat2/pdf.py
Normal file
|
@ -0,0 +1,135 @@
|
|||
""" Handle PDF
|
||||
|
||||
"""
|
||||
|
||||
import os
|
||||
import re
|
||||
import logging
|
||||
import tempfile
|
||||
import io
|
||||
|
||||
import cairo
|
||||
import gi
|
||||
gi.require_version('Poppler', '0.18')
|
||||
from gi.repository import Poppler, GLib
|
||||
|
||||
from . import abstract
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
||||
|
||||
class PDFParser(abstract.AbstractParser):
|
||||
mimetypes = {'application/pdf', }
|
||||
meta_list = {'author', 'creation-date', 'creator', 'format', 'keywords',
|
||||
'metadata', 'mod-date', 'producer', 'subject', 'title',
|
||||
'viewer-preferences'}
|
||||
|
||||
def __init__(self, filename):
|
||||
super().__init__(filename)
|
||||
self.uri = 'file://' + os.path.abspath(self.filename)
|
||||
self.__scale = 2 # how much precision do we want for the render
|
||||
try: # Check now that the file is valid, to avoid surprises later
|
||||
Poppler.Document.new_from_file(self.uri, None)
|
||||
except GLib.GError: # Invalid PDF
|
||||
raise ValueError
|
||||
|
||||
def remove_all_lightweight(self):
|
||||
"""
|
||||
Load the document into Poppler, render pages on a new PDFSurface.
|
||||
"""
|
||||
document = Poppler.Document.new_from_file(self.uri, None)
|
||||
pages_count = document.get_n_pages()
|
||||
|
||||
tmp_path = tempfile.mkstemp()[1]
|
||||
pdf_surface = cairo.PDFSurface(tmp_path, 10, 10)
|
||||
pdf_context = cairo.Context(pdf_surface) # context draws on the surface
|
||||
|
||||
for pagenum in range(pages_count):
|
||||
logging.info("Rendering page %d/%d", pagenum + 1, pages_count)
|
||||
page = document.get_page(pagenum)
|
||||
page_width, page_height = page.get_size()
|
||||
pdf_surface.set_size(page_width, page_height)
|
||||
pdf_context.save()
|
||||
page.render_for_printing(pdf_context)
|
||||
pdf_context.restore()
|
||||
pdf_context.show_page() # draw pdf_context on pdf_surface
|
||||
pdf_surface.finish()
|
||||
|
||||
self.__remove_superficial_meta(tmp_path, self.output_filename)
|
||||
os.remove(tmp_path)
|
||||
|
||||
return True
|
||||
|
||||
def remove_all(self):
|
||||
"""
|
||||
Load the document into Poppler, render pages on PNG,
|
||||
and shove those PNG into a new PDF.
|
||||
"""
|
||||
document = Poppler.Document.new_from_file(self.uri, None)
|
||||
pages_count = document.get_n_pages()
|
||||
|
||||
_, tmp_path = tempfile.mkstemp()
|
||||
pdf_surface = cairo.PDFSurface(tmp_path, 32, 32) # resized later anyway
|
||||
pdf_context = cairo.Context(pdf_surface)
|
||||
|
||||
for pagenum in range(pages_count):
|
||||
page = document.get_page(pagenum)
|
||||
page_width, page_height = page.get_size()
|
||||
logging.info("Rendering page %d/%d", pagenum + 1, pages_count)
|
||||
|
||||
img_surface = cairo.ImageSurface(cairo.FORMAT_ARGB32, int(page_width) * self.__scale, int(page_height) * self.__scale)
|
||||
img_context = cairo.Context(img_surface)
|
||||
|
||||
img_context.scale(self.__scale, self.__scale)
|
||||
page.render_for_printing(img_context)
|
||||
img_context.show_page()
|
||||
|
||||
buf = io.BytesIO()
|
||||
img_surface.write_to_png(buf)
|
||||
img_surface.finish()
|
||||
buf.seek(0)
|
||||
|
||||
img = cairo.ImageSurface.create_from_png(buf)
|
||||
pdf_surface.set_size(page_width*self.__scale, page_height*self.__scale)
|
||||
pdf_context.set_source_surface(img, 0, 0)
|
||||
pdf_context.paint()
|
||||
pdf_context.show_page()
|
||||
|
||||
pdf_surface.finish()
|
||||
|
||||
# Removes metadata added by Poppler
|
||||
self.__remove_superficial_meta(tmp_path, self.output_filename)
|
||||
os.remove(tmp_path)
|
||||
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def __remove_superficial_meta(in_file: str, out_file: str) -> bool:
|
||||
document = Poppler.Document.new_from_file('file://' + in_file)
|
||||
document.set_producer('')
|
||||
document.set_creator('')
|
||||
document.set_creation_date(-1)
|
||||
document.save('file://' + os.path.abspath(out_file))
|
||||
return True
|
||||
|
||||
|
||||
@staticmethod
|
||||
def __parse_metadata_field(data: str) -> dict:
|
||||
metadata = {}
|
||||
for (_, key, value) in re.findall(r"<(xmp|pdfx|pdf|xmpMM):(.+)>(.+)</\1:\2>", data, re.I):
|
||||
metadata[key] = value
|
||||
return metadata
|
||||
|
||||
def get_meta(self):
|
||||
""" Return a dict with all the meta of the file
|
||||
"""
|
||||
metadata = {}
|
||||
document = Poppler.Document.new_from_file(self.uri, None)
|
||||
|
||||
for key in self.meta_list:
|
||||
if document.get_property(key):
|
||||
metadata[key] = document.get_property(key)
|
||||
if 'metadata' in metadata:
|
||||
parsed_meta = self.__parse_metadata_field(metadata['metadata'])
|
||||
return {**metadata, **parsed_meta}
|
||||
return metadata
|
126
libmat2/torrent.py
Normal file
126
libmat2/torrent.py
Normal file
|
@ -0,0 +1,126 @@
|
|||
from . import abstract
|
||||
|
||||
|
||||
class TorrentParser(abstract.AbstractParser):
|
||||
mimetypes = {'application/x-bittorrent', }
|
||||
whitelist = {b'announce', b'announce-list', b'info'}
|
||||
|
||||
def get_meta(self) -> dict:
|
||||
metadata = {}
|
||||
with open(self.filename, 'rb') as f:
|
||||
d = _BencodeHandler().bdecode(f.read())
|
||||
if d is None:
|
||||
return {'Unknown meta': 'Unable to parse torrent file "%s".' % self.filename}
|
||||
for k, v in d.items():
|
||||
if k not in self.whitelist:
|
||||
metadata[k.decode('utf-8')] = v
|
||||
return metadata
|
||||
|
||||
|
||||
def remove_all(self) -> bool:
|
||||
cleaned = dict()
|
||||
with open(self.filename, 'rb') as f:
|
||||
d = _BencodeHandler().bdecode(f.read())
|
||||
if d is None:
|
||||
return False
|
||||
for k, v in d.items():
|
||||
if k in self.whitelist:
|
||||
cleaned[k] = v
|
||||
with open(self.output_filename, 'wb') as f:
|
||||
f.write(_BencodeHandler().bencode(cleaned))
|
||||
return True
|
||||
|
||||
|
||||
class _BencodeHandler(object):
|
||||
"""
|
||||
Since bencode isn't that hard to parse,
|
||||
MAT2 comes with its own parser, based on the spec
|
||||
https://wiki.theory.org/index.php/BitTorrentSpecification#Bencoding
|
||||
"""
|
||||
def __init__(self):
|
||||
self.__decode_func = {
|
||||
ord('d'): self.__decode_dict,
|
||||
ord('i'): self.__decode_int,
|
||||
ord('l'): self.__decode_list,
|
||||
}
|
||||
for i in range(0, 10):
|
||||
self.__decode_func[ord(str(i))] = self.__decode_string
|
||||
|
||||
self.__encode_func = {
|
||||
bytes: self.__encode_string,
|
||||
dict: self.__encode_dict,
|
||||
int: self.__encode_int,
|
||||
list: self.__encode_list,
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def __decode_int(s: str) -> (int, str):
|
||||
s = s[1:]
|
||||
next_idx = s.index(b'e')
|
||||
if s.startswith(b'-0'):
|
||||
raise ValueError # negative zero doesn't exist
|
||||
elif s.startswith(b'0') and next_idx != 1:
|
||||
raise ValueError # no leading zero except for zero itself
|
||||
return int(s[:next_idx]), s[next_idx+1:]
|
||||
|
||||
@staticmethod
|
||||
def __decode_string(s: str) -> (str, str):
|
||||
sep = s.index(b':')
|
||||
str_len = int(s[:sep])
|
||||
if str_len < 0:
|
||||
raise ValueError
|
||||
elif s[0] == b'0' and sep != 1:
|
||||
raise ValueError
|
||||
s = s[1:]
|
||||
return s[sep:sep+str_len], s[sep+str_len:]
|
||||
|
||||
def __decode_list(self, s: str) -> (list, str):
|
||||
r = list()
|
||||
s = s[1:] # skip leading `l`
|
||||
while s[0] != ord('e'):
|
||||
v, s = self.__decode_func[s[0]](s)
|
||||
r.append(v)
|
||||
return r, s[1:]
|
||||
|
||||
def __decode_dict(self, s: str) -> (dict, str):
|
||||
r = dict()
|
||||
s = s[1:] # skip leading `d`
|
||||
while s[0] != ord(b'e'):
|
||||
k, s = self.__decode_string(s)
|
||||
r[k], s = self.__decode_func[s[0]](s)
|
||||
return r, s[1:]
|
||||
|
||||
@staticmethod
|
||||
def __encode_int(x: str) -> bytes:
|
||||
return b'i' + bytes(str(x), 'utf-8') + b'e'
|
||||
|
||||
@staticmethod
|
||||
def __encode_string(x: str) -> bytes:
|
||||
return bytes((str(len(x))), 'utf-8') + b':' + x
|
||||
|
||||
def __encode_list(self, x: str) -> bytes:
|
||||
ret = b''
|
||||
for i in x:
|
||||
ret += self.__encode_func[type(i)](i)
|
||||
return b'l' + ret + b'e'
|
||||
|
||||
def __encode_dict(self, x: str) -> bytes:
|
||||
ret = b''
|
||||
for k, v in sorted(x.items()):
|
||||
ret += self.__encode_func[type(k)](k)
|
||||
ret += self.__encode_func[type(v)](v)
|
||||
return b'd' + ret + b'e'
|
||||
|
||||
def bencode(self, s: str) -> bytes:
|
||||
return self.__encode_func[type(s)](s)
|
||||
|
||||
def bdecode(self, s: str):
|
||||
try:
|
||||
r, l = self.__decode_func[s[0]](s)
|
||||
except (IndexError, KeyError, ValueError) as e:
|
||||
print("not a valid bencoded string: %s" % e)
|
||||
return None
|
||||
if l != b'':
|
||||
print("invalid bencoded value (data after valid prefix)")
|
||||
return None
|
||||
return r
|
Loading…
Add table
Add a link
Reference in a new issue