From 2bd0e634ac274bedb00c78292f931cbbd853d78c Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 18 Sep 2024 17:26:52 -0500 Subject: [PATCH 1/5] Bust `_membership_stream_cache` cache when current state changes Fix https://github.com/element-hq/synapse/issues/17368 --- synapse/storage/_base.py | 4 +++- synapse/storage/databases/main/cache.py | 5 +++++ synapse/util/caches/stream_change_cache.py | 11 +++++++++++ tests/util/test_stream_change_cache.py | 16 ++++++++++++++++ 4 files changed, 35 insertions(+), 1 deletion(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index e14d711c764..7251e72e3ad 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -86,7 +86,9 @@ def process_replication_position( # noqa: B027 (no-op by design) """ def _invalidate_state_caches( - self, room_id: str, members_changed: Collection[str] + self, + room_id: str, + members_changed: Collection[str], ) -> None: """Invalidates caches that are based on the current state, but does not stream invalidations down replication. diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 37c865a8e75..930bdb5a4fd 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -219,6 +219,8 @@ def process_replication_rows( room_id = row.keys[0] members_changed = set(row.keys[1:]) self._invalidate_state_caches(room_id, members_changed) + for user_id in members_changed: + self._membership_stream_cache.entity_has_changed(user_id, token) # type: ignore[attr-defined] elif row.cache_func == PURGE_HISTORY_CACHE_NAME: if row.keys is None: raise Exception( @@ -236,6 +238,7 @@ def process_replication_rows( room_id = row.keys[0] self._invalidate_caches_for_room_events(room_id) self._invalidate_caches_for_room(room_id) + self._membership_stream_cache.all_entities_changed(token) # type: ignore[attr-defined] else: self._attempt_to_invalidate_cache(row.cache_func, row.keys) @@ -275,6 +278,7 @@ def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None: self._attempt_to_invalidate_cache( "get_sliding_sync_rooms_for_user", None ) + self._membership_stream_cache.entity_has_changed(data.state_key, token) # type: ignore[attr-defined] elif data.type == EventTypes.RoomEncryption: self._attempt_to_invalidate_cache( "get_room_encryption", (data.room_id,) @@ -291,6 +295,7 @@ def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None: # Similar to the above, but the entire caches are invalidated. This is # unfortunate for the membership caches, but should recover quickly. self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined] + self._membership_stream_cache.all_entities_changed(token) # type: ignore[attr-defined] self._attempt_to_invalidate_cache("get_rooms_for_user", None) self._attempt_to_invalidate_cache("get_room_type", (data.room_id,)) self._attempt_to_invalidate_cache("get_room_encryption", (data.room_id,)) diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 16fcb00206f..8965086a970 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -314,6 +314,17 @@ def entity_has_changed(self, entity: EntityType, stream_pos: int) -> None: self._entity_to_key[entity] = stream_pos self._evict() + def all_entities_changed(self, stream_pos: int) -> None: + """ + Mark all entities as changed. This is useful when the cache is invalidated and + there may be some potential change for all of the entities. + """ + # All entities are at the same stream position now. + self._cache = SortedDict({stream_pos: set(self._entity_to_key.keys())}) + self._entity_to_key = { + entity: stream_pos for entity in self._entity_to_key.keys() + } + def _evict(self) -> None: """ Ensure the cache has not exceeded the maximum size. diff --git a/tests/util/test_stream_change_cache.py b/tests/util/test_stream_change_cache.py index af1199ef8a5..da36ecaed31 100644 --- a/tests/util/test_stream_change_cache.py +++ b/tests/util/test_stream_change_cache.py @@ -251,3 +251,19 @@ def test_max_pos(self) -> None: # Unknown entities will return None self.assertEqual(cache.get_max_pos_of_last_change("not@here.website"), None) + + def test_all_entities_changed(self) -> None: + """ + `StreamChangeCache.all_entities_changed(...)` will mark all entites as changed. + """ + cache = StreamChangeCache("#test", 1, max_size=10) + + cache.entity_has_changed("user@foo.com", 2) + cache.entity_has_changed("bar@baz.net", 3) + cache.entity_has_changed("user@elsewhere.org", 4) + + cache.all_entities_changed(5) + + self.assertEqual(cache.get_max_pos_of_last_change("user@foo.com"), 5) + self.assertEqual(cache.get_max_pos_of_last_change("bar@baz.net"), 5) + self.assertEqual(cache.get_max_pos_of_last_change("user@elsewhere.org"), 5) From d327c62ae745f0c2af702031833ad0b0f639ec29 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 18 Sep 2024 17:33:18 -0500 Subject: [PATCH 2/5] Add changelog --- changelog.d/17732.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/17732.bugfix diff --git a/changelog.d/17732.bugfix b/changelog.d/17732.bugfix new file mode 100644 index 00000000000..572c13fc573 --- /dev/null +++ b/changelog.d/17732.bugfix @@ -0,0 +1 @@ +Fix membership caches not updating in state reset scenarios. From dd87620fe6b6ed08a2fe0e0c28ed7ce39011ca23 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 9 Oct 2024 18:46:04 -0500 Subject: [PATCH 3/5] Actually bust membership cache on monolith when current state changes --- synapse/storage/databases/main/cache.py | 17 +++++++++++++++++ synapse/storage/databases/main/events.py | 15 ++++++++++++++- .../client/sliding_sync/test_sliding_sync.py | 18 ------------------ tests/storage/test_stream.py | 6 ------ 4 files changed, 31 insertions(+), 25 deletions(-) diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 4abd8f31967..91ce13c14f2 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -193,6 +193,13 @@ def get_all_updated_caches_txn( def process_replication_rows( self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any] ) -> None: + logger.info( + "asdf process_replication_rows %s %s %s %s", + stream_name, + instance_name, + token, + rows, + ) if stream_name == EventsStream.NAME: for row in rows: self._process_event_stream_row(token, row) @@ -219,6 +226,9 @@ def process_replication_rows( room_id = row.keys[0] members_changed = set(row.keys[1:]) self._invalidate_state_caches(room_id, members_changed) + self._curr_state_delta_stream_cache.entity_has_changed( + room_id, token + ) # type: ignore[attr-defined] for user_id in members_changed: self._membership_stream_cache.entity_has_changed(user_id, token) # type: ignore[attr-defined] elif row.cache_func == PURGE_HISTORY_CACHE_NAME: @@ -238,6 +248,9 @@ def process_replication_rows( room_id = row.keys[0] self._invalidate_caches_for_room_events(room_id) self._invalidate_caches_for_room(room_id) + self._curr_state_delta_stream_cache.entity_has_changed( + room_id, token + ) # type: ignore[attr-defined] self._membership_stream_cache.all_entities_changed(token) # type: ignore[attr-defined] else: self._attempt_to_invalidate_cache(row.cache_func, row.keys) @@ -255,6 +268,8 @@ def process_replication_position( def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None: data = row.data + logger.info("asdf _process_event_stream_row %s", row) + if row.type == EventsStreamEventRow.TypeId: assert isinstance(data, EventsStreamEventRow) self._invalidate_caches_for_event( @@ -271,6 +286,8 @@ def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None: assert isinstance(data, EventsStreamCurrentStateRow) self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined] + logger.info("asdf EventsStreamCurrentStateRow.TypeId %s", data) + if data.type == EventTypes.Member: self._attempt_to_invalidate_cache( "get_rooms_for_user", (data.state_key,) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index c0b7d8107d7..1733ef98de2 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1605,7 +1605,13 @@ def _update_current_state_txn( room_id delta_state: Deltas that are going to be used to update the `current_state_events` table. Changes to the current state of the room. - stream_id: TODO + stream_id: This is expected to be the minimum `stream_ordering` for the + batch of events that we are persisting; which means we do not end up in a + situation where workers see events before the `current_state_delta` updates. + FIXME: However, this function also gets called with next upcoming + `stream_ordering` when we re-sync the state of a partial stated room (see + `update_current_state(...)`) which may be "correct" but it would be good to + nail down what exactly is the expected value here. sliding_sync_table_changes: Changes to the `sliding_sync_membership_snapshots` and `sliding_sync_joined_rooms` tables derived from the given `delta_state` (see @@ -1904,6 +1910,13 @@ def _update_current_state_txn( stream_id, ) + for user_id in members_to_cache_bust: + txn.call_after( + self.store._membership_stream_cache.entity_has_changed, + user_id, + stream_id, + ) + # Invalidate the various caches self.store._invalidate_state_caches_and_stream( txn, room_id, members_to_cache_bust diff --git a/tests/rest/client/sliding_sync/test_sliding_sync.py b/tests/rest/client/sliding_sync/test_sliding_sync.py index ea3ca57957d..53f88dbabc1 100644 --- a/tests/rest/client/sliding_sync/test_sliding_sync.py +++ b/tests/rest/client/sliding_sync/test_sliding_sync.py @@ -1058,12 +1058,6 @@ def test_state_reset_room_comes_down_incremental_sync(self) -> None: self.persistence.persist_event(join_rule_event, join_rule_context) ) - # FIXME: We're manually busting the cache since - # https://github.com/element-hq/synapse/issues/17368 is not solved yet - self.store._membership_stream_cache.entity_has_changed( - user1_id, join_rule_event_pos.stream - ) - # Ensure that the state reset worked and only user2 is in the room now users_in_room = self.get_success(self.store.get_users_in_room(room_id1)) self.assertIncludes(set(users_in_room), {user2_id}, exact=True) @@ -1211,12 +1205,6 @@ def test_state_reset_previously_room_comes_down_incremental_sync_with_filters( self.persistence.persist_event(join_rule_event, join_rule_context) ) - # FIXME: We're manually busting the cache since - # https://github.com/element-hq/synapse/issues/17368 is not solved yet - self.store._membership_stream_cache.entity_has_changed( - user1_id, join_rule_event_pos.stream - ) - # Ensure that the state reset worked and only user2 is in the room now users_in_room = self.get_success(self.store.get_users_in_room(space_room_id)) self.assertIncludes(set(users_in_room), {user2_id}, exact=True) @@ -1395,12 +1383,6 @@ def test_state_reset_never_room_incremental_sync_with_filters( self.persistence.persist_event(join_rule_event, join_rule_context) ) - # FIXME: We're manually busting the cache since - # https://github.com/element-hq/synapse/issues/17368 is not solved yet - self.store._membership_stream_cache.entity_has_changed( - user1_id, join_rule_event_pos.stream - ) - # Ensure that the state reset worked and only user2 is in the room now users_in_room = self.get_success(self.store.get_users_in_room(space_room_id)) self.assertIncludes(set(users_in_room), {user2_id}, exact=True) diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py index ed5f2862439..c69dbd575ed 100644 --- a/tests/storage/test_stream.py +++ b/tests/storage/test_stream.py @@ -1209,12 +1209,6 @@ def test_state_reset2(self) -> None: self.persistence.persist_event(join_rule_event, join_rule_context) ) - # FIXME: We're manually busting the cache since - # https://github.com/element-hq/synapse/issues/17368 is not solved yet - self.store._membership_stream_cache.entity_has_changed( - user1_id, join_rule_event_pos.stream - ) - after_reset_token = self.event_sources.get_current_token() membership_changes = self.get_success( From 7c53716bc119aaa1cc6428a14ce86d9590a9f908 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 9 Oct 2024 18:50:22 -0500 Subject: [PATCH 4/5] Remove debug logs --- synapse/storage/databases/main/cache.py | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 91ce13c14f2..7143e36be96 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -193,13 +193,6 @@ def get_all_updated_caches_txn( def process_replication_rows( self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any] ) -> None: - logger.info( - "asdf process_replication_rows %s %s %s %s", - stream_name, - instance_name, - token, - rows, - ) if stream_name == EventsStream.NAME: for row in rows: self._process_event_stream_row(token, row) @@ -268,8 +261,6 @@ def process_replication_position( def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None: data = row.data - logger.info("asdf _process_event_stream_row %s", row) - if row.type == EventsStreamEventRow.TypeId: assert isinstance(data, EventsStreamEventRow) self._invalidate_caches_for_event( @@ -286,8 +277,6 @@ def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None: assert isinstance(data, EventsStreamCurrentStateRow) self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined] - logger.info("asdf EventsStreamCurrentStateRow.TypeId %s", data) - if data.type == EventTypes.Member: self._attempt_to_invalidate_cache( "get_rooms_for_user", (data.state_key,) From bceb4e08f1f3b5cdeb978a4c179f315641140ebc Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 9 Oct 2024 18:51:28 -0500 Subject: [PATCH 5/5] Fix lints --- synapse/storage/databases/main/cache.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 7143e36be96..0ec33f78182 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -219,9 +219,9 @@ def process_replication_rows( room_id = row.keys[0] members_changed = set(row.keys[1:]) self._invalidate_state_caches(room_id, members_changed) - self._curr_state_delta_stream_cache.entity_has_changed( + self._curr_state_delta_stream_cache.entity_has_changed( # type: ignore[attr-defined] room_id, token - ) # type: ignore[attr-defined] + ) for user_id in members_changed: self._membership_stream_cache.entity_has_changed(user_id, token) # type: ignore[attr-defined] elif row.cache_func == PURGE_HISTORY_CACHE_NAME: @@ -241,9 +241,9 @@ def process_replication_rows( room_id = row.keys[0] self._invalidate_caches_for_room_events(room_id) self._invalidate_caches_for_room(room_id) - self._curr_state_delta_stream_cache.entity_has_changed( + self._curr_state_delta_stream_cache.entity_has_changed( # type: ignore[attr-defined] room_id, token - ) # type: ignore[attr-defined] + ) self._membership_stream_cache.all_entities_changed(token) # type: ignore[attr-defined] else: self._attempt_to_invalidate_cache(row.cache_func, row.keys)