Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Cleanup opentracing logging for syncs #10828

Merged
merged 3 commits into from
Sep 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/10828.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added opentrace logging to help debug #9424.
267 changes: 141 additions & 126 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1533,21 +1533,18 @@ async def _generate_sync_entry_for_rooms(
newly_left_rooms = room_changes.newly_left_rooms

async def handle_room_entries(room_entry: "RoomSyncResultBuilder"):
with start_active_span("generate_room_entry"):
set_tag("room_id", room_entry.room_id)
log_kv({"events": len(room_entry.events or [])})
logger.debug("Generating room entry for %s", room_entry.room_id)
res = await self._generate_room_entry(
sync_result_builder,
ignored_users,
room_entry,
ephemeral=ephemeral_by_room.get(room_entry.room_id, []),
tags=tags_by_room.get(room_entry.room_id),
account_data=account_data_by_room.get(room_entry.room_id, {}),
always_include=sync_result_builder.full_state,
)
logger.debug("Generated room entry for %s", room_entry.room_id)
return res
logger.debug("Generating room entry for %s", room_entry.room_id)
res = await self._generate_room_entry(
sync_result_builder,
ignored_users,
room_entry,
ephemeral=ephemeral_by_room.get(room_entry.room_id, []),
tags=tags_by_room.get(room_entry.room_id),
account_data=account_data_by_room.get(room_entry.room_id, {}),
always_include=sync_result_builder.full_state,
)
logger.debug("Generated room entry for %s", room_entry.room_id)
return res

await concurrently_execute(handle_room_entries, room_entries, 10)

Expand Down Expand Up @@ -1960,139 +1957,157 @@ async def _generate_room_entry(
room_id = room_builder.room_id
since_token = room_builder.since_token
upto_token = room_builder.upto_token
log_kv(
{
"since_token": since_token,
"upto_token": upto_token,
}
)

batch = await self._load_filtered_recents(
room_id,
sync_config,
now_token=upto_token,
since_token=since_token,
potential_recents=events,
newly_joined_room=newly_joined,
)
log_kv(
{
"batch_events": len(batch.events),
"prev_batch": batch.prev_batch,
"batch_limited": batch.limited,
}
)
with start_active_span("generate_room_entry"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to change the span name?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm OK with it, but open to other suggestions 🤷

set_tag("room_id", room_id)
log_kv({"events": len(events or ())})

# Note: `batch` can be both empty and limited here in the case where
# `_load_filtered_recents` can't find any events the user should see
# (e.g. due to having ignored the sender of the last 50 events).
log_kv(
{
"since_token": since_token,
"upto_token": upto_token,
}
)

if newly_joined:
# debug for https://github.com/matrix-org/synapse/issues/4422
issue4422_logger.debug(
"Timeline events after filtering in newly-joined room %s: %r",
batch = await self._load_filtered_recents(
room_id,
batch,
sync_config,
now_token=upto_token,
since_token=since_token,
potential_recents=events,
newly_joined_room=newly_joined,
)
log_kv(
{
"batch_events": len(batch.events),
"prev_batch": batch.prev_batch,
"batch_limited": batch.limited,
}
)

# When we join the room (or the client requests full_state), we should
# send down any existing tags. Usually the user won't have tags in a
# newly joined room, unless either a) they've joined before or b) the
# tag was added by synapse e.g. for server notice rooms.
if full_state:
user_id = sync_result_builder.sync_config.user.to_string()
tags = await self.store.get_tags_for_room(user_id, room_id)
# Note: `batch` can be both empty and limited here in the case where
# `_load_filtered_recents` can't find any events the user should see
# (e.g. due to having ignored the sender of the last 50 events).

# If there aren't any tags, don't send the empty tags list down
# sync
if not tags:
tags = None
if newly_joined:
# debug for https://github.com/matrix-org/synapse/issues/4422
issue4422_logger.debug(
"Timeline events after filtering in newly-joined room %s: %r",
room_id,
batch,
)

account_data_events = []
if tags is not None:
account_data_events.append({"type": "m.tag", "content": {"tags": tags}})
# When we join the room (or the client requests full_state), we should
# send down any existing tags. Usually the user won't have tags in a
# newly joined room, unless either a) they've joined before or b) the
# tag was added by synapse e.g. for server notice rooms.
if full_state:
user_id = sync_result_builder.sync_config.user.to_string()
tags = await self.store.get_tags_for_room(user_id, room_id)

for account_data_type, content in account_data.items():
account_data_events.append({"type": account_data_type, "content": content})
# If there aren't any tags, don't send the empty tags list down
# sync
if not tags:
tags = None

account_data_events = sync_config.filter_collection.filter_room_account_data(
account_data_events
)
account_data_events = []
if tags is not None:
account_data_events.append({"type": "m.tag", "content": {"tags": tags}})

ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral)
for account_data_type, content in account_data.items():
account_data_events.append(
{"type": account_data_type, "content": content}
)

if not (
always_include or batch or account_data_events or ephemeral or full_state
):
return
account_data_events = (
sync_config.filter_collection.filter_room_account_data(
account_data_events
)
)

state = await self.compute_state_delta(
room_id, batch, sync_config, since_token, now_token, full_state=full_state
)
ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral)

summary: Optional[JsonDict] = {}

# we include a summary in room responses when we're lazy loading
# members (as the client otherwise doesn't have enough info to form
# the name itself).
if sync_config.filter_collection.lazy_load_members() and (
# we recalculate the summary:
# if there are membership changes in the timeline, or
# if membership has changed during a gappy sync, or
# if this is an initial sync.
any(ev.type == EventTypes.Member for ev in batch.events)
or (
# XXX: this may include false positives in the form of LL
# members which have snuck into state
batch.limited
and any(t == EventTypes.Member for (t, k) in state)
)
or since_token is None
):
summary = await self.compute_summary(
room_id, sync_config, batch, state, now_token
)
if not (
always_include
or batch
or account_data_events
or ephemeral
or full_state
):
return

if room_builder.rtype == "joined":
unread_notifications: Dict[str, int] = {}
room_sync = JoinedSyncResult(
room_id=room_id,
timeline=batch,
state=state,
ephemeral=ephemeral,
account_data=account_data_events,
unread_notifications=unread_notifications,
summary=summary,
unread_count=0,
state = await self.compute_state_delta(
room_id,
batch,
sync_config,
since_token,
now_token,
full_state=full_state,
)

if room_sync or always_include:
notifs = await self.unread_notifs_for_room_id(room_id, sync_config)
summary: Optional[JsonDict] = {}

# we include a summary in room responses when we're lazy loading
# members (as the client otherwise doesn't have enough info to form
# the name itself).
if sync_config.filter_collection.lazy_load_members() and (
# we recalculate the summary:
# if there are membership changes in the timeline, or
# if membership has changed during a gappy sync, or
# if this is an initial sync.
any(ev.type == EventTypes.Member for ev in batch.events)
or (
# XXX: this may include false positives in the form of LL
# members which have snuck into state
batch.limited
and any(t == EventTypes.Member for (t, k) in state)
)
or since_token is None
):
summary = await self.compute_summary(
room_id, sync_config, batch, state, now_token
)

if room_builder.rtype == "joined":
unread_notifications: Dict[str, int] = {}
room_sync = JoinedSyncResult(
room_id=room_id,
timeline=batch,
state=state,
ephemeral=ephemeral,
account_data=account_data_events,
unread_notifications=unread_notifications,
summary=summary,
unread_count=0,
)

unread_notifications["notification_count"] = notifs["notify_count"]
unread_notifications["highlight_count"] = notifs["highlight_count"]
if room_sync or always_include:
notifs = await self.unread_notifs_for_room_id(room_id, sync_config)

room_sync.unread_count = notifs["unread_count"]
unread_notifications["notification_count"] = notifs["notify_count"]
unread_notifications["highlight_count"] = notifs["highlight_count"]

sync_result_builder.joined.append(room_sync)
room_sync.unread_count = notifs["unread_count"]

if batch.limited and since_token:
user_id = sync_result_builder.sync_config.user.to_string()
logger.debug(
"Incremental gappy sync of %s for user %s with %d state events"
% (room_id, user_id, len(state))
sync_result_builder.joined.append(room_sync)

if batch.limited and since_token:
user_id = sync_result_builder.sync_config.user.to_string()
logger.debug(
"Incremental gappy sync of %s for user %s with %d state events"
% (room_id, user_id, len(state))
)
elif room_builder.rtype == "archived":
archived_room_sync = ArchivedSyncResult(
room_id=room_id,
timeline=batch,
state=state,
account_data=account_data_events,
)
elif room_builder.rtype == "archived":
archived_room_sync = ArchivedSyncResult(
room_id=room_id,
timeline=batch,
state=state,
account_data=account_data_events,
)
if archived_room_sync or always_include:
sync_result_builder.archived.append(archived_room_sync)
else:
raise Exception("Unrecognized rtype: %r", room_builder.rtype)
if archived_room_sync or always_include:
sync_result_builder.archived.append(archived_room_sync)
else:
raise Exception("Unrecognized rtype: %r", room_builder.rtype)

async def get_rooms_for_user_at(
self, user_id: str, room_key: RoomStreamToken
Expand Down