Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bust _membership_stream_cache cache when current state changes #17732

Draft
wants to merge 2 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17732.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix membership caches not updating in state reset scenarios.
4 changes: 3 additions & 1 deletion synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ def process_replication_position( # noqa: B027 (no-op by design)
"""

def _invalidate_state_caches(
self, room_id: str, members_changed: Collection[str]
self,
room_id: str,
members_changed: Collection[str],
) -> None:
"""Invalidates caches that are based on the current state, but does
not stream invalidations down replication.
Expand Down
5 changes: 5 additions & 0 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ def process_replication_rows(
room_id = row.keys[0]
members_changed = set(row.keys[1:])
self._invalidate_state_caches(room_id, members_changed)
for user_id in members_changed:
self._membership_stream_cache.entity_has_changed(user_id, token) # type: ignore[attr-defined]
Copy link
Collaborator Author

@MadLittleMods MadLittleMods Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kinda weird to just stick this here (same with the others in process_replication_rows). Better way to organize this?

Copy link
Collaborator Author

@MadLittleMods MadLittleMods Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, tested this out with the tests in #17725 and these cache busting spots for current state don't pick up a state reset. Will need to look into this more

elif row.cache_func == PURGE_HISTORY_CACHE_NAME:
if row.keys is None:
raise Exception(
Expand All @@ -236,6 +238,7 @@ def process_replication_rows(
room_id = row.keys[0]
self._invalidate_caches_for_room_events(room_id)
self._invalidate_caches_for_room(room_id)
self._membership_stream_cache.all_entities_changed(token) # type: ignore[attr-defined]
else:
self._attempt_to_invalidate_cache(row.cache_func, row.keys)

Expand Down Expand Up @@ -275,6 +278,7 @@ def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None:
self._attempt_to_invalidate_cache(
"get_sliding_sync_rooms_for_user", None
)
self._membership_stream_cache.entity_has_changed(data.state_key, token) # type: ignore[attr-defined]
elif data.type == EventTypes.RoomEncryption:
self._attempt_to_invalidate_cache(
"get_room_encryption", (data.room_id,)
Expand All @@ -291,6 +295,7 @@ def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None:
# Similar to the above, but the entire caches are invalidated. This is
# unfortunate for the membership caches, but should recover quickly.
self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined]
self._membership_stream_cache.all_entities_changed(token) # type: ignore[attr-defined]
self._attempt_to_invalidate_cache("get_rooms_for_user", None)
self._attempt_to_invalidate_cache("get_room_type", (data.room_id,))
self._attempt_to_invalidate_cache("get_room_encryption", (data.room_id,))
Expand Down
11 changes: 11 additions & 0 deletions synapse/util/caches/stream_change_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,17 @@ def entity_has_changed(self, entity: EntityType, stream_pos: int) -> None:
self._entity_to_key[entity] = stream_pos
self._evict()

def all_entities_changed(self, stream_pos: int) -> None:
"""
Mark all entities as changed. This is useful when the cache is invalidated and
there may be some potential change for all of the entities.
"""
Comment on lines +317 to +321
Copy link
Collaborator Author

@MadLittleMods MadLittleMods Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re: all_entities_changed(stream_pos): Does this concept make sense?

I don't think it makes sense to drop all of the keys as we're essentially not sure if something has changed. I think this is the way and is just "unfortunate for the membership caches"

# All entities are at the same stream position now.
self._cache = SortedDict({stream_pos: set(self._entity_to_key.keys())})
self._entity_to_key = {
entity: stream_pos for entity in self._entity_to_key.keys()
}

def _evict(self) -> None:
"""
Ensure the cache has not exceeded the maximum size.
Expand Down
16 changes: 16 additions & 0 deletions tests/util/test_stream_change_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,3 +251,19 @@ def test_max_pos(self) -> None:

# Unknown entities will return None
self.assertEqual(cache.get_max_pos_of_last_change("not@here.website"), None)

def test_all_entities_changed(self) -> None:
"""
`StreamChangeCache.all_entities_changed(...)` will mark all entites as changed.
"""
cache = StreamChangeCache("#test", 1, max_size=10)

cache.entity_has_changed("user@foo.com", 2)
cache.entity_has_changed("bar@baz.net", 3)
cache.entity_has_changed("user@elsewhere.org", 4)

cache.all_entities_changed(5)

self.assertEqual(cache.get_max_pos_of_last_change("user@foo.com"), 5)
self.assertEqual(cache.get_max_pos_of_last_change("bar@baz.net"), 5)
self.assertEqual(cache.get_max_pos_of_last_change("user@elsewhere.org"), 5)
Loading