-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Fix have_seen_event
cache not being invalidated
#13863
Changes from all commits
a847a35
f6393db
f2a5c70
1054f91
2162ab5
0cdc7bf
5b9b645
9fb750d
4fa8f05
b9be6c5
f8dc17b
af93b3c
0d0f54e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Fix `have_seen_event` cache not being invalidated after we persist an event which causes inefficiency effects like extra `/state` federation calls. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -431,6 +431,12 @@ def __get__( | |
cache: DeferredCache[CacheKey, Any] = cached_method.cache | ||
num_args = cached_method.num_args | ||
|
||
if num_args != self.num_args: | ||
raise Exception( | ||
"Number of args (%s) does not match underlying cache_method_name=%s (%s)." | ||
% (self.num_args, self.cached_method_name, num_args) | ||
) | ||
Comment on lines
+434
to
+438
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added a safety check so others don't run into the same pitfall and we can see this error obviously. This also has a test to make sure the safety check works (see Example error: Full error
There are other ways the args could mismatch like the type but this would have caught the problem encountered here with |
||
|
||
@functools.wraps(self.orig) | ||
def wrapped(*args: Any, **kwargs: Any) -> "defer.Deferred[Dict]": | ||
# If we're passed a cache_context then we'll want to call its | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,66 +35,45 @@ | |
from synapse.util.async_helpers import yieldable_gather_results | ||
|
||
from tests import unittest | ||
from tests.test_utils.event_injection import create_event, inject_event | ||
|
||
|
||
class HaveSeenEventsTestCase(unittest.HomeserverTestCase): | ||
servlets = [ | ||
admin.register_servlets, | ||
room.register_servlets, | ||
login.register_servlets, | ||
] | ||
|
||
def prepare(self, reactor, clock, hs): | ||
self.hs = hs | ||
self.store: EventsWorkerStore = hs.get_datastores().main | ||
|
||
# insert some test data | ||
for rid in ("room1", "room2"): | ||
self.get_success( | ||
self.store.db_pool.simple_insert( | ||
"rooms", | ||
{"room_id": rid, "room_version": 4}, | ||
) | ||
) | ||
self.user = self.register_user("user", "pass") | ||
self.token = self.login(self.user, "pass") | ||
self.room_id = self.helper.create_room_as(self.user, tok=self.token) | ||
|
||
self.event_ids: List[str] = [] | ||
for idx, rid in enumerate( | ||
( | ||
"room1", | ||
"room1", | ||
"room1", | ||
"room2", | ||
) | ||
): | ||
event_json = {"type": f"test {idx}", "room_id": rid} | ||
event = make_event_from_dict(event_json, room_version=RoomVersions.V4) | ||
event_id = event.event_id | ||
|
||
self.get_success( | ||
self.store.db_pool.simple_insert( | ||
"events", | ||
{ | ||
"event_id": event_id, | ||
"room_id": rid, | ||
"topological_ordering": idx, | ||
"stream_ordering": idx, | ||
"type": event.type, | ||
"processed": True, | ||
"outlier": False, | ||
}, | ||
for i in range(3): | ||
event = self.get_success( | ||
inject_event( | ||
hs, | ||
room_version=RoomVersions.V7.identifier, | ||
room_id=self.room_id, | ||
sender=self.user, | ||
type="test_event_type", | ||
content={"body": f"foobarbaz{i}"}, | ||
) | ||
) | ||
self.get_success( | ||
self.store.db_pool.simple_insert( | ||
"event_json", | ||
{ | ||
"event_id": event_id, | ||
"room_id": rid, | ||
"json": json.dumps(event_json), | ||
"internal_metadata": "{}", | ||
"format_version": 3, | ||
}, | ||
) | ||
) | ||
Comment on lines
-66
to
-91
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Simplified this test logic by using the real thing. And makes it easier to to create another event down the line in the new test with |
||
self.event_ids.append(event_id) | ||
|
||
self.event_ids.append(event.event_id) | ||
|
||
def test_simple(self): | ||
with LoggingContext(name="test") as ctx: | ||
res = self.get_success( | ||
self.store.have_seen_events("room1", [self.event_ids[0], "event19"]) | ||
self.store.have_seen_events( | ||
self.room_id, [self.event_ids[0], "eventdoesnotexist"] | ||
) | ||
) | ||
self.assertEqual(res, {self.event_ids[0]}) | ||
|
||
|
@@ -104,7 +83,9 @@ def test_simple(self): | |
# a second lookup of the same events should cause no queries | ||
with LoggingContext(name="test") as ctx: | ||
res = self.get_success( | ||
self.store.have_seen_events("room1", [self.event_ids[0], "event19"]) | ||
self.store.have_seen_events( | ||
self.room_id, [self.event_ids[0], "eventdoesnotexist"] | ||
) | ||
) | ||
self.assertEqual(res, {self.event_ids[0]}) | ||
self.assertEqual(ctx.get_resource_usage().db_txn_count, 0) | ||
|
@@ -116,11 +97,86 @@ def test_query_via_event_cache(self): | |
# looking it up should now cause no db hits | ||
with LoggingContext(name="test") as ctx: | ||
res = self.get_success( | ||
self.store.have_seen_events("room1", [self.event_ids[0]]) | ||
self.store.have_seen_events(self.room_id, [self.event_ids[0]]) | ||
) | ||
self.assertEqual(res, {self.event_ids[0]}) | ||
self.assertEqual(ctx.get_resource_usage().db_txn_count, 0) | ||
|
||
def test_persisting_event_invalidates_cache(self): | ||
""" | ||
Test to make sure that the `have_seen_event` cache | ||
is invalidated after we persist an event and returns | ||
the updated value. | ||
""" | ||
event, event_context = self.get_success( | ||
create_event( | ||
self.hs, | ||
room_id=self.room_id, | ||
sender=self.user, | ||
type="test_event_type", | ||
content={"body": "garply"}, | ||
) | ||
) | ||
|
||
with LoggingContext(name="test") as ctx: | ||
# First, check `have_seen_event` for an event we have not seen yet | ||
# to prime the cache with a `false` value. | ||
res = self.get_success( | ||
self.store.have_seen_events(event.room_id, [event.event_id]) | ||
) | ||
self.assertEqual(res, set()) | ||
|
||
# That should result in a single db query to lookup | ||
self.assertEqual(ctx.get_resource_usage().db_txn_count, 1) | ||
|
||
# Persist the event which should invalidate or prefill the | ||
# `have_seen_event` cache so we don't return stale values. | ||
persistence = self.hs.get_storage_controllers().persistence | ||
self.get_success( | ||
persistence.persist_event( | ||
event, | ||
event_context, | ||
) | ||
) | ||
|
||
with LoggingContext(name="test") as ctx: | ||
# Check `have_seen_event` again and we should see the updated fact | ||
# that we have now seen the event after persisting it. | ||
res = self.get_success( | ||
self.store.have_seen_events(event.room_id, [event.event_id]) | ||
) | ||
self.assertEqual(res, {event.event_id}) | ||
|
||
# That should result in a single db query to lookup | ||
self.assertEqual(ctx.get_resource_usage().db_txn_count, 1) | ||
|
||
def test_invalidate_cache_by_room_id(self): | ||
""" | ||
Test to make sure that all events associated with the given `(room_id,)` | ||
are invalidated in the `have_seen_event` cache. | ||
""" | ||
with LoggingContext(name="test") as ctx: | ||
# Prime the cache with some values | ||
res = self.get_success( | ||
self.store.have_seen_events(self.room_id, self.event_ids) | ||
) | ||
self.assertEqual(res, set(self.event_ids)) | ||
|
||
# That should result in a single db query to lookup | ||
self.assertEqual(ctx.get_resource_usage().db_txn_count, 1) | ||
|
||
# Clear the cache with any events associated with the `room_id` | ||
self.store.have_seen_event.invalidate((self.room_id,)) | ||
|
||
with LoggingContext(name="test") as ctx: | ||
res = self.get_success( | ||
self.store.have_seen_events(self.room_id, self.event_ids) | ||
) | ||
self.assertEqual(res, set(self.event_ids)) | ||
|
||
# Since we cleared the cache, it should result in another db query to lookup | ||
self.assertEqual(ctx.get_resource_usage().db_txn_count, 1) | ||
|
||
|
||
class EventCacheTestCase(unittest.HomeserverTestCase): | ||
"""Test that the various layers of event cache works.""" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the fix as described by @erikjohnston in #13865 (comment)
The rest of the changes in this file are adapting to this change.