Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Combine LruCache.invalidate and invalidate_many #9973

Merged
merged 3 commits into from
May 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9973.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Make `LruCache.invalidate` support tree invalidation, and remove `invalidate_many`.
2 changes: 1 addition & 1 deletion synapse/replication/slave/storage/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def _invalidate_caches_for_devices(self, token, rows):
if row.entity.startswith("@"):
self._device_list_stream_cache.entity_has_changed(row.entity, token)
self.get_cached_devices_for_user.invalidate((row.entity,))
self._get_cached_user_device.invalidate_many((row.entity,))
self._get_cached_user_device.invalidate((row.entity,))
self.get_device_list_last_stream_id_for_remote.invalidate((row.entity,))

else:
Expand Down
6 changes: 3 additions & 3 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def _invalidate_caches_for_event(

self.get_latest_event_ids_in_room.invalidate((room_id,))

self.get_unread_event_push_actions_by_room_for_user.invalidate_many((room_id,))
self.get_unread_event_push_actions_by_room_for_user.invalidate((room_id,))

if not backfilled:
self._events_stream_cache.entity_has_changed(room_id, stream_ordering)
Expand All @@ -184,8 +184,8 @@ def _invalidate_caches_for_event(
self.get_invited_rooms_for_local_user.invalidate((state_key,))

if relates_to:
self.get_relations_for_event.invalidate_many((relates_to,))
self.get_aggregation_groups_for_event.invalidate_many((relates_to,))
self.get_relations_for_event.invalidate((relates_to,))
self.get_aggregation_groups_for_event.invalidate((relates_to,))
self.get_applicable_edit.invalidate((relates_to,))

async def invalidate_cache_and_stream(self, cache_name: str, keys: Tuple[Any, ...]):
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -1282,7 +1282,7 @@ def _update_remote_device_list_cache_txn(
)

txn.call_after(self.get_cached_devices_for_user.invalidate, (user_id,))
txn.call_after(self._get_cached_user_device.invalidate_many, (user_id,))
txn.call_after(self._get_cached_user_device.invalidate, (user_id,))
txn.call_after(
self.get_device_list_last_stream_id_for_remote.invalidate, (user_id,)
)
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,7 @@ def _remove_old_push_actions_before_txn(
not be deleted.
"""
txn.call_after(
self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
self.get_unread_event_push_actions_by_room_for_user.invalidate,
(room_id, user_id),
)

Expand Down
8 changes: 4 additions & 4 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1748,9 +1748,9 @@ def _handle_event_relations(self, txn, event):
},
)

txn.call_after(self.store.get_relations_for_event.invalidate_many, (parent_id,))
txn.call_after(self.store.get_relations_for_event.invalidate, (parent_id,))
txn.call_after(
self.store.get_aggregation_groups_for_event.invalidate_many, (parent_id,)
self.store.get_aggregation_groups_for_event.invalidate, (parent_id,)
)

if rel_type == RelationTypes.REPLACE:
Expand Down Expand Up @@ -1903,7 +1903,7 @@ def _set_push_actions_for_event_and_users_txn(

for user_id in user_ids:
txn.call_after(
self.store.get_unread_event_push_actions_by_room_for_user.invalidate_many,
self.store.get_unread_event_push_actions_by_room_for_user.invalidate,
(room_id, user_id),
)

Expand All @@ -1917,7 +1917,7 @@ def _set_push_actions_for_event_and_users_txn(
def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id):
# Sad that we have to blow away the cache for the whole room here
txn.call_after(
self.store.get_unread_event_push_actions_by_room_for_user.invalidate_many,
self.store.get_unread_event_push_actions_by_room_for_user.invalidate,
(room_id,),
)
txn.execute(
Expand Down
6 changes: 2 additions & 4 deletions synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ def _invalidate_get_users_with_receipts_in_room(

def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id):
self.get_receipts_for_user.invalidate((user_id, receipt_type))
self._get_linearized_receipts_for_room.invalidate_many((room_id,))
self._get_linearized_receipts_for_room.invalidate((room_id,))
self.get_last_receipt_event_id_for_user.invalidate(
(user_id, room_id, receipt_type)
)
Expand Down Expand Up @@ -659,9 +659,7 @@ def insert_graph_receipt_txn(
)
txn.call_after(self.get_receipts_for_user.invalidate, (user_id, receipt_type))
# FIXME: This shouldn't invalidate the whole cache
txn.call_after(
self._get_linearized_receipts_for_room.invalidate_many, (room_id,)
)
txn.call_after(self._get_linearized_receipts_for_room.invalidate, (room_id,))

self.db_pool.simple_delete_txn(
txn,
Expand Down
42 changes: 16 additions & 26 deletions synapse/util/caches/deferred_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,7 @@

import enum
import threading
from typing import (
Callable,
Generic,
Iterable,
MutableMapping,
Optional,
TypeVar,
Union,
cast,
)
from typing import Callable, Generic, Iterable, MutableMapping, Optional, TypeVar, Union

from prometheus_client import Gauge

Expand Down Expand Up @@ -91,7 +82,7 @@ def __init__(
# _pending_deferred_cache maps from the key value to a `CacheEntry` object.
self._pending_deferred_cache = (
cache_type()
) # type: MutableMapping[KT, CacheEntry]
) # type: Union[TreeCache, MutableMapping[KT, CacheEntry]]

def metrics_cb():
cache_pending_metric.labels(name).set(len(self._pending_deferred_cache))
Expand Down Expand Up @@ -287,8 +278,17 @@ def prefill(
self.cache.set(key, value, callbacks=callbacks)

def invalidate(self, key):
"""Delete a key, or tree of entries

If the cache is backed by a regular dict, then "key" must be of
the right type for this cache

If the cache is backed by a TreeCache, then "key" must be a tuple, but
may be of lower cardinality than the TreeCache - in which case the whole
subtree is deleted.
"""
self.check_thread()
self.cache.pop(key, None)
self.cache.del_multi(key)

# if we have a pending lookup for this key, remove it from the
# _pending_deferred_cache, which will (a) stop it being returned
Expand All @@ -299,20 +299,10 @@ def invalidate(self, key):
# run the invalidation callbacks now, rather than waiting for the
# deferred to resolve.
if entry:
entry.invalidate()

def invalidate_many(self, key: KT):
self.check_thread()
if not isinstance(key, tuple):
raise TypeError("The cache key must be a tuple not %r" % (type(key),))
key = cast(KT, key)
self.cache.del_multi(key)

# if we have a pending lookup for this key, remove it from the
# _pending_deferred_cache, as above
entry_dict = self._pending_deferred_cache.pop(key, None)
if entry_dict is not None:
for entry in iterate_tree_cache_entry(entry_dict):
# _pending_deferred_cache.pop should either return a CacheEntry, or, in the
# case of a TreeCache, a dict of keys to cache entries. Either way calling
# iterate_tree_cache_entry on it will do the right thing.
for entry in iterate_tree_cache_entry(entry):
entry.invalidate()

def invalidate_all(self):
Expand Down
8 changes: 6 additions & 2 deletions synapse/util/caches/descriptors.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
class _CachedFunction(Generic[F]):
invalidate = None # type: Any
invalidate_all = None # type: Any
invalidate_many = None # type: Any
prefill = None # type: Any
cache = None # type: Any
num_args = None # type: Any
Expand Down Expand Up @@ -262,6 +261,11 @@ def __init__(
):
super().__init__(orig, num_args=num_args, cache_context=cache_context)

if tree and self.num_args < 2:
raise RuntimeError(
"tree=True is nonsensical for cached functions with a single parameter"
)

self.max_entries = max_entries
self.tree = tree
self.iterable = iterable
Expand Down Expand Up @@ -302,11 +306,11 @@ def _wrapped(*args, **kwargs):
wrapped = cast(_CachedFunction, _wrapped)

if self.num_args == 1:
assert not self.tree
wrapped.invalidate = lambda key: cache.invalidate(key[0])
wrapped.prefill = lambda key, val: cache.prefill(key[0], val)
else:
wrapped.invalidate = cache.invalidate
wrapped.invalidate_many = cache.invalidate_many
wrapped.prefill = cache.prefill

wrapped.invalidate_all = cache.invalidate_all
Expand Down
18 changes: 11 additions & 7 deletions synapse/util/caches/lrucache.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ class LruCache(Generic[KT, VT]):
"""
Least-recently-used cache, supporting prometheus metrics and invalidation callbacks.

Supports del_multi only if cache_type=TreeCache
If cache_type=TreeCache, all keys must be tuples.
"""

Expand Down Expand Up @@ -393,10 +392,16 @@ def cache_pop(key: KT, default: Optional[T] = None):

@synchronized
def cache_del_multi(key: KT) -> None:
"""Delete an entry, or tree of entries

If the LruCache is backed by a regular dict, then "key" must be of
the right type for this cache

If the LruCache is backed by a TreeCache, then "key" must be a tuple, but
may be of lower cardinality than the TreeCache - in which case the whole
subtree is deleted.
"""
This will only work if constructed with cache_type=TreeCache
"""
popped = cache.pop(key)
popped = cache.pop(key, None)
if popped is None:
return
# for each deleted node, we now need to remove it from the linked list
Expand Down Expand Up @@ -430,11 +435,10 @@ def cache_contains(key: KT) -> bool:
self.set = cache_set
self.setdefault = cache_set_default
self.pop = cache_pop
self.del_multi = cache_del_multi
# `invalidate` is exposed for consistency with DeferredCache, so that it can be
# invalidated by the cache invalidation replication stream.
self.invalidate = cache_pop
if cache_type is TreeCache:
self.del_multi = cache_del_multi
self.invalidate = cache_del_multi
self.len = synchronized(cache_len)
self.contains = cache_contains
self.clear = cache_clear
Expand Down
3 changes: 3 additions & 0 deletions synapse/util/caches/treecache.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ def pop(self, key, default=None):
value. If the key is partial, the TreeCacheNode corresponding to the part
of the tree that was removed.
"""
if not isinstance(key, tuple):
raise TypeError("The cache key must be a tuple not %r" % (type(key),))

# a list of the nodes we have touched on the way down the tree
nodes = []

Expand Down
6 changes: 3 additions & 3 deletions tests/util/caches/test_descriptors.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,17 +622,17 @@ def func2(self, key, cache_context):
self.assertEquals(callcount2[0], 1)

a.func2.invalidate(("foo",))
self.assertEquals(a.func2.cache.cache.pop.call_count, 1)
self.assertEquals(a.func2.cache.cache.del_multi.call_count, 1)

yield a.func2("foo")
a.func2.invalidate(("foo",))
self.assertEquals(a.func2.cache.cache.pop.call_count, 2)
self.assertEquals(a.func2.cache.cache.del_multi.call_count, 2)

self.assertEquals(callcount[0], 1)
self.assertEquals(callcount2[0], 2)

a.func.invalidate(("foo",))
self.assertEquals(a.func2.cache.cache.pop.call_count, 3)
self.assertEquals(a.func2.cache.cache.del_multi.call_count, 3)
yield a.func("foo")

self.assertEquals(callcount[0], 2)
Expand Down