diff --git a/libmat2/epub.py b/libmat2/epub.py new file mode 100644 index 0000000..09b7937 --- /dev/null +++ b/libmat2/epub.py @@ -0,0 +1,47 @@ +import logging +import re +import xml.etree.ElementTree as ET # type: ignore + +from . import archive, office + +class EPUBParser(archive.ArchiveBasedAbstractParser): + mimetypes = {'application/epub+zip', } + + def __init__(self, filename): + super().__init__(filename) + self.files_to_keep = set(map(re.compile, { # type: ignore + 'META-INF/container.xml', + 'mimetype', + 'OEBPS/content.opf', + })) + + def _specific_get_meta(self, full_path, file_path): + if file_path != 'OEBPS/content.opf': + return {} + + with open(full_path, encoding='utf-8') as f: + try: + results = re.findall(r"<((?:meta|dc|cp).+?)[^>]*>(.+)", + f.read(), re.I|re.M) + return {k:v for (k, v) in results} + except (TypeError, UnicodeDecodeError): + # We didn't manage to parse the xml file + return {file_path: 'harmful content', } + + def _specific_cleanup(self, full_path: str): + if not full_path.endswith('OEBPS/content.opf'): + return True + + try: + tree, namespace = office._parse_xml(full_path) + except ET.ParseError: + logging.error("Unable to parse %s in %s.", full_path, self.filename) + return False + parent_map = {c:p for p in tree.iter() for c in p} + + for item in tree.iterfind('.//', namespace): + if item.tag.strip().lower().endswith('metadata'): + parent_map[item].remove(item) + break # there is only a single block + tree.write(full_path, xml_declaration=True) + return True diff --git a/libmat2/html.py b/libmat2/html.py deleted file mode 100644 index d0e9a2b..0000000 --- a/libmat2/html.py +++ /dev/null @@ -1,69 +0,0 @@ -from html import parser -from typing import Dict, Any, List, Tuple - -from . import abstract - - -class HTMLParser(abstract.AbstractParser): - mimetypes = {'text/html', } - def __init__(self, filename): - super().__init__(filename) - self.__parser = _HTMLParser() - with open(filename) as f: - self.__parser.feed(f.read()) - self.__parser.close() - - def get_meta(self) -> Dict[str, Any]: - return self.__parser.get_meta() - - def remove_all(self) -> bool: - return self.__parser.remove_all(self.output_filename) - - -class _HTMLParser(parser.HTMLParser): - """Python doesn't have a validating html parser in its stdlib, so - we're using an internal queue to track all the opening/closing tags, - and hoping for the best. - """ - def __init__(self): - super().__init__() - self.__textrepr = '' - self.__meta = {} - self.__validation_queue = [] - - def handle_starttag(self, tag: str, attrs: List[Tuple[str, str]]): - self.__textrepr += self.get_starttag_text() - self.__validation_queue.append(tag) - - def handle_endtag(self, tag: str): - if not self.__validation_queue: - raise ValueError - elif tag != self.__validation_queue.pop(): - raise ValueError - # There is no `get_endtag_text()` method :/ - self.__textrepr += '\n' - - def handle_data(self, data: str): - if data.strip(): - self.__textrepr += data - - def handle_startendtag(self, tag: str, attrs: List[Tuple[str, str]]): - if tag == 'meta': - meta = {k:v for k, v in attrs} - name = meta.get('name', 'harmful metadata') - content = meta.get('content', 'harmful data') - self.__meta[name] = content - else: - self.__textrepr += self.get_starttag_text() - - def remove_all(self, output_filename: str) -> bool: - if self.__validation_queue: - raise ValueError - with open(output_filename, 'w') as f: - f.write(self.__textrepr) - return True - - def get_meta(self) -> Dict[str, Any]: - if self.__validation_queue: - raise ValueError - return self.__meta diff --git a/libmat2/parser_factory.py b/libmat2/parser_factory.py index 30c3b52..e93ee4f 100644 --- a/libmat2/parser_factory.py +++ b/libmat2/parser_factory.py @@ -1,3 +1,4 @@ +import logging import glob import os import mimetypes @@ -10,6 +11,10 @@ assert Tuple # make pyflakes happy T = TypeVar('T', bound='abstract.AbstractParser') +mimetypes.add_type('application/epub+zip', '.epub') +# EPUB Navigation Control XML File +mimetypes.add_type('application/x-dtbncx+xml', '.ncx') + def __load_all_parsers(): """ Loads every parser in a dynamic way """ @@ -49,6 +54,8 @@ def get_parser(filename: str) -> Tuple[Optional[T], Optional[str]]: if mtype in parser_class.mimetypes: try: return parser_class(filename), mtype - except ValueError: + except ValueError as e: + logging.info("Got an exception when trying to instanciate " + "%s for %s: %s", parser_class, filename, e) return None, mtype return None, mtype diff --git a/libmat2/web.py b/libmat2/web.py new file mode 100644 index 0000000..13d5fc8 --- /dev/null +++ b/libmat2/web.py @@ -0,0 +1,122 @@ +from html import parser +from typing import Dict, Any, List, Tuple +import re +import string + +from . import abstract + + +class CSSParser(abstract.AbstractParser): + """There is no such things as metadata in CSS files, + only comments of the form `/* … */`, so we're removing the laters.""" + mimetypes = {'text/css', } + flags = re.MULTILINE | re.DOTALL + + def remove_all(self) -> bool: + with open(self.filename, encoding='utf-8') as f: + cleaned = re.sub(r'/\*.+?\*/', '', f.read(), 0, self.flags) + with open(self.output_filename, 'w', encoding='utf-8') as f: + f.write(cleaned) + return True + + def get_meta(self) -> Dict[str, Any]: + metadata = {} + with open(self.filename, encoding='utf-8') as f: + cssdoc = re.findall(r'/\*(.+?)\*/', f.read(), self.flags) + for match in cssdoc: + for line in match.splitlines(): + try: + k, v = line.split(':') + metadata[k.strip(string.whitespace + '*')] = v.strip() + except ValueError: + metadata['harmful data'] = line.strip() + return metadata + + +class HTMLParser(abstract.AbstractParser): + mimetypes = {'text/html', 'application/x-dtbncx+xml', } + def __init__(self, filename): + super().__init__(filename) + self.__parser = _HTMLParser(self.filename) + with open(filename, encoding='utf-8') as f: + self.__parser.feed(f.read()) + self.__parser.close() + + def get_meta(self) -> Dict[str, Any]: + return self.__parser.get_meta() + + def remove_all(self) -> bool: + return self.__parser.remove_all(self.output_filename) + + +class _HTMLParser(parser.HTMLParser): + """Python doesn't have a validating html parser in its stdlib, so + we're using an internal queue to track all the opening/closing tags, + and hoping for the best. + """ + tag_blacklist = {'doctitle', 'meta'} # everything is lowercase + def __init__(self, filename): + super().__init__() + self.filename = filename + self.__textrepr = '' + self.__meta = {} + self.__validation_queue = [] + # We're using a counter instead of a boolean to handle nested tags + self.__in_dangerous_tag = 0 + + def handle_starttag(self, tag: str, attrs: List[Tuple[str, str]]): + self.__validation_queue.append(tag) + if tag in self.tag_blacklist: + self.__in_dangerous_tag += 1 + return + + if self.__in_dangerous_tag == 0: + self.__textrepr += self.get_starttag_text() + + def handle_endtag(self, tag: str): + if not self.__validation_queue: + raise ValueError("The closing tag %s doesn't have a corresponding " + "opening one in %s." % (tag, self.filename)) + + previous_tag = self.__validation_queue.pop() + if tag != previous_tag: + raise ValueError("The closing tag %s doesn't match the previous " + "tag %s in %s" % + (tag, previous_tag, self.filename)) + elif tag in self.tag_blacklist: + self.__in_dangerous_tag -= 1 + return + + if self.__in_dangerous_tag == 0: + # There is no `get_endtag_text()` method :/ + self.__textrepr += '\n' + + def handle_data(self, data: str): + if self.__in_dangerous_tag == 0 and data.strip(): + self.__textrepr += data + + def handle_startendtag(self, tag: str, attrs: List[Tuple[str, str]]): + if tag in self.tag_blacklist: + meta = {k:v for k, v in attrs} + name = meta.get('name', 'harmful metadata') + content = meta.get('content', 'harmful data') + self.__meta[name] = content + else: + if self.__in_dangerous_tag == 0: + self.__textrepr += self.get_starttag_text() + + def remove_all(self, output_filename: str) -> bool: + if self.__validation_queue: + raise ValueError("Some tags (%s) were left unclosed in %s" % ( + ', '.join(self.__validation_queue), + self.filename)) + with open(output_filename, 'w', encoding='utf-8') as f: + f.write(self.__textrepr) + return True + + def get_meta(self) -> Dict[str, Any]: + if self.__validation_queue: + raise ValueError("Some tags (%s) were left unclosed in %s" % ( + ', '.join(self.__validation_queue), + self.filename)) + return self.__meta diff --git a/tests/data/dirty.css b/tests/data/dirty.css new file mode 100644 index 0000000..f52caf9 --- /dev/null +++ b/tests/data/dirty.css @@ -0,0 +1,14 @@ +/** + * This is my super css framework + * version: 1.0 + * author : jvoisin + */ + +body { + color: red; + background-color: blue; +} + +.underline { + text-decoration: underline; /* underline is cool */ +} diff --git a/tests/data/dirty.epub b/tests/data/dirty.epub new file mode 100644 index 0000000..6389963 Binary files /dev/null and b/tests/data/dirty.epub differ diff --git a/tests/dirty.epub b/tests/dirty.epub new file mode 100644 index 0000000..6389963 Binary files /dev/null and b/tests/dirty.epub differ diff --git a/tests/test_corrupted_files.py b/tests/test_corrupted_files.py index 8728cb2..53c856a 100644 --- a/tests/test_corrupted_files.py +++ b/tests/test_corrupted_files.py @@ -7,7 +7,7 @@ import logging import zipfile from libmat2 import pdf, images, audio, office, parser_factory, torrent -from libmat2 import harmless, video, html +from libmat2 import harmless, video, web # No need to logging messages, should something go wrong, # the testsuite _will_ fail. @@ -220,34 +220,34 @@ class TestCorruptedFiles(unittest.TestCase): os.remove('./tests/data/--output.avi') def test_zip(self): - with zipfile.ZipFile('./tests/data/dirty.zip', 'w') as zout: + with zipfile.ZipFile('./tests/data/clean.zip', 'w') as zout: zout.write('./tests/data/dirty.flac') zout.write('./tests/data/dirty.docx') zout.write('./tests/data/dirty.jpg') zout.write('./tests/data/embedded_corrupted.docx') - p, mimetype = parser_factory.get_parser('./tests/data/dirty.zip') + p, mimetype = parser_factory.get_parser('./tests/data/clean.zip') self.assertEqual(mimetype, 'application/zip') meta = p.get_meta() self.assertEqual(meta['tests/data/dirty.flac']['comments'], 'Thank you for using MAT !') self.assertEqual(meta['tests/data/dirty.docx']['word/media/image1.png']['Comment'], 'This is a comment, be careful!') self.assertFalse(p.remove_all()) - os.remove('./tests/data/dirty.zip') + os.remove('./tests/data/clean.zip') def test_html(self): shutil.copy('./tests/data/dirty.html', './tests/data/clean.html') with open('./tests/data/clean.html', 'a') as f: f.write('but not') with self.assertRaises(ValueError): - html.HTMLParser('./tests/data/clean.html') + web.HTMLParser('./tests/data/clean.html') os.remove('./tests/data/clean.html') # Yes, we're able to deal with malformed html :/ shutil.copy('./tests/data/dirty.html', './tests/data/clean.html') with open('./tests/data/clean.html', 'a') as f: f.write('') - p = html.HTMLParser('./tests/data/clean.html') + p = web.HTMLParser('./tests/data/clean.html') self.assertTrue(p.remove_all()) - p = html.HTMLParser('./tests/data/clean.cleaned.html') + p = web.HTMLParser('./tests/data/clean.cleaned.html') self.assertEqual(p.get_meta(), {}) os.remove('./tests/data/clean.html') os.remove('./tests/data/clean.cleaned.html') @@ -255,17 +255,38 @@ class TestCorruptedFiles(unittest.TestCase): with open('./tests/data/clean.html', 'w') as f: f.write('') with self.assertRaises(ValueError): - html.HTMLParser('./tests/data/clean.html') + web.HTMLParser('./tests/data/clean.html') os.remove('./tests/data/clean.html') with open('./tests/data/clean.html', 'w') as f: f.write('') - p = html.HTMLParser('./tests/data/clean.html') + p = web.HTMLParser('./tests/data/clean.html') with self.assertRaises(ValueError): p.get_meta() - p = html.HTMLParser('./tests/data/clean.html') + p = web.HTMLParser('./tests/data/clean.html') with self.assertRaises(ValueError): p.remove_all() os.remove('./tests/data/clean.html') + with open('./tests/data/clean.html', 'w') as f: + f.write('

