1
0
mirror of synced 2024-11-22 09:14:23 +01:00

Improve a bit our coverage wrt. torrent files handling

This commit is contained in:
jvoisin 2018-06-10 00:56:55 +02:00
parent 3c56fa3237
commit 87bdcd1a95
2 changed files with 22 additions and 2 deletions

View File

@ -1,4 +1,6 @@
import logging
from typing import Union, Tuple, Dict from typing import Union, Tuple, Dict
from . import abstract from . import abstract
@ -58,6 +60,8 @@ class _BencodeHandler(object):
def __decode_int(s: bytes) -> Tuple[int, bytes]: def __decode_int(s: bytes) -> Tuple[int, bytes]:
s = s[1:] s = s[1:]
next_idx = s.index(b'e') next_idx = s.index(b'e')
if next_idx is None:
raise ValueError # missing suffix
if s.startswith(b'-0'): if s.startswith(b'-0'):
raise ValueError # negative zero doesn't exist raise ValueError # negative zero doesn't exist
elif s.startswith(b'0') and next_idx != 1: elif s.startswith(b'0') and next_idx != 1:
@ -67,6 +71,8 @@ class _BencodeHandler(object):
@staticmethod @staticmethod
def __decode_string(s: bytes) -> Tuple[bytes, bytes]: def __decode_string(s: bytes) -> Tuple[bytes, bytes]:
sep = s.index(b':') sep = s.index(b':')
if set is None:
raise ValueError # missing suffix
str_len = int(s[:sep]) str_len = int(s[:sep])
if str_len < 0: if str_len < 0:
raise ValueError raise ValueError
@ -119,9 +125,9 @@ class _BencodeHandler(object):
try: try:
r, l = self.__decode_func[s[0]](s) r, l = self.__decode_func[s[0]](s)
except (IndexError, KeyError, ValueError) as e: except (IndexError, KeyError, ValueError) as e:
print("not a valid bencoded string: %s" % e) logging.debug("Not a valid bencoded string: %s" % e)
return None return None
if l != b'': if l != b'':
print("invalid bencoded value (data after valid prefix)") logging.debug("Invalid bencoded value (data after valid prefix)")
return None return None
return r return r

View File

@ -57,6 +57,20 @@ class TestCorruptedFiles(unittest.TestCase):
images.PNGParser('./tests/data/clean.pdf') images.PNGParser('./tests/data/clean.pdf')
os.remove('./tests/data/clean.pdf') os.remove('./tests/data/clean.pdf')
def test_torrent(self):
shutil.copy('./tests/data/dirty.png', './tests/data/clean.torrent')
p = torrent.TorrentParser('./tests/data/clean.torrent')
self.assertFalse(p.remove_all())
expected = {'Unknown meta': 'Unable to parse torrent file "./tests/data/clean.torrent".'}
self.assertEqual(p.get_meta(), expected)
with open("./tests/data/clean.torrent", "a") as f:
f.write("trailing garbage")
p = torrent.TorrentParser('./tests/data/clean.torrent')
self.assertEqual(p.get_meta(), expected)
os.remove('./tests/data/clean.torrent')
class TestGetMeta(unittest.TestCase): class TestGetMeta(unittest.TestCase):
def test_pdf(self): def test_pdf(self):
p = pdf.PDFParser('./tests/data/dirty.pdf') p = pdf.PDFParser('./tests/data/dirty.pdf')