Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fetch edits for multiple events in a single query #11660

Merged
merged 16 commits into from
Feb 8, 2022
Merged
86 changes: 66 additions & 20 deletions synapse/storage/databases/main/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
RelationPaginationToken,
)
from synapse.util.caches.descriptors import cached
from synapse.util.caches.lrucache import LruCache

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand All @@ -63,6 +64,11 @@ def __init__(
self._msc1849_enabled = hs.config.experimental.msc1849_enabled
self._msc3440_enabled = hs.config.experimental.msc3440_enabled

self.get_applicable_edit: LruCache[str, Optional[EventBase]] = LruCache(
clokep marked this conversation as resolved.
Show resolved Hide resolved
cache_name="get_applicable_edit",
max_size=hs.config.caches.event_cache_size, # TODO
)
clokep marked this conversation as resolved.
Show resolved Hide resolved

@cached(tree=True)
async def get_relations_for_event(
self,
Expand Down Expand Up @@ -325,57 +331,96 @@ def _get_aggregation_groups_for_event_txn(
"get_aggregation_groups_for_event", _get_aggregation_groups_for_event_txn
)

@cached()
async def get_applicable_edit(self, event_id: str) -> Optional[EventBase]:
async def _get_applicable_edits(
self, event_ids: Iterable[str]
) -> Dict[str, EventBase]:
"""Get the most recent edit (if any) that has happened for the given
event.
events.

Correctly handles checking whether edits were allowed to happen.

Args:
event_id: The original event ID
event_ids: The original event IDs

Returns:
The most recent edit, if any.
A map of the most recent edit for each event. A missing event implies
there is no edits.
"""

# A map of the original event IDs to the edit events.
edits_by_original = {}

# Check if an edit for this event is currently cached.
event_ids_to_check = []
for event_id in event_ids:
if event_id not in self.get_applicable_edit:
event_ids_to_check.append(event_id)
else:
edit_event = self.get_applicable_edit[event_id]
if edit_event:
edits_by_original[event_id] = edit_event

# If all events were cached, all done.
if not event_ids_to_check:
return edits_by_original

# We only allow edits for `m.room.message` events that have the same sender
# and event type. We can't assert these things during regular event auth so
# we have to do the checks post hoc.

# Fetches latest edit that has the same type and sender as the
# original, and is an `m.room.message`.
#
# TODO Should this ensure it does not return results for state events / redacted events?
clokep marked this conversation as resolved.
Show resolved Hide resolved
sql = """
SELECT edit.event_id FROM events AS edit
SELECT original.event_id, edit.event_id FROM events AS edit
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we would be only pull out the latest edit event per original event, but I haven't found a reasonable way to do that (maybe a lateral join, but that's not supported on sqlite). Any thoughts on how to improve this would be appreciated!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does latest edit mean "edit with largest depth"?

Might we be able to use a trick like https://stackoverflow.com/a/27802817/5252017 ? (Sorry, not an expert here)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It means the largest origin_server_ts (which is why we're ordering by that descending.)

Copy link
Contributor

@reivilibre reivilibre Jan 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe a lateral join, but that's not supported on sqlite

N.B. SQLite does support lateral joins as long as you don't write the word LATERAL — if that's the only thing blocking you, you should be able to work around that by only inserting that word for Postgres.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@reivilibre Do you have any reference for that? I've been unable to get it to work (and my searching online has yielded "you can't do this on SQLite").

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On Postgres you can do SELECT DISTINCT ON (original.event_id) ... which will choose the first row (as defined by the ORDER BY clause). I don't think SQLite has that, and so you would need to use a different query, at which point you may as well fall back to the old behaviour.

Potentially you could use window functions and first_value , which I think both postgres and sqlite support, but those sorts of queries really are voodoo magic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️ @erikjohnston Thank you! That gave me some breadcrumbs to realize we do something very similar elsewhere:

if isinstance(self.database_engine, PostgresEngine):
# The `DISTINCT ON` clause will pick the *first* row it
# encounters, so ordering by stream ID desc will ensure we get
# the latest key.
sql = """
SELECT DISTINCT ON (user_id, keytype) user_id, keytype, keydata, stream_id
FROM e2e_cross_signing_keys
WHERE %(clause)s
ORDER BY user_id, keytype, stream_id DESC
""" % {
"clause": clause
}
else:
# SQLite has special handling for bare columns when using
# MIN/MAX with a `GROUP BY` clause where it picks the value from
# a row that matches the MIN/MAX.
sql = """
SELECT user_id, keytype, keydata, MAX(stream_id)
FROM e2e_cross_signing_keys
WHERE %(clause)s
GROUP BY user_id, keytype
""" % {
"clause": clause
}

I think we can abstract out the changes in 8400c20, but that is proving a bit tedious / invasive so I'd like to do it separately.

INNER JOIN event_relations USING (event_id)
INNER JOIN events AS original ON
original.event_id = relates_to_id
AND edit.type = original.type
AND edit.sender = original.sender
AND edit.room_id = original.room_id
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
WHERE
relates_to_id = ?
%s
AND relation_type = ?
AND edit.type = 'm.room.message'
ORDER by edit.origin_server_ts DESC, edit.event_id DESC
LIMIT 1
"""

def _get_applicable_edit_txn(txn: LoggingTransaction) -> Optional[str]:
txn.execute(sql, (event_id, RelationTypes.REPLACE))
row = txn.fetchone()
if row:
return row[0]
return None

edit_id = await self.db_pool.runInteraction(
def _get_applicable_edit_txn(txn: LoggingTransaction) -> Dict[str, str]:
clokep marked this conversation as resolved.
Show resolved Hide resolved
clause, args = make_in_list_sql_clause(
txn.database_engine, "relates_to_id", event_ids_to_check
)
args.append(RelationTypes.REPLACE)

txn.execute(sql % (clause,), args)
rows = txn.fetchall()
result = {}
for original_event_id, edit_event_id in rows:
# Only consider the latest edit (by origin server ts).
if original_event_id not in result:
result[original_event_id] = edit_event_id
return result

edit_ids = await self.db_pool.runInteraction(
"get_applicable_edit", _get_applicable_edit_txn
)

if not edit_id:
return None
edits = await self.get_events(edit_ids.values()) # type: ignore[attr-defined]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this class depend on EventsWorkerStore instead of ignoring the error?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can attempt to do that! Note that we have similar ignores all over, see #11165.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gives other errors (similar to #11165) about inconsistent MROs. I'm going to leave this to be solved in #11165.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair!


# Add the newly checked events to the cache. If an edit exists, add it to
# the results.
for original_event_id in event_ids_to_check:
# There might not be an edit or the event might not be known. In
# either case, cache the None.
edit_event_id = edit_ids.get(original_event_id)
edit_event = edits.get(edit_event_id)

self.get_applicable_edit.set(original_event_id, edit_event)
if edit_event:
edits_by_original[original_event_id] = edit_event

return await self.get_event(edit_id, allow_none=True) # type: ignore[attr-defined]
return edits_by_original

@cached()
async def get_thread_summary(
Expand Down Expand Up @@ -588,7 +633,8 @@ async def _get_bundled_aggregation_for_event(

edit = None
if event.type == EventTypes.Message:
edit = await self.get_applicable_edit(event_id)
edits = await self._get_applicable_edits([event_id])
edit = edits.get(event_id)

if edit:
aggregations[RelationTypes.REPLACE] = edit
Expand Down