') + p = web.HTMLParser('./tests/data/clean.html') + with self.assertRaises(ValueError): + p.get_meta() + p = web.HTMLParser('./tests/data/clean.html') + with self.assertRaises(ValueError): + p.remove_all() + os.remove('./tests/data/clean.html') + + def test_epub(self): + with zipfile.ZipFile('./tests/data/clean.epub', 'w') as zout: + zout.write('./tests/data/dirty.jpg', 'OEBPS/content.opf') + p, mimetype = parser_factory.get_parser('./tests/data/clean.epub') + self.assertEqual(mimetype, 'application/epub+zip') + meta = p.get_meta() + self.assertEqual(meta['OEBPS/content.opf']['OEBPS/content.opf'], + 'harmful content') + + self.assertFalse(p.remove_all()) + os.remove('./tests/data/clean.epub') diff --git a/tests/test_libmat2.py b/tests/test_libmat2.py index 8753e09..249c56d 100644 --- a/tests/test_libmat2.py +++ b/tests/test_libmat2.py @@ -6,7 +6,7 @@ import os import zipfile from libmat2 import pdf, images, audio, office, parser_factory, torrent, harmless -from libmat2 import check_dependencies, video, archive, html +from libmat2 import check_dependencies, video, archive, web, epub class TestCheckDependencies(unittest.TestCase): @@ -177,6 +177,23 @@ class TestGetMeta(unittest.TestCase): meta = p.get_meta() self.assertEqual(meta['Comment'], 'this is a test comment') + def test_epub(self): + p, mimetype = parser_factory.get_parser('./tests/data/dirty.epub') + self.assertEqual(mimetype, 'application/epub+zip') + meta = p.get_meta() + self.assertEqual(meta['OEBPS/content.opf']['dc:creator'], 'Dorothy L. Sayers') + self.assertEqual(meta['OEBPS/toc.ncx']['dtb:generator'], 'Ebookmaker 0.4.0a5 by Marcello Perathoner ') + self.assertEqual(meta['OEBPS/@public@vhost@g@gutenberg@html@files@58820@58820-h@images@shield25.jpg']['CreatorTool'], 'Adobe Photoshop CS5 Macintosh') + self.assertEqual(meta['OEBPS/@public@vhost@g@gutenberg@html@files@58820@58820-h@58820-h-2.htm.html']['generator'], 'Ebookmaker 0.4.0a5 by Marcello Perathoner ') + + def test_css(self): + p, mimetype = parser_factory.get_parser('./tests/data/dirty.css') + self.assertEqual(mimetype, 'text/css') + meta = p.get_meta() + self.assertEqual(meta['author'], 'jvoisin') + self.assertEqual(meta['version'], '1.0') + self.assertEqual(meta['harmful data'], 'underline is cool') + class TestRemovingThumbnails(unittest.TestCase): def test_odt(self): shutil.copy('./tests/data/revision.odt', './tests/data/clean.odt') @@ -599,7 +616,7 @@ class TestCleaning(unittest.TestCase): def test_html(self): shutil.copy('./tests/data/dirty.html', './tests/data/clean.html') - p = html.HTMLParser('./tests/data/clean.html') + p = web.HTMLParser('./tests/data/clean.html') meta = p.get_meta() self.assertEqual(meta['author'], 'jvoisin') @@ -607,10 +624,50 @@ class TestCleaning(unittest.TestCase): ret = p.remove_all() self.assertTrue(ret) - p = html.HTMLParser('./tests/data/clean.cleaned.html') + p = web.HTMLParser('./tests/data/clean.cleaned.html') self.assertEqual(p.get_meta(), {}) self.assertTrue(p.remove_all()) os.remove('./tests/data/clean.html') os.remove('./tests/data/clean.cleaned.html') os.remove('./tests/data/clean.cleaned.cleaned.html') + + + def test_epub(self): + shutil.copy('./tests/data/dirty.epub', './tests/data/clean.epub') + p = epub.EPUBParser('./tests/data/clean.epub') + + meta = p.get_meta() + self.assertEqual(meta['OEBPS/content.opf']['dc:source'], 'http://www.gutenberg.org/files/58820/58820-h/58820-h.htm') + + ret = p.remove_all() + self.assertTrue(ret) + + p = epub.EPUBParser('./tests/data/clean.cleaned.epub') + self.assertEqual(p.get_meta(), {}) + self.assertTrue(p.remove_all()) + + os.remove('./tests/data/clean.epub') + os.remove('./tests/data/clean.cleaned.epub') + os.remove('./tests/data/clean.cleaned.cleaned.epub') + + + def test_css(self): + shutil.copy('./tests/data/dirty.css', './tests/data/clean.css') + p = web.CSSParser('./tests/data/clean.css') + + self.assertEqual(p.get_meta(), { + 'harmful data': 'underline is cool', + 'version': '1.0', + 'author': 'jvoisin'}) + + ret = p.remove_all() + self.assertTrue(ret) + + p = web.CSSParser('./tests/data/clean.cleaned.css') + self.assertEqual(p.get_meta(), {}) + self.assertTrue(p.remove_all()) + + os.remove('./tests/data/clean.css') + os.remove('./tests/data/clean.cleaned.css') + os.remove('./tests/data/clean.cleaned.cleaned.css')