Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Refactor MSC3030 /timestamp_to_event to move away from our snowflak…
Browse files Browse the repository at this point in the history
…e pull from `destination` pattern (#14096)

 1. `federation_client.timestamp_to_event(...)` now handles all `destination` looping and uses our generic `_try_destination_list(...)` helper.
 2. Consistently handling `NotRetryingDestination` and `FederationDeniedError` across `get_pdu` , backfill, and the generic `_try_destination_list` which is used for many places we use this pattern.
 3. `get_pdu(...)` now returns `PulledPduInfo` so we know which `destination` we ended up pulling the PDU from
  • Loading branch information
MadLittleMods authored Oct 26, 2022
1 parent 0d59ae7 commit 40fa829
Show file tree
Hide file tree
Showing 7 changed files with 191 additions and 126 deletions.
1 change: 1 addition & 0 deletions changelog.d/14096.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor [MSC3030](https://github.com/matrix-org/matrix-spec-proposals/pull/3030) `/timestamp_to_event` endpoint to loop over federation destinations with standard pattern and error handling.
130 changes: 109 additions & 21 deletions synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,18 @@
T = TypeVar("T")


@attr.s(frozen=True, slots=True, auto_attribs=True)
class PulledPduInfo:
"""
A result object that stores the PDU and info about it like which homeserver we
pulled it from (`pull_origin`)
"""

pdu: EventBase
# Which homeserver we pulled the PDU from
pull_origin: str


class InvalidResponseError(RuntimeError):
"""Helper for _try_destination_list: indicates that the server returned a response
we couldn't parse
Expand Down Expand Up @@ -114,7 +126,9 @@ def __init__(self, hs: "HomeServer"):
self.hostname = hs.hostname
self.signing_key = hs.signing_key

self._get_pdu_cache: ExpiringCache[str, EventBase] = ExpiringCache(
# Cache mapping `event_id` to a tuple of the event itself and the `pull_origin`
# (which server we pulled the event from)
self._get_pdu_cache: ExpiringCache[str, Tuple[EventBase, str]] = ExpiringCache(
cache_name="get_pdu_cache",
clock=self._clock,
max_len=1000,
Expand Down Expand Up @@ -352,11 +366,11 @@ async def _record_failure_callback(
@tag_args
async def get_pdu(
self,
destinations: Iterable[str],
destinations: Collection[str],
event_id: str,
room_version: RoomVersion,
timeout: Optional[int] = None,
) -> Optional[EventBase]:
) -> Optional[PulledPduInfo]:
"""Requests the PDU with given origin and ID from the remote home
servers.
Expand All @@ -371,11 +385,11 @@ async def get_pdu(
moving to the next destination. None indicates no timeout.
Returns:
The requested PDU, or None if we were unable to find it.
The requested PDU wrapped in `PulledPduInfo`, or None if we were unable to find it.
"""

logger.debug(
"get_pdu: event_id=%s from destinations=%s", event_id, destinations
"get_pdu(event_id=%s): from destinations=%s", event_id, destinations
)

# TODO: Rate limit the number of times we try and get the same event.
Expand All @@ -384,19 +398,25 @@ async def get_pdu(
# it gets persisted to the database), so we cache the results of the lookup.
# Note that this is separate to the regular get_event cache which caches
# events once they have been persisted.
event = self._get_pdu_cache.get(event_id)
get_pdu_cache_entry = self._get_pdu_cache.get(event_id)

event = None
pull_origin = None
if get_pdu_cache_entry:
event, pull_origin = get_pdu_cache_entry
# If we don't see the event in the cache, go try to fetch it from the
# provided remote federated destinations
if not event:
else:
pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {})

# TODO: We can probably refactor this to use `_try_destination_list`
for destination in destinations:
now = self._clock.time_msec()
last_attempt = pdu_attempts.get(destination, 0)
if last_attempt + PDU_RETRY_TIME_MS > now:
logger.debug(
"get_pdu: skipping destination=%s because we tried it recently last_attempt=%s and we only check every %s (now=%s)",
"get_pdu(event_id=%s): skipping destination=%s because we tried it recently last_attempt=%s and we only check every %s (now=%s)",
event_id,
destination,
last_attempt,
PDU_RETRY_TIME_MS,
Expand All @@ -411,43 +431,48 @@ async def get_pdu(
room_version=room_version,
timeout=timeout,
)
pull_origin = destination

pdu_attempts[destination] = now

if event:
# Prime the cache
self._get_pdu_cache[event.event_id] = event
self._get_pdu_cache[event.event_id] = (event, pull_origin)

# Now that we have an event, we can break out of this
# loop and stop asking other destinations.
break

except NotRetryingDestination as e:
logger.info("get_pdu(event_id=%s): %s", event_id, e)
continue
except FederationDeniedError:
logger.info(
"get_pdu(event_id=%s): Not attempting to fetch PDU from %s because the homeserver is not on our federation whitelist",
event_id,
destination,
)
continue
except SynapseError as e:
logger.info(
"Failed to get PDU %s from %s because %s",
"get_pdu(event_id=%s): Failed to get PDU from %s because %s",
event_id,
destination,
e,
)
continue
except NotRetryingDestination as e:
logger.info(str(e))
continue
except FederationDeniedError as e:
logger.info(str(e))
continue
except Exception as e:
pdu_attempts[destination] = now

logger.info(
"Failed to get PDU %s from %s because %s",
"get_pdu(event_id=): Failed to get PDU from %s because %s",
event_id,
destination,
e,
)
continue

if not event:
if not event or not pull_origin:
return None

# `event` now refers to an object stored in `get_pdu_cache`. Our
Expand All @@ -459,7 +484,7 @@ async def get_pdu(
event.room_version,
)

return event_copy
return PulledPduInfo(event_copy, pull_origin)

@trace
@tag_args
Expand Down Expand Up @@ -699,12 +724,14 @@ async def _check_sigs_and_hash_and_fetch_one(
pdu_origin = get_domain_from_id(pdu.sender)
if not res and pdu_origin != origin:
try:
res = await self.get_pdu(
pulled_pdu_info = await self.get_pdu(
destinations=[pdu_origin],
event_id=pdu.event_id,
room_version=room_version,
timeout=10000,
)
if pulled_pdu_info is not None:
res = pulled_pdu_info.pdu
except SynapseError:
pass

Expand Down Expand Up @@ -806,6 +833,7 @@ async def _try_destination_list(
)

for destination in destinations:
# We don't want to ask our own server for information we don't have
if destination == self.server_name:
continue

Expand All @@ -814,9 +842,21 @@ async def _try_destination_list(
except (
RequestSendFailed,
InvalidResponseError,
NotRetryingDestination,
) as e:
logger.warning("Failed to %s via %s: %s", description, destination, e)
# Skip to the next homeserver in the list to try.
continue
except NotRetryingDestination as e:
logger.info("%s: %s", description, e)
continue
except FederationDeniedError:
logger.info(
"%s: Not attempting to %s from %s because the homeserver is not on our federation whitelist",
description,
description,
destination,
)
continue
except UnsupportedRoomVersionError:
raise
except HttpResponseException as e:
Expand Down Expand Up @@ -1609,6 +1649,54 @@ async def send_request(
return result

async def timestamp_to_event(
self, *, destinations: List[str], room_id: str, timestamp: int, direction: str
) -> Optional["TimestampToEventResponse"]:
"""
Calls each remote federating server from `destinations` asking for their closest
event to the given timestamp in the given direction until we get a response.
Also validates the response to always return the expected keys or raises an
error.
Args:
destinations: The domains of homeservers to try fetching from
room_id: Room to fetch the event from
timestamp: The point in time (inclusive) we should navigate from in
the given direction to find the closest event.
direction: ["f"|"b"] to indicate whether we should navigate forward
or backward from the given timestamp to find the closest event.
Returns:
A parsed TimestampToEventResponse including the closest event_id
and origin_server_ts or None if no destination has a response.
"""

async def _timestamp_to_event_from_destination(
destination: str,
) -> TimestampToEventResponse:
return await self._timestamp_to_event_from_destination(
destination, room_id, timestamp, direction
)

try:
# Loop through each homeserver candidate until we get a succesful response
timestamp_to_event_response = await self._try_destination_list(
"timestamp_to_event",
destinations,
# TODO: The requested timestamp may lie in a part of the
# event graph that the remote server *also* didn't have,
# in which case they will have returned another event
# which may be nowhere near the requested timestamp. In
# the future, we may need to reconcile that gap and ask
# other homeservers, and/or extend `/timestamp_to_event`
# to return events on *both* sides of the timestamp to
# help reconcile the gap faster.
_timestamp_to_event_from_destination,
)
return timestamp_to_event_response
except SynapseError:
return None

async def _timestamp_to_event_from_destination(
self, destination: str, room_id: str, timestamp: int, direction: str
) -> "TimestampToEventResponse":
"""
Expand Down
15 changes: 9 additions & 6 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,15 @@ async def try_backfill(domains: Collection[str]) -> bool:
# appropriate stuff.
# TODO: We can probably do something more intelligent here.
return True
except NotRetryingDestination as e:
logger.info("_maybe_backfill_inner: %s", e)
continue
except FederationDeniedError:
logger.info(
"_maybe_backfill_inner: Not attempting to backfill from %s because the homeserver is not on our federation whitelist",
dom,
)
continue
except (SynapseError, InvalidResponseError) as e:
logger.info("Failed to backfill from %s because %s", dom, e)
continue
Expand Down Expand Up @@ -477,15 +486,9 @@ async def try_backfill(domains: Collection[str]) -> bool:

logger.info("Failed to backfill from %s because %s", dom, e)
continue
except NotRetryingDestination as e:
logger.info(str(e))
continue
except RequestSendFailed as e:
logger.info("Failed to get backfill from %s because %s", dom, e)
continue
except FederationDeniedError as e:
logger.info(e)
continue
except Exception as e:
logger.exception("Failed to backfill from %s because %s", dom, e)
continue
Expand Down
31 changes: 14 additions & 17 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
)
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.federation.federation_client import InvalidResponseError
from synapse.federation.federation_client import InvalidResponseError, PulledPduInfo
from synapse.logging.context import nested_logging_context
from synapse.logging.opentracing import (
SynapseTags,
Expand Down Expand Up @@ -1517,8 +1517,8 @@ async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> No
)

async def backfill_event_id(
self, destination: str, room_id: str, event_id: str
) -> EventBase:
self, destinations: List[str], room_id: str, event_id: str
) -> PulledPduInfo:
"""Backfill a single event and persist it as a non-outlier which means
we also pull in all of the state and auth events necessary for it.
Expand All @@ -1530,38 +1530,35 @@ async def backfill_event_id(
Raises:
FederationError if we are unable to find the event from the destination
"""
logger.info(
"backfill_event_id: event_id=%s from destination=%s", event_id, destination
)
logger.info("backfill_event_id: event_id=%s", event_id)

room_version = await self._store.get_room_version(room_id)

event_from_response = await self._federation_client.get_pdu(
[destination],
pulled_pdu_info = await self._federation_client.get_pdu(
destinations,
event_id,
room_version,
)

if not event_from_response:
if not pulled_pdu_info:
raise FederationError(
"ERROR",
404,
"Unable to find event_id=%s from destination=%s to backfill."
% (event_id, destination),
f"Unable to find event_id={event_id} from remote servers to backfill.",
affected=event_id,
)

# Persist the event we just fetched, including pulling all of the state
# and auth events to de-outlier it. This also sets up the necessary
# `state_groups` for the event.
await self._process_pulled_events(
destination,
[event_from_response],
pulled_pdu_info.pull_origin,
[pulled_pdu_info.pdu],
# Prevent notifications going to clients
backfilled=True,
)

return event_from_response
return pulled_pdu_info

@trace
@tag_args
Expand All @@ -1584,19 +1581,19 @@ async def _get_events_and_persist(
async def get_event(event_id: str) -> None:
with nested_logging_context(event_id):
try:
event = await self._federation_client.get_pdu(
pulled_pdu_info = await self._federation_client.get_pdu(
[destination],
event_id,
room_version,
)
if event is None:
if pulled_pdu_info is None:
logger.warning(
"Server %s didn't return event %s",
destination,
event_id,
)
return
events.append(event)
events.append(pulled_pdu_info.pdu)

except Exception as e:
logger.warning(
Expand Down
Loading

0 comments on commit 40fa829

Please sign in to comment.