From c80e883b88ec56fbcd97fa01309620ff65d5821e Mon Sep 17 00:00:00 2001 From: Ahmad AbuKhalil Date: Mon, 26 Dec 2022 12:10:28 -0800 Subject: [PATCH] File System Caching for remote store Signed-off-by: Ahmad AbuKhalil --- .../org/opensearch/common/cache/Cache.java | 20 +- .../opensearch/common/cache/CacheUsage.java | 41 ++ .../common/cache/RemovalNotification.java | 10 - .../common/cache/RemovalReason.java | 22 + .../org/opensearch/common/cache/Weigher.java | 27 ++ .../common/cache/refcounted/LRUCache.java | 427 +++++++++++++++++ .../cache/refcounted/RefCountedCache.java | 106 +++++ .../cache/refcounted/SegmentedCache.java | 312 +++++++++++++ .../common/cache/refcounted/package-info.java | 13 + .../cache/refcounted/stats/CacheStats.java | 277 +++++++++++ .../refcounted/stats/DefaultStatsCounter.java | 86 ++++ .../cache/refcounted/stats/StatsCounter.java | 86 ++++ .../cache/refcounted/stats/package-info.java | 12 + .../org/opensearch/common/collect/Linked.java | 41 ++ .../common/collect/LinkedDeque.java | 432 ++++++++++++++++++ .../common/util/ThrowableTranslateUtils.java | 77 ++++ .../common/util/ValidationUtils.java | 41 ++ .../org/opensearch/index/IndexModule.java | 9 +- .../RemoteSnapshotDirectoryFactory.java | 16 +- .../store/remote/fc/CachedIndexInput.java | 34 ++ .../index/store/remote/fc/FileCache.java | 126 +++++ .../store/remote/fc/FileCacheFactory.java | 58 +++ .../store/remote/fc/FileCachedIndexInput.java | 164 +++++++ .../index/store/remote/fc/package-info.java | 12 + .../remote/file/OnDemandBlockIndexInput.java | 2 +- .../store/remote/utils/TransferManager.java | 62 ++- .../AbstractIndexShardCacheEntity.java | 7 +- .../cache/IndicesFieldDataCache.java | 3 +- .../main/java/org/opensearch/node/Node.java | 21 +- .../opensearch/common/cache/CacheTests.java | 14 +- .../refcounted/RefCountedCacheTests.java | 130 ++++++ 31 files changed, 2643 insertions(+), 45 deletions(-) create mode 100644 server/src/main/java/org/opensearch/common/cache/CacheUsage.java create mode 100644 server/src/main/java/org/opensearch/common/cache/RemovalReason.java create mode 100644 server/src/main/java/org/opensearch/common/cache/Weigher.java create mode 100644 server/src/main/java/org/opensearch/common/cache/refcounted/LRUCache.java create mode 100644 server/src/main/java/org/opensearch/common/cache/refcounted/RefCountedCache.java create mode 100644 server/src/main/java/org/opensearch/common/cache/refcounted/SegmentedCache.java create mode 100644 server/src/main/java/org/opensearch/common/cache/refcounted/package-info.java create mode 100644 server/src/main/java/org/opensearch/common/cache/refcounted/stats/CacheStats.java create mode 100644 server/src/main/java/org/opensearch/common/cache/refcounted/stats/DefaultStatsCounter.java create mode 100644 server/src/main/java/org/opensearch/common/cache/refcounted/stats/StatsCounter.java create mode 100644 server/src/main/java/org/opensearch/common/cache/refcounted/stats/package-info.java create mode 100644 server/src/main/java/org/opensearch/common/collect/Linked.java create mode 100644 server/src/main/java/org/opensearch/common/collect/LinkedDeque.java create mode 100644 server/src/main/java/org/opensearch/common/util/ThrowableTranslateUtils.java create mode 100644 server/src/main/java/org/opensearch/common/util/ValidationUtils.java create mode 100644 server/src/main/java/org/opensearch/index/store/remote/fc/CachedIndexInput.java create mode 100644 server/src/main/java/org/opensearch/index/store/remote/fc/FileCache.java create mode 100644 server/src/main/java/org/opensearch/index/store/remote/fc/FileCacheFactory.java create mode 100644 server/src/main/java/org/opensearch/index/store/remote/fc/FileCachedIndexInput.java create mode 100644 server/src/main/java/org/opensearch/index/store/remote/fc/package-info.java create mode 100644 server/src/test/java/org/opensearch/common/cache/refcounted/RefCountedCacheTests.java diff --git a/server/src/main/java/org/opensearch/common/cache/Cache.java b/server/src/main/java/org/opensearch/common/cache/Cache.java index 007b1bfd3cfda..0ebef1556424b 100644 --- a/server/src/main/java/org/opensearch/common/cache/Cache.java +++ b/server/src/main/java/org/opensearch/common/cache/Cache.java @@ -513,9 +513,7 @@ private void put(K key, V value, long now) { promote(tuple.v1(), now); } if (replaced) { - removalListener.onRemoval( - new RemovalNotification<>(tuple.v2().key, tuple.v2().value, RemovalNotification.RemovalReason.REPLACED) - ); + removalListener.onRemoval(new RemovalNotification<>(tuple.v2().key, tuple.v2().value, RemovalReason.REPLACED)); } } @@ -523,7 +521,7 @@ private void put(K key, V value, long now) { try { Entry entry = f.get(); try (ReleasableLock ignored = lruLock.acquire()) { - delete(entry, RemovalNotification.RemovalReason.INVALIDATED); + delete(entry, RemovalReason.INVALIDATED); } } catch (ExecutionException e) { // ok @@ -534,7 +532,7 @@ private void put(K key, V value, long now) { /** * Invalidate the association for the specified key. A removal notification will be issued for invalidated - * entries with {@link org.opensearch.common.cache.RemovalNotification.RemovalReason} INVALIDATED. + * entries with {@link RemovalReason} INVALIDATED. * * @param key the key whose mapping is to be invalidated from the cache */ @@ -546,7 +544,7 @@ public void invalidate(K key) { /** * Invalidate the entry for the specified key and value. If the value provided is not equal to the value in * the cache, no removal will occur. A removal notification will be issued for invalidated - * entries with {@link org.opensearch.common.cache.RemovalNotification.RemovalReason} INVALIDATED. + * entries with {@link RemovalReason} INVALIDATED. * * @param key the key whose mapping is to be invalidated from the cache * @param value the expected value that should be associated with the key @@ -558,7 +556,7 @@ public void invalidate(K key, V value) { /** * Invalidate all cache entries. A removal notification will be issued for invalidated entries with - * {@link org.opensearch.common.cache.RemovalNotification.RemovalReason} INVALIDATED. + * {@link RemovalReason} INVALIDATED. */ public void invalidateAll() { Entry h; @@ -589,7 +587,7 @@ public void invalidateAll() { } } while (h != null) { - removalListener.onRemoval(new RemovalNotification<>(h.key, h.value, RemovalNotification.RemovalReason.INVALIDATED)); + removalListener.onRemoval(new RemovalNotification<>(h.key, h.value, RemovalReason.INVALIDATED)); h = h.after; } } @@ -707,7 +705,7 @@ public void remove() { segment.remove(entry.key, entry.value, f -> {}); try (ReleasableLock ignored = lruLock.acquire()) { current = null; - delete(entry, RemovalNotification.RemovalReason.INVALIDATED); + delete(entry, RemovalReason.INVALIDATED); } } } @@ -796,10 +794,10 @@ private void evictEntry(Entry entry) { if (segment != null) { segment.remove(entry.key, entry.value, f -> {}); } - delete(entry, RemovalNotification.RemovalReason.EVICTED); + delete(entry, RemovalReason.EVICTED); } - private void delete(Entry entry, RemovalNotification.RemovalReason removalReason) { + private void delete(Entry entry, RemovalReason removalReason) { assert lruLock.isHeldByCurrentThread(); if (unlink(entry)) { diff --git a/server/src/main/java/org/opensearch/common/cache/CacheUsage.java b/server/src/main/java/org/opensearch/common/cache/CacheUsage.java new file mode 100644 index 0000000000000..9a744b6a37da3 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/CacheUsage.java @@ -0,0 +1,41 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache; + +import org.opensearch.common.cache.refcounted.RefCountedCache; + +/** + * Usage metrics for {@link RefCountedCache} + * + * @opensearch.internal + */ +public class CacheUsage { + /** + * Cache usage of the system + */ + private final long usage; + + /** + * Cache usage by entries which are referenced + */ + private final long activeUsage; + + public CacheUsage(long usage, long activeUsage) { + this.usage = usage; + this.activeUsage = activeUsage; + } + + public long usage() { + return usage; + } + + public long activeUsage() { + return activeUsage; + } +} diff --git a/server/src/main/java/org/opensearch/common/cache/RemovalNotification.java b/server/src/main/java/org/opensearch/common/cache/RemovalNotification.java index 454f7b5a96a64..6d355b2122460 100644 --- a/server/src/main/java/org/opensearch/common/cache/RemovalNotification.java +++ b/server/src/main/java/org/opensearch/common/cache/RemovalNotification.java @@ -38,16 +38,6 @@ * @opensearch.internal */ public class RemovalNotification { - /** - * Reason for notification removal - * - * @opensearch.internal - */ - public enum RemovalReason { - REPLACED, - INVALIDATED, - EVICTED - } private final K key; private final V value; diff --git a/server/src/main/java/org/opensearch/common/cache/RemovalReason.java b/server/src/main/java/org/opensearch/common/cache/RemovalReason.java new file mode 100644 index 0000000000000..e5d795c093547 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/RemovalReason.java @@ -0,0 +1,22 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache; + +/** + * Reason for notification removal + * + * @opensearch.internal + */ +public enum RemovalReason { + REPLACED, + INVALIDATED, + EVICTED, + EXPLICIT, + CAPACITY +} diff --git a/server/src/main/java/org/opensearch/common/cache/Weigher.java b/server/src/main/java/org/opensearch/common/cache/Weigher.java new file mode 100644 index 0000000000000..98e40b0e61bc5 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/Weigher.java @@ -0,0 +1,27 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache; + +/** + * A class that can determine the weight of a value. The total weight threshold + * is used to determine when an eviction is required. + * + * @opensearch.internal + */ +public interface Weigher { + + /** + * Measures an object's weight to determine how many units of capacity that + * the value consumes. A value must consume a minimum of one unit. + * + * @param value the object to weigh + * @return the object's weight + */ + long weightOf(V value); +} diff --git a/server/src/main/java/org/opensearch/common/cache/refcounted/LRUCache.java b/server/src/main/java/org/opensearch/common/cache/refcounted/LRUCache.java new file mode 100644 index 0000000000000..3d89bad54ccfe --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/refcounted/LRUCache.java @@ -0,0 +1,427 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.refcounted; + +import org.opensearch.common.cache.CacheUsage; +import org.opensearch.common.cache.RemovalListener; +import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.RemovalReason; +import org.opensearch.common.cache.Weigher; +import org.opensearch.common.cache.refcounted.stats.CacheStats; +import org.opensearch.common.cache.refcounted.stats.DefaultStatsCounter; +import org.opensearch.common.cache.refcounted.stats.StatsCounter; +import org.opensearch.common.collect.Linked; +import org.opensearch.common.collect.LinkedDeque; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.BiFunction; + +import static org.opensearch.common.util.ValidationUtils.validateNotNull; + +/** + * LRU implementation of {@link RefCountedCache}. As long as {@link Node#refCount} greater than 0 then node is not eligible for eviction. + * So this is best effort lazy cache to maintain capacity. + * + * @see RefCountedCache + * + * @opensearch.internal + */ +class LRUCache implements RefCountedCache { + private long capacity; + + private HashMap> data; + + /** the LRU list */ + private LinkedDeque> lru; + + private volatile ReentrantLock lock; + + private RemovalListener listener; + + private Weigher weigher; + + /** + * this tracks cache usage on the system (as long as cache entry is in the cache) + */ + private long usage; + + /** + * this tracks cache usage only by entries which are being referred ({@link Node#refCount > 0}) + */ + private long activeUsage; + + private StatsCounter statsCounter; + + static class Node implements Linked> { + final K key; + + V value; + + long weight; + + Node prev; + + Node next; + + int refCount; + + Node(K key, V value, long weight) { + this.key = key; + this.value = value; + this.weight = weight; + this.prev = null; + this.next = null; + this.refCount = 0; + } + + public Node getPrevious() { + return prev; + } + + public void setPrevious(Node prev) { + this.prev = prev; + } + + public Node getNext() { + return next; + } + + public void setNext(Node next) { + this.next = next; + } + + public boolean evictable() { + return (refCount == 0); + } + + V getValue() { + return value; + } + } + + public LRUCache(long capacity, RemovalListener listener, Weigher weigher) { + this.capacity = capacity; + this.listener = listener; + this.weigher = weigher; + this.data = new HashMap<>(); + this.lru = new LinkedDeque<>(); + this.lock = new ReentrantLock(); + this.statsCounter = new DefaultStatsCounter<>(); + + } + + @Override + public V get(K key) { + validateNotNull(key); + final ReentrantLock lock = this.lock; + lock.lock(); + try { + Node node = data.get(key); + // miss + if (node == null) { + statsCounter.recordMisses(key, 1); + return null; + } + // hit + if (node.evictable()) { + lru.moveToBack(node); + } + statsCounter.recordHits(key, 1); + return node.value; + } finally { + lock.unlock(); + } + } + + /** + * If put a new item to the cache, it's zero referenced. + * Otherwise, just replace the node with new value and new weight. + */ + @Override + public V put(K key, V value) { + validateNotNull(key); + validateNotNull(value); + + final long weight = weigher.weightOf(value); + final ReentrantLock lock = this.lock; + lock.lock(); + try { + Node node = data.get(key); + if (node != null) { + final V oldValue = node.value; + final long oldWeight = node.weight; + // update the value and weight + node.value = value; + node.weight = weight; + // update usage + final long weightDiff = weight - oldWeight; + if (node.refCount > 0) { + activeUsage += weightDiff; + } + if (node.evictable()) { + lru.moveToBack(node); + } + usage += weightDiff; + // call listeners + statsCounter.recordReplacement(); + listener.onRemoval(new RemovalNotification<>(key, oldValue, RemovalReason.REPLACED)); + evict(); + return oldValue; + } else { + Node newNode = new Node<>(key, value, weight); + data.put(key, newNode); + lru.add(newNode); + usage += weight; + evict(); + return null; + } + } finally { + lock.unlock(); + } + } + + @Override + public void putAll(Map m) { + for (Map.Entry e : m.entrySet()) + put(e.getKey(), e.getValue()); + } + + @Override + public V computeIfPresent(K key, BiFunction remappingFunction) { + validateNotNull(key); + final ReentrantLock lock = this.lock; + lock.lock(); + try { + Node node = data.get(key); + if (node != null && node.value != null) { + V v = remappingFunction.apply(key, node.value); + if (v != null) { + final V oldValue = node.value; + final long oldWeight = node.weight; + final long weight = weigher.weightOf(v); + // update the value and weight + node.value = v; + node.weight = weight; + + // update usage + final long weightDiff = weight - oldWeight; + if (node.evictable()) { + lru.moveToBack(node); + } + + if (node.refCount > 0) { + activeUsage += weightDiff; + } + + usage += weightDiff; + statsCounter.recordHits(key, 1); + if (oldValue != node.value) { + statsCounter.recordReplacement(); + listener.onRemoval(new RemovalNotification<>(node.key, oldValue, RemovalReason.REPLACED)); + } + evict(); + return v; + } else { + // is v is null, remove the item + data.remove(key); + if (node.refCount > 0) { + activeUsage -= node.weight; + } + usage -= node.weight; + if (node.evictable()) { + lru.remove(node); + } + statsCounter.recordRemoval(node.weight); + listener.onRemoval(new RemovalNotification<>(node.key, node.value, RemovalReason.EXPLICIT)); + } + } + + statsCounter.recordMisses(key, 1); + return null; + } finally { + lock.unlock(); + } + } + + @Override + public void remove(K key) { + validateNotNull(key); + final ReentrantLock lock = this.lock; + lock.lock(); + try { + Node node = data.remove(key); + if (node != null) { + if (node.refCount > 0) { + activeUsage -= node.weight; + } + usage -= node.weight; + if (node.evictable()) { + lru.remove(node); + } + statsCounter.recordRemoval(node.weight); + listener.onRemoval(new RemovalNotification<>(node.key, node.value, RemovalReason.EXPLICIT)); + } + } finally { + lock.unlock(); + } + } + + @Override + public void removeAll(Iterable keys) { + for (K key : keys) { + remove(key); + } + } + + @Override + public void clear() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + usage = 0L; + activeUsage = 0L; + lru.clear(); + for (Node node : data.values()) { + data.remove(node.key); + statsCounter.recordRemoval(node.weight); + listener.onRemoval(new RemovalNotification<>(node.key, node.value, RemovalReason.EXPLICIT)); + } + } finally { + lock.unlock(); + } + } + + @Override + public long size() { + return data.size(); + } + + @Override + public void incRef(K key) { + validateNotNull(key); + final ReentrantLock lock = this.lock; + lock.lock(); + try { + Node node = data.get(key); + if (node != null) { + if (node.refCount == 0) { + // if it was inactive, we should add the weight to active usage from now + activeUsage += node.weight; + } + + if (node.evictable()) { + // since it become active, we should remove it from eviction list + lru.remove(node); + } + + node.refCount++; + } + + } finally { + lock.unlock(); + } + } + + @Override + public void decRef(K key) { + validateNotNull(key); + final ReentrantLock lock = this.lock; + lock.lock(); + try { + Node node = data.get(key); + if (node != null && node.refCount > 0) { + node.refCount--; + + if (node.evictable()) { + // if it becomes evictable, we should add it to eviction list + lru.add(node); + } + + if (node.refCount == 0) { + // if it was active, we should remove its weight from active usage + activeUsage -= node.weight; + } + } + } finally { + lock.unlock(); + } + } + + @Override + public long prune() { + long sum = 0L; + final ReentrantLock lock = this.lock; + lock.lock(); + try { + Node node = lru.peek(); + // If weighted values are used, then the pending operations will adjust + // the size to reflect the correct weight + while (node != null) { + data.remove(node.key, node); + sum += node.weight; + statsCounter.recordRemoval(node.weight); + listener.onRemoval(new RemovalNotification<>(node.key, node.value, RemovalReason.EXPLICIT)); + Node tmp = node; + node = node.getNext(); + lru.remove(tmp); + } + + usage -= sum; + } finally { + lock.unlock(); + } + return sum; + } + + @Override + public CacheUsage usage() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return new CacheUsage(usage, activeUsage); + } finally { + lock.unlock(); + } + } + + @Override + public CacheStats stats() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return statsCounter.snapshot(); + } finally { + lock.unlock(); + } + } + + boolean hasOverflowed() { + return usage >= capacity; + } + + void evict() { + // Attempts to evict entries from the cache if it exceeds the maximum + // capacity. + while (hasOverflowed()) { + final Node node = lru.poll(); + + if (node == null) { + return; + } + + // Notify the listener only if the entry was evicted + data.remove(node.key, node); + usage -= node.weight; + statsCounter.recordEviction(node.weight); + listener.onRemoval(new RemovalNotification<>(node.key, node.value, RemovalReason.CAPACITY)); + } + } +} diff --git a/server/src/main/java/org/opensearch/common/cache/refcounted/RefCountedCache.java b/server/src/main/java/org/opensearch/common/cache/refcounted/RefCountedCache.java new file mode 100644 index 0000000000000..ffe82cf080194 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/refcounted/RefCountedCache.java @@ -0,0 +1,106 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.refcounted; + +import org.opensearch.common.cache.CacheUsage; +import org.opensearch.common.cache.refcounted.stats.CacheStats; + +import java.util.Map; +import java.util.function.BiFunction; + +/** + * Custom Cache which support typical cache operations (put, get, ...) and it support reference counting per individual key which might + * change eviction behavior + * @param type of the key + * @param type of th value + * + * @opensearch.internal + */ +public interface RefCountedCache { + + /** + * Returns the value associated with {@code key} in this cache, or {@code null} if there is no + * cached value for {@code key}. + */ + V get(K key); + + /** + * Associates {@code value} with {@code key} in this cache. If the cache previously contained a + * value associated with {@code key}, the old value is replaced by {@code value}. + */ + V put(K key, V value); + + /** + * Copies all the mappings from the specified map to the cache. The effect of this call is + * equivalent to that of calling {@code put(k, v)} on this map once for each mapping from key + * {@code k} to value {@code v} in the specified map. The behavior of this operation is undefined + * if the specified map is modified while the operation is in progress. + */ + void putAll(Map m); + + /** + * If the specified key is already associated with a value, attempts to update its value using the given mapping + * function and enters the new value into this map unless null. + * + * If the specified key is NOT already associated with a value, return null without applying the mapping function. + * + * The remappingFunction method for a given key will be invoked at most once. + */ + V computeIfPresent(K key, BiFunction remappingFunction); + + /** + * Discards any cached value for key {@code key}. + */ + void remove(K key); + + /** + * Discards any cached values for keys {@code keys}. + */ + void removeAll(Iterable keys); + + /** + * Discards all entries in the cache. + */ + void clear(); + + /** + * Returns the approximate number of entries in this cache. + */ + long size(); + + /** + * increment references count for key {@code key}. + */ + void incRef(K key); + + /** + * decrement references count for key {@code key}. + */ + void decRef(K key); + + long prune(); + + /** + * Returns the weighted usage of this cache. + * + * @return the combined weight of the values in this cache + */ + CacheUsage usage(); + + /** + * Returns a current snapshot of this cache's cumulative statistics. All statistics are + * initialized to zero, and are monotonically increasing over the lifetime of the cache. + *

+ * Due to the performance penalty of maintaining statistics, some implementations may not record + * the usage history immediately or at all. + * + * @return the current snapshot of the statistics of this cache + */ + CacheStats stats(); +} diff --git a/server/src/main/java/org/opensearch/common/cache/refcounted/SegmentedCache.java b/server/src/main/java/org/opensearch/common/cache/refcounted/SegmentedCache.java new file mode 100644 index 0000000000000..584b9d1c65253 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/refcounted/SegmentedCache.java @@ -0,0 +1,312 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.refcounted; + +import org.opensearch.common.cache.CacheUsage; +import org.opensearch.common.cache.RemovalListener; +import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.Weigher; +import org.opensearch.common.cache.refcounted.stats.CacheStats; + +import java.util.Map; +import java.util.function.BiFunction; + +import static org.opensearch.common.util.ValidationUtils.checkArgument; +import static org.opensearch.common.util.ValidationUtils.validateNotNull; + +/** + * Segmented {@link LRUCache} to offer concurrent access with less contention. + * @param type of the key + * @param type of th value + * + * @opensearch.internal + */ +public class SegmentedCache implements RefCountedCache { + + static final int HASH_BITS = 0x7fffffff; // usable bits of normal ActivenessAwareCache hash + + static final int ceilingNextPowerOfTwo(int x) { + // From Hacker's Delight, Chapter 3, Harry S. Warren Jr. + return 1 << (Integer.SIZE - Integer.numberOfLeadingZeros(x - 1)); + } + + long capacity; + + /** + * The array of bins. Size is always a power of two. + */ + final RefCountedCache[] table; + + /** + * Mask value for indexing into segments. + */ + final int segmentMask; + + public SegmentedCache(Builder builder) { + this.capacity = builder.capacity; + final int segments = ceilingNextPowerOfTwo(builder.concurrencyLevel); + this.segmentMask = segments - 1; + this.table = newSegmentArray(segments); + final long perStripeCapacity = (capacity + (segments - 1)) / segments; + for (int i = 0; i < table.length; i++) { + table[i] = new LRUCache<>(perStripeCapacity, builder.listener, builder.weigher); + } + } + + @SuppressWarnings("unchecked") + final RefCountedCache[] newSegmentArray(int size) { + return new RefCountedCache[size]; + } + + RefCountedCache segmentFor(K key) { + int h = key.hashCode(); + // Spreads (XORs) higher bits of hash to lower and also forces top + // bit to 0. Because the table uses power-of-two masking, sets of + // hashes that vary only in bits above the current mask will + // always collide. (Among known examples are sets of Float keys + // holding consecutive whole numbers in small tables.) So we + // apply a transform that spreads the impact of higher bits + // downward. There is a tradeoff between speed, utility, and + // quality of bit-spreading. Because many common sets of hashes + // are already reasonably distributed (so don't benefit from + // spreading), and because we use trees to handle large sets of + // collisions in bins, we just XOR some shifted bits in the + // cheapest possible way to reduce systematic lossage, as well as + // to incorporate impact of the highest bits that would otherwise + // never be used in index calculations because of table bounds. + return table[(h ^ (h >>> 16)) & HASH_BITS & segmentMask]; + } + + public long capacity() { + return capacity; + } + + @Override + public V get(K key) { + if (key == null) throw new NullPointerException(); + return segmentFor(key).get(key); + } + + @Override + public V put(K key, V value) { + if (key == null || value == null) throw new NullPointerException(); + return segmentFor(key).put(key, value); + } + + @Override + public void putAll(Map m) { + for (Map.Entry e : m.entrySet()) + put(e.getKey(), e.getValue()); + } + + @Override + public V computeIfPresent(K key, BiFunction remappingFunction) { + if (key == null || remappingFunction == null) throw new NullPointerException(); + return segmentFor(key).computeIfPresent(key, remappingFunction); + } + + @Override + public void remove(K key) { + if (key == null) throw new NullPointerException(); + segmentFor(key).remove(key); + } + + @Override + public void removeAll(Iterable keys) { + for (K k : keys) + remove(k); + } + + @Override + public void clear() { + + } + + @Override + public long size() { + long size = 0; + for (RefCountedCache cache : table) { + size += cache.size(); + } + return size; + } + + @Override + public void incRef(K key) { + if (key == null) throw new NullPointerException(); + segmentFor(key).incRef(key); + } + + @Override + public void decRef(K key) { + if (key == null) throw new NullPointerException(); + segmentFor(key).decRef(key); + } + + @Override + public long prune() { + long sum = 0L; + for (RefCountedCache cache : table) { + sum += cache.prune(); + } + return sum; + } + + @Override + public CacheUsage usage() { + long usage = 0L; + long activeUsage = 0L; + for (RefCountedCache cache : table) { + CacheUsage c = cache.usage(); + usage += c.usage(); + activeUsage += c.activeUsage(); + } + return new CacheUsage(usage, activeUsage); + } + + @Override + public CacheStats stats() { + long hitCount = 0L; + long missCount = 0L; + long removeCount = 0L; + long removeWeight = 0L; + long replaceCount = 0L; + long evictionCount = 0L; + long evictionWeight = 0L; + + for (RefCountedCache cache : table) { + CacheStats c = cache.stats(); + hitCount += c.hitCount(); + missCount += c.missCount(); + removeCount += c.removeCount(); + removeWeight += c.removeWeight(); + replaceCount += c.replaceCount(); + evictionCount += c.evictionCount(); + evictionWeight += c.evictionWeight(); + } + return new CacheStats(hitCount, missCount, removeCount, removeWeight, replaceCount, evictionCount, evictionWeight); + } + + enum SingletonWeigher implements Weigher { + INSTANCE; + + @Override + public long weightOf(Object value) { + return 1; + } + } + + /** + * A listener that ignores all notifications. + */ + enum DiscardingListener implements RemovalListener { + INSTANCE; + + @Override + public void onRemoval(RemovalNotification notification) { + + } + } + + /** + * A builder that creates {@link SegmentedCache} instances. It + * provides a flexible approach for constructing customized instances with + * a named parameter syntax. + */ + public static final class Builder { + + static final int DEFAULT_CONCURRENCY_LEVEL = Runtime.getRuntime().availableProcessors(); + + RemovalListener listener; + Weigher weigher; + + int concurrencyLevel; + + long capacity; + + @SuppressWarnings("unchecked") + Builder() { + capacity = -1; + weigher = (Weigher) SingletonWeigher.INSTANCE; + concurrencyLevel = DEFAULT_CONCURRENCY_LEVEL; + listener = (RemovalListener) DiscardingListener.INSTANCE; + } + + /** + * Specifies the maximum weighted capacity to coerce the map to and may + * exceed it temporarily. + * + * @param capacity the weighted threshold to bound the map by + * @throws IllegalArgumentException if the maximumWeightedCapacity is + * negative + */ + public Builder capacity(long capacity) { + checkArgument(capacity >= 0); + this.capacity = capacity; + return this; + } + + /** + * Specifies the estimated number of concurrently updating threads. The + * implementation performs internal sizing to try to accommodate this many + * threads (default Runtime.getRuntime().availableProcessors()). + * + * @param concurrencyLevel the estimated number of concurrently updating + * threads + * @throws IllegalArgumentException if the concurrencyLevel is less than or + * equal to zero + */ + public Builder concurrencyLevel(int concurrencyLevel) { + checkArgument(concurrencyLevel > 0); + this.concurrencyLevel = concurrencyLevel; + return this; + } + + /** + * Specifies an optional listener that is registered for notification when + * an entry is removed. + * + * @param listener the object to forward removed entries to + * @throws NullPointerException if the listener is null + */ + public Builder listener(RemovalListener listener) { + validateNotNull(listener); + this.listener = listener; + return this; + } + + /** + * Specifies an algorithm to determine how many the units of capacity a + * value consumes. The default algorithm bounds the map by the number of + * key-value pairs by giving each entry a weight of 1. + * + * @param weigher the algorithm to determine a value's weight + * @throws NullPointerException if the weigher is null + */ + public Builder weigher(Weigher weigher) { + validateNotNull(weigher); + this.weigher = weigher; + return this; + } + + /** + * Creates a new {@link SegmentedCache} instance. + * + * @throws IllegalStateException if the maximum weighted capacity was + * not set + */ + public SegmentedCache build() { + return new SegmentedCache<>(this); + } + } + + public static Builder builder() { + return new Builder<>(); + } +} diff --git a/server/src/main/java/org/opensearch/common/cache/refcounted/package-info.java b/server/src/main/java/org/opensearch/common/cache/refcounted/package-info.java new file mode 100644 index 0000000000000..d9e1b356a4e1f --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/refcounted/package-info.java @@ -0,0 +1,13 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Reference counted cache which is a cache which takes into consideration reference count per cache entry and it's eviction policy + * depends on current reference count + */ +package org.opensearch.common.cache.refcounted; diff --git a/server/src/main/java/org/opensearch/common/cache/refcounted/stats/CacheStats.java b/server/src/main/java/org/opensearch/common/cache/refcounted/stats/CacheStats.java new file mode 100644 index 0000000000000..6e388cd36b59b --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/refcounted/stats/CacheStats.java @@ -0,0 +1,277 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.refcounted.stats; + +import org.opensearch.common.cache.refcounted.RefCountedCache; + +import java.util.Objects; + +/** + * Statistics about the performance of a {@link RefCountedCache}. + * + * @opensearch.internal + */ +public final class CacheStats { + private static final CacheStats EMPTY_STATS = new CacheStats(0, 0, 0, 0, 0, 0, 0); + + private final long hitCount; + private final long missCount; + private final long removeCount; + private final long removeWeight; + private final long replaceCount; + private final long evictionCount; + private final long evictionWeight; + + /** + * Constructs a new {@code CacheStats} instance. + *

+ * Many parameters of the same type in a row is a bad thing, but this class is not constructed + * by end users and is too fine-grained for a builder. + * + * @param hitCount the number of cache hits + * @param missCount the number of cache misses* + * @param removeCount the number of entries removed from the cache + * @param removeWeight the sum of weights of entries removed from the cache + * @param replaceCount the number of entries replaced explicitly from the cache + * @param evictionCount the number of entries evicted from the cache + * @param evictionWeight the sum of weights of entries evicted from the cache + */ + public CacheStats( + long hitCount, + long missCount, + long removeCount, + long removeWeight, + long replaceCount, + long evictionCount, + long evictionWeight + ) { + if ((hitCount < 0) + || (missCount < 0) + || (removeCount < 0) + || (removeWeight < 0) + || (replaceCount < 0) + || (evictionCount < 0) + || (evictionWeight < 0)) { + throw new IllegalArgumentException(); + } + this.hitCount = hitCount; + this.missCount = missCount; + this.removeCount = removeCount; + this.removeWeight = removeWeight; + this.replaceCount = replaceCount; + this.evictionCount = evictionCount; + this.evictionWeight = evictionWeight; + } + + /** + * Returns a statistics instance where no cache events have been recorded. + * + * @return an empty statistics instance + */ + public static CacheStats empty() { + return EMPTY_STATS; + } + + /** + * Returns the number of times {@link RefCountedCache} lookup methods have returned either a cached or + * uncached value. This is defined as {@code hitCount + missCount}. + * + * @return the {@code hitCount + missCount} + */ + public long requestCount() { + return hitCount + missCount; + } + + /** + * Returns the number of times {@link RefCountedCache} lookup methods have returned a cached value. + * + * @return the number of times {@link RefCountedCache} lookup methods have returned a cached value + */ + public long hitCount() { + return hitCount; + } + + /** + * Returns the ratio of cache requests which were hits. This is defined as + * {@code hitCount / requestCount}, or {@code 1.0} when {@code requestCount == 0}. Note that + * {@code hitRate + missRate =~ 1.0}. + * + * @return the ratio of cache requests which were hits + */ + public double hitRate() { + long requestCount = requestCount(); + return (requestCount == 0) ? 1.0 : (double) hitCount / requestCount; + } + + /** + * Returns the number of times {@link RefCountedCache} lookup methods have returned an uncached (newly + * loaded) value, or null. Multiple concurrent calls to {@link RefCountedCache} lookup methods on an absent + * value can result in multiple misses, all returning the results of a single cache load + * operation. + * + * @return the number of times {@link RefCountedCache} lookup methods have returned an uncached (newly + * loaded) value, or null + */ + public long missCount() { + return missCount; + } + + /** + * Returns the ratio of cache requests which were misses. This is defined as + * {@code missCount / requestCount}, or {@code 0.0} when {@code requestCount == 0}. + * Note that {@code hitRate + missRate =~ 1.0}. Cache misses include all requests which + * weren't cache hits, including requests which resulted in either successful or failed loading + * attempts, and requests which waited for other threads to finish loading. It is thus the case + * that {@code missCount >= loadSuccessCount + loadFailureCount}. Multiple + * concurrent misses for the same key will result in a single load operation. + * + * @return the ratio of cache requests which were misses + */ + public double missRate() { + long requestCount = requestCount(); + return (requestCount == 0) ? 0.0 : (double) missCount / requestCount; + } + + /** + * Returns the number of times an entry has been removed explicitly. + * + * @return the number of times an entry has been removed + */ + public long removeCount() { + return removeCount; + } + + /** + * Returns the sum of weights of explicitly removed entries. + * + * @return the sum of weights of explicitly removed entries + */ + public long removeWeight() { + return removeWeight; + } + + /** + * Returns the number of times an entry has been replaced. + * + * @return the number of times an entry has been replaced + */ + public long replaceCount() { + return replaceCount; + } + + /** + * Returns the number of times an entry has been evicted. This count does not include manual + * {@linkplain RefCountedCache#remove removals}. + * + * @return the number of times an entry has been evicted + */ + public long evictionCount() { + return evictionCount; + } + + /** + * Returns the sum of weights of evicted entries. This total does not include manual + * {@linkplain RefCountedCache#remove removals}. + * + * @return the sum of weights of evicted entities + */ + public long evictionWeight() { + return evictionWeight; + } + + /** + * Returns a new {@code CacheStats} representing the difference between this {@code CacheStats} + * and {@code other}. Negative values, which aren't supported by {@code CacheStats} will be + * rounded up to zero. + * + * @param other the statistics to subtract with + * @return the difference between this instance and {@code other} + */ + + public CacheStats minus(CacheStats other) { + return new CacheStats( + Math.max(0L, hitCount - other.hitCount), + Math.max(0L, missCount - other.missCount), + Math.max(0L, removeCount - other.removeCount), + Math.max(0L, removeWeight - other.removeWeight), + Math.max(0L, replaceCount - other.replaceCount), + Math.max(0L, evictionCount - other.evictionCount), + Math.max(0L, evictionWeight - other.evictionWeight) + ); + } + + /** + * Returns a new {@code CacheStats} representing the sum of this {@code CacheStats} and + * {@code other}. + * + * @param other the statistics to add with + * @return the sum of the statistics + */ + + public CacheStats plus(CacheStats other) { + return new CacheStats( + hitCount + other.hitCount, + missCount + other.missCount, + removeCount + other.removeCount, + removeWeight + other.removeWeight, + replaceCount + other.replaceCount, + evictionCount + other.evictionCount, + evictionWeight + other.evictionWeight + ); + } + + @Override + public int hashCode() { + return Objects.hash(hitCount, missCount, removeCount, removeWeight, replaceCount, evictionCount, evictionWeight); + } + + @Override + public boolean equals(Object o) { + if (o == this) { + return true; + } else if (!(o instanceof CacheStats)) { + return false; + } + CacheStats other = (CacheStats) o; + return hitCount == other.hitCount + && missCount == other.missCount + && removeCount == other.removeCount + && removeWeight == other.removeWeight + && replaceCount == other.replaceCount + && evictionCount == other.evictionCount + && evictionWeight == other.evictionWeight; + } + + @Override + public String toString() { + return getClass().getSimpleName() + + '{' + + "hitCount=" + + hitCount + + ", " + + "missCount=" + + missCount + + ", " + + "removeCount=" + + removeCount + + ", " + + "removeWeight=" + + removeWeight + + ", " + + "replaceCount=" + + replaceCount + + ", " + + "evictionCount=" + + evictionCount + + ", " + + "evictionWeight=" + + evictionWeight + + '}'; + } +} diff --git a/server/src/main/java/org/opensearch/common/cache/refcounted/stats/DefaultStatsCounter.java b/server/src/main/java/org/opensearch/common/cache/refcounted/stats/DefaultStatsCounter.java new file mode 100644 index 0000000000000..e37172dcdacab --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/refcounted/stats/DefaultStatsCounter.java @@ -0,0 +1,86 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.refcounted.stats; + +/** + * A non thread-safe {@link StatsCounter} implementation. + * + * @opensearch.internal + */ +public class DefaultStatsCounter implements StatsCounter { + public static final DefaultStatsCounter INSTANCE = new DefaultStatsCounter(); + + private long hitCount; + private long missCount; + private long removeCount; + private long removeWeight; + private long replaceCount; + private long evictionCount; + private long evictionWeight; + + public DefaultStatsCounter() { + this.hitCount = 0L; + this.missCount = 0L; + this.removeCount = 0L; + this.removeWeight = 0L; + this.replaceCount = 0L; + this.evictionCount = 0L; + this.evictionWeight = 0L; + } + + @Override + public void recordHits(K key, int count) { + hitCount += count; + } + + @Override + public void recordMisses(K key, int count) { + missCount += count; + } + + @Override + public void recordRemoval(long weight) { + removeCount++; + removeWeight += weight; + } + + @Override + public void recordReplacement() { + replaceCount++; + } + + @Override + public void recordEviction(long weight) { + evictionCount++; + evictionWeight += weight; + } + + @Override + public CacheStats snapshot() { + return new CacheStats(hitCount, missCount, removeCount, removeWeight, replaceCount, evictionCount, evictionWeight); + } + + /** + * Increments all counters by the values in {@code other}. + * + * @param other the counter to increment from + */ + public void incrementBy(StatsCounter other) { + CacheStats otherStats = other.snapshot(); + hitCount += otherStats.hitCount(); + missCount += otherStats.missCount(); + evictionCount += otherStats.evictionCount(); + evictionWeight += otherStats.evictionWeight(); + } + + @Override + public String toString() { + return snapshot().toString(); + } +} diff --git a/server/src/main/java/org/opensearch/common/cache/refcounted/stats/StatsCounter.java b/server/src/main/java/org/opensearch/common/cache/refcounted/stats/StatsCounter.java new file mode 100644 index 0000000000000..f89f9b8e3f255 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/refcounted/stats/StatsCounter.java @@ -0,0 +1,86 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.refcounted.stats; + +import org.opensearch.common.cache.refcounted.RefCountedCache; + +import java.util.function.BiFunction; + +/** + * Accumulates statistics during the operation of a {@link RefCountedCache} for presentation by + * {@link RefCountedCache#stats}. This is solely intended for consumption by {@code Cache} implementors. + * + * @opensearch.internal + */ +public interface StatsCounter { + + /** + * Records cache hits. This should be called when a cache request returns a cached value. + * + * @param count the number of hits to record + */ + void recordHits(K key, int count); + + /** + * Records cache misses. This should be called when a cache request returns a value that was not + * found in the cache. This method should be called by the loading thread, as well as by threads + * blocking on the load. Multiple concurrent calls to {@link RefCountedCache} lookup methods with the same + * key on an absent value should result in a single call to either {@code recordLoadSuccess} or + * {@code recordLoadFailure} and multiple calls to this method, despite all being served by the + * results of a single load operation. + * + * @param count the number of misses to record + */ + void recordMisses(K key, int count); + + /** + * Records the explicit removal of an entry from the cache. This should only been called when an entry is + * removed as a result of manual + * {@link RefCountedCache#remove(Object)} + * {@link RefCountedCache#computeIfPresent(Object, BiFunction)} + * + * @param weight the weight of the removed entry + */ + void recordRemoval(long weight); + + /** + * Records the replacement of an entry from the cache. This should only been called when an entry is + * replaced as a result of manual + * {@link RefCountedCache#put(Object, Object)} + * {@link RefCountedCache#computeIfPresent(Object, BiFunction)} + */ + void recordReplacement(); + + /** + * Records the eviction of an entry from the cache. This should only been called when an entry is + * evicted due to the cache's eviction strategy, and not as a result of manual + * {@link RefCountedCache#remove(Object)} removals}. + * + * @param weight the weight of the evicted entry + */ + void recordEviction(long weight); + + /** + * Returns a snapshot of this counter's values. Note that this may be an inconsistent view, as it + * may be interleaved with update operations. + * + * @return a snapshot of this counter's values + */ + + CacheStats snapshot(); + + /** + * Returns an accumulator that does not record any cache events. + * + * @return an accumulator that does not record metrics + */ + static StatsCounter defaultStatsCounter() { + return DefaultStatsCounter.INSTANCE; + } +} diff --git a/server/src/main/java/org/opensearch/common/cache/refcounted/stats/package-info.java b/server/src/main/java/org/opensearch/common/cache/refcounted/stats/package-info.java new file mode 100644 index 0000000000000..d8c216724d368 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/refcounted/stats/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Cache stats for reference counted cache + */ +package org.opensearch.common.cache.refcounted.stats; diff --git a/server/src/main/java/org/opensearch/common/collect/Linked.java b/server/src/main/java/org/opensearch/common/collect/Linked.java new file mode 100644 index 0000000000000..bd9c2e6da2b26 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/collect/Linked.java @@ -0,0 +1,41 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.collect; + +import java.util.Deque; + +/** + * An element that is linked on the {@link Deque}. + * + * @opensearch.internal + */ +public interface Linked> { + + /** + * Retrieves the previous element or null if either the element is + * unlinked or the first element on the deque. + */ + T getPrevious(); + + /** + * Sets the previous element or null if there is no link. + */ + void setPrevious(T prev); + + /** + * Retrieves the next element or null if either the element is + * unlinked or the last element on the deque. + */ + T getNext(); + + /** + * Sets the next element or null if there is no link. + */ + void setNext(T next); +} diff --git a/server/src/main/java/org/opensearch/common/collect/LinkedDeque.java b/server/src/main/java/org/opensearch/common/collect/LinkedDeque.java new file mode 100644 index 0000000000000..8ac86660440ad --- /dev/null +++ b/server/src/main/java/org/opensearch/common/collect/LinkedDeque.java @@ -0,0 +1,432 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.collect; + +import java.util.AbstractCollection; +import java.util.Collection; +import java.util.Deque; +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * Linked list implementation of the {@link Deque} interface where the link + * pointers are tightly integrated with the element. Linked deques have no + * capacity restrictions; they grow as necessary to support usage. They are not + * thread-safe; in the absence of external synchronization, they do not support + * concurrent access by multiple threads. Null elements are prohibited. + *

+ * Most LinkedDeque operations run in constant time by assuming that + * the {@link Linked} parameter is associated with the deque instance. Any usage + * that violates this assumption will result in non-deterministic behavior. + *

+ * The iterators returned by this class are not fail-fast: If + * the deque is modified at any time after the iterator is created, the iterator + * will be in an unknown state. Thus, in the face of concurrent modification, + * the iterator risks arbitrary, non-deterministic behavior at an undetermined + * time in the future. + * + * @param the type of elements held in this collection + * + * @opensearch.internal + */ +public final class LinkedDeque> extends AbstractCollection implements Deque { + // This class provides a doubly-linked list that is optimized for the virtual + // machine. The first and last elements are manipulated instead of a slightly + // more convenient sentinel element to avoid the insertion of null checks with + // NullPointerException throws in the byte code. The links to a removed + // element are cleared to help a generational garbage collector if the + // discarded elements inhabit more than one generation. + + /** + * Pointer to first node. + * Invariant: (first == null && last == null) || + * (first.prev == null) + */ + E first; + + /** + * Pointer to last node. + * Invariant: (first == null && last == null) || + * (last.next == null) + */ + E last; + + /** + * Links the element to the front of the deque so that it becomes the first + * element. + * + * @param e the unlinked element + */ + void linkFirst(final E e) { + final E f = first; + first = e; + + if (f == null) { + last = e; + } else { + f.setPrevious(e); + e.setNext(f); + } + } + + /** + * Links the element to the back of the deque so that it becomes the last + * element. + * + * @param e the unlinked element + */ + void linkLast(final E e) { + final E l = last; + last = e; + + if (l == null) { + first = e; + } else { + l.setNext(e); + e.setPrevious(l); + } + } + + /** + * Unlinks the non-null first element. + */ + E unlinkFirst() { + final E f = first; + final E next = f.getNext(); + f.setNext(null); + + first = next; + if (next == null) { + last = null; + } else { + next.setPrevious(null); + } + return f; + } + + /** + * Unlinks the non-null last element. + */ + E unlinkLast() { + final E l = last; + final E prev = l.getPrevious(); + l.setPrevious(null); + last = prev; + if (prev == null) { + first = null; + } else { + prev.setNext(null); + } + return l; + } + + /** + * Unlinks the non-null element. + */ + void unlink(E e) { + final E prev = e.getPrevious(); + final E next = e.getNext(); + + if (prev == null) { + first = next; + } else { + prev.setNext(next); + e.setPrevious(null); + } + + if (next == null) { + last = prev; + } else { + next.setPrevious(prev); + e.setNext(null); + } + } + + @Override + public boolean isEmpty() { + return (first == null); + } + + void checkNotEmpty() { + if (isEmpty()) { + throw new NoSuchElementException(); + } + } + + /** + * {@inheritDoc} + *

+ * Beware that, unlike in most collections, this method is NOT a + * constant-time operation. + */ + @Override + public int size() { + int size = 0; + for (E e = first; e != null; e = e.getNext()) { + size++; + } + return size; + } + + @Override + public void clear() { + for (E e = first; e != null;) { + E next = e.getNext(); + e.setPrevious(null); + e.setNext(null); + e = next; + } + first = last = null; + } + + @Override + public boolean contains(Object o) { + return (o instanceof Linked) && contains((Linked) o); + } + + // A fast-path containment check + boolean contains(Linked e) { + return (e.getPrevious() != null) || (e.getNext() != null) || (e == first); + } + + /** + * Moves the element to the front of the deque so that it becomes the first + * element. + * + * @param e the linked element + */ + public void moveToFront(E e) { + if (e != first) { + unlink(e); + linkFirst(e); + } + } + + /** + * Moves the element to the back of the deque so that it becomes the last + * element. + * + * @param e the linked element + */ + public void moveToBack(E e) { + if (e != last) { + unlink(e); + linkLast(e); + } + } + + @Override + public E peek() { + return peekFirst(); + } + + @Override + public E peekFirst() { + return first; + } + + @Override + public E peekLast() { + return last; + } + + @Override + public E getFirst() { + checkNotEmpty(); + return peekFirst(); + } + + @Override + public E getLast() { + checkNotEmpty(); + return peekLast(); + } + + @Override + public E element() { + return getFirst(); + } + + @Override + public boolean offer(E e) { + return offerLast(e); + } + + @Override + public boolean offerFirst(E e) { + if (contains(e)) { + return false; + } + linkFirst(e); + return true; + } + + @Override + public boolean offerLast(E e) { + if (contains(e)) { + return false; + } + linkLast(e); + return true; + } + + @Override + public boolean add(E e) { + return offerLast(e); + } + + @Override + public void addFirst(E e) { + if (!offerFirst(e)) { + throw new IllegalArgumentException(); + } + } + + @Override + public void addLast(E e) { + if (!offerLast(e)) { + throw new IllegalArgumentException(); + } + } + + @Override + public E poll() { + return pollFirst(); + } + + @Override + public E pollFirst() { + return isEmpty() ? null : unlinkFirst(); + } + + @Override + public E pollLast() { + return isEmpty() ? null : unlinkLast(); + } + + @Override + public E remove() { + return removeFirst(); + } + + @Override + @SuppressWarnings("unchecked") + public boolean remove(Object o) { + return (o instanceof Linked) && remove((E) o); + } + + // A fast-path removal + boolean remove(E e) { + if (contains(e)) { + unlink(e); + return true; + } + return false; + } + + @Override + public E removeFirst() { + checkNotEmpty(); + return pollFirst(); + } + + @Override + public boolean removeFirstOccurrence(Object o) { + return remove(o); + } + + @Override + public E removeLast() { + checkNotEmpty(); + return pollLast(); + } + + @Override + public boolean removeLastOccurrence(Object o) { + return remove(o); + } + + @Override + public boolean removeAll(Collection c) { + boolean modified = false; + for (Object o : c) { + modified |= remove(o); + } + return modified; + } + + @Override + public void push(E e) { + addFirst(e); + } + + @Override + public E pop() { + return removeFirst(); + } + + @Override + public Iterator iterator() { + return new AbstractLinkedIterator(first) { + @Override + E computeNext() { + return cursor.getNext(); + } + }; + } + + @Override + public Iterator descendingIterator() { + return new AbstractLinkedIterator(last) { + @Override + E computeNext() { + return cursor.getPrevious(); + } + }; + } + + abstract class AbstractLinkedIterator implements Iterator { + E cursor; + + /** + * Creates an iterator that can can traverse the deque. + * + * @param start the initial element to begin traversal from + */ + AbstractLinkedIterator(E start) { + cursor = start; + } + + @Override + public boolean hasNext() { + return (cursor != null); + } + + @Override + public E next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + E e = cursor; + cursor = computeNext(); + return e; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + /** + * Retrieves the next element to traverse to or null if there are + * no more elements. + */ + abstract E computeNext(); + } +} diff --git a/server/src/main/java/org/opensearch/common/util/ThrowableTranslateUtils.java b/server/src/main/java/org/opensearch/common/util/ThrowableTranslateUtils.java new file mode 100644 index 0000000000000..ad9a6096de7d9 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/util/ThrowableTranslateUtils.java @@ -0,0 +1,77 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.util; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.common.CheckedRunnable; +import org.opensearch.common.CheckedSupplier; +import org.opensearch.index.store.remote.fc.FileCache; + +import java.io.IOException; +import java.io.UncheckedIOException; + +/** + * Utils to translate/cast throwable instances + * + * @opensearch.internal + */ +final public class ThrowableTranslateUtils { + private static final Logger logger = LogManager.getLogger(FileCache.class); + + /** + * Run passed runnable and catch exception and translate exception into runtime exception using + * {@link ThrowableTranslateUtils#asRuntimeException(Throwable)} + * @param supplier to run + */ + public static R catchAsRuntimeException(CheckedSupplier supplier) { + try { + return supplier.get(); + } catch (Exception e) { + logger.error(() -> new ParameterizedMessage("translating exception into runtime exception"), e); + throw asRuntimeException(e); + } + } + + /** + * Run passed runnable and catch exception and translate exception into runtime exception using + * {@link ThrowableTranslateUtils#asRuntimeException(Throwable)} + * @param runnable to run + */ + public static void catchAsRuntimeException(CheckedRunnable runnable) { + try { + runnable.run(); + } catch (Exception e) { + logger.error(() -> new ParameterizedMessage("translating exception into runtime exception"), e); + throw asRuntimeException(e); + } + } + + /** + * Translate throwable into {@link RuntimeException} except if throwable is {@link InterruptedException} then it will interrupt current + * thread + * @param throwable to be translated + * @return instance of {@link RuntimeException} + */ + public static RuntimeException asRuntimeException(Throwable throwable) { + if (throwable instanceof RuntimeException) { + return (RuntimeException) throwable; + } + if (throwable instanceof IOException) { + return new UncheckedIOException((IOException) throwable); + } + if (throwable instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + return new RuntimeException(throwable); + } + + private ThrowableTranslateUtils() {} +} diff --git a/server/src/main/java/org/opensearch/common/util/ValidationUtils.java b/server/src/main/java/org/opensearch/common/util/ValidationUtils.java new file mode 100644 index 0000000000000..bad1dd180f40f --- /dev/null +++ b/server/src/main/java/org/opensearch/common/util/ValidationUtils.java @@ -0,0 +1,41 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.util; + +/** + * Utils to do validations + * + * @opensearch.internal + */ +public final class ValidationUtils { + /** + * Asserts that the given object is non-null and returns it. + * + * @param reference Object to assert on + * @return Object if non-null + * @throws IllegalArgumentException If object was null + */ + public static T validateNotNull(T reference) { + if (reference == null) { + throw new NullPointerException(); + } + return reference; + } + + /** + * Ensures that the argument expression is true. + */ + public static void checkArgument(boolean expression) { + if (!expression) { + throw new IllegalArgumentException(); + } + } + + private ValidationUtils() {} +} diff --git a/server/src/main/java/org/opensearch/index/IndexModule.java b/server/src/main/java/org/opensearch/index/IndexModule.java index 9f7e3e9fb5eee..20109a2ef0c38 100644 --- a/server/src/main/java/org/opensearch/index/IndexModule.java +++ b/server/src/main/java/org/opensearch/index/IndexModule.java @@ -71,6 +71,7 @@ import org.opensearch.index.similarity.SimilarityService; import org.opensearch.index.store.FsDirectoryFactory; import org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory; +import org.opensearch.index.store.remote.fc.FileCache; import org.opensearch.indices.IndicesQueryCache; import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; @@ -649,7 +650,8 @@ private void ensureNotFrozen() { public static Map createBuiltInDirectoryFactories( Supplier repositoriesService, - ThreadPool threadPool + ThreadPool threadPool, + FileCache remoteStoreFileCache ) { final Map factories = new HashMap<>(); for (Type type : Type.values()) { @@ -662,7 +664,10 @@ public static Map createBuiltInDirect factories.put(type.getSettingsKey(), DEFAULT_DIRECTORY_FACTORY); break; case REMOTE_SNAPSHOT: - factories.put(type.getSettingsKey(), new RemoteSnapshotDirectoryFactory(repositoriesService, threadPool)); + factories.put( + type.getSettingsKey(), + new RemoteSnapshotDirectoryFactory(repositoriesService, threadPool, remoteStoreFileCache) + ); break; default: throw new IllegalStateException("No directory factory mapping for built-in type " + type); diff --git a/server/src/main/java/org/opensearch/index/store/remote/directory/RemoteSnapshotDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/remote/directory/RemoteSnapshotDirectoryFactory.java index fed1f127d113f..7f21feeaafe5a 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/directory/RemoteSnapshotDirectoryFactory.java +++ b/server/src/main/java/org/opensearch/index/store/remote/directory/RemoteSnapshotDirectoryFactory.java @@ -22,6 +22,7 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.ShardPath; import org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; +import org.opensearch.index.store.remote.fc.FileCache; import org.opensearch.index.store.remote.utils.TransferManager; import org.opensearch.plugins.IndexStorePlugin; import org.opensearch.repositories.RepositoriesService; @@ -42,9 +43,16 @@ public final class RemoteSnapshotDirectoryFactory implements IndexStorePlugin.Di private final Supplier repositoriesService; private final ThreadPool threadPool; - public RemoteSnapshotDirectoryFactory(Supplier repositoriesService, ThreadPool threadPool) { + private final FileCache remoteStoreFileCache; + + public RemoteSnapshotDirectoryFactory( + Supplier repositoriesService, + ThreadPool threadPool, + FileCache remoteStoreFileCache + ) { this.repositoriesService = repositoriesService; this.threadPool = threadPool; + this.remoteStoreFileCache = remoteStoreFileCache; } @Override @@ -81,7 +89,11 @@ private Future createRemoteSnapshotDirectoryFromSnapsho return threadPool.executor(ThreadPool.Names.SNAPSHOT).submit(() -> { final BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer(blobPath); final BlobStoreIndexShardSnapshot snapshot = blobStoreRepository.loadShardSnapshot(blobContainer, snapshotId); - TransferManager transferManager = new TransferManager(blobContainer, threadPool.executor(ThreadPool.Names.SEARCH)); + TransferManager transferManager = new TransferManager( + blobContainer, + threadPool.executor(ThreadPool.Names.SEARCH), + remoteStoreFileCache + ); return new RemoteSnapshotDirectory(snapshot, localStoreDir, transferManager); }); } diff --git a/server/src/main/java/org/opensearch/index/store/remote/fc/CachedIndexInput.java b/server/src/main/java/org/opensearch/index/store/remote/fc/CachedIndexInput.java new file mode 100644 index 0000000000000..9dda107649179 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/remote/fc/CachedIndexInput.java @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.remote.fc; + +import org.apache.lucene.store.IndexInput; + +/** + * Base IndexInput whose instances will be maintained in cache. + * + * @opensearch.internal + */ +public abstract class CachedIndexInput extends IndexInput { + + /** + * resourceDescription should be a non-null, opaque string + * describing this resource; it's returned from + * {@link #toString}. + */ + protected CachedIndexInput(String resourceDescription) { + super(resourceDescription); + } + + /** + * return true this index input is closed, false if not + * @return true this index input is closed, false if not + */ + public abstract boolean isClosed(); +} diff --git a/server/src/main/java/org/opensearch/index/store/remote/fc/FileCache.java b/server/src/main/java/org/opensearch/index/store/remote/fc/FileCache.java new file mode 100644 index 0000000000000..706760d6e7c7a --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/remote/fc/FileCache.java @@ -0,0 +1,126 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.remote.fc; + +import org.opensearch.common.cache.CacheUsage; +import org.opensearch.common.cache.refcounted.RefCountedCache; +import org.opensearch.common.cache.refcounted.SegmentedCache; +import org.opensearch.common.cache.refcounted.stats.CacheStats; +import java.nio.file.Path; +import java.util.Map; +import java.util.function.BiFunction; + +/** + * File Cache (FC) is introduced to solve the problem that the local disk cannot hold + * the entire dataset on remote store. It maintains a node level view of index files with priorities, + * caching only those index files needed by queries. The file with the lowest priority + * (Least Recently Used) in the FC is replaced first. + * + *

The two main interfaces of FC are put and get. When a new file index input is added + * to the file cache, the file will be added at cache head, which means it has the highest + * priority. + *

The get function does not add file to cache, but it promotes the priority + * of a given file (since it makes it the most recently used). + * + *

Once file cache reaches its capacity, it starts evictions. Eviction removes the file + * items from cache tail and triggers a callback to clean up the file from disk. The + * cleanup process also includes closing file’s descriptor. + * + * @opensearch.internal + */ +public class FileCache implements RefCountedCache { + private final SegmentedCache theCache; + + public FileCache(SegmentedCache cache) { + this.theCache = cache; + } + + public long capacity() { + return theCache.capacity(); + } + + public CachedIndexInput put(Path filePath, CachedIndexInput indexInput) { + return theCache.put(filePath, indexInput); + } + + @Override + public void putAll(Map m) { + theCache.putAll(m); + } + + @Override + public CachedIndexInput computeIfPresent( + Path key, + BiFunction remappingFunction + ) { + return theCache.computeIfPresent(key, remappingFunction); + } + + /** + * Given a file path, gets the corresponding file index input from FileCache. + * This API also updates the priority for the given file + * + * @param filePath given file path + * @return corresponding file index input from FileCache. + */ + public CachedIndexInput get(Path filePath) { + return theCache.get(filePath); + } + + /** + * Given a file path, remove the file from cache. + * Even if the file is pinned or it's still in use, the reclaim + * still take effect. + * + * @param filePath given file path + */ + public void remove(final Path filePath) { + theCache.remove(filePath); + } + + @Override + public void removeAll(Iterable keys) { + theCache.removeAll(keys); + } + + @Override + public void clear() { + theCache.clear(); + } + + @Override + public long size() { + return theCache.size(); + } + + @Override + public void incRef(Path key) { + theCache.incRef(key); + } + + @Override + public void decRef(Path key) { + theCache.decRef(key); + } + + @Override + public long prune() { + return theCache.prune(); + } + + @Override + public CacheUsage usage() { + return theCache.usage(); + } + + @Override + public CacheStats stats() { + return theCache.stats(); + } +} diff --git a/server/src/main/java/org/opensearch/index/store/remote/fc/FileCacheFactory.java b/server/src/main/java/org/opensearch/index/store/remote/fc/FileCacheFactory.java new file mode 100644 index 0000000000000..5d5688c6889f7 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/remote/fc/FileCacheFactory.java @@ -0,0 +1,58 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.remote.fc; + +import org.apache.lucene.store.IndexInput; +import org.opensearch.common.cache.RemovalReason; +import org.opensearch.common.cache.refcounted.SegmentedCache; + +import java.nio.file.Files; +import java.nio.file.Path; + +import static org.opensearch.common.util.ThrowableTranslateUtils.catchAsRuntimeException; + +/** + * File Cache (FC) is introduced to solve the problem that the local disk cannot hold + * the entire dataset on remote store. It maintains a node level view of index files with priorities, + * caching only those index files needed by queries. The file with the lowest priority + * (Least Recently Used) in the FC is replaced first. + * + *

The two main interfaces of FC are put and get. When a new file index input is added + * to the file cache, the file will be added at cache head, which means it has the highest + * priority. + *

The get function does not add file to cache, but it promotes the priority + * of a given file (since it makes it the most recently used). + * + *

Once file cache reaches its capacity, it starts evictions. Eviction removes the file + * items from cache tail and triggers a callback to clean up the file from disk. The + * cleanup process also includes closing file’s descriptor. + * + * @opensearch.internal + */ +public class FileCacheFactory { + public static FileCache createConcurrentLRUFileCache(long capacity) { + return new FileCache( + SegmentedCache.builder() + // capacity in bytes + .capacity(capacity) + // use length in bytes as the weight of the file item + .weigher(IndexInput::length) + .listener((removalNotification) -> { + RemovalReason removalReason = removalNotification.getRemovalReason(); + CachedIndexInput value = removalNotification.getValue(); + Path key = removalNotification.getKey(); + if (removalReason != RemovalReason.REPLACED) { + catchAsRuntimeException(value::close); + catchAsRuntimeException(() -> Files.deleteIfExists(key)); + } + }) + .build() + ); + } +} diff --git a/server/src/main/java/org/opensearch/index/store/remote/fc/FileCachedIndexInput.java b/server/src/main/java/org/opensearch/index/store/remote/fc/FileCachedIndexInput.java new file mode 100644 index 0000000000000..4f52ddcd13ec4 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/remote/fc/FileCachedIndexInput.java @@ -0,0 +1,164 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.remote.fc; + +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.RandomAccessInput; + +import java.io.IOException; +import java.nio.file.Path; + +/** + * Reference Counted IndexInput. The first FileCachedIndexInput for a file/block is called origin. + * origin never references to itself, so the RC = 0 when origin is created. + * Every time there is a clone to the origin, RC + 1. + * Every time a clone is closed, RC - 1. + * When there is an eviction in FileCache, it only cleanups those origins with RC = 0. + * + * @opensearch.internal + */ +public class FileCachedIndexInput extends CachedIndexInput implements RandomAccessInput { + + protected final FileCache cache; + + /** + * on disk file path of this index input + */ + protected Path filePath; + + /** + * underlying lucene index input which this IndexInput + * delegate all its read functions to. + */ + protected IndexInput luceneIndexInput; + + /** indicates if this IndexInput instance is a clone or not */ + private final boolean isClone; + + private volatile boolean closed = false; + + public FileCachedIndexInput(FileCache cache, Path filePath, IndexInput underlyingIndexInput) { + this(cache, filePath, underlyingIndexInput, false); + } + + FileCachedIndexInput(FileCache cache, Path filePath, IndexInput underlyingIndexInput, boolean isClone) { + super("FileCachedIndexInput (path=" + filePath.toString() + ")"); + this.cache = cache; + this.filePath = filePath; + this.luceneIndexInput = underlyingIndexInput; + this.isClone = isClone; + } + + @Override + public long getFilePointer() { + return luceneIndexInput.getFilePointer(); + } + + @Override + public void seek(long pos) throws IOException { + luceneIndexInput.seek(pos); + } + + @Override + public long length() { + return luceneIndexInput.length(); + } + + @Override + public byte readByte() throws IOException { + return luceneIndexInput.readByte(); + } + + @Override + public short readShort() throws IOException { + return luceneIndexInput.readShort(); + } + + @Override + public int readInt() throws IOException { + return luceneIndexInput.readInt(); + } + + @Override + public long readLong() throws IOException { + return luceneIndexInput.readLong(); + } + + @Override + public final int readVInt() throws IOException { + return luceneIndexInput.readVInt(); + } + + @Override + public final long readVLong() throws IOException { + return luceneIndexInput.readVLong(); + } + + @Override + public void readBytes(byte[] b, int offset, int len) throws IOException { + luceneIndexInput.readBytes(b, offset, len); + } + + @Override + public byte readByte(long pos) throws IOException { + return ((RandomAccessInput) luceneIndexInput).readByte(pos); + } + + @Override + public short readShort(long pos) throws IOException { + return ((RandomAccessInput) luceneIndexInput).readShort(pos); + } + + @Override + public int readInt(long pos) throws IOException { + return ((RandomAccessInput) luceneIndexInput).readInt(pos); + } + + @Override + public long readLong(long pos) throws IOException { + return ((RandomAccessInput) luceneIndexInput).readLong(pos); + } + + @Override + public FileCachedIndexInput clone() { + cache.incRef(filePath); + return new FileCachedIndexInput(cache, filePath, luceneIndexInput.clone(), true); + } + + @Override + public IndexInput slice(String sliceDescription, long offset, long length) throws IOException { + // never reach here! + throw new UnsupportedOperationException("FileCachedIndexInput couldn't be sliced."); + } + + @Override + public void close() throws IOException { + if (!closed) { + // if the underlying lucene index input is a clone, + // the following line won't close/unmap the file. + luceneIndexInput.close(); + luceneIndexInput = null; + // origin never reference it itself, only clone needs decRef here + if (isClone) { + cache.decRef(filePath); + } + closed = true; + } + } + + /** + * Mainly used by File Cache to detect origin this IndexInput is closed or not + * + * @return the index input closed or not + */ + @Override + public boolean isClosed() { + return closed; + } +} diff --git a/server/src/main/java/org/opensearch/index/store/remote/fc/package-info.java b/server/src/main/java/org/opensearch/index/store/remote/fc/package-info.java new file mode 100644 index 0000000000000..48a791858442a --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/remote/fc/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * This package contains class used for File System Caching for remote store + */ +package org.opensearch.index.store.remote.fc; diff --git a/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockIndexInput.java b/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockIndexInput.java index df05d33e8fde9..99d581a648a07 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockIndexInput.java +++ b/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockIndexInput.java @@ -57,7 +57,7 @@ abstract class OnDemandBlockIndexInput extends IndexInput implements RandomAcces // Variables for actual held open block /** - * Current block for read, it should be a cloned block always + * Current block for read, it should be a cloned block always. In current implementation this will be a FileCachedIndexInput */ protected IndexInput currentBlock; diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java index b0581215017b0..1fafd15f80c59 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java @@ -8,13 +8,21 @@ package org.opensearch.index.store.remote.utils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.opensearch.common.blobstore.BlobContainer; -import org.opensearch.common.lucene.store.ByteArrayIndexInput; +import org.opensearch.index.store.remote.fc.FileCache; +import org.opensearch.index.store.remote.fc.FileCachedIndexInput; +import java.io.BufferedOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; import java.nio.file.Path; +import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.function.Supplier; @@ -26,12 +34,17 @@ * @opensearch.internal */ public class TransferManager { + private static final Logger logger = LogManager.getLogger(TransferManager.class); + private final BlobContainer blobContainer; private final ConcurrentInvocationLinearizer invocationLinearizer; - public TransferManager(final BlobContainer blobContainer, final ExecutorService remoteStoreExecutorService) { + private final FileCache fileCache; + + public TransferManager(final BlobContainer blobContainer, final ExecutorService remoteStoreExecutorService, final FileCache fileCache) { this.blobContainer = blobContainer; this.invocationLinearizer = new ConcurrentInvocationLinearizer<>(remoteStoreExecutorService); + this.fileCache = fileCache; } /** @@ -53,16 +66,57 @@ public CompletableFuture asyncFetchBlob(Path path, Supplier indexInputSupplier.get()); } + /* + This method accessed through the ConcurrentInvocationLinearizer so read-check-write is acceptable here + */ private IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOException { - // for first phase, this is a simple remote repo blob read with no caching at all + // check if the origin is already in block cache + IndexInput origin = fileCache.computeIfPresent(blobFetchRequest.getFilePath(), (path, cachedIndexInput) -> { + if (cachedIndexInput.isClosed()) { + // if it's already in the file cache, but closed, open it and replace the original one + try { + IndexInput luceneIndexInput = blobFetchRequest.getDirectory().openInput(blobFetchRequest.getFileName(), IOContext.READ); + return new FileCachedIndexInput(fileCache, blobFetchRequest.getFilePath(), luceneIndexInput); + } catch (IOException ioe) { + logger.warn("Open index input " + blobFetchRequest.getFilePath() + " got error ", ioe); + // open failed so return null to download the file again + return null; + } + + } + // already in the cache and ready to be used (open) + return cachedIndexInput; + }); + + if (Objects.isNull(origin)) { + // origin is not in file cache, download origin + + // open new origin + IndexInput downloaded = downloadBlockLocally(blobFetchRequest); + + // refcount = 0 at the beginning + FileCachedIndexInput newOrigin = new FileCachedIndexInput(fileCache, blobFetchRequest.getFilePath(), downloaded); + + // put origin into file cache + fileCache.put(blobFetchRequest.getFilePath(), newOrigin); + origin = newOrigin; + } + // always, need to clone to do refcount += 1, and rely on GC to clean these IndexInput which will refcount -= 1 + return origin.clone(); + } + + private IndexInput downloadBlockLocally(BlobFetchRequest blobFetchRequest) throws IOException { try ( InputStream snapshotFileInputStream = blobContainer.readBlob( blobFetchRequest.getBlobName(), blobFetchRequest.getPosition(), blobFetchRequest.getLength() ); + OutputStream fileOutputStream = Files.newOutputStream(blobFetchRequest.getFilePath()); + OutputStream localFileOutputStream = new BufferedOutputStream(fileOutputStream); ) { - return new ByteArrayIndexInput(blobFetchRequest.getBlobName(), snapshotFileInputStream.readAllBytes()); + localFileOutputStream.write(snapshotFileInputStream.readAllBytes()); } + return blobFetchRequest.getDirectory().openInput(blobFetchRequest.getFileName(), IOContext.READ); } } diff --git a/server/src/main/java/org/opensearch/indices/AbstractIndexShardCacheEntity.java b/server/src/main/java/org/opensearch/indices/AbstractIndexShardCacheEntity.java index c0202f099f4e0..9bcebc2a73cb8 100644 --- a/server/src/main/java/org/opensearch/indices/AbstractIndexShardCacheEntity.java +++ b/server/src/main/java/org/opensearch/indices/AbstractIndexShardCacheEntity.java @@ -34,6 +34,7 @@ import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.RemovalReason; import org.opensearch.index.cache.request.ShardRequestCache; import org.opensearch.index.shard.IndexShard; @@ -66,10 +67,6 @@ public final void onMiss() { @Override public final void onRemoval(RemovalNotification notification) { - stats().onRemoval( - notification.getKey(), - notification.getValue(), - notification.getRemovalReason() == RemovalNotification.RemovalReason.EVICTED - ); + stats().onRemoval(notification.getKey(), notification.getValue(), notification.getRemovalReason() == RemovalReason.EVICTED); } } diff --git a/server/src/main/java/org/opensearch/indices/fielddata/cache/IndicesFieldDataCache.java b/server/src/main/java/org/opensearch/indices/fielddata/cache/IndicesFieldDataCache.java index 1833ae8900a9a..8d75cb33ba991 100644 --- a/server/src/main/java/org/opensearch/indices/fielddata/cache/IndicesFieldDataCache.java +++ b/server/src/main/java/org/opensearch/indices/fielddata/cache/IndicesFieldDataCache.java @@ -46,6 +46,7 @@ import org.opensearch.common.cache.CacheBuilder; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.RemovalReason; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.settings.Setting; @@ -115,7 +116,7 @@ public void onRemoval(RemovalNotification notification) { listener.onRemoval( key.shardId, indexCache.fieldName, - notification.getRemovalReason() == RemovalNotification.RemovalReason.EVICTED, + notification.getRemovalReason() == RemovalReason.EVICTED, value.ramBytesUsed() ); } catch (Exception e) { diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 46270230ccf27..be643e6b2ca52 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -38,13 +38,18 @@ import org.apache.lucene.util.SetOnce; import org.opensearch.common.util.FeatureFlags; import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance; +import org.opensearch.common.util.ThrowableTranslateUtils; import org.opensearch.index.IndexModule; import org.opensearch.index.IndexingPressureService; +import org.opensearch.index.store.remote.fc.FileCache; +import org.opensearch.index.store.remote.fc.FileCacheFactory; import org.opensearch.indices.replication.SegmentReplicationSourceFactory; import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.indices.replication.SegmentReplicationSourceService; import org.opensearch.extensions.ExtensionsManager; import org.opensearch.extensions.NoopExtensionsManager; +import org.opensearch.monitor.fs.FsInfo; +import org.opensearch.monitor.fs.FsProbe; import org.opensearch.search.backpressure.SearchBackpressureService; import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; import org.opensearch.tasks.TaskResourceTrackingService; @@ -617,10 +622,12 @@ protected Node( final Collection>> engineFactoryProviders = enginePlugins.stream() .map(plugin -> (Function>) plugin::getEngineFactory) .collect(Collectors.toList()); - + // TODO: for now this is a single cache, later, this should read node and index settings + final FileCache remoteStoreFileCache = createRemoteStoreFileCache(); final Map builtInDirectoryFactories = IndexModule.createBuiltInDirectoryFactories( repositoriesServiceReference::get, - threadPool + threadPool, + remoteStoreFileCache ); final Map directoryFactories = new HashMap<>(); pluginsService.filterPlugins(IndexStorePlugin.class) @@ -1116,6 +1123,16 @@ protected Node( } } + private FileCache createRemoteStoreFileCache() { + // TODO: implement more custom logic to create named caches, using multiple node paths, more capacity computation options and + // capacity reservation logic + FsInfo.Path info = ThrowableTranslateUtils.catchAsRuntimeException(() -> FsProbe.getFSInfo(nodeEnvironment.nodePaths()[0])); + long diskCapacity = info.getTotal().getBytes(); + // hard coded as 50% for now + long capacity = (long) (diskCapacity * 0.50); + return FileCacheFactory.createConcurrentLRUFileCache(capacity); + } + protected TransportService newTransportService( Settings settings, Transport transport, diff --git a/server/src/test/java/org/opensearch/common/cache/CacheTests.java b/server/src/test/java/org/opensearch/common/cache/CacheTests.java index db497894c5883..5aa8faaa940d4 100644 --- a/server/src/test/java/org/opensearch/common/cache/CacheTests.java +++ b/server/src/test/java/org/opensearch/common/cache/CacheTests.java @@ -240,7 +240,7 @@ protected long now() { cache.setExpireAfterAccessNanos(1); List evictedKeys = new ArrayList<>(); cache.setRemovalListener(notification -> { - assertEquals(RemovalNotification.RemovalReason.EVICTED, notification.getRemovalReason()); + assertEquals(RemovalReason.EVICTED, notification.getRemovalReason()); evictedKeys.add(notification.getKey()); }); now.set(0); @@ -299,7 +299,7 @@ protected long now() { cache.setExpireAfterWriteNanos(1); List evictedKeys = new ArrayList<>(); cache.setRemovalListener(notification -> { - assertEquals(RemovalNotification.RemovalReason.EVICTED, notification.getRemovalReason()); + assertEquals(RemovalReason.EVICTED, notification.getRemovalReason()); evictedKeys.add(notification.getKey()); }); now.set(0); @@ -445,7 +445,7 @@ public void testInvalidate() { public void testNotificationOnInvalidate() { Set notifications = new HashSet<>(); Cache cache = CacheBuilder.builder().removalListener(notification -> { - assertEquals(RemovalNotification.RemovalReason.INVALIDATED, notification.getRemovalReason()); + assertEquals(RemovalReason.INVALIDATED, notification.getRemovalReason()); notifications.add(notification.getKey()); }).build(); for (int i = 0; i < numberOfEntries; i++) { @@ -493,7 +493,7 @@ public void testInvalidateWithValue() { public void testNotificationOnInvalidateWithValue() { Set notifications = new HashSet<>(); Cache cache = CacheBuilder.builder().removalListener(notification -> { - assertEquals(RemovalNotification.RemovalReason.INVALIDATED, notification.getRemovalReason()); + assertEquals(RemovalReason.INVALIDATED, notification.getRemovalReason()); notifications.add(notification.getKey()); }).build(); for (int i = 0; i < numberOfEntries; i++) { @@ -529,7 +529,7 @@ public void testInvalidateAll() { public void testNotificationOnInvalidateAll() { Set notifications = new HashSet<>(); Cache cache = CacheBuilder.builder().removalListener(notification -> { - assertEquals(RemovalNotification.RemovalReason.INVALIDATED, notification.getRemovalReason()); + assertEquals(RemovalReason.INVALIDATED, notification.getRemovalReason()); notifications.add(notification.getKey()); }).build(); Set invalidated = new HashSet<>(); @@ -590,7 +590,7 @@ public int hashCode() { public void testNotificationOnReplace() { Set notifications = new HashSet<>(); Cache cache = CacheBuilder.builder().removalListener(notification -> { - assertEquals(RemovalNotification.RemovalReason.REPLACED, notification.getRemovalReason()); + assertEquals(RemovalReason.REPLACED, notification.getRemovalReason()); notifications.add(notification.getKey()); }).build(); for (int i = 0; i < numberOfEntries; i++) { @@ -928,7 +928,7 @@ public void testRemoveUsingValuesIterator() { assertEquals(expectedRemovals.size(), removalNotifications.size()); for (int i = 0; i < expectedRemovals.size(); i++) { assertEquals(expectedRemovals.get(i), removalNotifications.get(i).getValue()); - assertEquals(RemovalNotification.RemovalReason.INVALIDATED, removalNotifications.get(i).getRemovalReason()); + assertEquals(RemovalReason.INVALIDATED, removalNotifications.get(i).getRemovalReason()); } } } diff --git a/server/src/test/java/org/opensearch/common/cache/refcounted/RefCountedCacheTests.java b/server/src/test/java/org/opensearch/common/cache/refcounted/RefCountedCacheTests.java new file mode 100644 index 0000000000000..30c92afca1b7c --- /dev/null +++ b/server/src/test/java/org/opensearch/common/cache/refcounted/RefCountedCacheTests.java @@ -0,0 +1,130 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.refcounted; + +import org.junit.Before; +import org.mockito.Mockito; +import org.opensearch.common.cache.RemovalListener; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +public class RefCountedCacheTests extends OpenSearchTestCase { + private static final long SIZE = 100; + private RemovalListener removalListener; + + @Before + public void setUp() throws Exception { + super.setUp(); + this.removalListener = Mockito.mock(RemovalListener.class); + } + + public void testLRUCache() { + executeRefCountedCacheTests(new LRUCache<>(SIZE, removalListener, value -> value)); + } + + public void testSegmentedCache() { + executeRefCountedCacheTests( + SegmentedCache.builder() + .capacity(SIZE) + .weigher(value -> value) + .listener(removalListener) + .concurrencyLevel(1) + .build() + ); + } + + void executeRefCountedCacheTests(RefCountedCache refCountedCache) { + // basic get and put operation + assertNull(refCountedCache.get("1")); + refCountedCache.put("1", 10L); + assertEquals(10L, (long) refCountedCache.get("1")); + + // cache usage with ref ++ and -- + assertEquals(10L, refCountedCache.usage().usage()); + assertEquals(0L, refCountedCache.usage().activeUsage()); + refCountedCache.incRef("1"); + assertEquals(10L, refCountedCache.usage().usage()); + assertEquals(10L, refCountedCache.usage().activeUsage()); + refCountedCache.decRef("1"); + assertEquals(10L, refCountedCache.usage().usage()); + assertEquals(0L, refCountedCache.usage().activeUsage()); + + // put all delegation + Map toPutIntoCache = new HashMap<>() { + { + put("2", 20L); + put("3", 30L); + } + }; + refCountedCache.putAll(toPutIntoCache); + toPutIntoCache.forEach((k, v) -> { assertEquals(v, refCountedCache.get(k)); }); + assertEquals(60L, refCountedCache.usage().usage()); + assertEquals(0L, refCountedCache.usage().activeUsage()); + + // since all entries has ref count = 0 first added one will be evicted first once usage >= capacity + refCountedCache.put("4", 40L); + refCountedCache.put("5", 10L); + assertNull(refCountedCache.get("1")); + assertNull(refCountedCache.get("2")); + Arrays.asList("3", "4", "5").forEach(k -> assertNotNull(refCountedCache.get(k))); + assertEquals(80L, refCountedCache.usage().usage()); + assertEquals(0L, refCountedCache.usage().activeUsage()); + + // simple compute if present when present + refCountedCache.computeIfPresent("3", (k, v) -> { return v + 5; }); + assertEquals(35L, (long) refCountedCache.get("3")); + assertEquals(85L, refCountedCache.usage().usage()); + assertEquals(0L, refCountedCache.usage().activeUsage()); + + // simple compute if present when not present + refCountedCache.computeIfPresent("1", (k, v) -> { + fail("should not reach here"); + return v + 5; + }); + assertNull(refCountedCache.get("1")); + + // inc ref all entries to prevent cache evictions + refCountedCache.incRef("3"); + refCountedCache.incRef("4"); + refCountedCache.incRef("5"); + assertEquals(85L, refCountedCache.usage().usage()); + assertEquals(85L, refCountedCache.usage().activeUsage()); + + // adding cache entry while > capacity won't put entry to cache + refCountedCache.put("6", 15L); + assertNull(refCountedCache.get("6")); + assertEquals(85L, refCountedCache.usage().usage()); + assertEquals(85L, refCountedCache.usage().activeUsage()); + + // dec ref to add 6 instead of 3 + refCountedCache.decRef("3"); + refCountedCache.put("6", 15L); + assertNull(refCountedCache.get("3")); + assertEquals(15L, (long) refCountedCache.get("6")); + assertEquals(65L, refCountedCache.usage().usage()); + assertEquals(50L, refCountedCache.usage().activeUsage()); + + // check stats + assertEquals(4, refCountedCache.stats().evictionCount()); + assertEquals(9, refCountedCache.stats().hitCount()); + assertEquals(7, refCountedCache.stats().missCount()); + assertEquals(0, refCountedCache.stats().removeCount()); + assertEquals(1, refCountedCache.stats().replaceCount()); + + // remove one entry + refCountedCache.remove("6"); + assertNull(refCountedCache.get("6")); + assertEquals(50L, refCountedCache.usage().usage()); + assertEquals(50L, refCountedCache.usage().activeUsage()); + assertEquals(1, refCountedCache.stats().removeCount()); + } +}