From b012c8ba909349416d8609955a3c183b39140e16 Mon Sep 17 00:00:00 2001 From: David Teller Date: Thu, 4 Feb 2021 10:19:15 +0100 Subject: [PATCH 1/4] [CI] Updating CI --- Dockerfile | 4 +++- README.md | 7 ++++--- docker-compose.yaml | 1 - test/README.md | 3 ++- test/config/homeserver.yaml | 2 ++ 5 files changed, 11 insertions(+), 6 deletions(-) diff --git a/Dockerfile b/Dockerfile index 9cbfb89..5993533 100644 --- a/Dockerfile +++ b/Dockerfile @@ -6,9 +6,11 @@ FROM matrixdotorg/synapse:latest # Install extension. WORKDIR /data COPY . . + +RUN apt-get update --quiet && apt-get install postgresql-client gcc --yes --quiet + RUN pip install . -RUN apt-get update --quiet && apt-get install postgresql-client --yes --quiet # Run #ENTRYPOINT ["tail", "-f", "/data/test/run_tests.sh"] diff --git a/README.md b/README.md index fe08d4d..11eaf80 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ The filter: ## Requirements -You need Synapse >= 1.25.0. +You need Synapse >= 1.28.0. ## Installation @@ -36,7 +36,8 @@ spam_checker: # The name of the table containing MD5s # It MUST contain at least one value `md5 TEXT PRIMARY KEY NOT NULL`. md5_table: "image_filter.iwf_md5" + # How often we should check for changes in the database, in seconds. + pull_from_db_every_sec: 600 ``` -Synapse will need to be restarted to apply the changes. Links or MD5s added to the database -will be mirrored in real time. +Synapse will need to be restarted to apply the changes. Links added to the database will be used within `pull_from_db_every_sec` second, while MD5s added to the database will be used immediately. diff --git a/docker-compose.yaml b/docker-compose.yaml index 7f322fc..6c134e4 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -20,4 +20,3 @@ services: SYNAPSE_POSTGRES_USER: postgres SYNAPSE_POSTGRES_PASSWORD: postgres working_dir: /data - diff --git a/test/README.md b/test/README.md index 63eeef6..41ed928 100644 --- a/test/README.md +++ b/test/README.md @@ -12,8 +12,9 @@ Use the following steps to run tests locally. ```sh # Prepare the latest Synapse docker image (slow, you don't need to do it often) $ ./test/before_test.sh +$ docker-compose down --remove-orphans # Purge any previous version of the test, otherwise `docker-compose` ignores changes. -$ docker container purge --force +$ docker container prune # Launch the test $ docker-compose up --build --abort-on-container-exit ``` diff --git a/test/config/homeserver.yaml b/test/config/homeserver.yaml index fad28d7..fce6bba 100644 --- a/test/config/homeserver.yaml +++ b/test/config/homeserver.yaml @@ -2276,6 +2276,8 @@ spam_checker: # The name of the table containing MD5s # It MUST contain at least one value `md5 TEXT PRIMARY KEY NOT NULL`. md5_table: "image_filter.iwf_md5" + # How often we should check for changes in the database. + pull_from_db_every_sec: 600 ## Rooms ## From b20c452121d5ae0d2eab85f7e5152c99674449e1 Mon Sep 17 00:00:00 2001 From: David Teller Date: Thu, 4 Feb 2021 10:19:28 +0100 Subject: [PATCH 2/4] Moving to pyahocorasick Now that we know that the list of URLs will easily fit in memory, we can use a much faster trie-based approach. --- setup.py | 4 +- synapse_spamcheck_badlist.egg-info/PKG-INFO | 2 +- .../SOURCES.txt | 2 +- .../requires.txt | 3 +- synapse_spamcheck_badlist/bad_list_filter.py | 159 +++++++++--------- 5 files changed, 84 insertions(+), 86 deletions(-) diff --git a/setup.py b/setup.py index 25c4f3d..c15d3a3 100644 --- a/setup.py +++ b/setup.py @@ -2,12 +2,12 @@ setup( name="synapse-spamcheck-badlist", - version="0.1.0", + version="0.2.0", packages=find_packages(), description="A Synapse spam filter designed to block links and upload of content already known as bad. The typical use case is to plug this with a list of links and MD5s of child sexual abuse, as published by the IWF.", include_package_data=True, zip_safe=True, - install_requires=['linkify-it-py', 'prometheus-client'], + install_requires=['pyahocorasick', 'prometheus-client'], author="David Teller", author_email="davidt@element.io", license="Apache 2", diff --git a/synapse_spamcheck_badlist.egg-info/PKG-INFO b/synapse_spamcheck_badlist.egg-info/PKG-INFO index a718a87..dfc3435 100644 --- a/synapse_spamcheck_badlist.egg-info/PKG-INFO +++ b/synapse_spamcheck_badlist.egg-info/PKG-INFO @@ -1,6 +1,6 @@ Metadata-Version: 1.0 Name: synapse-spamcheck-badlist -Version: 0.1.0 +Version: 0.2.0 Summary: A Synapse spam filter designed to block links and upload of content already known as bad. The typical use case is to plug this with a list of links and MD5s of child sexual abuse, as published by the IWF. Home-page: UNKNOWN Author: David Teller diff --git a/synapse_spamcheck_badlist.egg-info/SOURCES.txt b/synapse_spamcheck_badlist.egg-info/SOURCES.txt index 482481d..1dc3381 100644 --- a/synapse_spamcheck_badlist.egg-info/SOURCES.txt +++ b/synapse_spamcheck_badlist.egg-info/SOURCES.txt @@ -1,7 +1,7 @@ README.md setup.py -synapse_spamcheck_badlist/BadListFilter.py synapse_spamcheck_badlist/__init__.py +synapse_spamcheck_badlist/bad_list_filter.py synapse_spamcheck_badlist.egg-info/PKG-INFO synapse_spamcheck_badlist.egg-info/SOURCES.txt synapse_spamcheck_badlist.egg-info/dependency_links.txt diff --git a/synapse_spamcheck_badlist.egg-info/requires.txt b/synapse_spamcheck_badlist.egg-info/requires.txt index b84bfa4..71a29fb 100644 --- a/synapse_spamcheck_badlist.egg-info/requires.txt +++ b/synapse_spamcheck_badlist.egg-info/requires.txt @@ -1 +1,2 @@ -linkify-it-py +prometheus-client +pyahocorasick diff --git a/synapse_spamcheck_badlist/bad_list_filter.py b/synapse_spamcheck_badlist/bad_list_filter.py index 6fe7073..21f84c1 100644 --- a/synapse_spamcheck_badlist/bad_list_filter.py +++ b/synapse_spamcheck_badlist/bad_list_filter.py @@ -14,11 +14,12 @@ import hashlib import logging +import time import re +import ahocorasick +from ahocorasick import Automaton from prometheus_client import Histogram -from linkify_it import LinkifyIt -from linkify_it.tlds import TLDS from urllib.parse import quote as urlquote logger = logging.getLogger(__name__) @@ -52,23 +53,18 @@ def __init__(self, config, api): self._base_url = config["base_url"] logger.info("Using base url %s" % self._base_url) + # How often we should check for updates in the database. + self._pull_from_db_every_sec = int(config["pull_from_db_every_sec"]) + logger.info("Rechecking database every %s seconds", self._pull_from_db_every_sec) + # Regexp for extracting info from mxc links. self._mxc_re = re.compile("mxc://(?P.*)/(?P.*)") - # Linkifier, used to extract URLs from text. - self._linkifier = ( - LinkifyIt() - .tlds(TLDS) - .tlds("onion", True) # Add the `onion` tld - .add("git:", "http:") # Add the `git:` scheme with the same rules as `http:` - .set({ - "fuzzy_ip": True, # Attempt to recognize e.g. 192.168.0.1 - "fuzzy_link": True # Attempt to recognize links without protocol - }) - ) - self._scheme_re = re.compile("https?:/*|git:/*|ftp:/*") - self._linkify_test_performance = Histogram('linkify_test_performance', 'Performance of calls to linkifier.test, in seconds') - self._link_test_performance = Histogram('link_test_performance', 'Total performance cost of checking links in a message, in seconds') + # A ahocorasick.Automaton used to recognize bad links. + self._link_automaton = None + + self._link_check_performance = Histogram('synapse_spamcheck_badlist_link_check_performance', 'Performance of link checking, in seconds. This operation is in the critical path between a message being sent and that message being delivered to other members.') + self._md5_check_performance = Histogram('synapse_spamcheck_badlist_md5_check_performance', 'Performance of md5 checking, in seconds. This operation is in the critical path between a message being sent and that message being delivered to other members.') # One of: # - `None` if we haven't checked yet whether the database is present; @@ -77,20 +73,36 @@ def __init__(self, config, api): self._can_we_check_links = None self._can_we_check_md5 = None + # Timestamp for the latest pull from the links table (or attempt to pull, + # if the links table was empty), as returned by `time.time()`. + self._last_checked_links = None + self._last_checked_md5 = None + async def can_we_check_links(self) -> bool: """ Check whether the links database exists, caching the result. """ + now = time.time() + if (self._last_checked_links is None) or (self._last_checked_links + self._pull_from_db_every_sec >= now): + # Force a recheck of the links. + logger.info("can_we_check_links: Forcing a recheck of the links") + self._can_we_check_links = None + self._last_checked_links = now if self._can_we_check_links is not None: return self._can_we_check_links if self._links_table is None: + logger.info("can_we_check_links: No table") self._can_we_check_links = False return False try: - def interaction(db): - db.execute("SELECT url FROM %s LIMIT 1" % self._links_table) - await self._api.run_db_interaction("Check whether we can check links", interaction) + logger.info("can_we_check_links: fetching links from table %s" % self._links_table) + links = await self._api.run_db_interaction("Fetch links from the table", _db_fetch_links, self._links_table) + logger.info("can_we_check_links: we received %s links" % len(links)) self._can_we_check_links = True + self._link_automaton = Automaton(ahocorasick.STORE_LENGTH) + for link in links: + self._link_automaton.add_word(link) + self._link_automaton.make_automaton() logger.info("We can check links!") except Exception as e: logger.warn("We CANNOT check links! %s" % e) @@ -101,6 +113,11 @@ async def can_we_check_md5(self) -> bool: """ Check whether the MD5 database exists, caching the result. """ + now = time.time() + if (self._last_checked_md5 is None) or (self._last_checked_md5 + self._pull_from_db_every_sec >= now): + # Force a recheck of the table. + self._can_we_check_md5 = None + self._last_checked_md5 = now if self._can_we_check_md5 is not None: return self._can_we_check_md5 if self._md5_table is None: @@ -126,56 +143,49 @@ async def check_event_for_spam(self, event) -> bool: # Look for links in text content. # Note that all messages can have a text content, even files (as part of the description), etc. - if await self.can_we_check_links(): - # Check for links in text, both unformatted and formatted. - # - # We always lower-case the url, as the IWF database is lowercase. - with self._link_test_performance.time(): + with self._link_check_performance.time(): + if await self.can_we_check_links(): + # Check for links in text, both unformatted and formatted. + # + # We always lower-case the url, as the IWF database is lowercase. for text in [content.get("body", ""), content.get("formatted_body", "")]: - with self._linkify_test_performance.time(): - # Run a first, faster test. - if not self._linkifier.test(text): - continue - # Now run the slower test, if necessary, using results cached from the faster test. - for match in self._linkifier.match(text) or []: - link = re.sub(self._scheme_re, "", match.url.lower()) - is_bad_link = await self._api.run_db_interaction("Check link against evil db", _db_is_bad_link, self._links_table, link) - if is_bad_link: - logger.info("Rejected bad link") - return True + for _ in self._link_automaton.iter(text): + logger.info("Rejected bad link") + return True # If it's a file, download content, extract hash. - if content.get("msgtype", "") in ["m.file", "m.image", "m.audio"]: - if not await self.can_we_check_md5(): - return False - - match = self._mxc_re.match(content.get("url", "")) - if match != None: - server_name = match.group('server_name') - media_id = match.group('media_id') - response = None - try: - url = "%s/_matrix/media/r0/download/%s/%s" % ( - self._base_url, - urlquote(server_name), - urlquote(media_id) - ) - response = await self._api.http_client.request("GET", url) - except Exception as e: - # In case of timeout or error, there's nothing we can do. - # Let's not take the risk of blocking valid contents. - logger.warn("Could not download media: '%s', assuming it's not spam." % e) - return False - if response.code == 429: - logger.warn("We were rate-limited, assuming it's not spam.") + with self._md5_check_performance.time(): + if content.get("msgtype", "") in ["m.file", "m.image", "m.audio"]: + if not await self.can_we_check_md5(): return False - md5 = hashlib.md5() - await response.collect(lambda batch: md5.update(batch)) - is_bad_upload = await self._api.run_db_interaction("Check upload against evil db", _db_is_bad_upload, self._md5_table, md5.hexdigest()) - if is_bad_upload: - logger.info("Rejected bad upload") - return True + match = self._mxc_re.match(content.get("url", "")) + if match != None: + server_name = match.group('server_name') + media_id = match.group('media_id') + response = None + try: + url = "%s/_matrix/media/r0/download/%s/%s" % ( + self._base_url, + urlquote(server_name), + urlquote(media_id) + ) + response = await self._api.http_client.request("GET", url) + except Exception as e: + # In case of timeout or error, there's nothing we can do. + # Let's not take the risk of blocking valid contents. + logger.warn("Could not download media: '%s', assuming it's not spam." % e) + return False + if response.code == 429: + logger.warn("We were rate-limited, assuming it's not spam.") + return False + + md5 = hashlib.md5() + await response.collect(lambda batch: md5.update(batch)) + is_bad_upload = await self._api.run_db_interaction("Check upload against evil db", _db_is_bad_upload, self._md5_table, md5.hexdigest()) + if is_bad_upload: + logger.info("Rejected bad upload") + return True # Not spam return False @@ -207,25 +217,12 @@ def parse_config(config): return config -def _db_is_bad_link(db, table, link): +def _db_fetch_links(db, table): """ - Search if any url in the database is a prefix of `link`. - `link` MUST be normalized by `_link_for_search`. + Pull the list of links from the database. """ - # Note: As per IWF guidelines, we're looking for *prefixes*. This might - # be slow. We're quickly going to have 1M+ links, so we need to find out - # whether this slows things down. - # - # 1. Find the closest url. - db.execute(("SELECT url FROM %s WHERE url <= ? ORDER BY url DESC LIMIT 1" % table), (link, )) - row = db.fetchone() - if not row: - logger.info("No match in %s for link %s " % (table, link)) - return False - - # 2. Check whether it's actually a prefix. - logger.info("Located potential prefix %s" % row[0]) - return link.startswith(row[0]) + db.execute("SELECT url FROM %s" % table) + return [row[0] for row in db] def _db_is_bad_upload(db, table, md5): """ From 7143fdf20c8692d6976f88dd3656fd70b64f22a5 Mon Sep 17 00:00:00 2001 From: David Teller Date: Thu, 4 Feb 2021 10:58:09 +0100 Subject: [PATCH 3/4] Detecting bad files during upload/download, rather than while we're sending messages. This should be much more responsive for users. --- synapse_spamcheck_badlist/bad_list_filter.py | 50 +++++++------------- test/4_test.py | 32 +++---------- test/before_test.sh | 3 ++ test/run_tests.sh | 2 +- 4 files changed, 27 insertions(+), 60 deletions(-) diff --git a/synapse_spamcheck_badlist/bad_list_filter.py b/synapse_spamcheck_badlist/bad_list_filter.py index 21f84c1..2e02357 100644 --- a/synapse_spamcheck_badlist/bad_list_filter.py +++ b/synapse_spamcheck_badlist/bad_list_filter.py @@ -153,43 +153,25 @@ async def check_event_for_spam(self, event) -> bool: logger.info("Rejected bad link") return True - # If it's a file, download content, extract hash. - with self._md5_check_performance.time(): - if content.get("msgtype", "") in ["m.file", "m.image", "m.audio"]: - if not await self.can_we_check_md5(): - return False - - match = self._mxc_re.match(content.get("url", "")) - if match != None: - server_name = match.group('server_name') - media_id = match.group('media_id') - response = None - try: - url = "%s/_matrix/media/r0/download/%s/%s" % ( - self._base_url, - urlquote(server_name), - urlquote(media_id) - ) - response = await self._api.http_client.request("GET", url) - except Exception as e: - # In case of timeout or error, there's nothing we can do. - # Let's not take the risk of blocking valid contents. - logger.warn("Could not download media: '%s', assuming it's not spam." % e) - return False - if response.code == 429: - logger.warn("We were rate-limited, assuming it's not spam.") - return False - - md5 = hashlib.md5() - await response.collect(lambda batch: md5.update(batch)) - is_bad_upload = await self._api.run_db_interaction("Check upload against evil db", _db_is_bad_upload, self._md5_table, md5.hexdigest()) - if is_bad_upload: - logger.info("Rejected bad upload") - return True - # Not spam return False + async def check_media_file_for_spam(self, file_wrapper, file_info): + if await self.can_we_check_md5(): + logger.info("Checking media file") + # Compute MD5 of file. + hasher = hashlib.md5() + await file_wrapper.write_chunks_to(hasher.update) + + hex_digest = hasher.hexdigest() + + # Check if it shows up in the db. + if await self._api.run_db_interaction("Check whether this md5 shows up in the database", _db_is_bad_upload, self._md5_table, hex_digest): + logger.info("Rejected bad media file") + return True + + return False # allow all media + def check_username_for_spam(self, user_profile): return False # allow all usernames diff --git a/test/4_test.py b/test/4_test.py index 2d2e985..1bc54eb 100644 --- a/test/4_test.py +++ b/test/4_test.py @@ -7,7 +7,8 @@ import requests logging.basicConfig(filename = "/data/test.log") -logger = logging.getLogger("Test") +logger = logging.getLogger("synapse_spamcheck_badlist.test") + class Test: def __init__(self): @@ -80,7 +81,7 @@ def _upload_content(self, prefix, content): """ Upload a file. - Argument `prefix` is prepended to the file name, to aid with lookup up + Argument `prefix` is prepended to the file name, to aid with looking up stuff in the Synapse logs. """ response = requests.post('http://localhost:8080/_matrix/media/r0/upload?filename=%s-%s' % (prefix, uuid.uuid1()), @@ -90,7 +91,7 @@ def _upload_content(self, prefix, content): }, data = content ).json() - return response['content_uri'] + return response.get('content_uri', None) def _sync_with_server(self, since): """ @@ -182,9 +183,10 @@ def test(self): good_mxid = self._upload_content('good', good_file_content) logger.info('Good image is %s' % good_mxid) - logger.info('Upload a bad image, for the time being, it should be accepted') + logger.info('Upload a bad image, it should be rejected') evil_mxid = self._upload_content('evil', evil_file_content) - logger.info('Bad image is %s' % evil_mxid) + assert evil_mxid is None + for message_type in ['m.file', 'm.image', 'm.audio']: logger.info('Send good image with good description, it should be accepted') @@ -223,26 +225,6 @@ def test(self): # Message may be redacted later bad_events[event_id] = "Good image with bad description, type %s" % message_type - logger.info('Send bad image with good description, it should be rejected') - event_id = self._send_message_to_room( - 'bad-image-with-good-description', - { - 'body': 'A text without any link', - 'msgtype': message_type, - 'url': evil_mxid, - 'info': { - 'w': 320, - 'h': 200, - 'size': len(evil_file_content), - } - } - ) - if event_id is None: - logger.info('Message was rejected immediately') - else: - # Message may be redacted later - bad_events[event_id] = "Good image with bad description, type %s" % message_type - logger.info('Sending canary event, to ensure that all previous events have been flushed') event_id = self._send_message_to_room( 'canary-event', diff --git a/test/before_test.sh b/test/before_test.sh index 3634e7c..8764532 100755 --- a/test/before_test.sh +++ b/test/before_test.sh @@ -4,4 +4,7 @@ \rm -Rf synapse git clone https://github.com/matrix-org/synapse.git +cd synapse +git checkout erikj/media_spam_checker +cd .. docker build -t matrixdotorg/synapse -f synapse/docker/Dockerfile synapse diff --git a/test/run_tests.sh b/test/run_tests.sh index ee689ea..e999461 100755 --- a/test/run_tests.sh +++ b/test/run_tests.sh @@ -37,7 +37,7 @@ register_new_matrix_user -c /data/homeserver.yaml -u user_2 -p user_2 --no-admin # 4. Running test echo TESTER: Running test -python /data/test/4_test.py +python /data/test/4_test.py &> /data/test.log RESULT=$? # 5. In case of failure, display logs From 91f94997a145dfc6ecc24805250f25241e2b0094 Mon Sep 17 00:00:00 2001 From: David Teller Date: Thu, 4 Feb 2021 12:42:29 +0100 Subject: [PATCH 4/4] Moving the loading of the links table out of the critical path. --- setup.py | 2 +- synapse_spamcheck_badlist/bad_list_filter.py | 206 +++++++++---------- test/config/homeserver.yaml | 2 - 3 files changed, 100 insertions(+), 110 deletions(-) diff --git a/setup.py b/setup.py index c15d3a3..11a1262 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ description="A Synapse spam filter designed to block links and upload of content already known as bad. The typical use case is to plug this with a list of links and MD5s of child sexual abuse, as published by the IWF.", include_package_data=True, zip_safe=True, - install_requires=['pyahocorasick', 'prometheus-client'], + install_requires=['pyahocorasick', 'prometheus-client', 'twisted'], author="David Teller", author_email="davidt@element.io", license="Apache 2", diff --git a/synapse_spamcheck_badlist/bad_list_filter.py b/synapse_spamcheck_badlist/bad_list_filter.py index 2e02357..92d5a63 100644 --- a/synapse_spamcheck_badlist/bad_list_filter.py +++ b/synapse_spamcheck_badlist/bad_list_filter.py @@ -14,125 +14,109 @@ import hashlib import logging -import time import re +import time import ahocorasick from ahocorasick import Automaton from prometheus_client import Histogram -from urllib.parse import quote as urlquote +from twisted.internet import defer, reactor +from twisted.internet.task import LoopingCall +from twisted.internet.threads import deferToThread logger = logging.getLogger(__name__) +link_check_performance = Histogram( + "synapse_spamcheck_badlist_link_check_performance", + "Performance of link checking, in seconds. This operation is in the critical path between a message being sent and that message being delivered to other members.", +) + + class BadListFilter(object): """ - A simple spam checker module for Synapse, designed to block upload of identified child sexual abuse - imagery and links to identified child sexual abuse websites. + A simple spam checker module for Synapse, designed to block upload + of identified child sexual abuse imagery and links to identified + child sexual abuse websites. This filter requires: - - a database of links of identified child sexual abuse websites (as published by e.g. the IWF); - - a database of MD5s of identified child sexual abuse imagery (as published by e.g. the IWF). + - a database of links of identified child sexual abuse websites + (as published by e.g. the IWF); + - a database of MD5s of identified child sexual abuse imagery + (as published by e.g. the IWF). + + This filter assumes that the list of links is small enough that + it can fit in memory. This is consistent with what the IWF provides + (the list is a few thousands links longs). The filter: - rejects any message containing a link that matches the database; - rejects any upload containing a file that matches the database. """ + def __init__(self, config, api): # The plug-in API. self._api = api - # The table containing links. Configured in homeserver.yaml, spam_checker.config.links_table. + # The table containing links. Configured in homeserver.yaml + # as `spam_checker.config.links_table`. self._links_table = config["links_table"] logger.info("Using links table %s" % self._links_table) - # The table containing md5 hashes. Configured in homeserver.yaml, spam_checker.config.links_table. + # The table containing md5 hashes. Configured in homeserver.yaml + # as `spam_checker.config.links_table`. self._md5_table = config["md5_table"] logger.info("Using md5 table %s" % self._md5_table) - # The base url for this server. Configured in homeserver.yaml, spam_checker.config.base_url. - self._base_url = config["base_url"] - logger.info("Using base url %s" % self._base_url) - # How often we should check for updates in the database. - self._pull_from_db_every_sec = int(config["pull_from_db_every_sec"]) - logger.info("Rechecking database every %s seconds", self._pull_from_db_every_sec) - - # Regexp for extracting info from mxc links. - self._mxc_re = re.compile("mxc://(?P.*)/(?P.*)") + # Configured in homeserver.yaml + # as `spam_checker.config.pull_from_db_every_sec`. + pull_from_db_every_sec = int(config["pull_from_db_every_sec"]) + logger.info("Rechecking database every %s seconds", pull_from_db_every_sec) # A ahocorasick.Automaton used to recognize bad links. self._link_automaton = None - self._link_check_performance = Histogram('synapse_spamcheck_badlist_link_check_performance', 'Performance of link checking, in seconds. This operation is in the critical path between a message being sent and that message being delivered to other members.') - self._md5_check_performance = Histogram('synapse_spamcheck_badlist_md5_check_performance', 'Performance of md5 checking, in seconds. This operation is in the critical path between a message being sent and that message being delivered to other members.') - - # One of: - # - `None` if we haven't checked yet whether the database is present; - # - `True` if we have checked and the database is present; - # - `False` if we have checked and the database is absent. - self._can_we_check_links = None - self._can_we_check_md5 = None - - # Timestamp for the latest pull from the links table (or attempt to pull, - # if the links table was empty), as returned by `time.time()`. - self._last_checked_links = None - self._last_checked_md5 = None - - async def can_we_check_links(self) -> bool: + # Start the loop to update links. + self._update_links_loop = LoopingCall( + lambda: defer.ensureDeferred(self._update_links_automaton()) + ) + self._update_links_loop.start(pull_from_db_every_sec) + # As soon as we can, run the first fetch. + # Note that we have no guarantee that this is finished + # by the time we receive the first message, so we need + # a fallback in `_get_links_automaton()`. + reactor.callWhenRunning( + lambda: defer.ensureDeferred(self._update_links_automaton()) + ) + + async def _update_links_automaton(self): """ - Check whether the links database exists, caching the result. + Fetch the latest version of the links from the table, build an automaton. """ - now = time.time() - if (self._last_checked_links is None) or (self._last_checked_links + self._pull_from_db_every_sec >= now): - # Force a recheck of the links. - logger.info("can_we_check_links: Forcing a recheck of the links") - self._can_we_check_links = None - self._last_checked_links = now - if self._can_we_check_links is not None: - return self._can_we_check_links - if self._links_table is None: - logger.info("can_we_check_links: No table") - self._can_we_check_links = False - return False - try: - logger.info("can_we_check_links: fetching links from table %s" % self._links_table) - links = await self._api.run_db_interaction("Fetch links from the table", _db_fetch_links, self._links_table) - logger.info("can_we_check_links: we received %s links" % len(links)) - self._can_we_check_links = True - self._link_automaton = Automaton(ahocorasick.STORE_LENGTH) - for link in links: - self._link_automaton.add_word(link) - self._link_automaton.make_automaton() - logger.info("We can check links!") - except Exception as e: - logger.warn("We CANNOT check links! %s" % e) - self._can_we_check_links = False - return self._can_we_check_links - - async def can_we_check_md5(self) -> bool: + logger.info( + "_update_links_automaton: fetching links from table %s" % self._links_table + ) + links = await self._api.run_db_interaction( + "Fetch links from the table", _db_fetch_links, self._links_table + ) + logger.info("_update_links_automaton: we received %s links" % len(links)) + self._link_automaton = Automaton(ahocorasick.STORE_LENGTH) + for link in links: + self._link_automaton.add_word(link) + await deferToThread(self._link_automaton.make_automaton) + + async def _get_link_automaton(self) -> Automaton: """ - Check whether the MD5 database exists, caching the result. + Get the automaton used to recognize bad links. + The automaton is updated every `self._pull_from_db_every_sec` seconds. """ - now = time.time() - if (self._last_checked_md5 is None) or (self._last_checked_md5 + self._pull_from_db_every_sec >= now): - # Force a recheck of the table. - self._can_we_check_md5 = None - self._last_checked_md5 = now - if self._can_we_check_md5 is not None: - return self._can_we_check_md5 - if self._md5_table is None: - self._can_we_check_md5 = False - return False - try: - def interaction(db): - db.execute("SELECT md5 FROM %s LIMIT 1" % self._md5_table) - await self._api.run_db_interaction("Check whether we can check md5", interaction) - self._can_we_check_md5 = True - logger.info("We can check md5!") - except: - logger.warn("We CANNOT check md5!") - self._can_we_check_md5 = False - return self._can_we_check_md5 + if self._link_automaton is None: + # In the very unlikely case that the first run of _update_links_automaton() + # hasn't completed yet, we need to replicate it here and block the message + # until it is complete. + # In the worst case scenario, this will happen exactly once per process. + await self._update_links_automaton() + return self._link_automaton async def check_event_for_spam(self, event) -> bool: if event["type"] != "m.room.message": @@ -143,34 +127,40 @@ async def check_event_for_spam(self, event) -> bool: # Look for links in text content. # Note that all messages can have a text content, even files (as part of the description), etc. - with self._link_check_performance.time(): - if await self.can_we_check_links(): - # Check for links in text, both unformatted and formatted. - # - # We always lower-case the url, as the IWF database is lowercase. - for text in [content.get("body", ""), content.get("formatted_body", "")]: - for _ in self._link_automaton.iter(text): - logger.info("Rejected bad link") - return True + with link_check_performance.time(): + automaton = await self._get_link_automaton() + + # Check for links in text, both unformatted and formatted. + # + # We always lower-case the url, as the IWF database is lowercase. + for text in [ + content.get("body", ""), + content.get("formatted_body", ""), + ]: + for _ in automaton.iter(text): + logger.info("Rejected bad link") + return True # Not spam return False async def check_media_file_for_spam(self, file_wrapper, file_info): - if await self.can_we_check_md5(): - logger.info("Checking media file") - # Compute MD5 of file. - hasher = hashlib.md5() - await file_wrapper.write_chunks_to(hasher.update) - - hex_digest = hasher.hexdigest() - - # Check if it shows up in the db. - if await self._api.run_db_interaction("Check whether this md5 shows up in the database", _db_is_bad_upload, self._md5_table, hex_digest): - logger.info("Rejected bad media file") - return True - - return False # allow all media + # Compute MD5 of file. + hasher = hashlib.md5() + await file_wrapper.write_chunks_to(hasher.update) + + hex_digest = hasher.hexdigest() + + # Check if it shows up in the db. + if await self._api.run_db_interaction( + "Check whether this md5 shows up in the database", + _db_is_bad_upload, + self._md5_table, + hex_digest, + ): + logger.info("Rejected bad media file") + return True + return False def check_username_for_spam(self, user_profile): return False # allow all usernames @@ -206,11 +196,12 @@ def _db_fetch_links(db, table): db.execute("SELECT url FROM %s" % table) return [row[0] for row in db] + def _db_is_bad_upload(db, table, md5): """ Search if the md5 appears in the database. """ - db.execute(("SELECT md5 FROM %s WHERE md5 = ?" % table), (md5, )) + db.execute(("SELECT md5 FROM %s WHERE md5 = ?" % table), (md5,)) row = db.fetchone() if not row: return False @@ -220,4 +211,5 @@ def _db_is_bad_upload(db, table, md5): # Run doctests if __name__ == "__main__": import doctest + doctest.testmod() diff --git a/test/config/homeserver.yaml b/test/config/homeserver.yaml index fce6bba..827621b 100644 --- a/test/config/homeserver.yaml +++ b/test/config/homeserver.yaml @@ -2268,8 +2268,6 @@ push: spam_checker: - module: "synapse_spamcheck_badlist.BadListFilter" config: - # The URL of the server using this filter. - base_url: "http://localhost:8080" # The name of the table containing links. # It MUST contain at least one value `url TEXT PRIMARY KEY NOT NULL`. links_table: "image_filter.iwf_links"