Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Process previously failed backfill events in the background (#15585)
Browse files Browse the repository at this point in the history
Process previously failed backfill events in the background because they are bound to fail again and we don't need to waste time holding up the request for something that is bound to fail again.

Fix #13623

Follow-up to #13621 and #13622

Part of making `/messages` faster: #13356
  • Loading branch information
MadLittleMods committed May 25, 2023
1 parent 8839b6c commit 77156a4
Show file tree
Hide file tree
Showing 6 changed files with 252 additions and 9 deletions.
1 change: 1 addition & 0 deletions changelog.d/15585.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Process previously failed backfill events in the background to avoid blocking requests for something that is bound to fail again.
70 changes: 62 additions & 8 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
)
from synapse.types.state import StateFilter
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.iterutils import batch_iter
from synapse.util.iterutils import batch_iter, partition
from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import shortstr

Expand Down Expand Up @@ -865,7 +865,7 @@ async def _process_pulled_events(
[event.event_id for event in events]
)

new_events = []
new_events: List[EventBase] = []
for event in events:
event_id = event.event_id

Expand Down Expand Up @@ -895,12 +895,66 @@ async def _process_pulled_events(
str(len(new_events)),
)

# We want to sort these by depth so we process them and
# tell clients about them in order.
sorted_events = sorted(new_events, key=lambda x: x.depth)
for ev in sorted_events:
with nested_logging_context(ev.event_id):
await self._process_pulled_event(origin, ev, backfilled=backfilled)
@trace
async def _process_new_pulled_events(new_events: Collection[EventBase]) -> None:
# We want to sort these by depth so we process them and tell clients about
# them in order. It's also more efficient to backfill this way (`depth`
# ascending) because one backfill event is likely to be the `prev_event` of
# the next event we're going to process.
sorted_events = sorted(new_events, key=lambda x: x.depth)
for ev in sorted_events:
with nested_logging_context(ev.event_id):
await self._process_pulled_event(origin, ev, backfilled=backfilled)

# Check if we've already tried to process these events at some point in the
# past. We aren't concerned with the expontntial backoff here, just whether it
# has failed to be processed before.
event_ids_with_failed_pull_attempts = (
await self._store.get_event_ids_with_failed_pull_attempts(
[event.event_id for event in new_events]
)
)

# We construct the event lists in source order from `/backfill` response because
# it's a) easiest, but also b) the order in which we process things matters for
# MSC2716 historical batches because many historical events are all at the same
# `depth` and we rely on the tenuous sort that the other server gave us and hope
# they're doing their best. The brittle nature of this ordering for historical
# messages over federation is one of the reasons why we don't want to continue
# on MSC2716 until we have online topological ordering.
events_with_failed_pull_attempts, fresh_events = partition(
new_events, lambda e: e.event_id in event_ids_with_failed_pull_attempts
)
set_tag(
SynapseTags.FUNC_ARG_PREFIX + "events_with_failed_pull_attempts",
str(event_ids_with_failed_pull_attempts),
)
set_tag(
SynapseTags.RESULT_PREFIX + "events_with_failed_pull_attempts.length",
str(len(events_with_failed_pull_attempts)),
)
set_tag(
SynapseTags.FUNC_ARG_PREFIX + "fresh_events",
str([event.event_id for event in fresh_events]),
)
set_tag(
SynapseTags.RESULT_PREFIX + "fresh_events.length",
str(len(fresh_events)),
)

# Process previously failed backfill events in the background to not waste
# time on something that is likely to fail again.
if len(events_with_failed_pull_attempts) > 0:
run_as_background_process(
"_process_new_pulled_events_with_failed_pull_attempts",
_process_new_pulled_events,
events_with_failed_pull_attempts,
)

# We can optimistically try to process and wait for the event to be fully
# persisted if we've never tried before.
if len(fresh_events) > 0:
await _process_new_pulled_events(fresh_events)

