Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Draft: /messages investigation scratch pad1 #13440

Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
522c29b
Instrument /messages for understandable traces
MadLittleMods Jul 23, 2022
b6a18d2
Trace in Complement
MadLittleMods Aug 3, 2022
2504bc6
Merge branch 'madlittlemods/instrument-messages-tracing' into madlitt…
MadLittleMods Aug 3, 2022
9cd6320
Fix imports after OTEL changes
MadLittleMods Aug 3, 2022
c3f3e59
Merge branch 'madlittlemods/11850-migrate-to-opentelemetry' into madl…
MadLittleMods Aug 3, 2022
9f69182
Move Twisted git install where it was before
MadLittleMods Aug 3, 2022
2f75287
Fix @tag_args being one-off (ahead)
MadLittleMods Aug 3, 2022
fdce1c2
Allow @trace and @tag_args to be used together
MadLittleMods Aug 4, 2022
a7eabb7
Trace more
MadLittleMods Aug 4, 2022
13855c5
More tracing for federated side
MadLittleMods Aug 6, 2022
552b7f1
More tracing for federation
MadLittleMods Aug 6, 2022
c51883e
Add length to the list of events
MadLittleMods Aug 6, 2022
ee465f9
Fix some lints (mistakes) and better trace when fetching events
MadLittleMods Aug 6, 2022
aa5e925
Only set attribute if going forward
MadLittleMods Aug 9, 2022
2a467fd
Merge branch 'madlittlemods/11850-migrate-to-opentelemetry' into madl…
MadLittleMods Aug 9, 2022
597c3f2
Trace some results
MadLittleMods Aug 9, 2022
f4ec9d1
Instrument FederationStateIdsServlet
MadLittleMods Aug 10, 2022
898ba0e
More tracing
MadLittleMods Aug 11, 2022
53b8453
Refactor from feedback
MadLittleMods Aug 15, 2022
0f2bfa4
Merge branch 'madlittlemods/11850-migrate-to-opentelemetry' into madl…
MadLittleMods Aug 18, 2022
db04b16
Some cleanup
MadLittleMods Aug 18, 2022
4168ba5
Remove debug logs
MadLittleMods Aug 19, 2022
05e5113
Merge branch 'madlittlemods/11850-migrate-to-opentelemetry' into madl…
MadLittleMods Sep 20, 2022
d8899e4
Merge branch 'madlittlemods/11850-migrate-to-opentelemetry' into madl…
MadLittleMods Sep 26, 2022
04de9ea
Merge branch 'madlittlemods/11850-migrate-to-opentelemetry' into madl…
MadLittleMods Nov 18, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions docker/conf/homeserver.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,21 @@ trusted_key_servers:

password_config:
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
enabled: true


# foo
tracing:
enabled: true
sample_rate: 1
jaeger_exporter_config:
agent_host_name: host.docker.internal
agent_port: 6831
# Split UDP packets (UDP_PACKET_MAX_LENGTH is set to 65k in OpenTelemetry)
udp_split_oversized_batches: true
# If you define a collector, it will communicate directly to the collector,
# bypassing the agent
#
# It does not seem like the agent can keep up with the massive UDP load
# (1065 spans in one trace) so lets just use the HTTP collector endpoint
# instead which seems to work.
Comment on lines +203 to +205
Copy link
Contributor Author

@MadLittleMods MadLittleMods Aug 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why this is the case? I was seeing this same behavior with the Jaeger opentracing stuff. Is the UDP connection being over saturated? Can the Jaeger agent in Docker not keep up? We see some spans come over but never the main servlet overarching one that is probably the last to be exported.

But using the HTTP Jaeger collector endpoint seems to work fine for getting the whole trace.

collector_endpoint: "http://host.docker.internal:14268/api/traces?format=jaeger.thrift"
28 changes: 18 additions & 10 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ signedjson = "^1.1.0"
service-identity = ">=18.1.0"
# Twisted 18.9 introduces some logger improvements that the structured
# logger utilises
twisted = {git = "https://github.com/twisted/twisted.git", rev = "trunk", extras = ["tls"]}
treq = ">=15.1"
# Twisted has required pyopenssl 16.0 since about Twisted 16.6.
pyOpenSSL = ">=16.0.0"
Expand Down Expand Up @@ -182,7 +183,6 @@ idna = { version = ">=2.5", optional = true }
opentelemetry-api = {version = "^1.11.1", optional = true}
opentelemetry-sdk = {version = "^1.11.1", optional = true}
opentelemetry-exporter-jaeger = {version = "^1.11.1", optional = true}
twisted = {git = "https://github.com/twisted/twisted.git", rev = "trunk"}

