Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Instrument the federation/backfill part of /messages #13489

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/13489.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Instrument the federation/backfill part of `/messages` for understandable traces in Jaeger.
27 changes: 26 additions & 1 deletion synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
)
from synapse.federation.transport.client import SendJoinResponse
from synapse.http.types import QueryParams
from synapse.logging.opentracing import trace
from synapse.logging.opentracing import SynapseTags, set_tag, tag_args, trace
from synapse.types import JsonDict, UserID, get_domain_from_id
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
Expand Down Expand Up @@ -235,6 +235,7 @@ async def claim_client_keys(
)

@trace
@tag_args
async def backfill(
self, dest: str, room_id: str, limit: int, extremities: Collection[str]
) -> Optional[List[EventBase]]:
Expand Down Expand Up @@ -337,6 +338,8 @@ async def get_pdu_from_destination_raw(

return None

@trace
@tag_args
async def get_pdu(
self,
destinations: Iterable[str],
Expand Down Expand Up @@ -448,6 +451,8 @@ async def get_pdu(

return event_copy

@trace
@tag_args
async def get_room_state_ids(
self, destination: str, room_id: str, event_id: str
) -> Tuple[List[str], List[str]]:
Expand All @@ -467,13 +472,32 @@ async def get_room_state_ids(
state_event_ids = result["pdu_ids"]
auth_event_ids = result.get("auth_chain_ids", [])

set_tag(
SynapseTags.RESULT_PREFIX + "state_event_ids",
str(state_event_ids),
)
set_tag(
SynapseTags.RESULT_PREFIX + "state_event_ids.length",
str(len(state_event_ids)),
)
Comment on lines +475 to +482
Copy link
Contributor Author

@MadLittleMods MadLittleMods Aug 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could make set_tag smart enough to accept a Collection and do this extra length tag for us.

set_tag(
SynapseTags.RESULT_PREFIX + "auth_event_ids",
str(auth_event_ids),
)
set_tag(
SynapseTags.RESULT_PREFIX + "auth_event_ids.length",
str(len(auth_event_ids)),
)

if not isinstance(state_event_ids, list) or not isinstance(
auth_event_ids, list
):
raise InvalidResponseError("invalid response from /state_ids")

return state_event_ids, auth_event_ids

@trace
@tag_args
async def get_room_state(
self,
destination: str,
Expand Down Expand Up @@ -533,6 +557,7 @@ async def get_room_state(

return valid_state_events, valid_auth_events

@trace
async def _check_sigs_and_hash_and_fetch(
self,
origin: str,
Expand Down
10 changes: 9 additions & 1 deletion synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
from synapse.federation.federation_client import InvalidResponseError
from synapse.http.servlet import assert_params_in_dict
from synapse.logging.context import nested_logging_context
from synapse.logging.opentracing import tag_args, trace
from synapse.logging.opentracing import SynapseTags, set_tag, tag_args, trace
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.module_api import NOT_SPAM
from synapse.replication.http.federation import (
Expand Down Expand Up @@ -370,6 +370,14 @@ async def _maybe_backfill_inner(
logger.debug(
"_maybe_backfill_inner: extremities_to_request %s", extremities_to_request
)
set_tag(
SynapseTags.RESULT_PREFIX + "extremities_to_request",
str(extremities_to_request),
)
set_tag(
SynapseTags.RESULT_PREFIX + "extremities_to_request.length",
str(len(extremities_to_request)),
)

# Now we need to decide which hosts to hit first.

Expand Down
112 changes: 98 additions & 14 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,13 @@
from synapse.events.snapshot import EventContext
from synapse.federation.federation_client import InvalidResponseError
from synapse.logging.context import nested_logging_context
from synapse.logging.opentracing import trace
from synapse.logging.opentracing import (
SynapseTags,
set_tag,
start_active_span,
tag_args,
trace,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
from synapse.replication.http.federation import (
Expand Down Expand Up @@ -410,6 +416,7 @@ async def check_join_restrictions(
prev_member_event,
)

@trace
async def process_remote_join(
self,
origin: str,
Expand Down Expand Up @@ -715,7 +722,7 @@ async def _get_missing_events_for_pdu(

@trace
async def _process_pulled_events(
self, origin: str, events: Iterable[EventBase], backfilled: bool
self, origin: str, events: Collection[EventBase], backfilled: bool
) -> None:
"""Process a batch of events we have pulled from a remote server

Expand All @@ -730,6 +737,15 @@ async def _process_pulled_events(
backfilled: True if this is part of a historical batch of events (inhibits
notification to clients, and validation of device keys.)
"""
set_tag(
SynapseTags.FUNC_ARG_PREFIX + "event_ids",
str([event.event_id for event in events]),
)
set_tag(
SynapseTags.FUNC_ARG_PREFIX + "event_ids.length",
str(len(events)),
)
set_tag(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled))
logger.debug(
"processing pulled backfilled=%s events=%s",
backfilled,
Expand All @@ -753,6 +769,7 @@ async def _process_pulled_events(
await self._process_pulled_event(origin, ev, backfilled=backfilled)

@trace
@tag_args
async def _process_pulled_event(
self, origin: str, event: EventBase, backfilled: bool
) -> None:
Expand Down Expand Up @@ -854,6 +871,7 @@ async def _process_pulled_event(
else:
raise

@trace
async def _compute_event_context_with_maybe_missing_prevs(
self, dest: str, event: EventBase
) -> EventContext:
Expand Down Expand Up @@ -970,6 +988,8 @@ async def _compute_event_context_with_maybe_missing_prevs(
event, state_ids_before_event=state_map, partial_state=partial_state
)

@trace
@tag_args
async def _get_state_ids_after_missing_prev_event(
self,
destination: str,
Expand Down Expand Up @@ -1009,10 +1029,10 @@ async def _get_state_ids_after_missing_prev_event(
logger.debug("Fetching %i events from cache/store", len(desired_events))
have_events = await self._store.have_seen_events(room_id, desired_events)

missing_desired_events = desired_events - have_events
missing_desired_event_ids = desired_events - have_events
Copy link
Contributor Author

@MadLittleMods MadLittleMods Aug 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some event to event_id renaming while we're using the same name in the tag.

(doesn't fix all of the cases in this function)

logger.debug(
"We are missing %i events (got %i)",
len(missing_desired_events),
len(missing_desired_event_ids),
len(have_events),
)

Expand All @@ -1024,13 +1044,30 @@ async def _get_state_ids_after_missing_prev_event(
# already have a bunch of the state events. It would be nice if the
# federation api gave us a way of finding out which we actually need.

missing_auth_events = set(auth_event_ids) - have_events
missing_auth_events.difference_update(
await self._store.have_seen_events(room_id, missing_auth_events)
missing_auth_event_ids = set(auth_event_ids) - have_events
missing_auth_event_ids.difference_update(
await self._store.have_seen_events(room_id, missing_auth_event_ids)
)
logger.debug("We are also missing %i auth events", len(missing_auth_events))
logger.debug("We are also missing %i auth events", len(missing_auth_event_ids))

missing_events = missing_desired_events | missing_auth_events
missing_event_ids = missing_desired_event_ids | missing_auth_event_ids

set_tag(
SynapseTags.RESULT_PREFIX + "missing_auth_event_ids",
str(missing_auth_event_ids),
)
set_tag(
SynapseTags.RESULT_PREFIX + "missing_auth_event_ids.length",
str(len(missing_auth_event_ids)),
)
set_tag(
SynapseTags.RESULT_PREFIX + "missing_desired_event_ids",
str(missing_desired_event_ids),
)
set_tag(
SynapseTags.RESULT_PREFIX + "missing_desired_event_ids.length",
str(len(missing_desired_event_ids)),
)

# Making an individual request for each of 1000s of events has a lot of
# overhead. On the other hand, we don't really want to fetch all of the events
Expand All @@ -1041,13 +1078,13 @@ async def _get_state_ids_after_missing_prev_event(
#
# TODO: might it be better to have an API which lets us do an aggregate event
# request
if (len(missing_events) * 10) >= len(auth_event_ids) + len(state_event_ids):
if (len(missing_event_ids) * 10) >= len(auth_event_ids) + len(state_event_ids):
logger.debug("Requesting complete state from remote")
await self._get_state_and_persist(destination, room_id, event_id)
else:
logger.debug("Fetching %i events from remote", len(missing_events))
logger.debug("Fetching %i events from remote", len(missing_event_ids))
await self._get_events_and_persist(
destination=destination, room_id=room_id, event_ids=missing_events
destination=destination, room_id=room_id, event_ids=missing_event_ids
)

# We now need to fill out the state map, which involves fetching the
Expand Down Expand Up @@ -1104,6 +1141,14 @@ async def _get_state_ids_after_missing_prev_event(
event_id,
failed_to_fetch,
)
set_tag(
SynapseTags.RESULT_PREFIX + "failed_to_fetch",
str(failed_to_fetch),
)
set_tag(
SynapseTags.RESULT_PREFIX + "failed_to_fetch.length",
str(len(failed_to_fetch)),
)

if remote_event.is_state() and remote_event.rejected_reason is None:
state_map[
Expand All @@ -1112,6 +1157,8 @@ async def _get_state_ids_after_missing_prev_event(

return state_map

@trace
@tag_args
async def _get_state_and_persist(
self, destination: str, room_id: str, event_id: str
) -> None:
Expand All @@ -1133,6 +1180,7 @@ async def _get_state_and_persist(
destination=destination, room_id=room_id, event_ids=(event_id,)
)

@trace
async def _process_received_pdu(
self,
origin: str,
Expand Down Expand Up @@ -1283,6 +1331,7 @@ async def _resync_device(self, sender: str) -> None:
except Exception:
logger.exception("Failed to resync device for %s", sender)

@trace
async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> None:
"""Handles backfilling the insertion event when we receive a marker
event that points to one.
Expand Down Expand Up @@ -1414,6 +1463,8 @@ async def backfill_event_id(

return event_from_response

@trace
@tag_args
async def _get_events_and_persist(
self, destination: str, room_id: str, event_ids: Collection[str]
) -> None:
Expand Down Expand Up @@ -1459,6 +1510,7 @@ async def get_event(event_id: str) -> None:
logger.info("Fetched %i events of %i requested", len(events), len(event_ids))
await self._auth_and_persist_outliers(room_id, events)

@trace
async def _auth_and_persist_outliers(
self, room_id: str, events: Iterable[EventBase]
) -> None:
Expand All @@ -1477,6 +1529,16 @@ async def _auth_and_persist_outliers(
"""
event_map = {event.event_id: event for event in events}

event_ids = event_map.keys()
set_tag(
SynapseTags.FUNC_ARG_PREFIX + "event_ids",
str(event_ids),
)
set_tag(
SynapseTags.FUNC_ARG_PREFIX + "event_ids.length",
str(len(event_ids)),
)

# filter out any events we have already seen. This might happen because
# the events were eagerly pushed to us (eg, during a room join), or because
# another thread has raced against us since we decided to request the event.
Expand Down Expand Up @@ -1593,6 +1655,7 @@ async def prep(event: EventBase) -> None:
backfilled=True,
)

@trace
async def _check_event_auth(
self, origin: Optional[str], event: EventBase, context: EventContext
) -> None:
Expand Down Expand Up @@ -1631,6 +1694,14 @@ async def _check_event_auth(
claimed_auth_events = await self._load_or_fetch_auth_events_for_event(
origin, event
)
set_tag(
SynapseTags.RESULT_PREFIX + "claimed_auth_events",
str([ev.event_id for ev in claimed_auth_events]),
)
set_tag(
SynapseTags.RESULT_PREFIX + "claimed_auth_events.length",
str(len(claimed_auth_events)),
)

# ... and check that the event passes auth at those auth events.
# https://spec.matrix.org/v1.3/server-server-api/#checks-performed-on-receipt-of-a-pdu:
Expand Down Expand Up @@ -1728,6 +1799,7 @@ async def _check_event_auth(
)
context.rejected = RejectedReason.AUTH_ERROR

@trace
async def _maybe_kick_guest_users(self, event: EventBase) -> None:
if event.type != EventTypes.GuestAccess:
return
Expand Down Expand Up @@ -1935,6 +2007,8 @@ async def _load_or_fetch_auth_events_for_event(
# instead we raise an AuthError, which will make the caller ignore it.
raise AuthError(code=HTTPStatus.FORBIDDEN, msg="Auth events could not be found")

@trace
@tag_args
async def _get_remote_auth_chain_for_event(
self, destination: str, room_id: str, event_id: str
) -> None:
Expand Down Expand Up @@ -1963,6 +2037,7 @@ async def _get_remote_auth_chain_for_event(

await self._auth_and_persist_outliers(room_id, remote_auth_events)

@trace
async def _run_push_actions_and_persist_event(
self, event: EventBase, context: EventContext, backfilled: bool = False
) -> None:
Expand Down Expand Up @@ -2071,8 +2146,17 @@ async def persist_events_and_notify(
self._message_handler.maybe_schedule_expiry(event)

if not backfilled: # Never notify for backfilled events
for event in events:
await self._notify_persisted_event(event, max_stream_token)
with start_active_span("notify_persisted_events"):
set_tag(
SynapseTags.RESULT_PREFIX + "event_ids",
str([ev.event_id for ev in events]),
)
set_tag(
SynapseTags.RESULT_PREFIX + "event_ids.length",
str(len(events)),
)
for event in events:
await self._notify_persisted_event(event, max_stream_token)

return max_stream_token.stream

Expand Down
Loading