From 9b390a35546420521ba661f7a006a9f99d3f554b Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 14 Sep 2022 19:06:57 -0500 Subject: [PATCH] Stop getting missing prev_events after we already know their signature is invalid MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Related to - https://github.com/matrix-org/synapse/issues/13622 - https://github.com/matrix-org/synapse/pull/13635 - https://github.com/matrix-org/synapse/issues/13676 Follow-up to https://github.com/matrix-org/synapse/pull/13815 which tracks event signature failures. This PR aims to stop us from trying to `_get_state_ids_after_missing_prev_event` after we already know that the prev_event will fail from a previous attempt To explain an exact scenario around `/messages` -> backfill, we call `/backfill` and first check the signatures of the 100 events. We see bad signature for `$luA4l7QHhf_jadH3mI-AyFqho0U2Q-IXXUbGSMq6h6M` and `$zuOn2Rd2vsC7SUia3Hp3r6JSkSFKcc5j3QTTqW_0jDw` (both member events). Then we process the 98 events remaining that have valid signatures but one of the events references `$luA4l7QHhf_jadH3mI-AyFqho0U2Q-IXXUbGSMq6h6M` as a `prev_event`. So we have to do the whole `_get_state_ids_after_missing_prev_event` rigmarole which pulls in those same events which fail again because the signatures are still invalid. - `backfill` - `outgoing-federation-request` `/backfill` - `_check_sigs_and_hash_and_fetch` - `_check_sigs_and_hash_and_fetch_one` for each event received over backfill - ❗ `$luA4l7QHhf_jadH3mI-AyFqho0U2Q-IXXUbGSMq6h6M` fails with `Signature on retrieved event was invalid.`: `unable to verify signature for sender domain xxx: 401: Failed to find any key to satisfy: _FetchKeyRequest(...)` - ❗ `$zuOn2Rd2vsC7SUia3Hp3r6JSkSFKcc5j3QTTqW_0jDw` fails with `Signature on retrieved event was invalid.`: `unable to verify signature for sender domain xxx: 401: Failed to find any key to satisfy: _FetchKeyRequest(...)` - `_process_pulled_events` - `_process_pulled_event` for each validated event - ❗ Event `$Q0iMdqtz3IJYfZQU2Xk2WjB5NDF8Gg8cFSYYyKQgKJ0` references `$luA4l7QHhf_jadH3mI-AyFqho0U2Q-IXXUbGSMq6h6M` as a `prev_event` which is missing so we try to get it - `_get_state_ids_after_missing_prev_event` - `outgoing-federation-request` `/state_ids` - ❗ `get_pdu` for `$luA4l7QHhf_jadH3mI-AyFqho0U2Q-IXXUbGSMq6h6M` which fails the signature check again - ❗ `get_pdu` for `$zuOn2Rd2vsC7SUia3Hp3r6JSkSFKcc5j3QTTqW_0jDw` which fails the signature check With this PR, we no longer call the burdensome `_get_state_ids_after_missing_prev_event` because the signature failure will count as an attempt before we try to run this. --- synapse/handlers/federation_event.py | 6 ++ .../databases/main/event_federation.py | 82 ++++++++++++++++++- 2 files changed, 87 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 9e065e1116b5..50a24e88183f 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -955,6 +955,12 @@ async def _compute_event_context_with_maybe_missing_prevs( seen = await self._store.have_events_in_timeline(prevs) missing_prevs = prevs - seen + # Filter out events we've tried to pull recently + prevs_to_ignore = await self.store.filter_events_with_pull_attempt_backoff( + room_id, missing_prevs + ) + missing_prevs = missing_prevs - prevs_to_ignore + if not missing_prevs: return await self._state_handler.compute_event_context(event) diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index ef477978ed63..8b9de2ae4f09 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import datetime import itertools import logging from queue import Empty, PriorityQueue @@ -21,6 +22,7 @@ Iterable, List, Optional, + Sequence, Set, Tuple, cast, @@ -43,7 +45,7 @@ ) from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.databases.main.signatures import SignatureWorkerStore -from synapse.storage.engines import PostgresEngine +from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.types import JsonDict from synapse.util import json_encoder from synapse.util.caches.descriptors import cached @@ -72,6 +74,12 @@ logger = logging.getLogger(__name__) +PULLED_EVENT_BACKOFF_UPPER_BOUND_SECONDS: int = int( + datetime.timedelta(days=7).total_seconds() +) +PULLED_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS: int = int( + datetime.timedelta(hours=1).total_seconds() +) # All the info we need while iterating the DAG while backfilling @attr.s(frozen=True, slots=True, auto_attribs=True) @@ -1339,6 +1347,78 @@ def _record_event_failed_pull_attempt_upsert_txn( txn.execute(sql, (room_id, event_id, 1, self._clock.time_msec(), cause)) + @trace + async def filter_events_with_pull_attempt_backoff( + self, + room_id: str, + event_ids: Sequence[str], + ) -> List[str]: + """ + Filter out events that we've failed to pull before + recently. Uses exponential backoff. + + Args: + room_id: The room that the events belong to + event_ids: A list of events to filter down + + Returns: + List of event_ids that can be attempted to be pulled + """ + return await self.db_pool.runInteraction( + "filter_events_with_pull_attempt_backoff", + self._filter_events_with_pull_attempt_backoff_txn, + room_id, + event_ids, + ) + + def _filter_events_with_pull_attempt_backoff_txn( + self, + txn: LoggingTransaction, + room_id: str, + event_ids: Sequence[str], + ) -> None: + where_event_ids_match_clause, values = make_in_list_sql_clause( + txn.database_engine, "event_id", event_ids + ) + + sql = """ + SELECT event_id FROM event_failed_pull_attempts + WHERE + room_id = ? + %s /* where_event_ids_match_clause */ + /** + * Exponential back-off (up to the upper bound) so we don't try to + * pull the same event over and over. ex. 2hr, 4hr, 8hr, 16hr, etc. + * + * We use `1 << n` as a power of 2 equivalent for compatibility + * with older SQLites. The left shift equivalent only works with + * powers of 2 because left shift is a binary operation (base-2). + * Otherwise, we would use `power(2, n)` or the power operator, `2^n`. + */ + AND ( + event_id IS NULL + OR ? /* current_time */ >= last_attempt_ts + /*least*/%s((1 << num_attempts) * ? /* step */, ? /* upper bound */) + ) + """ + + if isinstance(self.database_engine, PostgresEngine): + least_function = "least" + elif isinstance(self.database_engine, Sqlite3Engine): + least_function = "min" + else: + raise RuntimeError("Unknown database engine") + + txn.execute( + sql % (where_event_ids_match_clause, least_function), + ( + room_id, + *values, + self._clock.time_msec(), + 1000 * PULLED_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS, + 1000 * PULLED_EVENT_BACKOFF_UPPER_BOUND_SECONDS, + ), + ) + async def get_missing_events( self, room_id: str,