Simplify how torrent files are handled
- Rework the testsuite wrt. torrent - fail at parser's instantiation on corrupted torrent, instead of during `get_meta` or `remove_all` call
This commit is contained in:
parent
7ea362d908
commit
b5fcddd6a6
@ -8,13 +8,16 @@ class TorrentParser(abstract.AbstractParser):
|
|||||||
mimetypes = {'application/x-bittorrent', }
|
mimetypes = {'application/x-bittorrent', }
|
||||||
whitelist = {b'announce', b'announce-list', b'info'}
|
whitelist = {b'announce', b'announce-list', b'info'}
|
||||||
|
|
||||||
|
def __init__(self, filename):
|
||||||
|
super().__init__(filename)
|
||||||
|
with open(self.filename, 'rb') as f:
|
||||||
|
self.dict_repr = _BencodeHandler().bdecode(f.read())
|
||||||
|
if self.dict_repr is None:
|
||||||
|
raise ValueError
|
||||||
|
|
||||||
def get_meta(self) -> Dict[str, str]:
|
def get_meta(self) -> Dict[str, str]:
|
||||||
metadata = {}
|
metadata = {}
|
||||||
with open(self.filename, 'rb') as f:
|
for k, v in self.dict_repr.items():
|
||||||
d = _BencodeHandler().bdecode(f.read())
|
|
||||||
if d is None:
|
|
||||||
return {'Unknown meta': 'Unable to parse torrent file "%s".' % self.filename}
|
|
||||||
for k, v in d.items():
|
|
||||||
if k not in self.whitelist:
|
if k not in self.whitelist:
|
||||||
metadata[k.decode('utf-8')] = v
|
metadata[k.decode('utf-8')] = v
|
||||||
return metadata
|
return metadata
|
||||||
@ -22,15 +25,12 @@ class TorrentParser(abstract.AbstractParser):
|
|||||||
|
|
||||||
def remove_all(self) -> bool:
|
def remove_all(self) -> bool:
|
||||||
cleaned = dict()
|
cleaned = dict()
|
||||||
with open(self.filename, 'rb') as f:
|
for k, v in self.dict_repr.items():
|
||||||
d = _BencodeHandler().bdecode(f.read())
|
|
||||||
if d is None:
|
|
||||||
return False
|
|
||||||
for k, v in d.items():
|
|
||||||
if k in self.whitelist:
|
if k in self.whitelist:
|
||||||
cleaned[k] = v
|
cleaned[k] = v
|
||||||
with open(self.output_filename, 'wb') as f:
|
with open(self.output_filename, 'wb') as f:
|
||||||
f.write(_BencodeHandler().bencode(cleaned))
|
f.write(_BencodeHandler().bencode(cleaned))
|
||||||
|
self.dict_repr = cleaned # since we're stateful
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
@ -46,15 +46,14 @@ class TestCorruptedFiles(unittest.TestCase):
|
|||||||
|
|
||||||
def test_torrent(self):
|
def test_torrent(self):
|
||||||
shutil.copy('./tests/data/dirty.png', './tests/data/clean.torrent')
|
shutil.copy('./tests/data/dirty.png', './tests/data/clean.torrent')
|
||||||
p = torrent.TorrentParser('./tests/data/clean.torrent')
|
with self.assertRaises(ValueError):
|
||||||
self.assertFalse(p.remove_all())
|
torrent.TorrentParser('./tests/data/clean.torrent')
|
||||||
expected = {'Unknown meta': 'Unable to parse torrent file "./tests/data/clean.torrent".'}
|
|
||||||
self.assertEqual(p.get_meta(), expected)
|
|
||||||
|
|
||||||
with open("./tests/data/clean.torrent", "a") as f:
|
with open("./tests/data/clean.torrent", "a") as f:
|
||||||
f.write("trailing garbage")
|
f.write("trailing garbage")
|
||||||
p = torrent.TorrentParser('./tests/data/clean.torrent')
|
with self.assertRaises(ValueError):
|
||||||
self.assertEqual(p.get_meta(), expected)
|
torrent.TorrentParser('./tests/data/clean.torrent')
|
||||||
|
|
||||||
os.remove('./tests/data/clean.torrent')
|
os.remove('./tests/data/clean.torrent')
|
||||||
|
|
||||||
def test_odg(self):
|
def test_odg(self):
|
||||||
|
Loading…
Reference in New Issue
Block a user