From 9e485c0dad6cf67d73992efe871f68fe91b6a507 Mon Sep 17 00:00:00 2001 From: jvoisin Date: Sun, 22 Apr 2018 23:48:01 +0200 Subject: [PATCH] Clean up the torrent implementation --- src/torrent.py | 141 +++++++++++++++++++++++++------------------------ 1 file changed, 71 insertions(+), 70 deletions(-) diff --git a/src/torrent.py b/src/torrent.py index df64161..bdf83ce 100644 --- a/src/torrent.py +++ b/src/torrent.py @@ -1,75 +1,78 @@ -import os -import re -import shutil -import tempfile -import datetime -import zipfile - -from . import abstract, parser_factory - +from . import abstract class TorrentParser(abstract.AbstractParser): mimetypes = {'application/x-bittorrent', } whitelist = {b'announce', b'announce-list', b'info'} - def __init__(self, filename): - super().__init__(filename) - self.__decode_func = { - ord('l'): self.__decode_list, - ord('d'): self.__decode_dict, - ord('i'): self.__decode_int - } - for i in range(0, 10): - self.__decode_func[ord(str(i))] = self.__decode_string - - self.__encode_func = { - int: self.__encode_int, - bytes: self.__encode_string, - list: self.__encode_list, - dict: self.__encode_dict, - } - - - def get_meta(self): + def get_meta(self) -> dict: metadata = {} with open(self.filename, 'rb') as f: - d = self.__bdecode(f.read()) + d = _BencodeHandler().bdecode(f.read()) + if d is None: + return {'Unknown meta': 'Unable to parse torrent file "%s".' % self.filename} for k,v in d.items(): if k not in self.whitelist: metadata[k.decode('utf-8')] = v return metadata - def remove_all(self): + def remove_all(self) -> bool: cleaned = dict() with open(self.filename, 'rb') as f: - d = self.__bdecode(f.read()) + d = _BencodeHandler().bdecode(f.read()) + if d is None: + return False for k,v in d.items(): if k in self.whitelist: cleaned[k] = v with open(self.output_filename, 'wb') as f: - f.write(self.__bencode(cleaned)) + f.write(_BencodeHandler().bencode(cleaned)) return True - def __decode_int(self, s): + +class _BencodeHandler(object): + """ + Since bencode isn't that hard to parse, + MAT2 comes with its own parser, based on the spec + https://wiki.theory.org/index.php/BitTorrentSpecification#Bencoding + """ + def __init__(self): + self.__decode_func = { + ord('d'): self.__decode_dict, + ord('i'): self.__decode_int, + ord('l'): self.__decode_list, + } + for i in range(0, 10): + self.__decode_func[ord(str(i))] = self.__decode_string + + self.__encode_func = { + bytes: self.__encode_string, + dict: self.__encode_dict, + int: self.__encode_int, + list: self.__encode_list, + } + + def __decode_int(self, s:str) -> (int, str): s = s[1:] next_idx = s.index(b'e') if s.startswith(b'-0'): raise ValueError # negative zero doesn't exist - if s.startswith(b'0') and next_idx != 1: + elif s.startswith(b'0') and next_idx != 1: raise ValueError # no leading zero except for zero itself return int(s[:next_idx]), s[next_idx+1:] - def __decode_string(self, s): - end = s.index(b':') - str_len = int(s[:end]) - if s[0] == b'0' and end != 1: + def __decode_string(self, s:str) -> (str, str): + sep = s.index(b':') + str_len = int(s[:sep]) + if str_len < 0: raise ValueError - s = s[1:] # skip terminal `:` - return s[end:end+str_len], s[end+str_len:] + elif s[0] == b'0' and sep != 1: + raise ValueError + s = s[1:] + return s[sep:sep+str_len], s[sep+str_len:] - def __decode_list(self, s): + def __decode_list(self, s:str) -> (list, str): r = list() s = s[1:] # skip leading `l` while s[0] != ord('e'): @@ -77,15 +80,39 @@ class TorrentParser(abstract.AbstractParser): r.append(v) return r, s[1:] - def __decode_dict(self, s): + def __decode_dict(self, s:str) -> (dict, str): r = dict() - s = s[1:] + s = s[1:] # skip leading `d` while s[0] != ord(b'e'): k, s = self.__decode_string(s) r[k], s = self.__decode_func[s[0]](s) return r, s[1:] - def __bdecode(self, s): + @staticmethod + def __encode_int(x:str) -> bytes: + return b'i' + bytes(str(x), 'utf-8') + b'e' + + @staticmethod + def __encode_string(x:str) -> bytes: + return bytes((str(len(x))), 'utf-8') + b':' + x + + def __encode_list(self, x:str) -> bytes: + ret = b'' + for i in x: + ret += self.__encode_func[type(i)](i) + return b'l' + ret + b'e' + + def __encode_dict(self, x:str) -> bytes: + ret = b'' + for k, v in sorted(x.items()): + ret += self.__encode_func[type(k)](k) + ret += self.__encode_func[type(v)](v) + return b'd' + ret + b'e' + + def bencode(self, s:str) -> bytes: + return self.__encode_func[type(s)](s) + + def bdecode(self, s:str): try: r, l = self.__decode_func[s[0]](s) except (IndexError, KeyError, ValueError) as e: @@ -95,29 +122,3 @@ class TorrentParser(abstract.AbstractParser): print("invalid bencoded value (data after valid prefix)") return None return r - - @staticmethod - def __encode_int(x): - return b'i' + bytes(str(x), 'utf-8') + b'e' - - @staticmethod - def __encode_string(x:str): - return bytes((str(len(x))), 'utf-8') + b':' + x - - def __encode_list(self, x): - ret = b'' - for i in x: - ret += self.__encode_func[type(i)](i) - return b'l' + ret + b'e' - - def __encode_dict(self, x): - ret = b'' - for k, v in sorted(x.items()): - ret += self.__encode_func[type(k)](k) - ret += self.__encode_func[type(v)](v) - return b'd' + ret + b'e' - - def __bencode(self, x): - return self.__encode_func[type(x)](x) - -