1
0
mirror of synced 2024-11-22 09:14:23 +01:00
mat2/libmat2/torrent.py

128 lines
4.2 KiB
Python
Raw Normal View History

import logging
2018-06-04 22:54:01 +02:00
from typing import Union, Tuple, Dict
2018-04-22 23:48:01 +02:00
from . import abstract
2018-04-22 22:02:00 +02:00
class TorrentParser(abstract.AbstractParser):
2018-06-04 22:54:01 +02:00
mimetypes = {'application/x-bittorrent', }
2018-04-22 22:02:00 +02:00
whitelist = {b'announce', b'announce-list', b'info'}
def __init__(self, filename):
super().__init__(filename)
with open(self.filename, 'rb') as f:
self.dict_repr = _BencodeHandler().bdecode(f.read())
if self.dict_repr is None:
raise ValueError
2018-06-04 22:54:01 +02:00
def get_meta(self) -> Dict[str, str]:
2018-04-22 22:02:00 +02:00
metadata = {}
for key, value in self.dict_repr.items():
if key not in self.whitelist:
metadata[key.decode('utf-8')] = value
2018-04-22 22:02:00 +02:00
return metadata
2018-04-22 23:48:01 +02:00
def remove_all(self) -> bool:
2018-04-22 22:02:00 +02:00
cleaned = dict()
for key, value in self.dict_repr.items():
if key in self.whitelist:
cleaned[key] = value
2018-04-22 22:02:00 +02:00
with open(self.output_filename, 'wb') as f:
2018-04-22 23:48:01 +02:00
f.write(_BencodeHandler().bencode(cleaned))
self.dict_repr = cleaned # since we're stateful
2018-04-22 22:02:00 +02:00
return True
2018-04-22 23:48:01 +02:00
class _BencodeHandler(object):
"""
Since bencode isn't that hard to parse,
MAT2 comes with its own parser, based on the spec
https://wiki.theory.org/index.php/BitTorrentSpecification#Bencoding
"""
def __init__(self):
self.__decode_func = {
2018-05-16 22:36:59 +02:00
ord('d'): self.__decode_dict,
ord('i'): self.__decode_int,
ord('l'): self.__decode_list,
}
2018-04-22 23:48:01 +02:00
for i in range(0, 10):
self.__decode_func[ord(str(i))] = self.__decode_string
self.__encode_func = {
2018-05-16 22:36:59 +02:00
bytes: self.__encode_string,
dict: self.__encode_dict,
int: self.__encode_int,
list: self.__encode_list,
2018-04-22 23:48:01 +02:00
}
2018-05-16 22:36:59 +02:00
@staticmethod
2018-06-04 22:54:01 +02:00
def __decode_int(s: bytes) -> Tuple[int, bytes]:
2018-04-22 22:02:00 +02:00
s = s[1:]
next_idx = s.index(b'e')
if s.startswith(b'-0'):
raise ValueError # negative zero doesn't exist
2018-04-22 23:48:01 +02:00
elif s.startswith(b'0') and next_idx != 1:
2018-04-22 22:02:00 +02:00
raise ValueError # no leading zero except for zero itself
return int(s[:next_idx]), s[next_idx+1:]
2018-05-16 22:36:59 +02:00
@staticmethod
2018-06-04 22:54:01 +02:00
def __decode_string(s: bytes) -> Tuple[bytes, bytes]:
2018-07-08 15:13:03 +02:00
colon = s.index(b':')
2018-07-08 22:27:37 +02:00
# FIXME Python3 is broken here, the call to `ord` shouldn't be needed,
# but apparently it is. This is utterly idiotic.
if (s[0] == ord('0') or s[0] == '0') and colon != 1:
2018-04-22 23:48:01 +02:00
raise ValueError
2018-07-08 22:27:37 +02:00
str_len = int(s[:colon])
2018-04-22 23:48:01 +02:00
s = s[1:]
2018-07-08 15:13:03 +02:00
return s[colon:colon+str_len], s[colon+str_len:]
2018-04-22 22:02:00 +02:00
2018-06-04 22:54:01 +02:00
def __decode_list(self, s: bytes) -> Tuple[list, bytes]:
ret = list()
2018-04-22 22:02:00 +02:00
s = s[1:] # skip leading `l`
while s[0] != ord('e'):
value, s = self.__decode_func[s[0]](s)
ret.append(value)
return ret, s[1:]
2018-04-22 22:02:00 +02:00
2018-06-04 22:54:01 +02:00
def __decode_dict(self, s: bytes) -> Tuple[dict, bytes]:
ret = dict()
2018-04-22 23:48:01 +02:00
s = s[1:] # skip leading `d`
2018-04-22 22:02:00 +02:00
while s[0] != ord(b'e'):
key, s = self.__decode_string(s)
ret[key], s = self.__decode_func[s[0]](s)
return ret, s[1:]
2018-04-22 22:02:00 +02:00
@staticmethod
2018-06-04 22:54:01 +02:00
def __encode_int(x: bytes) -> bytes:
2018-04-22 22:02:00 +02:00
return b'i' + bytes(str(x), 'utf-8') + b'e'
@staticmethod
2018-06-04 22:54:01 +02:00
def __encode_string(x: bytes) -> bytes:
2018-04-22 22:02:00 +02:00
return bytes((str(len(x))), 'utf-8') + b':' + x
2018-05-16 22:36:59 +02:00
def __encode_list(self, x: str) -> bytes:
2018-04-22 22:02:00 +02:00
ret = b''
for i in x:
ret += self.__encode_func[type(i)](i)
return b'l' + ret + b'e'
2018-06-04 22:54:01 +02:00
def __encode_dict(self, x: dict) -> bytes:
2018-04-22 22:02:00 +02:00
ret = b''
for key, value in sorted(x.items()):
ret += self.__encode_func[type(key)](key)
ret += self.__encode_func[type(value)](value)
2018-04-22 22:02:00 +02:00
return b'd' + ret + b'e'
2018-06-04 22:54:01 +02:00
def bencode(self, s: Union[dict, list, bytes, int]) -> bytes:
2018-04-22 23:48:01 +02:00
return self.__encode_func[type(s)](s)
2018-04-22 22:02:00 +02:00
2018-06-04 22:54:01 +02:00
def bdecode(self, s: bytes) -> Union[dict, None]:
2018-04-22 23:48:01 +02:00
try:
ret, trail = self.__decode_func[s[0]](s)
2018-04-22 23:48:01 +02:00
except (IndexError, KeyError, ValueError) as e:
2018-09-01 14:14:32 +02:00
logging.warning("Not a valid bencoded string: %s", e)
2018-04-22 23:48:01 +02:00
return None
if trail != b'':
2018-09-01 14:14:32 +02:00
logging.warning("Invalid bencoded value (data after valid prefix)")
2018-04-22 23:48:01 +02:00
return None
return ret