Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Persist auth events before the events that rely on them #10771

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/10771.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Clean up some of the federation event authentication code for clarity.
101 changes: 65 additions & 36 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,7 @@ async def _get_state_after_missing_prev_event(
missing_events = missing_desired_events | missing_auth_events
logger.debug("Fetching %i events from remote", len(missing_events))
await self._get_events_and_persist(
destination=destination, room_id=room_id, events=missing_events
destination=destination, room_id=room_id, event_ids=missing_events
)

# we need to make sure we re-load from the database to get the rejected
Expand Down Expand Up @@ -1085,12 +1085,12 @@ async def _handle_marker_event(self, origin: str, marker_event: EventBase):
)

async def _get_events_and_persist(
self, destination: str, room_id: str, events: Iterable[str]
self, destination: str, room_id: str, event_ids: Collection[str]
) -> None:
"""Fetch the given events from a server, and persist them as outliers.

This function *does not* recursively get missing auth events of the
newly fetched events. Callers must include in the `events` argument
newly fetched events. Callers must include in the `event_ids` argument
any missing events from the auth chain.

Logs a warning if we can't find the given event.
Expand Down Expand Up @@ -1127,56 +1127,85 @@ async def get_event(event_id: str):
e,
)

await concurrently_execute(get_event, events, 5)
await concurrently_execute(get_event, event_ids, 5)
logger.info("Fetched %i events of %i requested", len(event_map), len(event_ids))

# Make a map of auth events for each event. We do this after fetching
# all the events as some of the events' auth events will be in the list
# of requested events.
# we now need to auth the events in an order which ensures that each event's
# auth_events are authed before the event itself.
#
# XXX: it might be possible to kick this process off in parallel with fetching
# the events.
while event_map:
# build a list of events whose auth events are not in the queue.
roots = tuple(
ev
for ev in event_map.values()
if not any(aid in event_map for aid in ev.auth_event_ids())
)

auth_events = [
aid
for event in event_map.values()
for aid in event.auth_event_ids()
if aid not in event_map
]
if not roots:
# if *none* of the remaining events are ready, that means
# we have a loop. This either means a bug in our logic, or that
# somebody has managed to create a loop (which requires finding a
# hash collision in room v2 and later).
logger.warning(
"Loop found in auth events while fetching missing state/auth "
"events: %s",
shortstr(event_map.keys()),
)
return

logger.info(
"Persisting %i of %i remaining events", len(roots), len(event_map)
)

await self._auth_and_persist_fetched_events(destination, room_id, roots)

for ev in roots:
del event_map[ev.event_id]

async def _auth_and_persist_fetched_events(
self, origin: str, room_id: str, fetched_events: Collection[EventBase]
) -> None:
"""Persist the events fetched by _get_events_and_persist.

The events should not depend on one another, e.g. this should be used to persist
a bunch of outliers, but not a chunk of individual events that depend
on each other for state calculations.

We also assume that all of the auth events for all of the events have already
been persisted.

Notifies about the events where appropriate.

Params:
origin: where the events came from
room_id: the room that the events are meant to be in (though this has
not yet been checked)
event_id: map from event_id -> event for the fetched events
"""
# get all the auth events for all the events in this batch. By now, they should
# have been persisted.
auth_events = {
aid for event in fetched_events for aid in event.auth_event_ids()
}
persisted_events = await self._store.get_events(
auth_events,
allow_rejected=True,
)

event_infos = []
for event in event_map.values():
for event in fetched_events:
auth = {}
for auth_event_id in event.auth_event_ids():
ae = persisted_events.get(auth_event_id) or event_map.get(auth_event_id)
ae = persisted_events.get(auth_event_id)
if ae:
auth[(ae.type, ae.state_key)] = ae
else:
logger.info("Missing auth event %s", auth_event_id)

event_infos.append(_NewEventInfo(event, auth))

if event_infos:
await self._auth_and_persist_events(
destination,
room_id,
event_infos,
)

async def _auth_and_persist_events(
self,
origin: str,
room_id: str,
event_infos: Collection[_NewEventInfo],
) -> None:
"""Creates the appropriate contexts and persists events. The events
should not depend on one another, e.g. this should be used to persist
a bunch of outliers, but not a chunk of individual events that depend
on each other for state calculations.

Notifies about the events where appropriate.
"""

if not event_infos:
return

Expand Down