diff --git a/changelog.d/11240.bugfix b/changelog.d/11240.bugfix new file mode 100644 index 000000000000..94d73f67e365 --- /dev/null +++ b/changelog.d/11240.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where all requests that read events from the database could get stuck as a result of losing the database connection. diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index ae37901be91d..c6bf316d5bf3 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -28,6 +28,7 @@ import attr from constantly import NamedConstant, Names +from prometheus_client import Gauge from typing_extensions import Literal from twisted.internet import defer @@ -81,6 +82,12 @@ EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events +event_fetch_ongoing_gauge = Gauge( + "synapse_event_fetch_ongoing", + "The number of event fetchers that are running", +) + + @attr.s(slots=True, auto_attribs=True) class _EventCacheEntry: event: EventBase @@ -222,6 +229,7 @@ def __init__(self, database: DatabasePool, db_conn, hs): self._event_fetch_lock = threading.Condition() self._event_fetch_list = [] self._event_fetch_ongoing = 0 + event_fetch_ongoing_gauge.set(self._event_fetch_ongoing) # We define this sequence here so that it can be referenced from both # the DataStore and PersistEventStore. @@ -732,28 +740,31 @@ def _do_fetch(self, conn: Connection) -> None: """Takes a database connection and waits for requests for events from the _event_fetch_list queue. """ - i = 0 - while True: - with self._event_fetch_lock: - event_list = self._event_fetch_list - self._event_fetch_list = [] - - if not event_list: - single_threaded = self.database_engine.single_threaded - if ( - not self.USE_DEDICATED_DB_THREADS_FOR_EVENT_FETCHING - or single_threaded - or i > EVENT_QUEUE_ITERATIONS - ): - self._event_fetch_ongoing -= 1 - return - else: - self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S) - i += 1 - continue - i = 0 - - self._fetch_event_list(conn, event_list) + try: + i = 0 + while True: + with self._event_fetch_lock: + event_list = self._event_fetch_list + self._event_fetch_list = [] + + if not event_list: + single_threaded = self.database_engine.single_threaded + if ( + not self.USE_DEDICATED_DB_THREADS_FOR_EVENT_FETCHING + or single_threaded + or i > EVENT_QUEUE_ITERATIONS + ): + break + else: + self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S) + i += 1 + continue + i = 0 + + self._fetch_event_list(conn, event_list) + finally: + self._event_fetch_ongoing -= 1 + event_fetch_ongoing_gauge.set(self._event_fetch_ongoing) def _fetch_event_list( self, conn: Connection, event_list: List[Tuple[List[str], defer.Deferred]] @@ -977,6 +988,7 @@ async def _enqueue_events(self, events: Iterable[str]) -> Dict[str, _EventRow]: if self._event_fetch_ongoing < EVENT_QUEUE_THREADS: self._event_fetch_ongoing += 1 + event_fetch_ongoing_gauge.set(self._event_fetch_ongoing) should_start = True else: should_start = False