From ebaa61d609f92cf94b9d4f06420cdad75d365242 Mon Sep 17 00:00:00 2001 From: Jack Conradson Date: Wed, 17 Apr 2024 11:05:11 -0700 Subject: [PATCH] add option to switch between lfu and lru cache --- .../blobcache/BlobCachePlugin.java | 1 + .../shared/SharedBlobCacheService.java | 531 +++++++- ...redBlobCacheServiceUsingLFUCacheTests.java | 1204 +++++++++++++++++ ...edBlobCacheServiceUsingLRUCacheTests.java} | 8 +- 4 files changed, 1734 insertions(+), 10 deletions(-) create mode 100644 x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceUsingLFUCacheTests.java rename x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/{SharedBlobCacheServiceTests.java => SharedBlobCacheServiceUsingLRUCacheTests.java} (99%) diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/BlobCachePlugin.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/BlobCachePlugin.java index 4f9ac3eb99348..940c4157fe43d 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/BlobCachePlugin.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/BlobCachePlugin.java @@ -18,6 +18,7 @@ public class BlobCachePlugin extends Plugin implements ExtensiblePlugin { @Override public List> getSettings() { return List.of( + SharedBlobCacheService.SHARED_CACHE_TYPE, SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING, SharedBlobCacheService.SHARED_CACHE_SIZE_MAX_HEADROOM_SETTING, SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING, diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java index 2b78755f29458..d458d5b4adbfa 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java @@ -42,6 +42,7 @@ import java.io.UncheckedIOException; import java.lang.invoke.MethodHandles; import java.lang.invoke.VarHandle; +import java.lang.reflect.Array; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; @@ -55,6 +56,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; import java.util.function.IntConsumer; import java.util.function.Predicate; @@ -67,6 +69,12 @@ public class SharedBlobCacheService implements Releasable { private static final String SHARED_CACHE_SETTINGS_PREFIX = "xpack.searchable.snapshot.shared_cache."; + public static final Setting SHARED_CACHE_TYPE = Setting.simpleString( + SHARED_CACHE_SETTINGS_PREFIX + "cache_type", + "lru", + Setting.Property.NodeScope + ); + public static final Setting SHARED_CACHE_RANGE_SIZE_SETTING = new Setting<>( SHARED_CACHE_SETTINGS_PREFIX + "range_size", ByteSizeValue.ofMb(16).getStringRep(), @@ -254,7 +262,25 @@ public void validate(ByteSizeValue value, Map, Object> settings, bool Setting.Property.NodeScope ); - private interface Cache { + // used only in tests + void computeDecay() { + assert cache instanceof LFUCache; + ((LFUCache) cache).computeDecay(); + } + + // used only in tests + void maybeScheduleDecayAndNewEpoch() { + assert cache instanceof LFUCache; + ((LFUCache) cache).maybeScheduleDecayAndNewEpoch(epoch()); + } + + // used only in tests + long epoch() { + assert cache instanceof LFUCache; + return ((LFUCache) cache).epoch.get(); + } + + private interface Cache extends Releasable { CacheEntry get(K cacheKey, long fileLength, int region); int forceEvict(Predicate cacheKeyPredicate); @@ -342,7 +368,15 @@ public SharedBlobCacheService( } this.regionSize = regionSize; assert regionSize > 0L; - this.cache = new LRUCache(); + if ("lru".equals(SHARED_CACHE_TYPE.get(settings))) { + this.cache = new LRUCache(); + } else if ("lfu".equals(SHARED_CACHE_TYPE.get(settings))) { + this.cache = new LFUCache(settings); + } else { + throw new IllegalArgumentException( + "setting [" + SHARED_CACHE_TYPE.getKey() + " has unknown cache type [" + SHARED_CACHE_TYPE.get(settings) + "]" + ); + } try { sharedBytes = new SharedBytes( numRegions, @@ -535,7 +569,7 @@ public void maybeFetchRegion( final RangeMissingHandler writer, final ActionListener listener ) { - if (freeRegionCount() < 1 && maybeEvictLeastRecent() == false) { + if (freeRegionCount() < 1 && maybeEvict() == false) { // no free page available and no old enough unused region to be evicted listener.onResponse(false); return; @@ -554,9 +588,11 @@ public void maybeFetchRegion( } // used by tests - boolean maybeEvictLeastRecent() { - if (cache instanceof LRUCache LRUCache) { - return LRUCache.maybeEvictLeastRecent(); + boolean maybeEvict() { + if (cache instanceof LRUCache lruCache) { + return lruCache.maybeEvictLeastRecent(); + } else if (cache instanceof LFUCache lfuCache) { + return lfuCache.maybeEvictLeastUsed(); } return false; } @@ -598,6 +634,14 @@ public int forceEvict(Predicate cacheKeyPredicate) { } + // used by tests + int getFreq(CacheFileRegion cacheFileRegion) { + if (cache instanceof LFUCache lfuCache) { + return lfuCache.getFreq(cacheFileRegion); + } + return -1; + } + @Override public void close() { sharedBytes.decRef(); @@ -673,6 +717,7 @@ protected boolean assertOffsetsWithinFileLength(long offset, long length, long f /** * While this class has incRef and tryIncRef methods, incRefEnsureOpen and tryIncrefEnsureOpen should * always be used, ensuring the right ordering between incRef/tryIncRef and ensureOpen + * (see {@link LFUCache#maybeEvictAndTakeForFrequency(Runnable, int)}) */ class CacheFileRegion extends EvictableRefCounted { @@ -1184,6 +1229,11 @@ void touch() { maxSize = Math.min(2, numRegions / 2); } + @Override + public void close() { + // nothing to close + } + @Override public LRUCacheEntry get(KeyType cacheKey, long fileLength, int region) { final RegionKey regionKey = new RegionKey<>(cacheKey, region); @@ -1443,4 +1493,473 @@ public boolean maybeEvictLeastRecent() { return false; } } + + class LFUCache implements Cache { + + class LFUCacheEntry extends CacheEntry { + LFUCacheEntry prev; + LFUCacheEntry next; + int freq; + volatile long lastAccessedEpoch; + + LFUCacheEntry(CacheFileRegion chunk, long lastAccessed) { + super(chunk); + this.lastAccessedEpoch = lastAccessed; + // todo: consider whether freq=1 is still right for new entries. + // it could risk decaying to level 0 right after and thus potentially be evicted + // if the freq 1 LRU chain was short. + // seems ok for now, since if it were to get evicted soon, the decays done would ensure we have more level 1 + // entries eventually and thus such an entry would (after some decays) be able to survive in the cache. + this.freq = 1; + } + + void touch() { + long now = epoch.get(); + if (now > lastAccessedEpoch) { + maybePromote(now, this); + } + } + } + + private final ConcurrentHashMap, LFUCacheEntry> keyMapping = new ConcurrentHashMap<>(); + private final LFUCacheEntry[] freqs; + private final int maxFreq; + private final DecayAndNewEpochTask decayAndNewEpochTask; + + private final AtomicLong epoch = new AtomicLong(); + + @SuppressWarnings("unchecked") + LFUCache(Settings settings) { + this.maxFreq = SHARED_CACHE_MAX_FREQ_SETTING.get(settings); + freqs = (LFUCacheEntry[]) Array.newInstance(LFUCacheEntry.class, maxFreq); + decayAndNewEpochTask = new DecayAndNewEpochTask(threadPool.generic()); + } + + @Override + public void close() { + decayAndNewEpochTask.close(); + } + + int getFreq(CacheFileRegion cacheFileRegion) { + return keyMapping.get(cacheFileRegion.regionKey).freq; + } + + @Override + public LFUCacheEntry get(KeyType cacheKey, long fileLength, int region) { + final RegionKey regionKey = new RegionKey<>(cacheKey, region); + final long now = epoch.get(); + // try to just get from the map on the fast-path to save instantiating the capturing lambda needed on the slow path + // if we did not find an entry + var entry = keyMapping.get(regionKey); + if (entry == null) { + final int effectiveRegionSize = computeCacheFileRegionSize(fileLength, region); + entry = keyMapping.computeIfAbsent(regionKey, key -> new LFUCacheEntry(new CacheFileRegion(key, effectiveRegionSize), now)); + } + // io is volatile, double locking is fine, as long as we assign it last. + if (entry.chunk.io == null) { + synchronized (entry.chunk) { + if (entry.chunk.io == null && entry.chunk.isEvicted() == false) { + return initChunk(entry); + } + } + } + assert assertChunkActiveOrEvicted(entry); + + // existing item, check if we need to promote item + if (now > entry.lastAccessedEpoch) { + maybePromote(now, entry); + } + + return entry; + } + + @Override + public int forceEvict(Predicate cacheKeyPredicate) { + final List matchingEntries = new ArrayList<>(); + keyMapping.forEach((key, value) -> { + if (cacheKeyPredicate.test(key.file)) { + matchingEntries.add(value); + } + }); + var evictedCount = 0; + var nonZeroFrequencyEvictedCount = 0; + if (matchingEntries.isEmpty() == false) { + synchronized (SharedBlobCacheService.this) { + for (LFUCacheEntry entry : matchingEntries) { + int frequency = entry.freq; + boolean evicted = entry.chunk.forceEvict(); + if (evicted && entry.chunk.io != null) { + unlink(entry); + keyMapping.remove(entry.chunk.regionKey, entry); + evictedCount++; + if (frequency > 0) { + nonZeroFrequencyEvictedCount++; + } + } + } + } + } + blobCacheMetrics.getEvictedCountNonZeroFrequency().incrementBy(nonZeroFrequencyEvictedCount); + return evictedCount; + } + + private LFUCacheEntry initChunk(LFUCacheEntry entry) { + assert Thread.holdsLock(entry.chunk); + RegionKey regionKey = entry.chunk.regionKey; + if (keyMapping.get(regionKey) != entry) { + throwAlreadyClosed("no free region found (contender)"); + } + // new item + assert entry.freq == 1; + assert entry.prev == null; + assert entry.next == null; + final SharedBytes.IO freeSlot = freeRegions.poll(); + if (freeSlot != null) { + // no need to evict an item, just add + assignToSlot(entry, freeSlot); + } else { + // need to evict something + SharedBytes.IO io; + synchronized (SharedBlobCacheService.this) { + io = maybeEvictAndTake(evictIncrementer); + } + if (io == null) { + io = freeRegions.poll(); + } + if (io != null) { + assignToSlot(entry, io); + } else { + boolean removed = keyMapping.remove(regionKey, entry); + assert removed; + throwAlreadyClosed("no free region found"); + } + } + + return entry; + } + + private void assignToSlot(LFUCacheEntry entry, SharedBytes.IO freeSlot) { + assert regionOwners.put(freeSlot, entry.chunk) == null; + synchronized (SharedBlobCacheService.this) { + if (entry.chunk.isEvicted()) { + assert regionOwners.remove(freeSlot) == entry.chunk; + freeRegions.add(freeSlot); + keyMapping.remove(entry.chunk.regionKey, entry); + throwAlreadyClosed("evicted during free region allocation"); + } + pushEntryToBack(entry); + // assign io only when chunk is ready for use. Under lock to avoid concurrent tryEvict. + entry.chunk.io = freeSlot; + } + } + + private void pushEntryToBack(final LFUCacheEntry entry) { + assert Thread.holdsLock(SharedBlobCacheService.this); + assert invariant(entry, false); + assert entry.prev == null; + assert entry.next == null; + final LFUCacheEntry currFront = freqs[entry.freq]; + if (currFront == null) { + freqs[entry.freq] = entry; + entry.prev = entry; + entry.next = null; + } else { + assert currFront.freq == entry.freq; + final LFUCacheEntry last = currFront.prev; + currFront.prev = entry; + last.next = entry; + entry.prev = last; + entry.next = null; + } + assert freqs[entry.freq].prev == entry; + assert freqs[entry.freq].prev.next == null; + assert entry.prev != null; + assert entry.prev.next == null || entry.prev.next == entry; + assert entry.next == null; + assert invariant(entry, true); + } + + private synchronized boolean invariant(final LFUCacheEntry e, boolean present) { + boolean found = false; + for (int i = 0; i < maxFreq; i++) { + assert freqs[i] == null || freqs[i].prev != null; + assert freqs[i] == null || freqs[i].prev != freqs[i] || freqs[i].next == null; + assert freqs[i] == null || freqs[i].prev.next == null; + for (LFUCacheEntry entry = freqs[i]; entry != null; entry = entry.next) { + assert entry.next == null || entry.next.prev == entry; + assert entry.prev != null; + assert entry.prev.next == null || entry.prev.next == entry; + assert entry.freq == i; + if (entry == e) { + found = true; + } + } + for (LFUCacheEntry entry = freqs[i]; entry != null && entry.prev != freqs[i]; entry = entry.prev) { + assert entry.next == null || entry.next.prev == entry; + assert entry.prev != null; + assert entry.prev.next == null || entry.prev.next == entry; + assert entry.freq == i; + if (entry == e) { + found = true; + } + } + } + assert found == present; + return true; + } + + private boolean assertChunkActiveOrEvicted(LFUCacheEntry entry) { + synchronized (SharedBlobCacheService.this) { + // assert linked (or evicted) + assert entry.prev != null || entry.chunk.isEvicted(); + + } + SharedBytes.IO io = entry.chunk.io; + assert io != null || entry.chunk.isEvicted(); + assert io == null || regionOwners.get(io) == entry.chunk || entry.chunk.isEvicted(); + return true; + } + + private void maybePromote(long epoch, LFUCacheEntry entry) { + synchronized (SharedBlobCacheService.this) { + if (epoch > entry.lastAccessedEpoch && entry.freq < maxFreq - 1 && entry.chunk.isEvicted() == false) { + unlink(entry); + // go 2 up per epoch, allowing us to decay 1 every epoch. + entry.freq = Math.min(entry.freq + 2, maxFreq - 1); + entry.lastAccessedEpoch = epoch; + pushEntryToBack(entry); + } + } + } + + private void unlink(final LFUCacheEntry entry) { + assert Thread.holdsLock(SharedBlobCacheService.this); + assert invariant(entry, true); + assert entry.prev != null; + final LFUCacheEntry currFront = freqs[entry.freq]; + assert currFront != null; + if (currFront == entry) { + freqs[entry.freq] = entry.next; + if (entry.next != null) { + assert entry.prev != entry; + entry.next.prev = entry.prev; + } + } else { + if (entry.next != null) { + entry.next.prev = entry.prev; + } + entry.prev.next = entry.next; + if (currFront.prev == entry) { + currFront.prev = entry.prev; + } + } + entry.next = null; + entry.prev = null; + assert invariant(entry, false); + } + + private void appendLevel1ToLevel0() { + assert Thread.holdsLock(SharedBlobCacheService.this); + var front0 = freqs[0]; + var front1 = freqs[1]; + if (front0 == null) { + freqs[0] = front1; + freqs[1] = null; + decrementFreqList(front1); + assert front1 == null || invariant(front1, true); + } else if (front1 != null) { + var back0 = front0.prev; + var back1 = front1.prev; + assert invariant(front0, true); + assert invariant(front1, true); + assert invariant(back0, true); + assert invariant(back1, true); + + decrementFreqList(front1); + + front0.prev = back1; + back0.next = front1; + front1.prev = back0; + assert back1.next == null; + + freqs[1] = null; + + assert invariant(front0, true); + assert invariant(front1, true); + assert invariant(back0, true); + assert invariant(back1, true); + } + } + + private void decrementFreqList(LFUCacheEntry entry) { + while (entry != null) { + entry.freq--; + entry = entry.next; + } + } + + /** + * Cycles through the {@link LFUCacheEntry} from 0 to max frequency and + * tries to evict a chunk if no one is holding onto its resources anymore. + * + * Also regularly polls for free regions and thus might steal one in case any become available. + * + * @return a now free IO region or null if none available. + */ + private SharedBytes.IO maybeEvictAndTake(Runnable evictedNotification) { + assert Thread.holdsLock(SharedBlobCacheService.this); + long currentEpoch = epoch.get(); // must be captured before attempting to evict a freq 0 + SharedBytes.IO freq0 = maybeEvictAndTakeForFrequency(evictedNotification, 0); + if (freqs[0] == null) { + // no frequency 0 entries, let us switch epoch and decay so we get some for next time. + maybeScheduleDecayAndNewEpoch(currentEpoch); + } + if (freq0 != null) { + return freq0; + } + for (int currentFreq = 1; currentFreq < maxFreq; currentFreq++) { + // recheck this per freq in case we raced an eviction with an incref'er. + SharedBytes.IO freeRegion = freeRegions.poll(); + if (freeRegion != null) { + return freeRegion; + } + SharedBytes.IO taken = maybeEvictAndTakeForFrequency(evictedNotification, currentFreq); + if (taken != null) { + return taken; + } + } + // give up + return null; + } + + private SharedBytes.IO maybeEvictAndTakeForFrequency(Runnable evictedNotification, int currentFreq) { + for (LFUCacheEntry entry = freqs[currentFreq]; entry != null; entry = entry.next) { + boolean evicted = entry.chunk.tryEvictNoDecRef(); + if (evicted) { + try { + SharedBytes.IO ioRef = entry.chunk.io; + if (ioRef != null) { + try { + if (entry.chunk.refCount() == 1) { + // we own that one refcount (since we CAS'ed evicted to 1) + // grab io, rely on incref'ers also checking evicted field. + entry.chunk.io = null; + assert regionOwners.remove(ioRef) == entry.chunk; + return ioRef; + } + } finally { + unlink(entry); + keyMapping.remove(entry.chunk.regionKey, entry); + } + } + } finally { + entry.chunk.decRef(); + if (currentFreq > 0) { + evictedNotification.run(); + } + } + } + } + return null; + } + + /** + * Check if a new epoch is needed based on the input. The input epoch should be captured + * before the determination that a new epoch is needed is done. + * @param currentEpoch the epoch to check against if a new epoch is needed + */ + private void maybeScheduleDecayAndNewEpoch(long currentEpoch) { + decayAndNewEpochTask.spawnIfNotRunning(currentEpoch); + } + + /** + * This method tries to evict the least used {@link LFUCacheEntry}. Only entries with the lowest possible frequency are considered + * for eviction. + * + * @return true if an entry was evicted, false otherwise. + */ + public boolean maybeEvictLeastUsed() { + synchronized (SharedBlobCacheService.this) { + for (LFUCacheEntry entry = freqs[0]; entry != null; entry = entry.next) { + boolean evicted = entry.chunk.tryEvict(); + if (evicted && entry.chunk.io != null) { + unlink(entry); + keyMapping.remove(entry.chunk.regionKey, entry); + return true; + } + } + } + return false; + } + + private void computeDecay() { + long now = threadPool.rawRelativeTimeInMillis(); + long afterLock; + long end; + synchronized (SharedBlobCacheService.this) { + afterLock = threadPool.rawRelativeTimeInMillis(); + appendLevel1ToLevel0(); + for (int i = 2; i < maxFreq; i++) { + assert freqs[i - 1] == null; + freqs[i - 1] = freqs[i]; + freqs[i] = null; + decrementFreqList(freqs[i - 1]); + assert freqs[i - 1] == null || invariant(freqs[i - 1], true); + } + } + end = threadPool.rawRelativeTimeInMillis(); + logger.debug("Decay took {} ms (acquire lock: {} ms)", end - now, afterLock - now); + } + + class DecayAndNewEpochTask extends AbstractRunnable { + + private final Executor executor; + private final AtomicLong pendingEpoch = new AtomicLong(); + private volatile boolean isClosed; + + DecayAndNewEpochTask(Executor executor) { + this.executor = executor; + } + + @Override + protected void doRun() throws Exception { + if (isClosed == false) { + computeDecay(); + } + } + + @Override + public void onFailure(Exception e) { + logger.error("failed to run cache decay task", e); + } + + @Override + public void onAfter() { + assert pendingEpoch.get() == epoch.get() + 1; + epoch.incrementAndGet(); + } + + @Override + public void onRejection(Exception e) { + assert false : e; + logger.error("unexpected rejection", e); + epoch.incrementAndGet(); + } + + @Override + public String toString() { + return "shared_cache_decay_task"; + } + + public void spawnIfNotRunning(long currentEpoch) { + if (isClosed == false && pendingEpoch.compareAndSet(currentEpoch, currentEpoch + 1)) { + executor.execute(this); + } + } + + public void close() { + this.isClosed = true; + } + } + } } diff --git a/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceUsingLFUCacheTests.java b/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceUsingLFUCacheTests.java new file mode 100644 index 0000000000000..a90a59ea214d4 --- /dev/null +++ b/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceUsingLFUCacheTests.java @@ -0,0 +1,1204 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.blobcache.shared; + +import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.GroupedActionListener; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.blobcache.BlobCacheMetrics; +import org.elasticsearch.blobcache.BlobCacheUtils; +import org.elasticsearch.blobcache.common.ByteRange; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.settings.SettingsException; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.RatioValue; +import org.elasticsearch.common.unit.RelativeByteSizeValue; +import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; +import org.elasticsearch.common.util.concurrent.StoppableExecutorServiceWrapper; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.env.Environment; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.env.TestEnvironment; +import org.elasticsearch.node.NodeRoleSettings; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.elasticsearch.node.Node.NODE_NAME_SETTING; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; + +public class SharedBlobCacheServiceUsingLFUCacheTests extends ESTestCase { + + private static long size(long numPages) { + return numPages * SharedBytes.PAGE_SIZE; + } + + public void testBasicEviction() throws IOException { + Settings settings = Settings.builder() + .put(NODE_NAME_SETTING.getKey(), "node") + .put(SharedBlobCacheService.SHARED_CACHE_TYPE.getKey(), "lfu") + .put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(500)).getStringRep()) + .put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(100)).getStringRep()) + .put("path.home", createTempDir()) + .build(); + final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue(); + try ( + NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings)); + var cacheService = new SharedBlobCacheService<>( + environment, + settings, + taskQueue.getThreadPool(), + ThreadPool.Names.GENERIC, + BlobCacheMetrics.NOOP + ) + ) { + final var cacheKey = generateCacheKey(); + assertEquals(5, cacheService.freeRegionCount()); + final var region0 = cacheService.get(cacheKey, size(250), 0); + assertEquals(size(100), region0.tracker.getLength()); + assertEquals(4, cacheService.freeRegionCount()); + final var region1 = cacheService.get(cacheKey, size(250), 1); + assertEquals(size(100), region1.tracker.getLength()); + assertEquals(3, cacheService.freeRegionCount()); + final var region2 = cacheService.get(cacheKey, size(250), 2); + assertEquals(size(50), region2.tracker.getLength()); + assertEquals(2, cacheService.freeRegionCount()); + + synchronized (cacheService) { + assertTrue(tryEvict(region1)); + } + assertEquals(3, cacheService.freeRegionCount()); + synchronized (cacheService) { + assertFalse(tryEvict(region1)); + } + assertEquals(3, cacheService.freeRegionCount()); + final var bytesReadFuture = new PlainActionFuture(); + region0.populateAndRead( + ByteRange.of(0L, 1L), + ByteRange.of(0L, 1L), + (channel, channelPos, relativePos, length) -> 1, + (channel, channelPos, relativePos, length, progressUpdater) -> progressUpdater.accept(length), + taskQueue.getThreadPool().generic(), + bytesReadFuture + ); + synchronized (cacheService) { + assertFalse(tryEvict(region0)); + } + assertEquals(3, cacheService.freeRegionCount()); + assertFalse(bytesReadFuture.isDone()); + taskQueue.runAllRunnableTasks(); + synchronized (cacheService) { + assertTrue(tryEvict(region0)); + } + assertEquals(4, cacheService.freeRegionCount()); + synchronized (cacheService) { + assertTrue(tryEvict(region2)); + } + assertEquals(5, cacheService.freeRegionCount()); + assertTrue(bytesReadFuture.isDone()); + assertEquals(Integer.valueOf(1), bytesReadFuture.actionGet()); + } + } + + private static boolean tryEvict(SharedBlobCacheService.CacheFileRegion region1) { + if (randomBoolean()) { + return region1.tryEvict(); + } else { + boolean result = region1.tryEvictNoDecRef(); + if (result) { + region1.decRef(); + } + return result; + } + } + + public void testAutoEviction() throws IOException { + Settings settings = Settings.builder() + .put(NODE_NAME_SETTING.getKey(), "node") + .put(SharedBlobCacheService.SHARED_CACHE_TYPE.getKey(), "lfu") + .put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(200)).getStringRep()) + .put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(100)).getStringRep()) + .put("path.home", createTempDir()) + .build(); + final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue(); + try ( + NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings)); + var cacheService = new SharedBlobCacheService<>( + environment, + settings, + taskQueue.getThreadPool(), + ThreadPool.Names.GENERIC, + BlobCacheMetrics.NOOP + ) + ) { + final var cacheKey = generateCacheKey(); + assertEquals(2, cacheService.freeRegionCount()); + final var region0 = cacheService.get(cacheKey, size(250), 0); + assertEquals(size(100), region0.tracker.getLength()); + assertEquals(1, cacheService.freeRegionCount()); + final var region1 = cacheService.get(cacheKey, size(250), 1); + assertEquals(size(100), region1.tracker.getLength()); + assertEquals(0, cacheService.freeRegionCount()); + assertFalse(region0.isEvicted()); + assertFalse(region1.isEvicted()); + + // acquire region 2, which should evict region 0 (oldest) + final var region2 = cacheService.get(cacheKey, size(250), 2); + assertEquals(size(50), region2.tracker.getLength()); + assertEquals(0, cacheService.freeRegionCount()); + assertTrue(region0.isEvicted()); + assertFalse(region1.isEvicted()); + + // explicitly evict region 1 + synchronized (cacheService) { + assertTrue(tryEvict(region1)); + } + assertEquals(1, cacheService.freeRegionCount()); + } + } + + public void testForceEviction() throws IOException { + Settings settings = Settings.builder() + .put(NODE_NAME_SETTING.getKey(), "node") + .put(SharedBlobCacheService.SHARED_CACHE_TYPE.getKey(), "lfu") + .put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(500)).getStringRep()) + .put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(100)).getStringRep()) + .put("path.home", createTempDir()) + .build(); + final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue(); + try ( + NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings)); + var cacheService = new SharedBlobCacheService<>( + environment, + settings, + taskQueue.getThreadPool(), + ThreadPool.Names.GENERIC, + BlobCacheMetrics.NOOP + ) + ) { + final var cacheKey1 = generateCacheKey(); + final var cacheKey2 = generateCacheKey(); + assertEquals(5, cacheService.freeRegionCount()); + final var region0 = cacheService.get(cacheKey1, size(250), 0); + assertEquals(4, cacheService.freeRegionCount()); + final var region1 = cacheService.get(cacheKey2, size(250), 1); + assertEquals(3, cacheService.freeRegionCount()); + assertFalse(region0.isEvicted()); + assertFalse(region1.isEvicted()); + cacheService.removeFromCache(cacheKey1); + assertTrue(region0.isEvicted()); + assertFalse(region1.isEvicted()); + assertEquals(4, cacheService.freeRegionCount()); + } + } + + public void testForceEvictResponse() throws IOException { + Settings settings = Settings.builder() + .put(NODE_NAME_SETTING.getKey(), "node") + .put(SharedBlobCacheService.SHARED_CACHE_TYPE.getKey(), "lfu") + .put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(500)).getStringRep()) + .put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(100)).getStringRep()) + .put("path.home", createTempDir()) + .build(); + final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue(); + try ( + NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings)); + var cacheService = new SharedBlobCacheService<>( + environment, + settings, + taskQueue.getThreadPool(), + ThreadPool.Names.GENERIC, + BlobCacheMetrics.NOOP + ) + ) { + final var cacheKey1 = generateCacheKey(); + final var cacheKey2 = generateCacheKey(); + assertEquals(5, cacheService.freeRegionCount()); + final var region0 = cacheService.get(cacheKey1, size(250), 0); + assertEquals(4, cacheService.freeRegionCount()); + final var region1 = cacheService.get(cacheKey2, size(250), 1); + assertEquals(3, cacheService.freeRegionCount()); + assertFalse(region0.isEvicted()); + assertFalse(region1.isEvicted()); + + assertEquals(1, cacheService.forceEvict(cK -> cK == cacheKey1)); + assertEquals(1, cacheService.forceEvict(e -> true)); + } + } + + public void testDecay() throws IOException { + // we have 8 regions + Settings settings = Settings.builder() + .put(NODE_NAME_SETTING.getKey(), "node") + .put(SharedBlobCacheService.SHARED_CACHE_TYPE.getKey(), "lfu") + .put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(400)).getStringRep()) + .put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(100)).getStringRep()) + .put("path.home", createTempDir()) + .build(); + final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue(); + try ( + NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings)); + var cacheService = new SharedBlobCacheService<>( + environment, + settings, + taskQueue.getThreadPool(), + ThreadPool.Names.GENERIC, + BlobCacheMetrics.NOOP + ) + ) { + assertEquals(4, cacheService.freeRegionCount()); + + final var cacheKey1 = generateCacheKey(); + final var cacheKey2 = generateCacheKey(); + final var cacheKey3 = generateCacheKey(); + // add a region that we can evict when provoking first decay + cacheService.get("evictkey", size(250), 0); + assertEquals(3, cacheService.freeRegionCount()); + final var region0 = cacheService.get(cacheKey1, size(250), 0); + assertEquals(2, cacheService.freeRegionCount()); + final var region1 = cacheService.get(cacheKey2, size(250), 1); + assertEquals(1, cacheService.freeRegionCount()); + final var region2 = cacheService.get(cacheKey3, size(250), 1); + assertEquals(0, cacheService.freeRegionCount()); + + assertEquals(1, cacheService.getFreq(region0)); + assertEquals(1, cacheService.getFreq(region1)); + assertEquals(1, cacheService.getFreq(region2)); + AtomicLong expectedEpoch = new AtomicLong(); + Runnable triggerDecay = () -> { + assertThat(taskQueue.hasRunnableTasks(), is(false)); + cacheService.get(expectedEpoch.toString(), size(250), 0); + assertThat(taskQueue.hasRunnableTasks(), is(true)); + taskQueue.runAllRunnableTasks(); + assertThat(cacheService.epoch(), equalTo(expectedEpoch.incrementAndGet())); + }; + + triggerDecay.run(); + + cacheService.get(cacheKey1, size(250), 0); + cacheService.get(cacheKey2, size(250), 1); + cacheService.get(cacheKey3, size(250), 1); + + triggerDecay.run(); + + final var region0Again = cacheService.get(cacheKey1, size(250), 0); + assertSame(region0Again, region0); + assertEquals(3, cacheService.getFreq(region0)); + assertEquals(1, cacheService.getFreq(region1)); + assertEquals(1, cacheService.getFreq(region2)); + + triggerDecay.run(); + + cacheService.get(cacheKey1, size(250), 0); + assertEquals(4, cacheService.getFreq(region0)); + cacheService.get(cacheKey1, size(250), 0); + assertEquals(4, cacheService.getFreq(region0)); + assertEquals(0, cacheService.getFreq(region1)); + assertEquals(0, cacheService.getFreq(region2)); + + // ensure no freq=0 entries + cacheService.get(cacheKey2, size(250), 1); + cacheService.get(cacheKey3, size(250), 1); + assertEquals(2, cacheService.getFreq(region1)); + assertEquals(2, cacheService.getFreq(region2)); + + triggerDecay.run(); + + assertEquals(3, cacheService.getFreq(region0)); + assertEquals(1, cacheService.getFreq(region1)); + assertEquals(1, cacheService.getFreq(region2)); + + triggerDecay.run(); + assertEquals(2, cacheService.getFreq(region0)); + assertEquals(0, cacheService.getFreq(region1)); + assertEquals(0, cacheService.getFreq(region2)); + + // ensure no freq=0 entries + cacheService.get(cacheKey2, size(250), 1); + cacheService.get(cacheKey3, size(250), 1); + assertEquals(2, cacheService.getFreq(region1)); + assertEquals(2, cacheService.getFreq(region2)); + + triggerDecay.run(); + assertEquals(1, cacheService.getFreq(region0)); + assertEquals(1, cacheService.getFreq(region1)); + assertEquals(1, cacheService.getFreq(region2)); + + triggerDecay.run(); + assertEquals(0, cacheService.getFreq(region0)); + assertEquals(0, cacheService.getFreq(region1)); + assertEquals(0, cacheService.getFreq(region2)); + } + } + + /** + * Test when many objects need to decay, in particular useful to measure how long the decay task takes. + * For 1M objects (with no assertions) it took 26ms locally. + */ + public void testMassiveDecay() throws IOException { + int regions = 1024; // to measure decay time, increase to 1024*1024 and disable assertions. + Settings settings = Settings.builder() + .put(NODE_NAME_SETTING.getKey(), "node") + .put(SharedBlobCacheService.SHARED_CACHE_TYPE.getKey(), "lfu") + .put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(regions)).getStringRep()) + .put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(1)).getStringRep()) + .put("path.home", createTempDir()) + .build(); + final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue(); + try ( + NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings)); + var cacheService = new SharedBlobCacheService<>( + environment, + settings, + taskQueue.getThreadPool(), + ThreadPool.Names.GENERIC, + BlobCacheMetrics.NOOP + ) + ) { + Runnable decay = () -> { + assertThat(taskQueue.hasRunnableTasks(), is(true)); + long before = System.currentTimeMillis(); + taskQueue.runAllRunnableTasks(); + long after = System.currentTimeMillis(); + logger.debug("took {} ms", (after - before)); + }; + long fileLength = size(regions + 100); + Object cacheKey = new Object(); + for (int i = 0; i < regions; ++i) { + cacheService.get(cacheKey, fileLength, i); + if (Integer.bitCount(i) == 1) { + logger.debug("did {} gets", i); + } + } + assertThat(taskQueue.hasRunnableTasks(), is(false)); + cacheService.get(cacheKey, fileLength, regions); + decay.run(); + int maxRounds = 5; + for (int round = 2; round <= maxRounds; ++round) { + for (int i = round; i < regions + round; ++i) { + cacheService.get(cacheKey, fileLength, i); + if (Integer.bitCount(i) == 1) { + logger.debug("did {} gets", i); + } + } + decay.run(); + } + + Map freqs = new HashMap<>(); + for (int i = maxRounds; i < regions + maxRounds; ++i) { + int freq = cacheService.getFreq(cacheService.get(cacheKey, fileLength, i)) - 2; + freqs.compute(freq, (k, v) -> v == null ? 1 : v + 1); + if (Integer.bitCount(i) == 1) { + logger.debug("did {} gets", i); + } + } + assertThat(freqs.get(4), equalTo(regions - maxRounds + 1)); + } + } + + /** + * Exercise SharedBlobCacheService#get in multiple threads to trigger any assertion errors. + * @throws IOException + */ + public void testGetMultiThreaded() throws IOException { + final int threads = between(2, 10); + final int regionCount = between(1, 20); + final boolean incRef = randomBoolean(); + // if we have enough regions, a get should always have a result (except for explicit evict interference) + // if we incRef, we risk the eviction racing against that, leading to no available region, so allow + // the already closed exception in that case. + final boolean allowAlreadyClosed = regionCount < threads || incRef; + + logger.info("{} {} {}", threads, regionCount, allowAlreadyClosed); + Settings settings = Settings.builder() + .put(NODE_NAME_SETTING.getKey(), "node") + .put(SharedBlobCacheService.SHARED_CACHE_TYPE.getKey(), "lfu") + .put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(regionCount * 100L)).getStringRep()) + .put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(100)).getStringRep()) + .put(SharedBlobCacheService.SHARED_CACHE_MIN_TIME_DELTA_SETTING.getKey(), randomFrom("0", "1ms", "10s")) + .put("path.home", createTempDir()) + .build(); + long fileLength = size(500); + ThreadPool threadPool = new TestThreadPool("testGetMultiThreaded"); + Set files = randomSet(1, 10, () -> randomAlphaOfLength(5)); + try ( + NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings)); + var cacheService = new SharedBlobCacheService( + environment, + settings, + threadPool, + ThreadPool.Names.GENERIC, + BlobCacheMetrics.NOOP + ) + ) { + CyclicBarrier ready = new CyclicBarrier(threads); + List threadList = IntStream.range(0, threads).mapToObj(no -> { + int iterations = between(100, 500); + String[] cacheKeys = IntStream.range(0, iterations).mapToObj(ignore -> randomFrom(files)).toArray(String[]::new); + int[] regions = IntStream.range(0, iterations).map(ignore -> between(0, 4)).toArray(); + int[] yield = IntStream.range(0, iterations).map(ignore -> between(0, 9)).toArray(); + int[] evict = IntStream.range(0, iterations).map(ignore -> between(0, 99)).toArray(); + return new Thread(() -> { + try { + ready.await(); + for (int i = 0; i < iterations; ++i) { + try { + SharedBlobCacheService.CacheFileRegion cacheFileRegion; + try { + cacheFileRegion = cacheService.get(cacheKeys[i], fileLength, regions[i]); + } catch (AlreadyClosedException e) { + assert allowAlreadyClosed || e.getMessage().equals("evicted during free region allocation") : e; + throw e; + } + if (incRef && cacheFileRegion.tryIncRef()) { + if (yield[i] == 0) { + Thread.yield(); + } + cacheFileRegion.decRef(); + } + if (evict[i] == 0) { + cacheService.forceEvict(x -> true); + } + } catch (AlreadyClosedException e) { + // ignore + } + } + } catch (InterruptedException | BrokenBarrierException e) { + assert false; + throw new RuntimeException(e); + } + }); + }).toList(); + threadList.forEach(Thread::start); + threadList.forEach(thread -> { + try { + thread.join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + }); + } finally { + threadPool.shutdownNow(); + } + } + + public void testFetchFullCacheEntry() throws Exception { + Settings settings = Settings.builder() + .put(NODE_NAME_SETTING.getKey(), "node") + .put(SharedBlobCacheService.SHARED_CACHE_TYPE.getKey(), "lfu") + .put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(500)).getStringRep()) + .put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(100)).getStringRep()) + .put("path.home", createTempDir()) + .build(); + + AtomicInteger bulkTaskCount = new AtomicInteger(0); + ThreadPool threadPool = new TestThreadPool("test") { + @Override + public ExecutorService executor(String name) { + ExecutorService generic = super.executor(Names.GENERIC); + if (Objects.equals(name, "bulk")) { + return new StoppableExecutorServiceWrapper(generic) { + @Override + public void execute(Runnable command) { + super.execute(command); + bulkTaskCount.incrementAndGet(); + } + }; + } + return generic; + } + }; + + try ( + NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings)); + var cacheService = new SharedBlobCacheService<>( + environment, + settings, + threadPool, + ThreadPool.Names.GENERIC, + "bulk", + BlobCacheMetrics.NOOP + ) + ) { + { + final var cacheKey = generateCacheKey(); + assertEquals(5, cacheService.freeRegionCount()); + final long size = size(250); + AtomicLong bytesRead = new AtomicLong(size); + final PlainActionFuture future = new PlainActionFuture<>(); + cacheService.maybeFetchFullEntry(cacheKey, size, (channel, channelPos, relativePos, length, progressUpdater) -> { + bytesRead.addAndGet(-length); + progressUpdater.accept(length); + }, future); + + future.get(10, TimeUnit.SECONDS); + assertEquals(0L, bytesRead.get()); + assertEquals(2, cacheService.freeRegionCount()); + assertEquals(3, bulkTaskCount.get()); + } + { + // a download that would use up all regions should not run + final var cacheKey = generateCacheKey(); + assertEquals(2, cacheService.freeRegionCount()); + var configured = cacheService.maybeFetchFullEntry(cacheKey, size(500), (ch, chPos, relPos, len, update) -> { + throw new AssertionError("Should never reach here"); + }, ActionListener.noop()); + assertFalse(configured); + assertEquals(2, cacheService.freeRegionCount()); + } + } + + threadPool.shutdown(); + } + + public void testFetchFullCacheEntryConcurrently() throws Exception { + Settings settings = Settings.builder() + .put(NODE_NAME_SETTING.getKey(), "node") + .put(SharedBlobCacheService.SHARED_CACHE_TYPE.getKey(), "lfu") + .put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(500)).getStringRep()) + .put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(100)).getStringRep()) + .put("path.home", createTempDir()) + .build(); + + ThreadPool threadPool = new TestThreadPool("test") { + @Override + public ExecutorService executor(String name) { + ExecutorService generic = super.executor(Names.GENERIC); + if (Objects.equals(name, "bulk")) { + return new StoppableExecutorServiceWrapper(generic); + } + return generic; + } + }; + + try ( + NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings)); + var cacheService = new SharedBlobCacheService<>( + environment, + settings, + threadPool, + ThreadPool.Names.GENERIC, + "bulk", + BlobCacheMetrics.NOOP + ) + ) { + + final long size = size(randomIntBetween(1, 100)); + final Thread[] threads = new Thread[10]; + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread(() -> { + for (int j = 0; j < 1000; j++) { + final var cacheKey = generateCacheKey(); + try { + PlainActionFuture.get( + f -> cacheService.maybeFetchFullEntry( + cacheKey, + size, + (channel, channelPos, relativePos, length, progressUpdater) -> progressUpdater.accept(length), + f + ) + ); + } catch (Exception e) { + throw new AssertionError(e); + } + } + }); + } + for (Thread thread : threads) { + thread.start(); + } + for (Thread thread : threads) { + thread.join(); + } + } finally { + assertTrue(ThreadPool.terminate(threadPool, 10L, TimeUnit.SECONDS)); + } + } + + public void testCacheSizeRejectedOnNonFrozenNodes() { + String cacheSize = randomBoolean() + ? ByteSizeValue.ofBytes(size(500)).getStringRep() + : (new RatioValue(between(1, 100))).formatNoTrailingZerosPercent(); + final Settings settings = Settings.builder() + .put(SharedBlobCacheService.SHARED_CACHE_TYPE.getKey(), "lfu") + .put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), cacheSize) + .put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(100)).getStringRep()) + .putList(NodeRoleSettings.NODE_ROLES_SETTING.getKey(), DiscoveryNodeRole.DATA_HOT_NODE_ROLE.roleName()) + .build(); + final IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.get(settings) + ); + assertThat(e.getCause(), notNullValue()); + assertThat(e.getCause(), instanceOf(SettingsException.class)); + assertThat( + e.getCause().getMessage(), + is( + "Setting [" + + SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey() + + "] to be positive [" + + cacheSize + + "] is only permitted on nodes with the data_frozen, search, or indexing role. Roles are [data_hot]" + ) + ); + } + + public void testMultipleDataPathsRejectedOnFrozenNodes() { + final Settings settings = Settings.builder() + .put(SharedBlobCacheService.SHARED_CACHE_TYPE.getKey(), "lfu") + .put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(500)).getStringRep()) + .putList(NodeRoleSettings.NODE_ROLES_SETTING.getKey(), DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE.roleName()) + .putList(Environment.PATH_DATA_SETTING.getKey(), List.of("a", "b")) + .build(); + final IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.get(settings) + ); + assertThat(e.getCause(), notNullValue()); + assertThat(e.getCause(), instanceOf(SettingsException.class)); + assertThat( + e.getCause().getMessage(), + is( + "setting [" + + SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey() + + "=" + + ByteSizeValue.ofBytes(size(500)).getStringRep() + + "] is not permitted on nodes with multiple data paths [a,b]" + ) + ); + } + + public void testDedicateFrozenCacheSizeDefaults() { + final Settings settings = Settings.builder() + .put(SharedBlobCacheService.SHARED_CACHE_TYPE.getKey(), "lfu") + .putList(NodeRoleSettings.NODE_ROLES_SETTING.getKey(), DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE.roleName()) + .build(); + + RelativeByteSizeValue relativeCacheSize = SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.get(settings); + assertThat(relativeCacheSize.isAbsolute(), is(false)); + assertThat(relativeCacheSize.isNonZeroSize(), is(true)); + assertThat(relativeCacheSize.calculateValue(ByteSizeValue.ofBytes(10000), null), equalTo(ByteSizeValue.ofBytes(9000))); + assertThat(SharedBlobCacheService.SHARED_CACHE_SIZE_MAX_HEADROOM_SETTING.get(settings), equalTo(ByteSizeValue.ofGb(100))); + } + + public void testNotDedicatedFrozenCacheSizeDefaults() { + final Settings settings = Settings.builder() + .put(SharedBlobCacheService.SHARED_CACHE_TYPE.getKey(), "lfu") + .putList( + NodeRoleSettings.NODE_ROLES_SETTING.getKey(), + Sets.union( + Set.of( + randomFrom( + DiscoveryNodeRole.DATA_HOT_NODE_ROLE, + DiscoveryNodeRole.DATA_COLD_NODE_ROLE, + DiscoveryNodeRole.DATA_WARM_NODE_ROLE, + DiscoveryNodeRole.DATA_CONTENT_NODE_ROLE + ) + ), + new HashSet<>( + randomSubsetOf( + between(0, 3), + DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE, + DiscoveryNodeRole.INGEST_ROLE, + DiscoveryNodeRole.MASTER_ROLE + ) + ) + ).stream().map(DiscoveryNodeRole::roleName).collect(Collectors.toList()) + ) + .build(); + + RelativeByteSizeValue relativeCacheSize = SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.get(settings); + assertThat(relativeCacheSize.isNonZeroSize(), is(false)); + assertThat(relativeCacheSize.isAbsolute(), is(true)); + assertThat(relativeCacheSize.getAbsolute(), equalTo(ByteSizeValue.ZERO)); + assertThat(SharedBlobCacheService.SHARED_CACHE_SIZE_MAX_HEADROOM_SETTING.get(settings), equalTo(ByteSizeValue.ofBytes(-1))); + } + + public void testSearchOrIndexNodeCacheSizeDefaults() { + final Settings settings = Settings.builder() + .put(SharedBlobCacheService.SHARED_CACHE_TYPE.getKey(), "lfu") + .putList( + NodeRoleSettings.NODE_ROLES_SETTING.getKey(), + randomFrom(DiscoveryNodeRole.SEARCH_ROLE, DiscoveryNodeRole.INDEX_ROLE).roleName() + ) + .build(); + + RelativeByteSizeValue relativeCacheSize = SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.get(settings); + assertThat(relativeCacheSize.isAbsolute(), is(false)); + assertThat(relativeCacheSize.isNonZeroSize(), is(true)); + assertThat(relativeCacheSize.calculateValue(ByteSizeValue.ofBytes(10000), null), equalTo(ByteSizeValue.ofBytes(9000))); + assertThat(SharedBlobCacheService.SHARED_CACHE_SIZE_MAX_HEADROOM_SETTING.get(settings), equalTo(ByteSizeValue.ofGb(100))); + } + + public void testMaxHeadroomRejectedForAbsoluteCacheSize() { + String cacheSize = ByteSizeValue.ofBytes(size(500)).getStringRep(); + final Settings settings = Settings.builder() + .put(SharedBlobCacheService.SHARED_CACHE_TYPE.getKey(), "lfu") + .put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), cacheSize) + .put(SharedBlobCacheService.SHARED_CACHE_SIZE_MAX_HEADROOM_SETTING.getKey(), ByteSizeValue.ofBytes(size(100)).getStringRep()) + .putList(NodeRoleSettings.NODE_ROLES_SETTING.getKey(), DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE.roleName()) + .build(); + final IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> SharedBlobCacheService.SHARED_CACHE_SIZE_MAX_HEADROOM_SETTING.get(settings) + ); + assertThat(e.getCause(), notNullValue()); + assertThat(e.getCause(), instanceOf(SettingsException.class)); + assertThat( + e.getCause().getMessage(), + is( + "setting [" + + SharedBlobCacheService.SHARED_CACHE_SIZE_MAX_HEADROOM_SETTING.getKey() + + "] cannot be specified for absolute [" + + SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey() + + "=" + + cacheSize + + "]" + ) + ); + } + + public void testCalculateCacheSize() { + long smallSize = 10000; + long largeSize = ByteSizeValue.ofTb(10).getBytes(); + assertThat(SharedBlobCacheService.calculateCacheSize(Settings.EMPTY, smallSize), equalTo(0L)); + final Settings settings = Settings.builder() + .put(SharedBlobCacheService.SHARED_CACHE_TYPE.getKey(), "lfu") + .putList(NodeRoleSettings.NODE_ROLES_SETTING.getKey(), DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE.roleName()) + .build(); + assertThat(SharedBlobCacheService.calculateCacheSize(settings, smallSize), equalTo(9000L)); + assertThat(SharedBlobCacheService.calculateCacheSize(settings, largeSize), equalTo(largeSize - ByteSizeValue.ofGb(100).getBytes())); + } + + private static Object generateCacheKey() { + return new Object(); + } + + public void testCacheSizeChanges() throws IOException { + ByteSizeValue val1 = new ByteSizeValue(randomIntBetween(1, 5), ByteSizeUnit.MB); + Settings settings = Settings.builder() + .put(NODE_NAME_SETTING.getKey(), "node") + .put(SharedBlobCacheService.SHARED_CACHE_TYPE.getKey(), "lfu") + .put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), val1.getStringRep()) + .put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(100)).getStringRep()) + .put("path.home", createTempDir()) + .build(); + final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue(); + try ( + NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings)); + SharedBlobCacheService cacheService = new SharedBlobCacheService<>( + environment, + settings, + taskQueue.getThreadPool(), + ThreadPool.Names.GENERIC, + BlobCacheMetrics.NOOP + ) + ) { + assertEquals(val1.getBytes(), cacheService.getStats().size()); + } + + ByteSizeValue val2 = new ByteSizeValue(randomIntBetween(1, 5), ByteSizeUnit.MB); + settings = Settings.builder() + .put(settings) + .put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), val2.getStringRep()) + .build(); + try ( + NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings)); + SharedBlobCacheService cacheService = new SharedBlobCacheService<>( + environment, + settings, + taskQueue.getThreadPool(), + ThreadPool.Names.GENERIC, + BlobCacheMetrics.NOOP + ) + ) { + assertEquals(val2.getBytes(), cacheService.getStats().size()); + } + } + + public void testMaybeEvictLeastUsed() throws Exception { + final int numRegions = 10; + final long regionSize = size(1L); + Settings settings = Settings.builder() + .put(NODE_NAME_SETTING.getKey(), "node") + .put(SharedBlobCacheService.SHARED_CACHE_TYPE.getKey(), "lfu") + .put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(numRegions)).getStringRep()) + .put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(regionSize).getStringRep()) + .put("path.home", createTempDir()) + .build(); + + final AtomicLong relativeTimeInMillis = new AtomicLong(0L); + final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue(); + try ( + NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings)); + var cacheService = new SharedBlobCacheService<>( + environment, + settings, + taskQueue.getThreadPool(), + ThreadPool.Names.GENERIC, + "bulk", + BlobCacheMetrics.NOOP + ) + ) { + final Map.CacheFileRegion> cacheEntries = new HashMap<>(); + + assertThat("All regions are free", cacheService.freeRegionCount(), equalTo(numRegions)); + assertThat("Cache has no entries", cacheService.maybeEvict(), is(false)); + + // use all regions in cache + for (int i = 0; i < numRegions; i++) { + final var cacheKey = generateCacheKey(); + var entry = cacheService.get(cacheKey, regionSize, 0); + entry.populate( + ByteRange.of(0L, regionSize), + (channel, channelPos, relativePos, length, progressUpdater) -> progressUpdater.accept(length), + taskQueue.getThreadPool().generic(), + ActionListener.noop() + ); + assertThat(cacheService.getFreq(entry), equalTo(1)); + cacheEntries.put(cacheKey, entry); + } + + assertThat("All regions are used", cacheService.freeRegionCount(), equalTo(0)); + assertThat("Cache entries are not old enough to be evicted", cacheService.maybeEvict(), is(false)); + + taskQueue.runAllRunnableTasks(); + + assertThat("All regions are used", cacheService.freeRegionCount(), equalTo(0)); + assertThat("Cache entries are not old enough to be evicted", cacheService.maybeEvict(), is(false)); + + cacheService.maybeScheduleDecayAndNewEpoch(); + taskQueue.runAllRunnableTasks(); + + cacheEntries.keySet().forEach(key -> cacheService.get(key, regionSize, 0)); + cacheService.maybeScheduleDecayAndNewEpoch(); + taskQueue.runAllRunnableTasks(); + + // touch some random cache entries + var usedCacheKeys = Set.copyOf(randomSubsetOf(cacheEntries.keySet())); + usedCacheKeys.forEach(key -> cacheService.get(key, regionSize, 0)); + + cacheEntries.forEach( + (key, entry) -> assertThat(cacheService.getFreq(entry), usedCacheKeys.contains(key) ? equalTo(3) : equalTo(1)) + ); + + assertThat("All regions are used", cacheService.freeRegionCount(), equalTo(0)); + assertThat("Cache entries are not old enough to be evicted", cacheService.maybeEvict(), is(false)); + + cacheService.maybeScheduleDecayAndNewEpoch(); + taskQueue.runAllRunnableTasks(); + + assertThat("All regions are used", cacheService.freeRegionCount(), equalTo(0)); + cacheEntries.forEach( + (key, entry) -> assertThat(cacheService.getFreq(entry), usedCacheKeys.contains(key) ? equalTo(2) : equalTo(0)) + ); + + var zeroFrequencyCacheEntries = cacheEntries.size() - usedCacheKeys.size(); + for (int i = 0; i < zeroFrequencyCacheEntries; i++) { + assertThat(cacheService.freeRegionCount(), equalTo(i)); + assertThat("Cache entry is old enough to be evicted", cacheService.maybeEvict(), is(true)); + assertThat(cacheService.freeRegionCount(), equalTo(i + 1)); + } + + assertThat("No more cache entries old enough to be evicted", cacheService.maybeEvict(), is(false)); + assertThat(cacheService.freeRegionCount(), equalTo(zeroFrequencyCacheEntries)); + } + } + + public void testMaybeFetchRegion() throws Exception { + final long cacheSize = size(500L); + final long regionSize = size(100L); + Settings settings = Settings.builder() + .put(NODE_NAME_SETTING.getKey(), "node") + .put(SharedBlobCacheService.SHARED_CACHE_TYPE.getKey(), "lfu") + .put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(cacheSize).getStringRep()) + .put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(regionSize).getStringRep()) + .put("path.home", createTempDir()) + .build(); + + AtomicInteger bulkTaskCount = new AtomicInteger(0); + ThreadPool threadPool = new TestThreadPool("test") { + @Override + public ExecutorService executor(String name) { + ExecutorService generic = super.executor(Names.GENERIC); + if (Objects.equals(name, "bulk")) { + return new StoppableExecutorServiceWrapper(generic) { + @Override + public void execute(Runnable command) { + super.execute(command); + bulkTaskCount.incrementAndGet(); + } + }; + } + return generic; + } + }; + try ( + NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings)); + var cacheService = new SharedBlobCacheService<>( + environment, + settings, + threadPool, + ThreadPool.Names.GENERIC, + "bulk", + BlobCacheMetrics.NOOP + ) + ) { + { + // fetch a single region + final var cacheKey = generateCacheKey(); + assertEquals(5, cacheService.freeRegionCount()); + final long blobLength = size(250); // 3 regions + AtomicLong bytesRead = new AtomicLong(0L); + final PlainActionFuture future = new PlainActionFuture<>(); + cacheService.maybeFetchRegion(cacheKey, 0, blobLength, (channel, channelPos, relativePos, length, progressUpdater) -> { + bytesRead.addAndGet(length); + progressUpdater.accept(length); + }, future); + + var fetched = future.get(10, TimeUnit.SECONDS); + assertThat("Region has been fetched", fetched, is(true)); + assertEquals(regionSize, bytesRead.get()); + assertEquals(4, cacheService.freeRegionCount()); + assertEquals(1, bulkTaskCount.get()); + } + { + // fetch multiple regions to used all the cache + final int remainingFreeRegions = cacheService.freeRegionCount(); + assertEquals(4, cacheService.freeRegionCount()); + + final var cacheKey = generateCacheKey(); + final long blobLength = regionSize * remainingFreeRegions; + AtomicLong bytesRead = new AtomicLong(0L); + + final PlainActionFuture> future = new PlainActionFuture<>(); + final var listener = new GroupedActionListener<>(remainingFreeRegions, future); + for (int region = 0; region < remainingFreeRegions; region++) { + cacheService.maybeFetchRegion( + cacheKey, + region, + blobLength, + (channel, channelPos, relativePos, length, progressUpdater) -> { + bytesRead.addAndGet(length); + progressUpdater.accept(length); + }, + listener + ); + } + + var results = future.get(10, TimeUnit.SECONDS); + assertThat(results.stream().allMatch(result -> result), is(true)); + assertEquals(blobLength, bytesRead.get()); + assertEquals(0, cacheService.freeRegionCount()); + assertEquals(1 + remainingFreeRegions, bulkTaskCount.get()); + } + { + // cache fully used, no entry old enough to be evicted + assertEquals(0, cacheService.freeRegionCount()); + final var cacheKey = generateCacheKey(); + final PlainActionFuture future = new PlainActionFuture<>(); + cacheService.maybeFetchRegion( + cacheKey, + randomIntBetween(0, 10), + randomLongBetween(1L, regionSize), + (channel, channelPos, relativePos, length, progressUpdater) -> { + throw new AssertionError("should not be executed"); + }, + future + ); + assertThat("Listener is immediately completed", future.isDone(), is(true)); + assertThat("Region already exists in cache", future.get(), is(false)); + } + { + cacheService.computeDecay(); + + // fetch one more region should evict an old cache entry + final var cacheKey = generateCacheKey(); + assertEquals(0, cacheService.freeRegionCount()); + long blobLength = randomLongBetween(1L, regionSize); + AtomicLong bytesRead = new AtomicLong(0L); + final PlainActionFuture future = new PlainActionFuture<>(); + cacheService.maybeFetchRegion(cacheKey, 0, blobLength, (channel, channelPos, relativePos, length, progressUpdater) -> { + bytesRead.addAndGet(length); + progressUpdater.accept(length); + }, future); + + var fetched = future.get(10, TimeUnit.SECONDS); + assertThat("Region has been fetched", fetched, is(true)); + assertEquals(blobLength, bytesRead.get()); + assertEquals(0, cacheService.freeRegionCount()); + } + } + + threadPool.shutdown(); + } + + public void testPopulate() throws Exception { + final long regionSize = size(1L); + Settings settings = Settings.builder() + .put(NODE_NAME_SETTING.getKey(), "node") + .put(SharedBlobCacheService.SHARED_CACHE_TYPE.getKey(), "lfu") + .put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(100)).getStringRep()) + .put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(regionSize).getStringRep()) + .put("path.home", createTempDir()) + .build(); + + final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue(); + try ( + NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings)); + var cacheService = new SharedBlobCacheService<>( + environment, + settings, + taskQueue.getThreadPool(), + ThreadPool.Names.GENERIC, + ThreadPool.Names.GENERIC, + BlobCacheMetrics.NOOP + ) + ) { + final var cacheKey = generateCacheKey(); + final var blobLength = size(12L); + + // start populating the first region + var entry = cacheService.get(cacheKey, blobLength, 0); + AtomicLong bytesWritten = new AtomicLong(0L); + final PlainActionFuture future1 = new PlainActionFuture<>(); + entry.populate(ByteRange.of(0, regionSize - 1), (channel, channelPos, relativePos, length, progressUpdater) -> { + bytesWritten.addAndGet(length); + progressUpdater.accept(length); + }, taskQueue.getThreadPool().generic(), future1); + + assertThat(future1.isDone(), is(false)); + assertThat(taskQueue.hasRunnableTasks(), is(true)); + + // start populating the second region + entry = cacheService.get(cacheKey, blobLength, 1); + final PlainActionFuture future2 = new PlainActionFuture<>(); + entry.populate(ByteRange.of(0, regionSize - 1), (channel, channelPos, relativePos, length, progressUpdater) -> { + bytesWritten.addAndGet(length); + progressUpdater.accept(length); + }, taskQueue.getThreadPool().generic(), future2); + + // start populating again the first region, listener should be called immediately + entry = cacheService.get(cacheKey, blobLength, 0); + final PlainActionFuture future3 = new PlainActionFuture<>(); + entry.populate(ByteRange.of(0, regionSize - 1), (channel, channelPos, relativePos, length, progressUpdater) -> { + bytesWritten.addAndGet(length); + progressUpdater.accept(length); + }, taskQueue.getThreadPool().generic(), future3); + + assertThat(future3.isDone(), is(true)); + var written = future3.get(10L, TimeUnit.SECONDS); + assertThat(written, is(false)); + + taskQueue.runAllRunnableTasks(); + + written = future1.get(10L, TimeUnit.SECONDS); + assertThat(future1.isDone(), is(true)); + assertThat(written, is(true)); + written = future2.get(10L, TimeUnit.SECONDS); + assertThat(future2.isDone(), is(true)); + assertThat(written, is(true)); + } + } + + private void assertThatNonPositiveRecoveryRangeSizeRejected(Setting setting) { + final String value = randomFrom(ByteSizeValue.MINUS_ONE, ByteSizeValue.ZERO).getStringRep(); + final Settings settings = Settings.builder() + .put(SharedBlobCacheService.SHARED_CACHE_TYPE.getKey(), "lfu") + .put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(100)).getStringRep()) + .putList(NodeRoleSettings.NODE_ROLES_SETTING.getKey(), DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE.roleName()) + .put(setting.getKey(), value) + .build(); + final IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> setting.get(settings)); + assertThat(e.getCause(), notNullValue()); + assertThat(e.getCause(), instanceOf(SettingsException.class)); + assertThat(e.getCause().getMessage(), is("setting [" + setting.getKey() + "] must be greater than zero")); + } + + public void testNonPositiveRegionSizeRejected() { + assertThatNonPositiveRecoveryRangeSizeRejected(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING); + } + + public void testNonPositiveRangeSizeRejected() { + assertThatNonPositiveRecoveryRangeSizeRejected(SharedBlobCacheService.SHARED_CACHE_RANGE_SIZE_SETTING); + } + + public void testNonPositiveRecoveryRangeSizeRejected() { + assertThatNonPositiveRecoveryRangeSizeRejected(SharedBlobCacheService.SHARED_CACHE_RECOVERY_RANGE_SIZE_SETTING); + } + + public void testUseFullRegionSize() throws IOException { + final long regionSize = size(randomIntBetween(1, 100)); + final long cacheSize = regionSize * randomIntBetween(1, 10); + + Settings settings = Settings.builder() + .put(NODE_NAME_SETTING.getKey(), "node") + .put(SharedBlobCacheService.SHARED_CACHE_TYPE.getKey(), "lfu") + .put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(regionSize).getStringRep()) + .put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(cacheSize).getStringRep()) + .put("path.home", createTempDir()) + .build(); + final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue(); + try ( + NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings)); + var cacheService = new SharedBlobCacheService<>( + environment, + settings, + taskQueue.getThreadPool(), + ThreadPool.Names.GENERIC, + ThreadPool.Names.GENERIC, + BlobCacheMetrics.NOOP + ) { + @Override + protected int computeCacheFileRegionSize(long fileLength, int region) { + // use full region + return super.getRegionSize(); + } + } + ) { + final var cacheKey = generateCacheKey(); + final var blobLength = randomLongBetween(1L, cacheSize); + + int regions = Math.toIntExact(blobLength / regionSize); + regions += (blobLength % regionSize == 0L ? 0L : 1L); + assertThat( + cacheService.computeCacheFileRegionSize(blobLength, randomFrom(regions)), + equalTo(BlobCacheUtils.toIntBytes(regionSize)) + ); + for (int region = 0; region < regions; region++) { + var cacheFileRegion = cacheService.get(cacheKey, blobLength, region); + assertThat(cacheFileRegion.tracker.getLength(), equalTo(regionSize)); + } + } + } +} diff --git a/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java b/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceUsingLRUCacheTests.java similarity index 99% rename from x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java rename to x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceUsingLRUCacheTests.java index 92ee18e7542f2..d30449e317e89 100644 --- a/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java +++ b/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceUsingLRUCacheTests.java @@ -54,7 +54,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; -public class SharedBlobCacheServiceTests extends ESTestCase { +public class SharedBlobCacheServiceUsingLRUCacheTests extends ESTestCase { private static long size(long numPages) { return numPages * SharedBytes.PAGE_SIZE; @@ -682,7 +682,7 @@ public void testMaybeEvictRecentUsed() throws Exception { ) ) { assertThat("All regions are free", cacheService.freeRegionCount(), equalTo(numRegions)); - assertThat("Cache has no entries", cacheService.maybeEvictLeastRecent(), is(false)); + assertThat("Cache has no entries", cacheService.maybeEvict(), is(false)); // use all regions in cache for (int i = 0; i < numRegions; i++) { @@ -697,12 +697,12 @@ public void testMaybeEvictRecentUsed() throws Exception { } assertThat("All regions are used", cacheService.freeRegionCount(), equalTo(0)); - assertThat("Last entry has other references", cacheService.maybeEvictLeastRecent(), is(false)); + assertThat("Last entry has other references", cacheService.maybeEvict(), is(false)); taskQueue.runAllRunnableTasks(); assertThat("All regions are used", cacheService.freeRegionCount(), equalTo(0)); - assertThat("Cache should evict last entry", cacheService.maybeEvictLeastRecent(), is(true)); + assertThat("Cache should evict last entry", cacheService.maybeEvict(), is(true)); } }