Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Process previously failed backfill events in the background #15585

Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
fd26164
Process previously failed backfill events in the background
MadLittleMods May 12, 2023
c5dc746
Add changelog
MadLittleMods May 12, 2023
8fc47d8
Add consideration
MadLittleMods May 12, 2023
b5d95f7
Fix lints
MadLittleMods May 12, 2023
ebc93be
Merge branch 'develop' into madlittlemods/process-previously-failed-e…
MadLittleMods May 16, 2023
e13f5a9
Always check for failed attempts
MadLittleMods May 16, 2023
70f5911
Add comments and concern about maybe queue
MadLittleMods May 16, 2023
45934fe
Process all failed events as a sequential task in the background
MadLittleMods May 16, 2023
b1998d7
Merge branch 'develop' into madlittlemods/process-previously-failed-e…
MadLittleMods May 16, 2023
93de856
Better comments
MadLittleMods May 16, 2023
631d7db
Add test for `separate_event_ids_with_failed_pull_attempts`
MadLittleMods May 16, 2023
beeccc3
Avoid doing extra work if the list is empty
MadLittleMods May 17, 2023
7eabc60
Make sure to retain the same order they were given in case the depth …
MadLittleMods May 17, 2023
7583c2c
Add comments why OrderedDict
MadLittleMods May 17, 2023
e101318
Make test more robust around ordering
MadLittleMods May 17, 2023
899fc34
Add test description
MadLittleMods May 17, 2023
b5aec4f
Same order separated results
MadLittleMods May 17, 2023
6edd126
Refactor to get_event_ids_with_failed_pull_attempts(...)
MadLittleMods May 17, 2023
d4b8ff7
Update comment doc
MadLittleMods May 17, 2023
6a0ec9d
Merge branch 'develop' into madlittlemods/process-previously-failed-e…
MadLittleMods May 18, 2023
d843557
Use List
MadLittleMods May 18, 2023
75bec52
Merge branch 'develop' into madlittlemods/process-previously-failed-e…
MadLittleMods May 23, 2023
c4e1533
Trace differentiaed events
MadLittleMods May 23, 2023
ec230a3
Prefer plain language
MadLittleMods May 24, 2023
22a69be
Use a `set` for efficient lookups
MadLittleMods May 24, 2023
65febed
Add some context
MadLittleMods May 24, 2023
6474b4e
Use dedicated `partition` function to separate list
MadLittleMods May 24, 2023
15527f7
Add context for why source order for MSC2716
MadLittleMods May 24, 2023
d59615f
Add sanity check test that failed pull attempt events are still proce…
MadLittleMods May 24, 2023
95ffa7c
Use obvious type
MadLittleMods May 25, 2023
50acf6a
Merge branch 'develop' into madlittlemods/process-previously-failed-e…
MadLittleMods May 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/15585.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Process previously failed backfill events in the background to avoid blocking requests for something that is bound to fail again.
49 changes: 42 additions & 7 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,7 @@ async def _process_pulled_events(
[event.event_id for event in events]
)

new_events = []
new_events: Collection[EventBase] = []
for event in events:
event_id = event.event_id

