From 3ebea5a19cd7ad00be8250869b6a5b29958c13b2 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Wed, 3 Nov 2021 15:29:53 +0000 Subject: [PATCH] Track ongoing event fetches correctly in the presence of failure When an event fetcher aborts due to an exception, `_event_fetch_ongoing` must be decremented, otherwise the event fetcher would never be replaced. If enough event fetchers were to fail, no more events would be fetched and requests would get stuck waiting for events. --- changelog.d/11240.bugfix | 1 + .../storage/databases/main/events_worker.py | 56 +++++++++++-------- 2 files changed, 35 insertions(+), 22 deletions(-) create mode 100644 changelog.d/11240.bugfix diff --git a/changelog.d/11240.bugfix b/changelog.d/11240.bugfix new file mode 100644 index 000000000000..94d73f67e365 --- /dev/null +++ b/changelog.d/11240.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where all requests that read events from the database could get stuck as a result of losing the database connection. diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index ae37901be91d..c6bf316d5bf3 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -28,6 +28,7 @@ import attr from constantly import NamedConstant, Names +from prometheus_client import Gauge from typing_extensions import Literal from twisted.internet import defer @@ -81,6 +82,12 @@ EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events +event_fetch_ongoing_gauge = Gauge( + "synapse_event_fetch_ongoing", + "The number of event fetchers that are running", +) + + @attr.s(slots=True, auto_attribs=True) class _EventCacheEntry: event: EventBase @@ -222,6 +229,7 @@ def __init__(self, database: DatabasePool, db_conn, hs): self._event_fetch_lock = threading.Condition() self._event_fetch_list = [] self._event_fetch_ongoing = 0 + event_fetch_ongoing_gauge.set(self._event_fetch_ongoing) # We define this sequence here so that it can be referenced from both # the DataStore and PersistEventStore. @@ -732,28 +740,31 @@ def _do_fetch(self, conn: Connection) -> None: """Takes a database connection and waits for requests for events from the _event_fetch_list queue. """ - i = 0 - while True: - with self._event_fetch_lock: - event_list = self._event_fetch_list - self._event_fetch_list = [] - - if not event_list: - single_threaded = self.database_engine.single_threaded - if ( - not self.USE_DEDICATED_DB_THREADS_FOR_EVENT_FETCHING - or single_threaded - or i > EVENT_QUEUE_ITERATIONS - ): - self._event_fetch_ongoing -= 1 - return - else: - self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S) - i += 1 - continue - i = 0 - - self._fetch_event_list(conn, event_list) + try: + i = 0 + while True: + with self._event_fetch_lock: + event_list = self._event_fetch_list + self._event_fetch_list = [] + + if not event_list: + single_threaded = self.database_engine.single_threaded + if ( + not self.USE_DEDICATED_DB_THREADS_FOR_EVENT_FETCHING + or single_threaded + or i > EVENT_QUEUE_ITERATIONS + ): + break + else: + self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S) + i += 1 + continue + i = 0 + + self._fetch_event_list(conn, event_list) + finally: + self._event_fetch_ongoing -= 1 + event_fetch_ongoing_gauge.set(self._event_fetch_ongoing) def _fetch_event_list( self, conn: Connection, event_list: List[Tuple[List[str], defer.Deferred]] @@ -977,6 +988,7 @@ async def _enqueue_events(self, events: Iterable[str]) -> Dict[str, _EventRow]: if self._event_fetch_ongoing < EVENT_QUEUE_THREADS: self._event_fetch_ongoing += 1 + event_fetch_ongoing_gauge.set(self._event_fetch_ongoing) should_start = True else: should_start = False