[tool.poetry.extras]
# NB: Packages that should be part of `pip install matrix-synapse[all]` need to be specified
Expand Down
8 changes: 7 additions & 1 deletion synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
)
from synapse.federation.transport.client import SendJoinResponse
from synapse.http.types import QueryParams
from synapse.logging.tracing import trace
from synapse.logging.tracing import tag_args, trace
from synapse.types import JsonDict, UserID, get_domain_from_id
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
Expand Down Expand Up @@ -235,6 +235,7 @@ async def claim_client_keys(
)

@trace
@tag_args
async def backfill(
self, dest: str, room_id: str, limit: int, extremities: Collection[str]
) -> Optional[List[EventBase]]:
Expand Down Expand Up @@ -337,6 +338,8 @@ async def get_pdu_from_destination_raw(

return None

@trace
@tag_args
async def get_pdu(
self,
destinations: Iterable[str],
Expand Down Expand Up @@ -474,6 +477,8 @@ async def get_room_state_ids(

return state_event_ids, auth_event_ids

@trace
@tag_args
async def get_room_state(
self,
destination: str,
Expand Down Expand Up @@ -533,6 +538,7 @@ async def get_room_state(

return valid_state_events, valid_auth_events

@trace
async def _check_sigs_and_hash_and_fetch(
self,
origin: str,
Expand Down
5 changes: 3 additions & 2 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
from synapse.federation.federation_client import InvalidResponseError
from synapse.http.servlet import assert_params_in_dict
from synapse.logging.context import nested_logging_context
from synapse.logging.tracing import trace
from synapse.logging.tracing import set_attribute, trace
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.module_api import NOT_SPAM
from synapse.replication.http.federation import (
Expand Down Expand Up @@ -319,7 +319,8 @@ async def _maybe_backfill_inner(
# attempting to paginate before backfill reached the visible history.

extremities_to_request: List[str] = []
for bp in sorted_backfill_points:
for i, bp in enumerate(sorted_backfill_points):
set_attribute("backfill_point" + str(i), str(bp))
if len(extremities_to_request) >= 5:
break

Expand Down
50 changes: 46 additions & 4 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,13 @@
from synapse.events.snapshot import EventContext
from synapse.federation.federation_client import InvalidResponseError
from synapse.logging.context import nested_logging_context
from synapse.logging.tracing import trace
from synapse.logging.tracing import (
SynapseTags,
set_attribute,
start_active_span,
tag_args,
trace,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
from synapse.replication.http.federation import (
Expand Down Expand Up @@ -410,6 +416,7 @@ async def check_join_restrictions(
prev_member_event,
)

@trace
async def process_remote_join(
self,
origin: str,
Expand Down Expand Up @@ -715,7 +722,7 @@ async def _get_missing_events_for_pdu(

@trace
async def _process_pulled_events(
self, origin: str, events: Iterable[EventBase], backfilled: bool
self, origin: str, events: List[EventBase], backfilled: bool
) -> None:
"""Process a batch of events we have pulled from a remote server

Expand All @@ -730,6 +737,11 @@ async def _process_pulled_events(
backfilled: True if this is part of a historical batch of events (inhibits
notification to clients, and validation of device keys.)
"""
set_attribute(
SynapseTags.FUNC_ARG_PREFIX + f"event_ids ({len(events)})",
str([event.event_id for event in events]),
)
set_attribute(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled))
logger.debug(
"processing pulled backfilled=%s events=%s",
backfilled,
Expand All @@ -753,6 +765,7 @@ async def _process_pulled_events(
await self._process_pulled_event(origin, ev, backfilled=backfilled)

@trace
@tag_args
async def _process_pulled_event(
self, origin: str, event: EventBase, backfilled: bool
) -> None:
Expand Down Expand Up @@ -854,6 +867,7 @@ async def _process_pulled_event(
else:
raise

@trace
async def _compute_event_context_with_maybe_missing_prevs(
self, dest: str, event: EventBase
) -> EventContext:
Expand Down Expand Up @@ -970,6 +984,8 @@ async def _compute_event_context_with_maybe_missing_prevs(
event, state_ids_before_event=state_map, partial_state=partial_state
)

@trace
@tag_args
async def _get_state_ids_after_missing_prev_event(
self,
destination: str,
Expand Down Expand Up @@ -1112,6 +1128,8 @@ async def _get_state_ids_after_missing_prev_event(

return state_map

@trace
@tag_args
async def _get_state_and_persist(
self, destination: str, room_id: str, event_id: str
) -> None:
Expand All @@ -1133,6 +1151,7 @@ async def _get_state_and_persist(
destination=destination, room_id=room_id, event_ids=(event_id,)
)

@trace
async def _process_received_pdu(
self,
origin: str,
Expand Down Expand Up @@ -1283,6 +1302,7 @@ async def _resync_device(self, sender: str) -> None:
except Exception:
logger.exception("Failed to resync device for %s", sender)

@trace
async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> None:
"""Handles backfilling the insertion event when we receive a marker
event that points to one.
Expand Down Expand Up @@ -1414,6 +1434,8 @@ async def backfill_event_id(

return event_from_response

@trace
@tag_args
async def _get_events_and_persist(
self, destination: str, room_id: str, event_ids: Collection[str]
) -> None:
Expand Down Expand Up @@ -1459,6 +1481,7 @@ async def get_event(event_id: str) -> None:
logger.info("Fetched %i events of %i requested", len(events), len(event_ids))
await self._auth_and_persist_outliers(room_id, events)

@trace
async def _auth_and_persist_outliers(
self, room_id: str, events: Iterable[EventBase]
) -> None:
Expand All @@ -1477,6 +1500,12 @@ async def _auth_and_persist_outliers(
"""
event_map = {event.event_id: event for event in events}

event_ids = event_map.keys()
set_attribute(
SynapseTags.FUNC_ARG_PREFIX + f"event_ids ({len(event_ids)})",
str(event_ids),
)

# filter out any events we have already seen. This might happen because
# the events were eagerly pushed to us (eg, during a room join), or because
# another thread has raced against us since we decided to request the event.
Expand Down Expand Up @@ -1593,6 +1622,7 @@ async def prep(event: EventBase) -> None:
backfilled=True,
)

@trace
async def _check_event_auth(
self, origin: Optional[str], event: EventBase, context: EventContext
) -> None:
Expand Down Expand Up @@ -1631,6 +1661,9 @@ async def _check_event_auth(
claimed_auth_events = await self._load_or_fetch_auth_events_for_event(
origin, event
)
set_attribute(
"claimed_auth_events", str([ev.event_id for ev in claimed_auth_events])
)

# ... and check that the event passes auth at those auth events.
# https://spec.matrix.org/v1.3/server-server-api/#checks-performed-on-receipt-of-a-pdu:
Expand Down Expand Up @@ -1728,6 +1761,7 @@ async def _check_event_auth(
)
context.rejected = RejectedReason.AUTH_ERROR

@trace
async def _maybe_kick_guest_users(self, event: EventBase) -> None:
if event.type != EventTypes.GuestAccess:
return
Expand Down Expand Up @@ -1935,6 +1969,8 @@ async def _load_or_fetch_auth_events_for_event(
# instead we raise an AuthError, which will make the caller ignore it.
raise AuthError(code=HTTPStatus.FORBIDDEN, msg="Auth events could not be found")

@trace
@tag_args
async def _get_remote_auth_chain_for_event(
self, destination: str, room_id: str, event_id: str
) -> None:
Expand Down Expand Up @@ -1963,6 +1999,7 @@ async def _get_remote_auth_chain_for_event(

await self._auth_and_persist_outliers(room_id, remote_auth_events)

@trace
async def _run_push_actions_and_persist_event(
self, event: EventBase, context: EventContext, backfilled: bool = False
) -> None:
Expand Down Expand Up @@ -2071,8 +2108,13 @@ async def persist_events_and_notify(
self._message_handler.maybe_schedule_expiry(event)

if not backfilled: # Never notify for backfilled events
for event in events:
await self._notify_persisted_event(event, max_stream_token)
with start_active_span("notify_persisted_events"):
set_attribute(
SynapseTags.FUNC_ARG_PREFIX + f"event_ids ({len(events)})",
str([ev.event_id for ev in events]),
)
for event in events:
await self._notify_persisted_event(event, max_stream_token)

return max_stream_token.stream

Expand Down
Loading