Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Push some deferred wrangling down into DeferredCache #8572

Merged
merged 3 commits into from
Oct 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8572.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Modify `DeferredCache.get()` to return `Deferred`s instead of `ObservableDeferred`s.
57 changes: 48 additions & 9 deletions synapse/util/caches/deferred_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class DeferredCache(Generic[KT, VT]):
"""Wraps an LruCache, adding support for Deferred results.

It expects that each entry added with set() will be a Deferred; likewise get()
may return an ObservableDeferred.
will return a Deferred.
"""

__slots__ = (
Expand Down Expand Up @@ -130,16 +130,22 @@ def get(
key: KT,
callback: Optional[Callable[[], None]] = None,
update_metrics: bool = True,
) -> Union[ObservableDeferred, VT]:
) -> defer.Deferred:
"""Looks the key up in the caches.

For symmetry with set(), this method does *not* follow the synapse logcontext
rules: the logcontext will not be cleared on return, and the Deferred will run
its callbacks in the sentinel context. In other words: wrap the result with
make_deferred_yieldable() before `await`ing it.

Args:
key(tuple)
callback(fn): Gets called when the entry in the cache is invalidated
key:
callback: Gets called when the entry in the cache is invalidated
update_metrics (bool): whether to update the cache hit rate metrics

Returns:
Either an ObservableDeferred or the result itself
A Deferred which completes with the result. Note that this may later fail
if there is an ongoing set() operation which later completes with a failure.

Raises:
KeyError if the key is not found in the cache
Expand All @@ -152,15 +158,15 @@ def get(
m = self.cache.metrics
assert m # we always have a name, so should always have metrics
m.inc_hits()
return val.deferred
return val.deferred.observe()

val2 = self.cache.get(
key, _Sentinel.sentinel, callbacks=callbacks, update_metrics=update_metrics
)
if val2 is _Sentinel.sentinel:
raise KeyError()
else:
return val2
return defer.succeed(val2)

def get_immediate(
self, key: KT, default: T, update_metrics: bool = True
Expand All @@ -173,7 +179,36 @@ def set(
key: KT,
value: defer.Deferred,
callback: Optional[Callable[[], None]] = None,
) -> ObservableDeferred:
) -> defer.Deferred:
"""Adds a new entry to the cache (or updates an existing one).

The given `value` *must* be a Deferred.

First any existing entry for the same key is invalidated. Then a new entry
is added to the cache for the given key.

Until the `value` completes, calls to `get()` for the key will also result in an
incomplete Deferred, which will ultimately complete with the same result as
`value`.

If `value` completes successfully, subsequent calls to `get()` will then return
a completed deferred with the same result. If it *fails*, the cache is
invalidated and subequent calls to `get()` will raise a KeyError.

If another call to `set()` happens before `value` completes, then (a) any
invalidation callbacks registered in the interim will be called, (b) any
`get()`s in the interim will continue to complete with the result from the
*original* `value`, (c) any future calls to `get()` will complete with the
result from the *new* `value`.

It is expected that `value` does *not* follow the synapse logcontext rules - ie,
if it is incomplete, it runs its callbacks in the sentinel context.

Args:
key: Key to be set
value: a deferred which will complete with a result to add to the cache
callback: An optional callback to be called when the entry is invalidated
"""
if not isinstance(value, defer.Deferred):
raise TypeError("not a Deferred")

Expand All @@ -187,6 +222,8 @@ def set(
if existing_entry:
existing_entry.invalidate()

# XXX: why don't we invalidate the entry in `self.cache` yet?

self._pending_deferred_cache[key] = entry

def compare_and_pop():
Expand Down Expand Up @@ -230,7 +267,9 @@ def eb(_fail):
# _pending_deferred_cache to the real cache.
#
observer.addCallbacks(cb, eb)
return observable

# we return a new Deferred which will be called before any subsequent observers.
return observable.observe()

def prefill(self, key: KT, value: VT, callback: Callable[[], None] = None):
callbacks = [callback] if callback else []
Expand Down
32 changes: 7 additions & 25 deletions synapse/util/caches/descriptors.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

from synapse.logging.context import make_deferred_yieldable, preserve_fn
from synapse.util import unwrapFirstError
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.caches.deferred_cache import DeferredCache

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -156,7 +155,7 @@ def __get__(self, obj, owner):
keylen=self.num_args,
tree=self.tree,
iterable=self.iterable,
) # type: DeferredCache[Tuple, Any]
) # type: DeferredCache[CacheKey, Any]

def get_cache_key_gen(args, kwargs):
"""Given some args/kwargs return a generator that resolves into
Expand Down Expand Up @@ -208,26 +207,12 @@ def _wrapped(*args, **kwargs):
kwargs["cache_context"] = _CacheContext.get_instance(cache, cache_key)

try:
cached_result_d = cache.get(cache_key, callback=invalidate_callback)

if isinstance(cached_result_d, ObservableDeferred):
observer = cached_result_d.observe()
else:
observer = defer.succeed(cached_result_d)

ret = cache.get(cache_key, callback=invalidate_callback)
except KeyError:
ret = defer.maybeDeferred(preserve_fn(self.orig), obj, *args, **kwargs)
ret = cache.set(cache_key, ret, callback=invalidate_callback)

def onErr(f):
cache.invalidate(cache_key)
return f

ret.addErrback(onErr)
Comment on lines -221 to -225
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this happens anyway, at

.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ohh, except that's slightly different. I better write some tests.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I've stared at this harder and am pretty sure it's redundant. As noted above, if the function fails, then we call entry.invalidate() on the "temporary" entry in the pending_deferred_cache. The only difference between that and calling cache.invalidate(cache_key) as shown here would be if there was also an entry in the main cache. But we know that can't be the case, because we checked it at line 210, and there is no way for it to have been populated in the meantime.

https://github.com/matrix-org/synapse/pull/8572/files#diff-3aa4bc9d8830ffd035d53b8af86b548a21f2a4cddb3e5240445661614a88a08fR210

(unless someone decided to sidestep the CacheDescriptor and manually call cache.set or cache.prefill while this request was running? but that would be silly, and even if they did it's not obvious that we should invalidate it here.)

In short, I think this is handled correctly in DeferredCache itself.


result_d = cache.set(cache_key, ret, callback=invalidate_callback)
observer = result_d.observe()

return make_deferred_yieldable(observer)
return make_deferred_yieldable(ret)

wrapped = cast(_CachedFunction, _wrapped)

Expand Down Expand Up @@ -286,7 +271,7 @@ def __init__(self, orig, cached_method_name, list_name, num_args=None):

def __get__(self, obj, objtype=None):
cached_method = getattr(obj, self.cached_method_name)
cache = cached_method.cache
cache = cached_method.cache # type: DeferredCache[CacheKey, Any]
num_args = cached_method.num_args

@functools.wraps(self.orig)
Expand Down Expand Up @@ -326,14 +311,11 @@ def arg_to_cache_key(arg):
for arg in list_args:
try:
res = cache.get(arg_to_cache_key(arg), callback=invalidate_callback)
if not isinstance(res, ObservableDeferred):
results[arg] = res
elif not res.has_succeeded():
res = res.observe()
if not res.called:
res.addCallback(update_results_dict, arg)
cached_defers.append(res)
else:
results[arg] = res.get_result()
results[arg] = res.result
except KeyError:
missing.add(arg)

Expand Down
Loading