Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Keep track when we try and fail to process a pulled event (#13589)
Browse files Browse the repository at this point in the history
We can follow-up this PR with:

 1. Only try to backfill from an event if we haven't tried recently -> #13622
 1. When we decide to backfill that event again, process it in the background so it doesn't block and make `/messages` slow when we know it will probably fail again -> #13623
 1. Generally track failures everywhere we try and fail to pull an event over federation -> #13700

Fix #13621

Part of #13356

Mentioned in [internal doc](https://docs.google.com/document/d/1lvUoVfYUiy6UaHB6Rb4HicjaJAU40-APue9Q4vzuW3c/edit#bookmark=id.qv7cj51sv9i5)
  • Loading branch information
MadLittleMods authored Sep 14, 2022
1 parent 666ae87 commit 957e3d7
Show file tree
Hide file tree
Showing 7 changed files with 329 additions and 9 deletions.
1 change: 1 addition & 0 deletions changelog.d/13589.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Keep track when we attempt to backfill an event but fail so we can intelligently back-off in the future.
7 changes: 7 additions & 0 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,9 @@ async def _process_pulled_event(
self._sanity_check_event(event)
except SynapseError as err:
logger.warning("Event %s failed sanity check: %s", event_id, err)
await self._store.record_event_failed_pull_attempt(
event.room_id, event_id, str(err)
)
return

try:
Expand Down Expand Up @@ -897,6 +900,10 @@ async def _process_pulled_event(
backfilled=backfilled,
)
except FederationError as e:
await self._store.record_event_failed_pull_attempt(
event.room_id, event_id, str(e)
)

if e.code == 403:
logger.warning("Pulled event %s failed history check.", event_id)
else:
Expand Down
45 changes: 45 additions & 0 deletions synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1294,6 +1294,51 @@ def _get_backfill_events(

return event_id_results

@trace
async def record_event_failed_pull_attempt(
self, room_id: str, event_id: str, cause: str
) -> None:
"""
Record when we fail to pull an event over federation.
This information allows us to be more intelligent when we decide to
retry (we don't need to fail over and over) and we can process that
event in the background so we don't block on it each time.
Args:
room_id: The room where the event failed to pull from
event_id: The event that failed to be fetched or processed
cause: The error message or reason that we failed to pull the event
"""
await self.db_pool.runInteraction(
"record_event_failed_pull_attempt",
self._record_event_failed_pull_attempt_upsert_txn,
room_id,
event_id,
cause,
db_autocommit=True, # Safe as it's a single upsert
)

def _record_event_failed_pull_attempt_upsert_txn(
self,
txn: LoggingTransaction,
room_id: str,
event_id: str,
cause: str,
) -> None:
sql = """
INSERT INTO event_failed_pull_attempts (
room_id, event_id, num_attempts, last_attempt_ts, last_cause
)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT (room_id, event_id) DO UPDATE SET
num_attempts=event_failed_pull_attempts.num_attempts + 1,
last_attempt_ts=EXCLUDED.last_attempt_ts,
last_cause=EXCLUDED.last_cause;
"""

txn.execute(sql, (room_id, event_id, 1, self._clock.time_msec(), cause))

async def get_missing_events(
self,
room_id: str,
Expand Down
32 changes: 23 additions & 9 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2435,17 +2435,31 @@ def _update_backward_extremeties(
"DELETE FROM event_backward_extremities"
" WHERE event_id = ? AND room_id = ?"
)
backward_extremity_tuples_to_remove = [
(ev.event_id, ev.room_id)
for ev in events
if not ev.internal_metadata.is_outlier()
# If we encountered an event with no prev_events, then we might
# as well remove it now because it won't ever have anything else
# to backfill from.
or len(ev.prev_event_ids()) == 0
]
txn.execute_batch(
query,
[
(ev.event_id, ev.room_id)
for ev in events
if not ev.internal_metadata.is_outlier()
# If we encountered an event with no prev_events, then we might
# as well remove it now because it won't ever have anything else
# to backfill from.
or len(ev.prev_event_ids()) == 0
],
backward_extremity_tuples_to_remove,
)

# Clear out the failed backfill attempts after we successfully pulled
# the event. Since we no longer need these events as backward
# extremities, it also means that they won't be backfilled from again so
# we no longer need to store the backfill attempts around it.
query = """
DELETE FROM event_failed_pull_attempts
WHERE event_id = ? and room_id = ?
"""
txn.execute_batch(
query,
backward_extremity_tuples_to_remove,
)


Expand Down
2 changes: 2 additions & 0 deletions synapse/storage/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@
Changes in SCHEMA_VERSION = 73;
- thread_id column is added to event_push_actions, event_push_actions_staging
event_push_summary, receipts_linearized, and receipts_graph.
- Add table `event_failed_pull_attempts` to keep track when we fail to pull
events over federation.
"""


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/* Copyright 2022 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


-- Add a table that keeps track of when we failed to pull an event over
-- federation (via /backfill, `/event`, `/get_missing_events`, etc). This allows
-- us to be more intelligent when we decide to retry (we don't need to fail over
-- and over) and we can process that event in the background so we don't block
-- on it each time.
CREATE TABLE IF NOT EXISTS event_failed_pull_attempts(
room_id TEXT NOT NULL REFERENCES rooms (room_id),
event_id TEXT NOT NULL,
num_attempts INT NOT NULL,
last_attempt_ts BIGINT NOT NULL,
last_cause TEXT NOT NULL,
PRIMARY KEY (room_id, event_id)
);
222 changes: 222 additions & 0 deletions tests/handlers/test_federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,3 +227,225 @@ async def get_event(destination: str, event_id: str, timeout=None):

if prev_exists_as_outlier:
self.mock_federation_transport_client.get_event.assert_not_called()

def test_process_pulled_event_records_failed_backfill_attempts(
self,
) -> None:
"""
Test to make sure that failed backfill attempts for an event are
recorded in the `event_failed_pull_attempts` table.
In this test, we pretend we are processing a "pulled" event via
backfill. The pulled event has a fake `prev_event` which our server has
obviously never seen before so it attempts to request the state at that
`prev_event` which expectedly fails because it's a fake event. Because
the server can't fetch the state at the missing `prev_event`, the
"pulled" event fails the history check and is fails to process.
We check that we correctly record the number of failed pull attempts
of the pulled event and as a sanity check, that the "pulled" event isn't
persisted.
"""
OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}"
main_store = self.hs.get_datastores().main

# Create the room
user_id = self.register_user("kermit", "test")
tok = self.login("kermit", "test")
room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
room_version = self.get_success(main_store.get_room_version(room_id))

# We expect an outbound request to /state_ids, so stub that out
self.mock_federation_transport_client.get_room_state_ids.return_value = make_awaitable(
{
# Mimic the other server not knowing about the state at all.
# We want to cause Synapse to throw an error (`Unable to get
# missing prev_event $fake_prev_event`) and fail to backfill
# the pulled event.
"pdu_ids": [],
"auth_chain_ids": [],
}
)
# We also expect an outbound request to /state
self.mock_federation_transport_client.get_room_state.return_value = make_awaitable(
StateRequestResponse(
# Mimic the other server not knowing about the state at all.
# We want to cause Synapse to throw an error (`Unable to get
# missing prev_event $fake_prev_event`) and fail to backfill
# the pulled event.
auth_events=[],
state=[],
)
)

pulled_event = make_event_from_dict(
self.add_hashes_and_signatures_from_other_server(
{
"type": "test_regular_type",
"room_id": room_id,
"sender": OTHER_USER,
"prev_events": [
# The fake prev event will make the pulled event fail
# the history check (`Unable to get missing prev_event
# $fake_prev_event`)
"$fake_prev_event"
],
"auth_events": [],
"origin_server_ts": 1,
"depth": 12,
"content": {"body": "pulled"},
}
),
room_version,
)

# The function under test: try to process the pulled event
with LoggingContext("test"):
self.get_success(
self.hs.get_federation_event_handler()._process_pulled_event(
self.OTHER_SERVER_NAME, pulled_event, backfilled=True
)
)

# Make sure our failed pull attempt was recorded
backfill_num_attempts = self.get_success(
main_store.db_pool.simple_select_one_onecol(
table="event_failed_pull_attempts",
keyvalues={"event_id": pulled_event.event_id},
retcol="num_attempts",
)
)
self.assertEqual(backfill_num_attempts, 1)

# The function under test: try to process the pulled event again
with LoggingContext("test"):
self.get_success(
self.hs.get_federation_event_handler()._process_pulled_event(
self.OTHER_SERVER_NAME, pulled_event, backfilled=True
)
)

# Make sure our second failed pull attempt was recorded (`num_attempts` was incremented)
backfill_num_attempts = self.get_success(
main_store.db_pool.simple_select_one_onecol(
table="event_failed_pull_attempts",
keyvalues={"event_id": pulled_event.event_id},
retcol="num_attempts",
)
)
self.assertEqual(backfill_num_attempts, 2)

# And as a sanity check, make sure the event was not persisted through all of this.
persisted = self.get_success(
main_store.get_event(pulled_event.event_id, allow_none=True)
)
self.assertIsNone(
persisted,
"pulled event that fails the history check should not be persisted at all",
)

def test_process_pulled_event_clears_backfill_attempts_after_being_successfully_persisted(
self,
) -> None:
"""
Test to make sure that failed pull attempts
(`event_failed_pull_attempts` table) for an event are cleared after the
event is successfully persisted.
In this test, we pretend we are processing a "pulled" event via
backfill. The pulled event succesfully processes and the backward
extremeties are updated along with clearing out any failed pull attempts
for those old extremities.
We check that we correctly cleared failed pull attempts of the
pulled event.
"""
OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}"
main_store = self.hs.get_datastores().main

# Create the room
user_id = self.register_user("kermit", "test")
tok = self.login("kermit", "test")
room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
room_version = self.get_success(main_store.get_room_version(room_id))

# allow the remote user to send state events
self.helper.send_state(
room_id,
"m.room.power_levels",
{"events_default": 0, "state_default": 0},
tok=tok,
)

# add the remote user to the room
member_event = self.get_success(
event_injection.inject_member_event(self.hs, room_id, OTHER_USER, "join")
)

initial_state_map = self.get_success(
main_store.get_partial_current_state_ids(room_id)
)

auth_event_ids = [
initial_state_map[("m.room.create", "")],
initial_state_map[("m.room.power_levels", "")],
member_event.event_id,
]

pulled_event = make_event_from_dict(
self.add_hashes_and_signatures_from_other_server(
{
"type": "test_regular_type",
"room_id": room_id,
"sender": OTHER_USER,
"prev_events": [member_event.event_id],
"auth_events": auth_event_ids,
"origin_server_ts": 1,
"depth": 12,
"content": {"body": "pulled"},
}
),
room_version,
)

# Fake the "pulled" event failing to backfill once so we can test
# if it's cleared out later on.
self.get_success(
main_store.record_event_failed_pull_attempt(
pulled_event.room_id, pulled_event.event_id, "fake cause"
)
)
# Make sure we have a failed pull attempt recorded for the pulled event
backfill_num_attempts = self.get_success(
main_store.db_pool.simple_select_one_onecol(
table="event_failed_pull_attempts",
keyvalues={"event_id": pulled_event.event_id},
retcol="num_attempts",
)
)
self.assertEqual(backfill_num_attempts, 1)

# The function under test: try to process the pulled event
with LoggingContext("test"):
self.get_success(
self.hs.get_federation_event_handler()._process_pulled_event(
self.OTHER_SERVER_NAME, pulled_event, backfilled=True
)
)

# Make sure the failed pull attempts for the pulled event are cleared
backfill_num_attempts = self.get_success(
main_store.db_pool.simple_select_one_onecol(
table="event_failed_pull_attempts",
keyvalues={"event_id": pulled_event.event_id},
retcol="num_attempts",
allow_none=True,
)
)
self.assertIsNone(backfill_num_attempts)

# And as a sanity check, make sure the "pulled" event was persisted.
persisted = self.get_success(
main_store.get_event(pulled_event.event_id, allow_none=True)
)
self.assertIsNotNone(persisted, "pulled event was not persisted at all")

0 comments on commit 957e3d7

Please sign in to comment.