Expand All @@ -890,12 +890,47 @@ async def _process_pulled_events(
# Continue on with the events that are new to us.
new_events.append(event)

# We want to sort these by depth so we process them and
# tell clients about them in order.
sorted_events = sorted(new_events, key=lambda x: x.depth)
for ev in sorted_events:
with nested_logging_context(ev.event_id):
await self._process_pulled_event(origin, ev, backfilled=backfilled)
@trace
async def _process_new_pulled_events(new_events: Collection[EventBase]) -> None:
# We want to sort these by depth so we process them and
# tell clients about them in order.
sorted_events = sorted(new_events, key=lambda x: x.depth)
for ev in sorted_events:
with nested_logging_context(ev.event_id):
await self._process_pulled_event(origin, ev, backfilled=backfilled)

# Check if we've already tried to process these events at some point in the
# past. We aren't concerned with the expontntial backoff here, just whether it
# has failed to be processed before.
event_ids_with_failed_pull_attempts = (
await self._store.get_event_ids_with_failed_pull_attempts(
[event.event_id for event in new_events]
)
)
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

# Process previously failed backfill events in the background to not waste
# time on something that is bound to fail again.
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
events_with_failed_pull_attempts = [
event
for event in new_events
if event.event_id in event_ids_with_failed_pull_attempts
]
if len(events_with_failed_pull_attempts) > 0:
run_as_background_process(
"_process_new_pulled_events_with_failed_pull_attempts",
_process_new_pulled_events,
events_with_failed_pull_attempts,
)

# We can optimistically try to process and wait for the event to be fully
# persisted if we've never tried before.
fresh_events = [
event
for event in new_events
if event.event_id not in event_ids_with_failed_pull_attempts
]
if len(fresh_events) > 0:
await _process_new_pulled_events(fresh_events)

@trace
@tag_args
Expand Down
41 changes: 40 additions & 1 deletion synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.databases.main.signatures import SignatureWorkerStore
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.types import JsonDict
from synapse.types import JsonDict, StrCollection
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached
from synapse.util.caches.lrucache import LruCache
Expand Down Expand Up @@ -1583,6 +1583,45 @@ def _record_event_failed_pull_attempt_upsert_txn(

txn.execute(sql, (room_id, event_id, 1, self._clock.time_msec(), cause))

@trace
async def get_event_ids_with_failed_pull_attempts(
self, event_ids: StrCollection
) -> StrCollection:
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
"""
Separate the given list of events into two lists based on whether they have any
failed pull attempts or not.

Args:
event_ids: A list of events to separate

Returns:
A tuple with two lists where the given event_ids are separated based on
whether they have any failed pull attempts or not
(event_ids_with_failed_pull_attempts, fresh_event_ids). Lists are ordered
the same as the given event_ids.
"""

rows = await self.db_pool.simple_select_many_batch(
table="event_failed_pull_attempts",
column="event_id",
iterable=event_ids,
keyvalues={},
retcols=("event_id",),
desc="get_event_ids_with_failed_pull_attempts",
)
event_ids_with_failed_pull_attempts_from_database = [
str(row["event_id"]) for row in rows
]
# We want to maintain the order of the given `event_ids` so re-construct things
# since there is no gurantees from the database implementation/query.
event_ids_with_failed_pull_attempts = [
event_id
for event_id in event_ids
if event_id in event_ids_with_failed_pull_attempts_from_database
]

return event_ids_with_failed_pull_attempts

@trace
async def get_event_ids_to_not_pull_from_backoff(
self,
Expand Down
49 changes: 49 additions & 0 deletions tests/storage/test_event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1134,6 +1134,55 @@ def test_get_insertion_event_backward_extremities_in_room_attempted_event_retry_
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
self.assertEqual(backfill_event_ids, ["insertion_eventA"])

def test_get_event_ids_with_failed_pull_attempts(self) -> None:
"""
Test to make sure we properly get event_ids based on whether they have any
failed pull attempts.
"""
# Create the room
user_id = self.register_user("alice", "test")
tok = self.login("alice", "test")
room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)

# We purposely record the failed pull attempt for `$c_failed_event_id3` first to
# make sure we return results in the order of the `event_ids` passed in instead
# of the order in which we find things in the database or the unordered
# collections we might accidentally use. They also purposely have reverse
# prefixed a-c in front to better test dubious sorting happening somewhere.
self.get_success(
self.store.record_event_failed_pull_attempt(
room_id, "$a_failed_event_id3", "fake cause"
)
)
self.get_success(
self.store.record_event_failed_pull_attempt(
room_id, "$c_failed_event_id1", "fake cause"
)
)
self.get_success(
self.store.record_event_failed_pull_attempt(
room_id, "$b_failed_event_id2", "fake cause"
)
)

event_ids_with_failed_pull_attempts = self.get_success(
self.store.get_event_ids_with_failed_pull_attempts(
event_ids=[
"$c_failed_event_id1",
"$c_fresh_event_id1",
"$b_failed_event_id2",
"$b_fresh_event_id2",
"$a_failed_event_id3",
"$a_fresh_event_id3",
]
)
)

self.assertEqual(
event_ids_with_failed_pull_attempts,
["$c_failed_event_id1", "$b_failed_event_id2", "$a_failed_event_id3"],
)

def test_get_event_ids_to_not_pull_from_backoff(self) -> None:
"""
Test to make sure only event IDs we should backoff from are returned.
Expand Down