@trace
@tag_args
Expand Down
31 changes: 30 additions & 1 deletion synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.databases.main.signatures import SignatureWorkerStore
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.types import JsonDict
from synapse.types import JsonDict, StrCollection
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached
from synapse.util.caches.lrucache import LruCache
Expand Down Expand Up @@ -1583,6 +1583,35 @@ def _record_event_failed_pull_attempt_upsert_txn(

txn.execute(sql, (room_id, event_id, 1, self._clock.time_msec(), cause))

@trace
async def get_event_ids_with_failed_pull_attempts(
self, event_ids: StrCollection
) -> Set[str]:
"""
Filter the given list of `event_ids` and return events which have any failed
pull attempts.
Args:
event_ids: A list of events to filter down.
Returns:
A filtered down list of `event_ids` that have previous failed pull attempts.
"""

rows = await self.db_pool.simple_select_many_batch(
table="event_failed_pull_attempts",
column="event_id",
iterable=event_ids,
keyvalues={},
retcols=("event_id",),
desc="get_event_ids_with_failed_pull_attempts",
)
event_ids_with_failed_pull_attempts: Set[str] = {
row["event_id"] for row in rows
}

return event_ids_with_failed_pull_attempts

@trace
async def get_event_ids_to_not_pull_from_backoff(
self,
Expand Down
27 changes: 27 additions & 0 deletions synapse/util/iterutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
import heapq
from itertools import islice
from typing import (
Callable,
Collection,
Dict,
Generator,
Iterable,
Iterator,
List,
Mapping,
Set,
Sized,
Expand Down Expand Up @@ -71,6 +73,31 @@ def chunk_seq(iseq: S, maxlen: int) -> Iterator[S]:
return (iseq[i : i + maxlen] for i in range(0, len(iseq), maxlen))


def partition(
iterable: Iterable[T], predicate: Callable[[T], bool]
) -> Tuple[List[T], List[T]]:
"""
Separate a given iterable into two lists based on the result of a predicate function.
Args:
iterable: the iterable to partition (separate)
predicate: a function that takes an item from the iterable and returns a boolean
Returns:
A tuple of two lists, the first containing all items for which the predicate
returned True, the second containing all items for which the predicate returned
False
"""
true_results = []
false_results = []
for item in iterable:
if predicate(item):
true_results.append(item)
else:
false_results.append(item)
return true_results, false_results


def sorted_topologically(
nodes: Iterable[T],
graph: Mapping[T, Collection[T]],
Expand Down
95 changes: 95 additions & 0 deletions tests/handlers/test_federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,101 @@ async def get_room_state(
StoreError,
)

def test_backfill_process_previously_failed_pull_attempt_event_in_the_background(
self,
) -> None:
"""
Sanity check that events are still processed even if it is in the background
for events that already have failed pull attempts.
"""
OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}"
main_store = self.hs.get_datastores().main

# Create the room
user_id = self.register_user("kermit", "test")
tok = self.login("kermit", "test")
room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
room_version = self.get_success(main_store.get_room_version(room_id))

# Allow the remote user to send state events
self.helper.send_state(
room_id,
"m.room.power_levels",
{"events_default": 0, "state_default": 0},
tok=tok,
)

# Add the remote user to the room
member_event = self.get_success(
event_injection.inject_member_event(self.hs, room_id, OTHER_USER, "join")
)

initial_state_map = self.get_success(
main_store.get_partial_current_state_ids(room_id)
)

auth_event_ids = [
initial_state_map[("m.room.create", "")],
initial_state_map[("m.room.power_levels", "")],
member_event.event_id,
]

# Create a regular event that should process
pulled_event = make_event_from_dict(
self.add_hashes_and_signatures_from_other_server(
{
"type": "test_regular_type",
"room_id": room_id,
"sender": OTHER_USER,
"prev_events": [
member_event.event_id,
],
"auth_events": auth_event_ids,
"origin_server_ts": 1,
"depth": 12,
"content": {"body": "pulled_event"},
}
),
room_version,
)

# Record a failed pull attempt for this event which will cause us to backfill it
# in the background from here on out.
self.get_success(
main_store.record_event_failed_pull_attempt(
room_id, pulled_event.event_id, "fake cause"
)
)

# We expect an outbound request to /backfill, so stub that out
self.mock_federation_transport_client.backfill.return_value = make_awaitable(
{
"origin": self.OTHER_SERVER_NAME,
"origin_server_ts": 123,
"pdus": [
pulled_event.get_pdu_json(),
],
}
)

# The function under test: try to backfill and process the pulled event
with LoggingContext("test"):
self.get_success(
self.hs.get_federation_event_handler().backfill(
self.OTHER_SERVER_NAME,
room_id,
limit=1,
extremities=["$some_extremity"],
)
)

# Ensure `run_as_background_process(...)` has a chance to run (essentially
# `wait_for_background_processes()`)
self.reactor.pump((0.1,))

# Make sure we processed and persisted the pulled event
self.get_success(main_store.get_event(pulled_event.event_id, allow_none=False))

def test_process_pulled_event_with_rejected_missing_state(self) -> None:
"""Ensure that we correctly handle pulled events with missing state containing a
rejected state event
Expand Down
37 changes: 37 additions & 0 deletions tests/storage/test_event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1134,6 +1134,43 @@ def test_get_insertion_event_backward_extremities_in_room_attempted_event_retry_
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
self.assertEqual(backfill_event_ids, ["insertion_eventA"])

def test_get_event_ids_with_failed_pull_attempts(self) -> None:
"""
Test to make sure we properly get event_ids based on whether they have any
failed pull attempts.
"""
# Create the room
user_id = self.register_user("alice", "test")
tok = self.login("alice", "test")
room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)

self.get_success(
self.store.record_event_failed_pull_attempt(
room_id, "$failed_event_id1", "fake cause"
)
)
self.get_success(
self.store.record_event_failed_pull_attempt(
room_id, "$failed_event_id2", "fake cause"
)
)

event_ids_with_failed_pull_attempts = self.get_success(
self.store.get_event_ids_with_failed_pull_attempts(
event_ids=[
"$failed_event_id1",
"$fresh_event_id1",
"$failed_event_id2",
"$fresh_event_id2",
]
)
)

self.assertEqual(
event_ids_with_failed_pull_attempts,
{"$failed_event_id1", "$failed_event_id2"},
)

def test_get_event_ids_to_not_pull_from_backoff(self) -> None:
"""
Test to make sure only event IDs we should backoff from are returned.
Expand Down

0 comments on commit 77156a4

Please sign in to comment.