Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Only store data in caches, not "smart" objects #9845

Merged
merged 9 commits into from
Apr 23, 2021
25 changes: 9 additions & 16 deletions synapse/push/bulk_push_rule_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastore()
self.auth = hs.get_auth()

# Used by `RulesForRoom` to ensure only one thing mutates the cache at a
# time. Keyed off room_id.
self._rules_linearizer = Linearizer(name="rules_for_room")

self.room_push_rule_cache_metrics = register_cache(
Expand Down Expand Up @@ -154,8 +156,8 @@ async def _get_rules_for_event(

@lru_cache()
def _get_rules_for_room(self, room_id: str) -> "RulesForRoomData":
"""Get the current RulesForRoom object for the given room id"""
# It's important that RulesForRoom gets added to self._get_rules_for_room.cache
"""Get the current RulesForRoomData object for the given room id"""
# It's important that the RulesForRoomData object gets added to self._get_rules_for_room.cache
# before any lookup methods get called on it as otherwise there may be
# a race if invalidate_all gets called (which assumes its in the cache)
return RulesForRoomData()
Expand Down Expand Up @@ -339,13 +341,16 @@ def __init__(
rules_for_room_cache: The cache object that caches these
RoomsForUser objects.
room_push_rule_cache_metrics: The metrics object
data
linearizer:
cached_data:
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
"""
self.room_id = room_id
self.is_mine_id = hs.is_mine_id
self.store = hs.get_datastore()
self.room_push_rule_cache_metrics = room_push_rule_cache_metrics

# Used to ensure only one thing mutates the cache at a time. Keyed off
# room_id.
self.linearizer = linearizer

self.data = cached_data
Expand All @@ -371,7 +376,7 @@ async def get_rules(
self.room_push_rule_cache_metrics.inc_hits()
return self.data.rules_by_user

with (await self.linearizer.queue(event.room_id)):
with (await self.linearizer.queue(self.room_id)):
if state_group and self.data.state_group == state_group:
logger.debug("Using cached rules for %r", self.room_id)
self.room_push_rule_cache_metrics.inc_hits()
Expand Down Expand Up @@ -515,18 +520,6 @@ async def _update_rules_with_member_event_ids(

self.update_cache(sequence, members, ret_rules_by_user, state_group)

def invalidate_all(self) -> None:
# Note: Don't hand this function directly to an invalidation callback
# as it keeps a reference to self and will stop this instance from being
# GC'd if it gets dropped from the rules_to_user cache. Instead use
# `self.invalidate_all_cb`
logger.debug("Invalidating RulesForRoom for %r", self.room_id)
self.data.sequence += 1
self.data.state_group = object()
self.data.member_map = {}
self.data.rules_by_user = {}
push_rules_invalidation_counter.inc()

def update_cache(self, sequence, members, rules_by_user, state_group) -> None:
if sequence == self.data.sequence:
self.data.member_map.update(members)
Expand Down
25 changes: 17 additions & 8 deletions synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ class RoomMemberWorkerStore(EventsWorkerStore):
def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)

# Used by `_get_joined_hosts` to ensure only one thing mutates the cache
# at a time. Keyed by room_id.
self._joined_host_linearizer = Linearizer("_JoinedHostsCache")

# Is the current_state_events.membership up to date? Or is the
Expand Down Expand Up @@ -759,26 +761,31 @@ async def _get_joined_hosts(
# on it. However, its important that its never None, since two current_state's
# with a state_group of None are likely to be different.
assert state_group is not None
assert state_entry.state_group is None or state_entry.state_group == state_group
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

# We use a secondary cache of previous work to allow us to build up the
# joined hosts for the given state group based on previous state groups.
#
# We cache one object per room containing the results of the last state
# group we got joined hosts for, with the idea being that generally
# `get_joined_hosts` with the "current" state group for the room.
# group we got joined hosts for. The idea being that generally
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
# `get_joined_hosts` is called with the "current" state group for the
# room, and so consecutive calls will be for consecutive state groups
# which point to the previous state group.
cache = await self._get_joined_hosts_cache(room_id)

# If the state group in the cache matches then its a no-op.
# If the state group in the cache matches, we already have the data we need.
if state_entry.state_group == cache.state_group:
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
return frozenset(cache.hosts_to_joined_users)

# Since we'll mutate the cache we need to lock.
with (await self._joined_host_linearizer.queue(room_id)):
if state_entry.state_group == cache.state_group:
# Same state group, so nothing to do
# Same state group, so nothing to do. We've already checked for
# this above, but the cache may have changed while waiting on
# the lock.
pass
elif state_entry.prev_group == cache.state_group:
# The cache work is for the previous state group, so we work out
# The cached work is for the previous state group, so we work out
# the delta.
for (typ, state_key), event_id in state_entry.delta_ids.items():
if typ != EventTypes.Member:
Expand Down Expand Up @@ -1129,12 +1136,14 @@ def f(txn):

@attr.s(slots=True)
class _JoinedHostsCache:
"""Cache for joined hosts in a room that is optimised to handle updates
via state deltas.
"""
"""The cached data used by the `_get_joined_hosts_cache`."""

# Dict of host to the set of their users in the room at the state group.
hosts_to_joined_users = attr.ib(type=Dict[str, Set[str]], factory=dict)

# The state group `hosts_to_joined_users` is derived from. Will be an object
# if the class is newly created or if the state is not based on a state
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
# group.
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
state_group = attr.ib(type=Union[object, int], factory=object)
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

def __len__(self):
Expand Down