diff --git a/changelog.d/15585.feature b/changelog.d/15585.feature new file mode 100644 index 000000000000..1adcfb69ee5a --- /dev/null +++ b/changelog.d/15585.feature @@ -0,0 +1 @@ +Process previously failed backfill events in the background to avoid blocking requests for something that is bound to fail again. diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 9a08618da5c6..42141d367057 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -88,7 +88,7 @@ ) from synapse.types.state import StateFilter from synapse.util.async_helpers import Linearizer, concurrently_execute -from synapse.util.iterutils import batch_iter +from synapse.util.iterutils import batch_iter, partition from synapse.util.retryutils import NotRetryingDestination from synapse.util.stringutils import shortstr @@ -865,7 +865,7 @@ async def _process_pulled_events( [event.event_id for event in events] ) - new_events = [] + new_events: List[EventBase] = [] for event in events: event_id = event.event_id @@ -895,12 +895,66 @@ async def _process_pulled_events( str(len(new_events)), ) - # We want to sort these by depth so we process them and - # tell clients about them in order. - sorted_events = sorted(new_events, key=lambda x: x.depth) - for ev in sorted_events: - with nested_logging_context(ev.event_id): - await self._process_pulled_event(origin, ev, backfilled=backfilled) + @trace + async def _process_new_pulled_events(new_events: Collection[EventBase]) -> None: + # We want to sort these by depth so we process them and tell clients about + # them in order. It's also more efficient to backfill this way (`depth` + # ascending) because one backfill event is likely to be the `prev_event` of + # the next event we're going to process. + sorted_events = sorted(new_events, key=lambda x: x.depth) + for ev in sorted_events: + with nested_logging_context(ev.event_id): + await self._process_pulled_event(origin, ev, backfilled=backfilled) + + # Check if we've already tried to process these events at some point in the + # past. We aren't concerned with the expontntial backoff here, just whether it + # has failed to be processed before. + event_ids_with_failed_pull_attempts = ( + await self._store.get_event_ids_with_failed_pull_attempts( + [event.event_id for event in new_events] + ) + ) + + # We construct the event lists in source order from `/backfill` response because + # it's a) easiest, but also b) the order in which we process things matters for + # MSC2716 historical batches because many historical events are all at the same + # `depth` and we rely on the tenuous sort that the other server gave us and hope + # they're doing their best. The brittle nature of this ordering for historical + # messages over federation is one of the reasons why we don't want to continue + # on MSC2716 until we have online topological ordering. + events_with_failed_pull_attempts, fresh_events = partition( + new_events, lambda e: e.event_id in event_ids_with_failed_pull_attempts + ) + set_tag( + SynapseTags.FUNC_ARG_PREFIX + "events_with_failed_pull_attempts", + str(event_ids_with_failed_pull_attempts), + ) + set_tag( + SynapseTags.RESULT_PREFIX + "events_with_failed_pull_attempts.length", + str(len(events_with_failed_pull_attempts)), + ) + set_tag( + SynapseTags.FUNC_ARG_PREFIX + "fresh_events", + str([event.event_id for event in fresh_events]), + ) + set_tag( + SynapseTags.RESULT_PREFIX + "fresh_events.length", + str(len(fresh_events)), + ) + + # Process previously failed backfill events in the background to not waste + # time on something that is likely to fail again. + if len(events_with_failed_pull_attempts) > 0: + run_as_background_process( + "_process_new_pulled_events_with_failed_pull_attempts", + _process_new_pulled_events, + events_with_failed_pull_attempts, + ) + + # We can optimistically try to process and wait for the event to be fully + # persisted if we've never tried before. + if len(fresh_events) > 0: + await _process_new_pulled_events(fresh_events) @trace @tag_args diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index ac19de183cb6..2681917d0b6a 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -46,7 +46,7 @@ from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.databases.main.signatures import SignatureWorkerStore from synapse.storage.engines import PostgresEngine, Sqlite3Engine -from synapse.types import JsonDict +from synapse.types import JsonDict, StrCollection from synapse.util import json_encoder from synapse.util.caches.descriptors import cached from synapse.util.caches.lrucache import LruCache @@ -1583,6 +1583,35 @@ def _record_event_failed_pull_attempt_upsert_txn( txn.execute(sql, (room_id, event_id, 1, self._clock.time_msec(), cause)) + @trace + async def get_event_ids_with_failed_pull_attempts( + self, event_ids: StrCollection + ) -> Set[str]: + """ + Filter the given list of `event_ids` and return events which have any failed + pull attempts. + + Args: + event_ids: A list of events to filter down. + + Returns: + A filtered down list of `event_ids` that have previous failed pull attempts. + """ + + rows = await self.db_pool.simple_select_many_batch( + table="event_failed_pull_attempts", + column="event_id", + iterable=event_ids, + keyvalues={}, + retcols=("event_id",), + desc="get_event_ids_with_failed_pull_attempts", + ) + event_ids_with_failed_pull_attempts: Set[str] = { + row["event_id"] for row in rows + } + + return event_ids_with_failed_pull_attempts + @trace async def get_event_ids_to_not_pull_from_backoff( self, diff --git a/synapse/util/iterutils.py b/synapse/util/iterutils.py index 4938ddf70321..a0efb96d3b46 100644 --- a/synapse/util/iterutils.py +++ b/synapse/util/iterutils.py @@ -15,11 +15,13 @@ import heapq from itertools import islice from typing import ( + Callable, Collection, Dict, Generator, Iterable, Iterator, + List, Mapping, Set, Sized, @@ -71,6 +73,31 @@ def chunk_seq(iseq: S, maxlen: int) -> Iterator[S]: return (iseq[i : i + maxlen] for i in range(0, len(iseq), maxlen)) +def partition( + iterable: Iterable[T], predicate: Callable[[T], bool] +) -> Tuple[List[T], List[T]]: + """ + Separate a given iterable into two lists based on the result of a predicate function. + + Args: + iterable: the iterable to partition (separate) + predicate: a function that takes an item from the iterable and returns a boolean + + Returns: + A tuple of two lists, the first containing all items for which the predicate + returned True, the second containing all items for which the predicate returned + False + """ + true_results = [] + false_results = [] + for item in iterable: + if predicate(item): + true_results.append(item) + else: + false_results.append(item) + return true_results, false_results + + def sorted_topologically( nodes: Iterable[T], graph: Mapping[T, Collection[T]], diff --git a/tests/handlers/test_federation_event.py b/tests/handlers/test_federation_event.py index c067e5bfe3dc..23f1b33b2fda 100644 --- a/tests/handlers/test_federation_event.py +++ b/tests/handlers/test_federation_event.py @@ -664,6 +664,101 @@ async def get_room_state( StoreError, ) + def test_backfill_process_previously_failed_pull_attempt_event_in_the_background( + self, + ) -> None: + """ + Sanity check that events are still processed even if it is in the background + for events that already have failed pull attempts. + """ + OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}" + main_store = self.hs.get_datastores().main + + # Create the room + user_id = self.register_user("kermit", "test") + tok = self.login("kermit", "test") + room_id = self.helper.create_room_as(room_creator=user_id, tok=tok) + room_version = self.get_success(main_store.get_room_version(room_id)) + + # Allow the remote user to send state events + self.helper.send_state( + room_id, + "m.room.power_levels", + {"events_default": 0, "state_default": 0}, + tok=tok, + ) + + # Add the remote user to the room + member_event = self.get_success( + event_injection.inject_member_event(self.hs, room_id, OTHER_USER, "join") + ) + + initial_state_map = self.get_success( + main_store.get_partial_current_state_ids(room_id) + ) + + auth_event_ids = [ + initial_state_map[("m.room.create", "")], + initial_state_map[("m.room.power_levels", "")], + member_event.event_id, + ] + + # Create a regular event that should process + pulled_event = make_event_from_dict( + self.add_hashes_and_signatures_from_other_server( + { + "type": "test_regular_type", + "room_id": room_id, + "sender": OTHER_USER, + "prev_events": [ + member_event.event_id, + ], + "auth_events": auth_event_ids, + "origin_server_ts": 1, + "depth": 12, + "content": {"body": "pulled_event"}, + } + ), + room_version, + ) + + # Record a failed pull attempt for this event which will cause us to backfill it + # in the background from here on out. + self.get_success( + main_store.record_event_failed_pull_attempt( + room_id, pulled_event.event_id, "fake cause" + ) + ) + + # We expect an outbound request to /backfill, so stub that out + self.mock_federation_transport_client.backfill.return_value = make_awaitable( + { + "origin": self.OTHER_SERVER_NAME, + "origin_server_ts": 123, + "pdus": [ + pulled_event.get_pdu_json(), + ], + } + ) + + # The function under test: try to backfill and process the pulled event + with LoggingContext("test"): + self.get_success( + self.hs.get_federation_event_handler().backfill( + self.OTHER_SERVER_NAME, + room_id, + limit=1, + extremities=["$some_extremity"], + ) + ) + + # Ensure `run_as_background_process(...)` has a chance to run (essentially + # `wait_for_background_processes()`) + self.reactor.pump((0.1,)) + + # Make sure we processed and persisted the pulled event + self.get_success(main_store.get_event(pulled_event.event_id, allow_none=False)) + def test_process_pulled_event_with_rejected_missing_state(self) -> None: """Ensure that we correctly handle pulled events with missing state containing a rejected state event diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py index 81e50bdd5523..4b8d8328d742 100644 --- a/tests/storage/test_event_federation.py +++ b/tests/storage/test_event_federation.py @@ -1134,6 +1134,43 @@ def test_get_insertion_event_backward_extremities_in_room_attempted_event_retry_ backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points] self.assertEqual(backfill_event_ids, ["insertion_eventA"]) + def test_get_event_ids_with_failed_pull_attempts(self) -> None: + """ + Test to make sure we properly get event_ids based on whether they have any + failed pull attempts. + """ + # Create the room + user_id = self.register_user("alice", "test") + tok = self.login("alice", "test") + room_id = self.helper.create_room_as(room_creator=user_id, tok=tok) + + self.get_success( + self.store.record_event_failed_pull_attempt( + room_id, "$failed_event_id1", "fake cause" + ) + ) + self.get_success( + self.store.record_event_failed_pull_attempt( + room_id, "$failed_event_id2", "fake cause" + ) + ) + + event_ids_with_failed_pull_attempts = self.get_success( + self.store.get_event_ids_with_failed_pull_attempts( + event_ids=[ + "$failed_event_id1", + "$fresh_event_id1", + "$failed_event_id2", + "$fresh_event_id2", + ] + ) + ) + + self.assertEqual( + event_ids_with_failed_pull_attempts, + {"$failed_event_id1", "$failed_event_id2"}, + ) + def test_get_event_ids_to_not_pull_from_backoff(self) -> None: """ Test to make sure only event IDs we should backoff from are returned.