Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Comments and typing for _update_outliers_txn #11776

Merged
merged 2 commits into from
Jan 19, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11776.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add some comments and type annotations for `_update_outliers_txn`.
33 changes: 22 additions & 11 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1254,28 +1254,32 @@ def _update_room_depths_txn(
for room_id, depth in depth_updates.items():
self._update_min_depth_for_room_txn(txn, room_id, depth)

def _update_outliers_txn(self, txn, events_and_contexts):
def _update_outliers_txn(
self,
txn: LoggingTransaction,
events_and_contexts: List[Tuple[EventBase, EventContext]],
) -> List[Tuple[EventBase, EventContext]]:
"""Update any outliers with new event info.

This turns outliers into ex-outliers (unless the new event was
rejected).
This turns outliers into ex-outliers (unless the new event was rejected), and
also removes any other events we have already seen from the list.

Args:
txn (twisted.enterprise.adbapi.Connection): db connection
events_and_contexts (list[(EventBase, EventContext)]): events
we are persisting
txn: db connection
events_and_contexts: events we are persisting

Returns:
list[(EventBase, EventContext)] new list, without events which
are already in the events table.
new list, without events which are already in the events table.
"""
txn.execute(
"SELECT event_id, outlier FROM events WHERE event_id in (%s)"
% (",".join(["?"] * len(events_and_contexts)),),
[event.event_id for event, _ in events_and_contexts],
)

have_persisted = {event_id: outlier for event_id, outlier in txn}
have_persisted: Dict[str, bool] = {
event_id: outlier for event_id, outlier in txn
}

to_remove = set()
for event, context in events_and_contexts:
Expand All @@ -1285,15 +1289,22 @@ def _update_outliers_txn(self, txn, events_and_contexts):
to_remove.add(event)

if context.rejected:
# If the event is rejected then we don't care if the event
# was an outlier or not.
# If the incoming event is rejected then we don't care if the event
# was an outlier or not - what we have is at least as good.
continue

outlier_persisted = have_persisted[event.event_id]
if not event.internal_metadata.is_outlier() and outlier_persisted:
# We received a copy of an event that we had already stored as
# an outlier in the database. We now have some state at that
# so we need to update the state_groups table with that state.
richvdh marked this conversation as resolved.
Show resolved Hide resolved
#
# Note that we do not update the stream_ordering of the event in this
# scenario. XXX: does this cause bugs? It will mean we won't send such
# events down /sync. In general they will be historical events, so that
# doesn't matter too much, but that is not always the case.

logger.info("Updating state for ex-outlier event %s", event.event_id)

# insert into event_to_state_groups.
try:
Expand Down