diff --git a/changelog.d/12701.feature b/changelog.d/12701.feature new file mode 100644 index 000000000000..bb2264602c84 --- /dev/null +++ b/changelog.d/12701.feature @@ -0,0 +1 @@ +Add a config options to allow for auto-tuning of caches. diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index e7b57f5a0bdf..8ee387147fd6 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -784,6 +784,24 @@ caches: # #cache_entry_ttl: 30m + # This flag enables cache autotuning, and is further specified by the sub-options `max_cache_memory_usage`, + # `target_cache_memory_usage`, `min_cache_ttl`. These flags work in conjunction with each other to maintain + # a balance between cache memory usage and cache entry availability. You must be using jemalloc to utilize + # this option, and all three of the options must be specified for this feature to work. + #cache_autotuning: + # This flag sets a ceiling on much memory the cache can use before caches begin to be continuously evicted. + # They will continue to be evicted until the memory usage drops below the `target_memory_usage`, set in + # the flag below, or until the `min_cache_ttl` is hit. + #max_cache_memory_usage: 1024M + + # This flag sets a rough target for the desired memory usage of the caches. + #target_cache_memory_usage: 758M + + # 'min_cache_ttl` sets a limit under which newer cache entries are not evicted and is only applied when + # caches are actively being evicted/`max_cache_memory_usage` has been exceeded. This is to protect hot caches + # from being emptied while Synapse is evicting due to memory. + #min_cache_ttl: 5m + # Controls how long the results of a /sync request are cached for after # a successful response is returned. A higher duration can help clients with # intermittent connections, at the cost of higher memory usage. diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md index f292b94fb0cd..f53812090151 100644 --- a/docs/usage/configuration/config_documentation.md +++ b/docs/usage/configuration/config_documentation.md @@ -1119,7 +1119,17 @@ Caching can be configured through the following sub-options: with intermittent connections, at the cost of higher memory usage. By default, this is zero, which means that sync responses are not cached at all. - +* `cache_autotuning` and its sub-options `max_cache_memory_usage`, `target_cache_memory_usage`, and + `min_cache_ttl` work in conjunction with each other to maintain a balance between cache memory + usage and cache entry availability. You must be using [jemalloc](https://github.com/matrix-org/synapse#help-synapse-is-slow-and-eats-all-my-ramcpu) + to utilize this option, and all three of the options must be specified for this feature to work. + * `max_cache_memory_usage` sets a ceiling on how much memory the cache can use before caches begin to be continuously evicted. + They will continue to be evicted until the memory usage drops below the `target_memory_usage`, set in + the flag below, or until the `min_cache_ttl` is hit. + * `target_memory_usage` sets a rough target for the desired memory usage of the caches. + * `min_cache_ttl` sets a limit under which newer cache entries are not evicted and is only applied when + caches are actively being evicted/`max_cache_memory_usage` has been exceeded. This is to protect hot caches + from being emptied while Synapse is evicting due to memory. Example configuration: ```yaml @@ -1127,8 +1137,11 @@ caches: global_factor: 1.0 per_cache_factors: get_users_who_share_room_with_user: 2.0 - expire_caches: false sync_response_cache_duration: 2m + cache_autotuning: + max_cache_memory_usage: 1024M + target_cache_memory_usage: 758M + min_cache_ttl: 5m ``` ### Reloading cache factors diff --git a/synapse/config/cache.py b/synapse/config/cache.py index 58b2fe55193c..d2f55534d7d1 100644 --- a/synapse/config/cache.py +++ b/synapse/config/cache.py @@ -176,6 +176,24 @@ def generate_config_section(self, **kwargs: Any) -> str: # #cache_entry_ttl: 30m + # This flag enables cache autotuning, and is further specified by the sub-options `max_cache_memory_usage`, + # `target_cache_memory_usage`, `min_cache_ttl`. These flags work in conjunction with each other to maintain + # a balance between cache memory usage and cache entry availability. You must be using jemalloc to utilize + # this option, and all three of the options must be specified for this feature to work. + #cache_autotuning: + # This flag sets a ceiling on much memory the cache can use before caches begin to be continuously evicted. + # They will continue to be evicted until the memory usage drops below the `target_memory_usage`, set in + # the flag below, or until the `min_cache_ttl` is hit. + #max_cache_memory_usage: 1024M + + # This flag sets a rough target for the desired memory usage of the caches. + #target_cache_memory_usage: 758M + + # 'min_cache_ttl` sets a limit under which newer cache entries are not evicted and is only applied when + # caches are actively being evicted/`max_cache_memory_usage` has been exceeded. This is to protect hot caches + # from being emptied while Synapse is evicting due to memory. + #min_cache_ttl: 5m + # Controls how long the results of a /sync request are cached for after # a successful response is returned. A higher duration can help clients with # intermittent connections, at the cost of higher memory usage. @@ -263,6 +281,21 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: ) self.expiry_time_msec = self.parse_duration(expiry_time) + self.cache_autotuning = cache_config.get("cache_autotuning") + if self.cache_autotuning: + max_memory_usage = self.cache_autotuning.get("max_cache_memory_usage") + self.cache_autotuning["max_cache_memory_usage"] = self.parse_size( + max_memory_usage + ) + + target_mem_size = self.cache_autotuning.get("target_cache_memory_usage") + self.cache_autotuning["target_cache_memory_usage"] = self.parse_size( + target_mem_size + ) + + min_cache_ttl = self.cache_autotuning.get("min_cache_ttl") + self.cache_autotuning["min_cache_ttl"] = self.parse_duration(min_cache_ttl) + self.sync_response_cache_duration = self.parse_duration( cache_config.get("sync_response_cache_duration", 0) ) diff --git a/synapse/metrics/jemalloc.py b/synapse/metrics/jemalloc.py index 6bc329f04a9c..1fc8a0e888a1 100644 --- a/synapse/metrics/jemalloc.py +++ b/synapse/metrics/jemalloc.py @@ -18,6 +18,7 @@ import re from typing import Iterable, Optional, overload +import attr from prometheus_client import REGISTRY, Metric from typing_extensions import Literal @@ -27,52 +28,24 @@ logger = logging.getLogger(__name__) -def _setup_jemalloc_stats() -> None: - """Checks to see if jemalloc is loaded, and hooks up a collector to record - statistics exposed by jemalloc. - """ - - # Try to find the loaded jemalloc shared library, if any. We need to - # introspect into what is loaded, rather than loading whatever is on the - # path, as if we load a *different* jemalloc version things will seg fault. - - # We look in `/proc/self/maps`, which only exists on linux. - if not os.path.exists("/proc/self/maps"): - logger.debug("Not looking for jemalloc as no /proc/self/maps exist") - return - - # We're looking for a path at the end of the line that includes - # "libjemalloc". - regex = re.compile(r"/\S+/libjemalloc.*$") - - jemalloc_path = None - with open("/proc/self/maps") as f: - for line in f: - match = regex.search(line.strip()) - if match: - jemalloc_path = match.group() - - if not jemalloc_path: - # No loaded jemalloc was found. - logger.debug("jemalloc not found") - return - - logger.debug("Found jemalloc at %s", jemalloc_path) - - jemalloc = ctypes.CDLL(jemalloc_path) +@attr.s(slots=True, frozen=True, auto_attribs=True) +class JemallocStats: + jemalloc: ctypes.CDLL @overload def _mallctl( - name: str, read: Literal[True] = True, write: Optional[int] = None + self, name: str, read: Literal[True] = True, write: Optional[int] = None ) -> int: ... @overload - def _mallctl(name: str, read: Literal[False], write: Optional[int] = None) -> None: + def _mallctl( + self, name: str, read: Literal[False], write: Optional[int] = None + ) -> None: ... def _mallctl( - name: str, read: bool = True, write: Optional[int] = None + self, name: str, read: bool = True, write: Optional[int] = None ) -> Optional[int]: """Wrapper around `mallctl` for reading and writing integers to jemalloc. @@ -120,7 +93,7 @@ def _mallctl( # Where oldp/oldlenp is a buffer where the old value will be written to # (if not null), and newp/newlen is the buffer with the new value to set # (if not null). Note that they're all references *except* newlen. - result = jemalloc.mallctl( + result = self.jemalloc.mallctl( name.encode("ascii"), input_var_ref, input_len_ref, @@ -136,21 +109,80 @@ def _mallctl( return input_var.value - def _jemalloc_refresh_stats() -> None: + def refresh_stats(self) -> None: """Request that jemalloc updates its internal statistics. This needs to be called before querying for stats, otherwise it will return stale values. """ try: - _mallctl("epoch", read=False, write=1) + self._mallctl("epoch", read=False, write=1) except Exception as e: logger.warning("Failed to reload jemalloc stats: %s", e) + def get_stat(self, name: str) -> int: + """Request the stat of the given name at the time of the last + `refresh_stats` call. This may throw if we fail to read + the stat. + """ + return self._mallctl(f"stats.{name}") + + +_JEMALLOC_STATS: Optional[JemallocStats] = None + + +def get_jemalloc_stats() -> Optional[JemallocStats]: + """Returns an interface to jemalloc, if it is being used. + + Note that this will always return None until `setup_jemalloc_stats` has been + called. + """ + return _JEMALLOC_STATS + + +def _setup_jemalloc_stats() -> None: + """Checks to see if jemalloc is loaded, and hooks up a collector to record + statistics exposed by jemalloc. + """ + + global _JEMALLOC_STATS + + # Try to find the loaded jemalloc shared library, if any. We need to + # introspect into what is loaded, rather than loading whatever is on the + # path, as if we load a *different* jemalloc version things will seg fault. + + # We look in `/proc/self/maps`, which only exists on linux. + if not os.path.exists("/proc/self/maps"): + logger.debug("Not looking for jemalloc as no /proc/self/maps exist") + return + + # We're looking for a path at the end of the line that includes + # "libjemalloc". + regex = re.compile(r"/\S+/libjemalloc.*$") + + jemalloc_path = None + with open("/proc/self/maps") as f: + for line in f: + match = regex.search(line.strip()) + if match: + jemalloc_path = match.group() + + if not jemalloc_path: + # No loaded jemalloc was found. + logger.debug("jemalloc not found") + return + + logger.debug("Found jemalloc at %s", jemalloc_path) + + jemalloc_dll = ctypes.CDLL(jemalloc_path) + + stats = JemallocStats(jemalloc_dll) + _JEMALLOC_STATS = stats + class JemallocCollector(Collector): """Metrics for internal jemalloc stats.""" def collect(self) -> Iterable[Metric]: - _jemalloc_refresh_stats() + stats.refresh_stats() g = GaugeMetricFamily( "jemalloc_stats_app_memory_bytes", @@ -184,7 +216,7 @@ def collect(self) -> Iterable[Metric]: "metadata", ): try: - value = _mallctl(f"stats.{t}") + value = stats.get_stat(t) except Exception as e: # There was an error fetching the value, skip. logger.warning("Failed to read jemalloc stats.%s: %s", t, e) diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py index 45ff0de638a4..a3b60578e3c3 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py @@ -13,6 +13,7 @@ # limitations under the License. import logging +import math import threading import weakref from enum import Enum @@ -40,6 +41,7 @@ from synapse.config import cache as cache_config from synapse.metrics.background_process_metrics import wrap_as_background_process +from synapse.metrics.jemalloc import get_jemalloc_stats from synapse.util import Clock, caches from synapse.util.caches import CacheMetric, EvictionReason, register_cache from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry @@ -106,10 +108,16 @@ def update_last_access(self, clock: Clock) -> None: @wrap_as_background_process("LruCache._expire_old_entries") -async def _expire_old_entries(clock: Clock, expiry_seconds: int) -> None: +async def _expire_old_entries( + clock: Clock, expiry_seconds: int, autotune_config: Optional[dict] +) -> None: """Walks the global cache list to find cache entries that haven't been - accessed in the given number of seconds. + accessed in the given number of seconds, or if a given memory threshold has been breached. """ + if autotune_config: + max_cache_memory_usage = autotune_config["max_cache_memory_usage"] + target_cache_memory_usage = autotune_config["target_cache_memory_usage"] + min_cache_ttl = autotune_config["min_cache_ttl"] / 1000 now = int(clock.time()) node = GLOBAL_ROOT.prev_node @@ -119,11 +127,36 @@ async def _expire_old_entries(clock: Clock, expiry_seconds: int) -> None: logger.debug("Searching for stale caches") + evicting_due_to_memory = False + + # determine if we're evicting due to memory + jemalloc_interface = get_jemalloc_stats() + if jemalloc_interface and autotune_config: + try: + jemalloc_interface.refresh_stats() + mem_usage = jemalloc_interface.get_stat("allocated") + if mem_usage > max_cache_memory_usage: + logger.info("Begin memory-based cache eviction.") + evicting_due_to_memory = True + except Exception: + logger.warning( + "Unable to read allocated memory, skipping memory-based cache eviction." + ) + while node is not GLOBAL_ROOT: # Only the root node isn't a `_TimedListNode`. assert isinstance(node, _TimedListNode) - if node.last_access_ts_secs > now - expiry_seconds: + # if node has not aged past expiry_seconds and we are not evicting due to memory usage, there's + # nothing to do here + if ( + node.last_access_ts_secs > now - expiry_seconds + and not evicting_due_to_memory + ): + break + + # if entry is newer than min_cache_entry_ttl then do not evict and don't evict anything newer + if evicting_due_to_memory and now - node.last_access_ts_secs < min_cache_ttl: break cache_entry = node.get_cache_entry() @@ -136,10 +169,29 @@ async def _expire_old_entries(clock: Clock, expiry_seconds: int) -> None: assert cache_entry is not None cache_entry.drop_from_cache() + # Check mem allocation periodically if we are evicting a bunch of caches + if jemalloc_interface and evicting_due_to_memory and (i + 1) % 100 == 0: + try: + jemalloc_interface.refresh_stats() + mem_usage = jemalloc_interface.get_stat("allocated") + if mem_usage < target_cache_memory_usage: + evicting_due_to_memory = False + logger.info("Stop memory-based cache eviction.") + except Exception: + logger.warning( + "Unable to read allocated memory, this may affect memory-based cache eviction." + ) + # If we've failed to read the current memory usage then we + # should stop trying to evict based on memory usage + evicting_due_to_memory = False + # If we do lots of work at once we yield to allow other stuff to happen. if (i + 1) % 10000 == 0: logger.debug("Waiting during drop") - await clock.sleep(0) + if node.last_access_ts_secs > now - expiry_seconds: + await clock.sleep(0.5) + else: + await clock.sleep(0) logger.debug("Waking during drop") node = next_node @@ -156,21 +208,28 @@ async def _expire_old_entries(clock: Clock, expiry_seconds: int) -> None: def setup_expire_lru_cache_entries(hs: "HomeServer") -> None: """Start a background job that expires all cache entries if they have not - been accessed for the given number of seconds. + been accessed for the given number of seconds, or if a given memory usage threshold has been + breached. """ - if not hs.config.caches.expiry_time_msec: + if not hs.config.caches.expiry_time_msec and not hs.config.caches.cache_autotuning: return - logger.info( - "Expiring LRU caches after %d seconds", hs.config.caches.expiry_time_msec / 1000 - ) + if hs.config.caches.expiry_time_msec: + expiry_time = hs.config.caches.expiry_time_msec / 1000 + logger.info("Expiring LRU caches after %d seconds", expiry_time) + else: + expiry_time = math.inf global USE_GLOBAL_LIST USE_GLOBAL_LIST = True clock = hs.get_clock() clock.looping_call( - _expire_old_entries, 30 * 1000, clock, hs.config.caches.expiry_time_msec / 1000 + _expire_old_entries, + 30 * 1000, + clock, + expiry_time, + hs.config.caches.cache_autotuning, ) diff --git a/tests/util/test_lrucache.py b/tests/util/test_lrucache.py index 321fc1776f8c..67173a4f5b3a 100644 --- a/tests/util/test_lrucache.py +++ b/tests/util/test_lrucache.py @@ -14,8 +14,9 @@ from typing import List -from unittest.mock import Mock +from unittest.mock import Mock, patch +from synapse.metrics.jemalloc import JemallocStats from synapse.util.caches.lrucache import LruCache, setup_expire_lru_cache_entries from synapse.util.caches.treecache import TreeCache @@ -316,3 +317,58 @@ def test_evict(self): self.assertEqual(cache.get("key1"), None) self.assertEqual(cache.get("key2"), 3) + + +class MemoryEvictionTestCase(unittest.HomeserverTestCase): + @override_config( + { + "caches": { + "cache_autotuning": { + "max_cache_memory_usage": "700M", + "target_cache_memory_usage": "500M", + "min_cache_ttl": "5m", + } + } + } + ) + @patch("synapse.util.caches.lrucache.get_jemalloc_stats") + def test_evict_memory(self, jemalloc_interface) -> None: + mock_jemalloc_class = Mock(spec=JemallocStats) + jemalloc_interface.return_value = mock_jemalloc_class + + # set the return value of get_stat() to be greater than max_cache_memory_usage + mock_jemalloc_class.get_stat.return_value = 924288000 + + setup_expire_lru_cache_entries(self.hs) + cache = LruCache(4, clock=self.hs.get_clock()) + + cache["key1"] = 1 + cache["key2"] = 2 + + # advance the reactor less than the min_cache_ttl + self.reactor.advance(60 * 2) + + # our items should still be in the cache + self.assertEqual(cache.get("key1"), 1) + self.assertEqual(cache.get("key2"), 2) + + # advance the reactor past the min_cache_ttl + self.reactor.advance(60 * 6) + + # the items should be cleared from cache + self.assertEqual(cache.get("key1"), None) + self.assertEqual(cache.get("key2"), None) + + # add more stuff to caches + cache["key1"] = 1 + cache["key2"] = 2 + + # set the return value of get_stat() to be lower than target_cache_memory_usage + mock_jemalloc_class.get_stat.return_value = 10000 + + # advance the reactor past the min_cache_ttl + self.reactor.advance(60 * 6) + + # the items should still be in the cache + self.assertEqual(cache.get("key1"), 1) + self.assertEqual(cache.get("key2"), 2)