diff --git a/lbry/schema/mime_types.py b/lbry/schema/mime_types.py index 95e6c08dc1..62505be04f 100644 --- a/lbry/schema/mime_types.py +++ b/lbry/schema/mime_types.py @@ -1,4 +1,6 @@ import os +import filetype +import logging types_map = { # http://www.iana.org/assignments/media-types @@ -166,10 +168,38 @@ '.wmv': ('video/x-ms-wmv', 'video') } +# maps detected extensions to the possible analogs +# i.e. .cbz file is actually a .zip +synonyms_map = { + '.zip': ['.cbz'], + '.rar': ['.cbr'], + '.ar': ['.a'] +} + +log = logging.getLogger(__name__) + def guess_media_type(path): _, ext = os.path.splitext(path) extension = ext.strip().lower() + + try: + kind = filetype.guess(path) + if kind: + real_extension = f".{kind.extension}" + + if extension != real_extension: + if extension: + log.warning(f"file extension does not match it's contents: {path}, identified as {real_extension}") + else: + log.debug(f"file {path} does not have extension, identified by it's contents as {real_extension}") + + if extension not in synonyms_map.get(real_extension, []): + extension = real_extension + + except OSError as error: + pass + if extension[1:]: if extension in types_map: return types_map[extension] diff --git a/setup.py b/setup.py index 56832e8eb1..da749bfd9c 100644 --- a/setup.py +++ b/setup.py @@ -56,7 +56,8 @@ 'attrs==18.2.0', 'pylru==1.1.0', 'elasticsearch==7.10.1', - 'grpcio==1.38.0' + 'grpcio==1.38.0', + 'filetype==1.0.9' ] + PLYVEL, extras_require={ 'torrent': ['lbry-libtorrent'], diff --git a/tests/unit/schema/test_mime_types.py b/tests/unit/schema/test_mime_types.py new file mode 100644 index 0000000000..6d5beed2b8 --- /dev/null +++ b/tests/unit/schema/test_mime_types.py @@ -0,0 +1,51 @@ +import unittest +import tempfile +import os + +from lbry.schema.mime_types import guess_media_type + +class MediaTypeTests(unittest.TestCase): + def test_guess_media_type_from_path_only(self): + kind = guess_media_type('/tmp/test.mkv') + self.assertEqual(kind, ('video/x-matroska', 'video')) + + def test_defaults_for_no_extension(self): + kind = guess_media_type('/tmp/test') + self.assertEqual(kind, ('application/octet-stream', 'binary')) + + def test_defaults_for_unknown_extension(self): + kind = guess_media_type('/tmp/test.unk') + self.assertEqual(kind, ('application/x-ext-unk', 'binary')) + + def test_spoofed_unknown(self): + with tempfile.TemporaryDirectory() as temp_dir: + file = os.path.join(temp_dir, 'spoofed_unknown.txt') + with open(file, 'wb') as fd: + bytes_lz4 = bytearray([0x04,0x22,0x4d,0x18]) + fd.write(bytes_lz4) + fd.close() + + kind = guess_media_type(file) + self.assertEqual(kind, ('application/x-ext-lz4', 'binary')) + + def test_spoofed_known(self): + with tempfile.TemporaryDirectory() as temp_dir: + file = os.path.join(temp_dir, 'spoofed_known.avi') + with open(file, 'wb') as fd: + bytes_zip = bytearray([0x50,0x4b,0x03,0x06]) + fd.write(bytes_zip) + fd.close() + + kind = guess_media_type(file) + self.assertEqual(kind, ('application/zip', 'binary')) + + def test_spoofed_synonym(self): + with tempfile.TemporaryDirectory() as temp_dir: + file = os.path.join(temp_dir, 'spoofed_known.cbz') + with open(file, 'wb') as fd: + bytes_zip = bytearray([0x50,0x4b,0x03,0x06]) + fd.write(bytes_zip) + fd.close() + + kind = guess_media_type(file) + self.assertEqual(kind, ('application/vnd.comicbook+zip', 'document'))