From a18310774f0311f06bd36640a77ad2cab6654311 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Mon, 7 Feb 2022 18:17:23 +0200 Subject: [PATCH 1/2] [Metadata] Let entries expire in the metadata caches - refreshAfterWrite will never expire entries. After the given time, the next request to the entry will trigger a refresh in the background. The current entry will be used until the entry has been refreshed. - documentation for Caffeine's refresh feature: https://github.com/ben-manes/caffeine/wiki/Refresh --- .../pulsar/metadata/cache/impl/MetadataCacheImpl.java | 3 ++- .../apache/pulsar/metadata/impl/AbstractMetadataStore.java | 6 ++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java index fccbe2ca38a6c..2e60332b18c5e 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java @@ -72,7 +72,8 @@ public MetadataCacheImpl(MetadataStore store, MetadataSerde serde) { this.serde = serde; this.objCache = Caffeine.newBuilder() - .refreshAfterWrite(CACHE_REFRESH_TIME_MILLIS, TimeUnit.MILLISECONDS) + .refreshAfterWrite(CACHE_REFRESH_TIME_MILLIS / 2, TimeUnit.MILLISECONDS) + .expireAfterWrite(CACHE_REFRESH_TIME_MILLIS, TimeUnit.MILLISECONDS) .buildAsync(new AsyncCacheLoader>>() { @Override public CompletableFuture>> asyncLoad(String key, Executor executor) { diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java index e8230e0113ffe..41dcdccd32931 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java @@ -80,7 +80,8 @@ protected AbstractMetadataStore() { registerListener(this); this.childrenCache = Caffeine.newBuilder() - .refreshAfterWrite(CACHE_REFRESH_TIME_MILLIS, TimeUnit.MILLISECONDS) + .refreshAfterWrite(CACHE_REFRESH_TIME_MILLIS / 2, TimeUnit.MILLISECONDS) + .expireAfterWrite(CACHE_REFRESH_TIME_MILLIS, TimeUnit.MILLISECONDS) .buildAsync(new AsyncCacheLoader>() { @Override public CompletableFuture> asyncLoad(String key, Executor executor) { @@ -100,7 +101,8 @@ public CompletableFuture> asyncReload(String key, List oldV }); this.existsCache = Caffeine.newBuilder() - .refreshAfterWrite(CACHE_REFRESH_TIME_MILLIS, TimeUnit.MILLISECONDS) + .refreshAfterWrite(CACHE_REFRESH_TIME_MILLIS / 2, TimeUnit.MILLISECONDS) + .expireAfterWrite(CACHE_REFRESH_TIME_MILLIS, TimeUnit.MILLISECONDS) .buildAsync(new AsyncCacheLoader() { @Override public CompletableFuture asyncLoad(String key, Executor executor) { From 7361c77888e37511697500fa10647452b5929790 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 8 Feb 2022 07:47:12 +0200 Subject: [PATCH 2/2] Address review comments --- .../pulsar/metadata/cache/impl/MetadataCacheImpl.java | 4 ++-- .../pulsar/metadata/impl/AbstractMetadataStore.java | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java index 2e60332b18c5e..a49bed205dab9 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java @@ -72,8 +72,8 @@ public MetadataCacheImpl(MetadataStore store, MetadataSerde serde) { this.serde = serde; this.objCache = Caffeine.newBuilder() - .refreshAfterWrite(CACHE_REFRESH_TIME_MILLIS / 2, TimeUnit.MILLISECONDS) - .expireAfterWrite(CACHE_REFRESH_TIME_MILLIS, TimeUnit.MILLISECONDS) + .refreshAfterWrite(CACHE_REFRESH_TIME_MILLIS, TimeUnit.MILLISECONDS) + .expireAfterWrite(CACHE_REFRESH_TIME_MILLIS * 2, TimeUnit.MILLISECONDS) .buildAsync(new AsyncCacheLoader>>() { @Override public CompletableFuture>> asyncLoad(String key, Executor executor) { diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java index 41dcdccd32931..cac386405a64c 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java @@ -80,8 +80,8 @@ protected AbstractMetadataStore() { registerListener(this); this.childrenCache = Caffeine.newBuilder() - .refreshAfterWrite(CACHE_REFRESH_TIME_MILLIS / 2, TimeUnit.MILLISECONDS) - .expireAfterWrite(CACHE_REFRESH_TIME_MILLIS, TimeUnit.MILLISECONDS) + .refreshAfterWrite(CACHE_REFRESH_TIME_MILLIS, TimeUnit.MILLISECONDS) + .expireAfterWrite(CACHE_REFRESH_TIME_MILLIS * 2, TimeUnit.MILLISECONDS) .buildAsync(new AsyncCacheLoader>() { @Override public CompletableFuture> asyncLoad(String key, Executor executor) { @@ -101,8 +101,8 @@ public CompletableFuture> asyncReload(String key, List oldV }); this.existsCache = Caffeine.newBuilder() - .refreshAfterWrite(CACHE_REFRESH_TIME_MILLIS / 2, TimeUnit.MILLISECONDS) - .expireAfterWrite(CACHE_REFRESH_TIME_MILLIS, TimeUnit.MILLISECONDS) + .refreshAfterWrite(CACHE_REFRESH_TIME_MILLIS, TimeUnit.MILLISECONDS) + .expireAfterWrite(CACHE_REFRESH_TIME_MILLIS * 2, TimeUnit.MILLISECONDS) .buildAsync(new AsyncCacheLoader() { @Override public CompletableFuture asyncLoad(String key, Executor executor) {