Refactor office document handling
This commit is contained in:
parent
2d7c703c52
commit
eac51dbc99
@ -6,8 +6,8 @@ class AbstractParser(object):
|
||||
self.filename = filename
|
||||
self.output_filename = filename + '.cleaned'
|
||||
|
||||
def get_meta(self):
|
||||
def get_meta(self) -> dict:
|
||||
raise NotImplementedError
|
||||
|
||||
def remove_all(self):
|
||||
def remove_all(self) -> bool:
|
||||
raise NotImplementedError
|
||||
|
@ -1,68 +0,0 @@
|
||||
import re
|
||||
import subprocess
|
||||
import json
|
||||
import zipfile
|
||||
import tempfile
|
||||
import shutil
|
||||
import os
|
||||
|
||||
from . import abstract, parser_factory
|
||||
|
||||
class LibreOfficeParser(abstract.AbstractParser):
|
||||
mimetypes = {
|
||||
'application/vnd.oasis.opendocument.text',
|
||||
'application/vnd.oasis.opendocument.spreadsheet',
|
||||
'application/vnd.oasis.opendocument.presentation',
|
||||
'application/vnd.oasis.opendocument.graphics',
|
||||
'application/vnd.oasis.opendocument.chart'
|
||||
}
|
||||
|
||||
def get_meta(self):
|
||||
"""
|
||||
Yes, I know that parsing xml with regexp ain't pretty,
|
||||
be my guest and fix it if you want.
|
||||
"""
|
||||
metadata = {}
|
||||
zipin = zipfile.ZipFile(self.filename)
|
||||
for item in zipin.namelist():
|
||||
if item == 'meta.xml':
|
||||
content = zipin.read(item).decode('utf-8')
|
||||
for (key, value) in re.findall(r"<((?:meta|dc).+?)>(.+)</\1>", content, re.I):
|
||||
metadata[key] = value
|
||||
if not metadata: # better safe than sorry
|
||||
metadata[item] = 'harmful content'
|
||||
zipin.close()
|
||||
return metadata
|
||||
|
||||
def __clean_zipinfo(self, zipinfo:zipfile.ZipInfo) -> zipfile.ZipInfo:
|
||||
zipinfo.compress_type = zipfile.ZIP_DEFLATED
|
||||
zipinfo.create_system = 3 # Linux
|
||||
zipinfo.comment = b''
|
||||
zipinfo.date_time = (1980, 1, 1, 0, 0, 0)
|
||||
return zipinfo
|
||||
|
||||
def remove_all(self):
|
||||
zin = zipfile.ZipFile(self.filename, 'r')
|
||||
zout = zipfile.ZipFile(self.output_filename, 'w')
|
||||
temp_folder = tempfile.mkdtemp()
|
||||
|
||||
for item in zin.infolist():
|
||||
if item.filename[-1] == '/':
|
||||
continue # `is_dir` is added in Python3.6
|
||||
elif item.filename == 'meta.xml':
|
||||
continue # don't keep metadata files
|
||||
|
||||
zin.extract(member=item, path=temp_folder)
|
||||
tmp_parser = parser_factory.get_parser(os.path.join(temp_folder, item.filename))
|
||||
if tmp_parser is None:
|
||||
print("%s isn't supported" % item.filename)
|
||||
continue
|
||||
tmp_parser.remove_all()
|
||||
zinfo = zipfile.ZipInfo(item.filename)
|
||||
item = self.__clean_zipinfo(item)
|
||||
with open(tmp_parser.output_filename, 'rb') as f:
|
||||
zout.writestr(zinfo, f.read())
|
||||
shutil.rmtree(temp_folder)
|
||||
zout.close()
|
||||
zin.close()
|
||||
return True
|
@ -1,14 +1,34 @@
|
||||
import re
|
||||
import subprocess
|
||||
import json
|
||||
import zipfile
|
||||
import tempfile
|
||||
import shutil
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import subprocess
|
||||
import tempfile
|
||||
import zipfile
|
||||
|
||||
from . import abstract, parser_factory
|
||||
|
||||
class OfficeParser(abstract.AbstractParser):
|
||||
class ArchiveBasedAbstractParser(abstract.AbstractParser):
|
||||
def _clean_zipinfo(self, zipinfo:zipfile.ZipInfo) -> zipfile.ZipInfo:
|
||||
zipinfo.compress_type = zipfile.ZIP_DEFLATED
|
||||
zipinfo.create_system = 3 # Linux
|
||||
zipinfo.comment = b''
|
||||
zipinfo.date_time = (1980, 1, 1, 0, 0, 0)
|
||||
return zipinfo
|
||||
|
||||
def _clean_internal_file(self, item:zipfile.ZipInfo, temp_folder:str, zin:zipfile.ZipFile, zout:zipfile.ZipFile):
|
||||
zin.extract(member=item, path=temp_folder)
|
||||
tmp_parser = parser_factory.get_parser(os.path.join(temp_folder, item.filename))
|
||||
if tmp_parser is None:
|
||||
print("%s isn't supported" % item.filename)
|
||||
return
|
||||
tmp_parser.remove_all()
|
||||
zinfo = zipfile.ZipInfo(item.filename)
|
||||
item = self._clean_zipinfo(item)
|
||||
with open(tmp_parser.output_filename, 'rb') as f:
|
||||
zout.writestr(zinfo, f.read())
|
||||
|
||||
class MSOfficeParser(ArchiveBasedAbstractParser):
|
||||
mimetypes = {
|
||||
'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
|
||||
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
|
||||
@ -33,12 +53,6 @@ class OfficeParser(abstract.AbstractParser):
|
||||
zipin.close()
|
||||
return metadata
|
||||
|
||||
def __clean_zipinfo(self, zipinfo:zipfile.ZipInfo) -> zipfile.ZipInfo:
|
||||
zipinfo.compress_type = zipfile.ZIP_DEFLATED
|
||||
zipinfo.create_system = 3 # Linux
|
||||
zipinfo.comment = b''
|
||||
zipinfo.date_time = (1980, 1, 1, 0, 0, 0)
|
||||
return zipinfo
|
||||
|
||||
def remove_all(self):
|
||||
zin = zipfile.ZipFile(self.filename, 'r')
|
||||
@ -52,20 +66,57 @@ class OfficeParser(abstract.AbstractParser):
|
||||
if not item.filename.endswith('.rels'):
|
||||
continue # don't keep metadata files
|
||||
if item.filename in self.files_to_keep:
|
||||
item = self.__clean_zipinfo(item)
|
||||
item = self._clean_zipinfo(item)
|
||||
zout.writestr(item, zin.read(item))
|
||||
continue
|
||||
|
||||
zin.extract(member=item, path=temp_folder)
|
||||
tmp_parser = parser_factory.get_parser(os.path.join(temp_folder, item.filename))
|
||||
if tmp_parser is None:
|
||||
print("%s isn't supported" % item.filename)
|
||||
continue
|
||||
tmp_parser.remove_all()
|
||||
zinfo = zipfile.ZipInfo(item.filename)
|
||||
item = self.__clean_zipinfo(item)
|
||||
with open(tmp_parser.output_filename, 'rb') as f:
|
||||
zout.writestr(zinfo, f.read())
|
||||
self._clean_internal_file(item, temp_folder, zin, zout)
|
||||
|
||||
shutil.rmtree(temp_folder)
|
||||
zout.close()
|
||||
zin.close()
|
||||
return True
|
||||
|
||||
|
||||
|
||||
class LibreOfficeParser(ArchiveBasedAbstractParser):
|
||||
mimetypes = {
|
||||
'application/vnd.oasis.opendocument.text',
|
||||
'application/vnd.oasis.opendocument.spreadsheet',
|
||||
'application/vnd.oasis.opendocument.presentation',
|
||||
'application/vnd.oasis.opendocument.graphics',
|
||||
'application/vnd.oasis.opendocument.chart'
|
||||
}
|
||||
|
||||
def get_meta(self):
|
||||
"""
|
||||
Yes, I know that parsing xml with regexp ain't pretty,
|
||||
be my guest and fix it if you want.
|
||||
"""
|
||||
metadata = {}
|
||||
zipin = zipfile.ZipFile(self.filename)
|
||||
for item in zipin.namelist():
|
||||
if item == 'meta.xml':
|
||||
content = zipin.read(item).decode('utf-8')
|
||||
for (key, value) in re.findall(r"<((?:meta|dc|cp).+?)>(.+)</\1>", content, re.I):
|
||||
metadata[key] = value
|
||||
if not metadata: # better safe than sorry
|
||||
metadata[item] = 'harmful content'
|
||||
zipin.close()
|
||||
return metadata
|
||||
|
||||
def remove_all(self):
|
||||
zin = zipfile.ZipFile(self.filename, 'r')
|
||||
zout = zipfile.ZipFile(self.output_filename, 'w')
|
||||
temp_folder = tempfile.mkdtemp()
|
||||
|
||||
for item in zin.infolist():
|
||||
if item.filename[-1] == '/':
|
||||
continue # `is_dir` is added in Python3.6
|
||||
elif item.filename == 'meta.xml':
|
||||
continue # don't keep metadata files
|
||||
|
||||
self._clean_internal_file(item, temp_folder, zin, zout)
|
||||
|
||||
shutil.rmtree(temp_folder)
|
||||
zout.close()
|
||||
|
@ -6,7 +6,7 @@ import os
|
||||
import zipfile
|
||||
import tempfile
|
||||
|
||||
from src import pdf, png, images_pixbuf, audio, office, libreoffice, parser_factory
|
||||
from src import pdf, png, images_pixbuf, audio, office, parser_factory
|
||||
|
||||
class TestGetMeta(unittest.TestCase):
|
||||
def test_pdf(self):
|
||||
@ -49,14 +49,14 @@ class TestGetMeta(unittest.TestCase):
|
||||
self.assertEqual(meta['TITLE'], ['I am so'])
|
||||
|
||||
def test_docx(self):
|
||||
p = office.OfficeParser('./tests/data/dirty.docx')
|
||||
p = office.MSOfficeParser('./tests/data/dirty.docx')
|
||||
meta = p.get_meta()
|
||||
self.assertEqual(meta['cp:lastModifiedBy'], 'Julien Voisin')
|
||||
self.assertEqual(meta['dc:creator'], 'julien voisin')
|
||||
self.assertEqual(meta['Application'], 'LibreOffice/5.4.5.1$Linux_X86_64 LibreOffice_project/40m0$Build-1')
|
||||
|
||||
def test_libreoffice(self):
|
||||
p = libreoffice.LibreOfficeParser('./tests/data/dirty.odt')
|
||||
p = office.LibreOfficeParser('./tests/data/dirty.odt')
|
||||
meta = p.get_meta()
|
||||
self.assertEqual(meta['meta:initial-creator'], 'jvoisin ')
|
||||
self.assertEqual(meta['meta:creation-date'], '2011-07-26T03:27:48')
|
||||
@ -90,7 +90,7 @@ class TestDeepCleaning(unittest.TestCase):
|
||||
|
||||
def test_office(self):
|
||||
shutil.copy('./tests/data/dirty.docx', './tests/data/clean.docx')
|
||||
p = office.OfficeParser('./tests/data/clean.docx')
|
||||
p = office.MSOfficeParser('./tests/data/clean.docx')
|
||||
|
||||
meta = p.get_meta()
|
||||
self.assertIsNotNone(meta)
|
||||
@ -98,7 +98,7 @@ class TestDeepCleaning(unittest.TestCase):
|
||||
ret = p.remove_all()
|
||||
self.assertTrue(ret)
|
||||
|
||||
p = office.OfficeParser('./tests/data/clean.docx.cleaned')
|
||||
p = office.MSOfficeParser('./tests/data/clean.docx.cleaned')
|
||||
self.assertEqual(p.get_meta(), {})
|
||||
|
||||
self.__check_zip_meta(p)
|
||||
@ -109,7 +109,7 @@ class TestDeepCleaning(unittest.TestCase):
|
||||
|
||||
def test_libreoffice(self):
|
||||
shutil.copy('./tests/data/dirty.odt', './tests/data/clean.odt')
|
||||
p = libreoffice.LibreOfficeParser('./tests/data/clean.odt')
|
||||
p = office.LibreOfficeParser('./tests/data/clean.odt')
|
||||
|
||||
meta = p.get_meta()
|
||||
self.assertIsNotNone(meta)
|
||||
@ -117,7 +117,7 @@ class TestDeepCleaning(unittest.TestCase):
|
||||
ret = p.remove_all()
|
||||
self.assertTrue(ret)
|
||||
|
||||
p = libreoffice.LibreOfficeParser('./tests/data/clean.odt.cleaned')
|
||||
p = office.LibreOfficeParser('./tests/data/clean.odt.cleaned')
|
||||
self.assertEqual(p.get_meta(), {})
|
||||
|
||||
self.__check_zip_meta(p)
|
||||
@ -219,7 +219,7 @@ class TestCleaning(unittest.TestCase):
|
||||
|
||||
def test_office(self):
|
||||
shutil.copy('./tests/data/dirty.docx', './tests/data/clean.docx')
|
||||
p = office.OfficeParser('./tests/data/clean.docx')
|
||||
p = office.MSOfficeParser('./tests/data/clean.docx')
|
||||
|
||||
meta = p.get_meta()
|
||||
self.assertIsNotNone(meta)
|
||||
@ -227,7 +227,7 @@ class TestCleaning(unittest.TestCase):
|
||||
ret = p.remove_all()
|
||||
self.assertTrue(ret)
|
||||
|
||||
p = office.OfficeParser('./tests/data/clean.docx.cleaned')
|
||||
p = office.MSOfficeParser('./tests/data/clean.docx.cleaned')
|
||||
self.assertEqual(p.get_meta(), {})
|
||||
|
||||
os.remove('./tests/data/clean.docx')
|
||||
@ -235,7 +235,7 @@ class TestCleaning(unittest.TestCase):
|
||||
|
||||
def test_libreoffice(self):
|
||||
shutil.copy('./tests/data/dirty.odt', './tests/data/clean.odt')
|
||||
p = libreoffice.LibreOfficeParser('./tests/data/clean.odt')
|
||||
p = office.LibreOfficeParser('./tests/data/clean.odt')
|
||||
|
||||
meta = p.get_meta()
|
||||
self.assertIsNotNone(meta)
|
||||
@ -243,7 +243,7 @@ class TestCleaning(unittest.TestCase):
|
||||
ret = p.remove_all()
|
||||
self.assertTrue(ret)
|
||||
|
||||
p = libreoffice.LibreOfficeParser('./tests/data/clean.odt.cleaned')
|
||||
p = office.LibreOfficeParser('./tests/data/clean.odt.cleaned')
|
||||
self.assertEqual(p.get_meta(), {})
|
||||
|
||||
os.remove('./tests/data/clean.odt')
|
||||
|
Loading…
Reference in New Issue
Block a user