Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop fewer caches when we delete old rooms #17128

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ def _invalidate_state_caches_all(self, room_id: str) -> None:
self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,))
self._attempt_to_invalidate_cache("does_pair_of_users_share_a_room", None)
self._attempt_to_invalidate_cache("get_user_in_room_with_profile", None)
self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None)
self._attempt_to_invalidate_cache(
"get_rooms_for_user_with_stream_ordering", None
)
Expand Down
48 changes: 28 additions & 20 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,8 @@ def process_replication_rows(
)

room_id = row.keys[0]
self._invalidate_caches_for_room_events(room_id)
self._invalidate_caches_for_room(room_id)
server_joined = row.keys.get(1, True)
self._invalidate_caches_for_room(room_id, server_joined)
else:
self._attempt_to_invalidate_cache(row.cache_func, row.keys)

Expand Down Expand Up @@ -388,7 +388,6 @@ def _invalidate_caches_for_room_events(self, room_id: str) -> None:
self._invalidate_local_get_event_cache_room_id(room_id) # type: ignore[attr-defined]

self._attempt_to_invalidate_cache("have_seen_event", (room_id,))
self._attempt_to_invalidate_cache("get_latest_event_ids_in_room", (room_id,))
self._attempt_to_invalidate_cache(
"get_unread_event_push_actions_by_room_for_user", (room_id,)
)
Expand All @@ -398,11 +397,7 @@ def _invalidate_caches_for_room_events(self, room_id: str) -> None:
self._attempt_to_invalidate_cache("get_applicable_edit", None)
self._attempt_to_invalidate_cache("get_thread_id", None)
self._attempt_to_invalidate_cache("get_thread_id_for_receipts", None)
self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None)
self._attempt_to_invalidate_cache(
"get_rooms_for_user_with_stream_ordering", None
)
self._attempt_to_invalidate_cache("get_rooms_for_user", None)

self._attempt_to_invalidate_cache("did_forget", None)
self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None)
self._attempt_to_invalidate_cache("get_references_for_event", None)
Expand All @@ -417,17 +412,28 @@ def _invalidate_caches_for_room_events(self, room_id: str) -> None:
self._attempt_to_invalidate_cache("_get_joined_profile_from_event_id", None)

def _invalidate_caches_for_room_and_stream(
self, txn: LoggingTransaction, room_id: str
self,
txn: LoggingTransaction,
room_id: str,
server_in_room: bool,
) -> None:
"""Invalidate caches associated with rooms, and stream to replication.

Used when we delete rooms.

Args:
txn
room_id
server_in_room: Whether the server was joined or invited to the
room when we deleted it.
Comment on lines +428 to +429
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we care if the server has knocked on the room?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, now I can't remember why I included invite. It's basically whether we need to invalidate stuff due to current_state_events table changing, which should be empty unless the server is joined

"""

self._send_invalidation_to_replication(txn, DELETE_ROOM_CACHE_NAME, [room_id])
txn.call_after(self._invalidate_caches_for_room, room_id)
self._send_invalidation_to_replication(
txn, DELETE_ROOM_CACHE_NAME, [room_id, server_in_room]
)
txn.call_after(self._invalidate_caches_for_room, room_id, server_in_room)

def _invalidate_caches_for_room(self, room_id: str) -> None:
def _invalidate_caches_for_room(self, room_id: str, server_in_room: bool) -> None:
"""Invalidate caches associated with rooms.

Used when we delete rooms.
Expand All @@ -439,8 +445,16 @@ def _invalidate_caches_for_room(self, room_id: str) -> None:
self._attempt_to_invalidate_cache("get_account_data_for_room", None)
self._attempt_to_invalidate_cache("get_account_data_for_room_and_type", None)
self._attempt_to_invalidate_cache("get_aliases_for_room", (room_id,))
self._attempt_to_invalidate_cache("get_latest_event_ids_in_room", (room_id,))
self._attempt_to_invalidate_cache("_get_forward_extremeties_for_room", None)

if server_in_room:
self._attempt_to_invalidate_cache(
"get_latest_event_ids_in_room", (room_id,)
)
self._attempt_to_invalidate_cache("_get_forward_extremeties_for_room", None)

# And delete state caches.
self._invalidate_state_caches_all(room_id)

self._attempt_to_invalidate_cache(
"get_unread_event_push_actions_by_room_for_user", (room_id,)
)
Expand All @@ -453,19 +467,13 @@ def _invalidate_caches_for_room(self, room_id: str) -> None:
"_get_partial_state_servers_at_join", (room_id,)
)
self._attempt_to_invalidate_cache("is_partial_state_room", (room_id,))
self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None)
self._attempt_to_invalidate_cache(
"get_current_hosts_in_room_ordered", (room_id,)
)
self._attempt_to_invalidate_cache("did_forget", None)
self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None)
self._attempt_to_invalidate_cache("_get_membership_from_event_id", None)
self._attempt_to_invalidate_cache("get_room_version_id", (room_id,))

# And delete state caches.

self._invalidate_state_caches_all(room_id)

async def invalidate_cache_and_stream(
self, cache_name: str, keys: Tuple[Any, ...]
) -> None:
Expand Down
17 changes: 16 additions & 1 deletion synapse/storage/databases/main/purge_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import logging
from typing import Any, List, Set, Tuple, cast

from synapse.api.constants import Membership
from synapse.api.errors import SynapseError
from synapse.storage.database import LoggingTransaction
from synapse.storage.databases.main import CacheInvalidationWorkerStore
Expand Down Expand Up @@ -376,6 +377,20 @@ def _purge_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[int]:
(room_id,),
)

server_in_room = self._check_host_room_membership_txn( # type: ignore[attr-defined]
txn,
room_id,
host=self.hs.hostname,
membership=Membership.JOIN,
)
if not server_in_room:
server_in_room = self._check_host_room_membership_txn( # type: ignore[attr-defined]
txn,
room_id,
host=self.hs.hostname,
membership=Membership.INVITE,
)

# First, fetch all the state groups that should be deleted, before
# we delete that information.
txn.execute(
Expand Down Expand Up @@ -503,6 +518,6 @@ def _purge_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[int]:
# index on them. In any case we should be clearing out 'stream' tables
# periodically anyway (https://github.com/matrix-org/synapse/issues/5888)

self._invalidate_caches_for_room_and_stream(txn, room_id)
self._invalidate_caches_for_room_and_stream(txn, room_id, server_in_room)

return state_groups
21 changes: 16 additions & 5 deletions synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -962,6 +962,17 @@ async def is_host_invited(self, room_id: str, host: str) -> bool:

async def _check_host_room_membership(
self, room_id: str, host: str, membership: str
) -> bool:
return await self.db_pool.runInteraction(
"is_host_joined",
self._check_host_room_membership_txn,
room_id,
host,
membership,
)

def _check_host_room_membership_txn(
self, txn: LoggingTransaction, room_id: str, host: str, membership: str
) -> bool:
if "%" in host or "_" in host:
raise Exception("Invalid host name")
Expand All @@ -980,14 +991,14 @@ async def _check_host_room_membership(
# the returned user actually has the correct domain.
like_clause = "%:" + host

rows = await self.db_pool.execute(
"is_host_joined", sql, membership, room_id, like_clause
)
txn.execute(sql, (membership, room_id, like_clause))

row = txn.fetchone()

if not rows:
if not row:
return False

user_id = rows[0][0]
user_id = row[0]
if get_domain_from_id(user_id) != host:
# This can only happen if the host name has something funky in it
raise Exception("Invalid host name")
Expand Down