Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Stop getting missing prev_events after we already know their signatur…
Browse files Browse the repository at this point in the history
…e is invalid

Related to
 - #13622
    - #13635
 - #13676

Follow-up to #13815
which tracks event signature failures.

This PR aims to stop us from trying to
`_get_state_ids_after_missing_prev_event` after
we already know that the prev_event will fail
from a previous attempt

To explain an exact scenario around `/messages` -> backfill, we call `/backfill` and first check the signatures of the 100 events. We see bad signature for `$luA4l7QHhf_jadH3mI-AyFqho0U2Q-IXXUbGSMq6h6M` and `$zuOn2Rd2vsC7SUia3Hp3r6JSkSFKcc5j3QTTqW_0jDw` (both member events). Then we process the 98 events remaining that have valid signatures but one of the events references `$luA4l7QHhf_jadH3mI-AyFqho0U2Q-IXXUbGSMq6h6M` as a `prev_event`. So we have to do the whole `_get_state_ids_after_missing_prev_event` rigmarole which pulls in those same events which fail again because the signatures are still invalid.

 - `backfill`
    - `outgoing-federation-request` `/backfill`
    - `_check_sigs_and_hash_and_fetch`
       - `_check_sigs_and_hash_and_fetch_one` for each event received over backfill
          - ❗ `$luA4l7QHhf_jadH3mI-AyFqho0U2Q-IXXUbGSMq6h6M` fails with `Signature on retrieved event was invalid.`: `unable to verify signature for sender domain xxx: 401: Failed to find any key to satisfy: _FetchKeyRequest(...)`
          - ❗ `$zuOn2Rd2vsC7SUia3Hp3r6JSkSFKcc5j3QTTqW_0jDw` fails with `Signature on retrieved event was invalid.`: `unable to verify signature for sender domain xxx: 401: Failed to find any key to satisfy: _FetchKeyRequest(...)`
   - `_process_pulled_events`
      - `_process_pulled_event` for each validated event
         - ❗ Event `$Q0iMdqtz3IJYfZQU2Xk2WjB5NDF8Gg8cFSYYyKQgKJ0` references `$luA4l7QHhf_jadH3mI-AyFqho0U2Q-IXXUbGSMq6h6M` as a `prev_event` which is missing so we try to get it
            - `_get_state_ids_after_missing_prev_event`
               - `outgoing-federation-request` `/state_ids`
               - ❗ `get_pdu` for `$luA4l7QHhf_jadH3mI-AyFqho0U2Q-IXXUbGSMq6h6M` which fails the signature check again
               - ❗ `get_pdu` for `$zuOn2Rd2vsC7SUia3Hp3r6JSkSFKcc5j3QTTqW_0jDw` which fails the signature check

With this PR, we no longer call the burdensome `_get_state_ids_after_missing_prev_event`
because the signature failure will count as an attempt before we try to run this.
  • Loading branch information
MadLittleMods committed Sep 15, 2022
1 parent cfb4e88 commit 9b390a3
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 1 deletion.
6 changes: 6 additions & 0 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,12 @@ async def _compute_event_context_with_maybe_missing_prevs(
seen = await self._store.have_events_in_timeline(prevs)
missing_prevs = prevs - seen

# Filter out events we've tried to pull recently
prevs_to_ignore = await self.store.filter_events_with_pull_attempt_backoff(
room_id, missing_prevs
)
missing_prevs = missing_prevs - prevs_to_ignore

if not missing_prevs:
return await self._state_handler.compute_event_context(event)

Expand Down
82 changes: 81 additions & 1 deletion synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import itertools
import logging
from queue import Empty, PriorityQueue
Expand All @@ -21,6 +22,7 @@
Iterable,
List,
Optional,
Sequence,
Set,
Tuple,
cast,
Expand All @@ -43,7 +45,7 @@
)
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.databases.main.signatures import SignatureWorkerStore
from synapse.storage.engines import PostgresEngine
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.types import JsonDict
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached
Expand Down Expand Up @@ -72,6 +74,12 @@

logger = logging.getLogger(__name__)

PULLED_EVENT_BACKOFF_UPPER_BOUND_SECONDS: int = int(
datetime.timedelta(days=7).total_seconds()
)
PULLED_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS: int = int(
datetime.timedelta(hours=1).total_seconds()
)

# All the info we need while iterating the DAG while backfilling
@attr.s(frozen=True, slots=True, auto_attribs=True)
Expand Down Expand Up @@ -1339,6 +1347,78 @@ def _record_event_failed_pull_attempt_upsert_txn(

txn.execute(sql, (room_id, event_id, 1, self._clock.time_msec(), cause))

@trace
async def filter_events_with_pull_attempt_backoff(
self,
room_id: str,
event_ids: Sequence[str],
) -> List[str]:
"""
Filter out events that we've failed to pull before
recently. Uses exponential backoff.
Args:
room_id: The room that the events belong to
event_ids: A list of events to filter down
Returns:
List of event_ids that can be attempted to be pulled
"""
return await self.db_pool.runInteraction(
"filter_events_with_pull_attempt_backoff",
self._filter_events_with_pull_attempt_backoff_txn,
room_id,
event_ids,
)

def _filter_events_with_pull_attempt_backoff_txn(
self,
txn: LoggingTransaction,
room_id: str,
event_ids: Sequence[str],
) -> None:
where_event_ids_match_clause, values = make_in_list_sql_clause(
txn.database_engine, "event_id", event_ids
)

sql = """
SELECT event_id FROM event_failed_pull_attempts
WHERE
room_id = ?
%s /* where_event_ids_match_clause */
/**
* Exponential back-off (up to the upper bound) so we don't try to
* pull the same event over and over. ex. 2hr, 4hr, 8hr, 16hr, etc.
*
* We use `1 << n` as a power of 2 equivalent for compatibility
* with older SQLites. The left shift equivalent only works with
* powers of 2 because left shift is a binary operation (base-2).
* Otherwise, we would use `power(2, n)` or the power operator, `2^n`.
*/
AND (
event_id IS NULL
OR ? /* current_time */ >= last_attempt_ts + /*least*/%s((1 << num_attempts) * ? /* step */, ? /* upper bound */)
)
"""

if isinstance(self.database_engine, PostgresEngine):
least_function = "least"
elif isinstance(self.database_engine, Sqlite3Engine):
least_function = "min"
else:
raise RuntimeError("Unknown database engine")

txn.execute(
sql % (where_event_ids_match_clause, least_function),
(
room_id,
*values,
self._clock.time_msec(),
1000 * PULLED_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS,
1000 * PULLED_EVENT_BACKOFF_UPPER_BOUND_SECONDS,
),
)

async def get_missing_events(
self,
room_id: str,
Expand Down

0 comments on commit 9b390a3

Please sign in to comment.