Skip to content

Commit

Permalink
[Tiered caching] Framework changes (opensearch-project#10753)
Browse files Browse the repository at this point in the history
* [Tiered caching] Framework changes

Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com>

* Added javadoc for new files/packages

Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com>

* Added changelog

Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com>

* Fixing javadoc warnings

Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com>

* Addressing comments

Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com>

* Addressing additional minor comments

Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com>

* Moving non null check to builder for OS onHeapCache

Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com>

* Adding package-info for new packages

Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com>

* Removing service and adding different cache interfaces along with event listener support

Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com>

* Fixing gradle missingDoc issue

Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com>

* Changing listener logic, removing tiered cache integration with IRC

Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com>

* Adding opensearch.internal tag for LoadAwareCacheLoader

Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com>

* Fixing thread safety issue

Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com>

* Remove compute function and event listener logic change for TieredCache

Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com>

* Making Cache.compute function private

Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com>

* Adding javadoc and more test for cache.put

Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com>

* Adding write locks to refresh API as well

Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com>

* Removing unwanted EventType class and refactoring one UT

Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com>

* Removing TieredCache interface

Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com>

---------

Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com>
Signed-off-by: Sagar <99425694+sgup432@users.noreply.github.com>
(cherry picked from commit ebda963)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] authored and sgup432 committed Jan 22, 2024
1 parent c52a152 commit 736ad6a
Show file tree
Hide file tree
Showing 19 changed files with 1,566 additions and 55 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351))
- Add cluster state stats ([#10670](https://github.com/opensearch-project/OpenSearch/pull/10670))
- Remove ingest processor supports excluding fields ([#10967](https://github.com/opensearch-project/OpenSearch/pull/10967))
- [Tiered caching] Defining interfaces, listeners and extending IndicesRequestCache with Tiered cache support ([#10753](https://github.com/opensearch-project/OpenSearch/pull/10753))
- [Remote cluster state] Restore cluster state version during remote state auto restore ([#10853](https://github.com/opensearch-project/OpenSearch/pull/10853))
- Update the indexRandom function to create more segments for concurrent search tests ([10247](https://github.com/opensearch-project/OpenSearch/pull/10247))
- Add support for query profiler with concurrent aggregation ([#9248](https://github.com/opensearch-project/OpenSearch/pull/9248))
Expand Down
112 changes: 59 additions & 53 deletions server/src/main/java/org/opensearch/common/cache/Cache.java
Original file line number Diff line number Diff line change
Expand Up @@ -424,68 +424,74 @@ public V computeIfAbsent(K key, CacheLoader<K, V> loader) throws ExecutionExcept
}
});
if (value == null) {
// we need to synchronize loading of a value for a given key; however, holding the segment lock while
// invoking load can lead to deadlock against another thread due to dependent key loading; therefore, we
// need a mechanism to ensure that load is invoked at most once, but we are not invoking load while holding
// the segment lock; to do this, we atomically put a future in the map that can load the value, and then
// get the value from this future on the thread that won the race to place the future into the segment map
CacheSegment<K, V> segment = getCacheSegment(key);
CompletableFuture<Entry<K, V>> future;
CompletableFuture<Entry<K, V>> completableFuture = new CompletableFuture<>();
value = compute(key, loader);
}
return value;
}

try (ReleasableLock ignored = segment.writeLock.acquire()) {
future = segment.map.putIfAbsent(key, completableFuture);
}
private V compute(K key, CacheLoader<K, V> loader) throws ExecutionException {
long now = now();
// we need to synchronize loading of a value for a given key; however, holding the segment lock while
// invoking load can lead to deadlock against another thread due to dependent key loading; therefore, we
// need a mechanism to ensure that load is invoked at most once, but we are not invoking load while holding
// the segment lock; to do this, we atomically put a future in the map that can load the value, and then
// get the value from this future on the thread that won the race to place the future into the segment map
CacheSegment<K, V> segment = getCacheSegment(key);
CompletableFuture<Entry<K, V>> future;
CompletableFuture<Entry<K, V>> completableFuture = new CompletableFuture<>();

BiFunction<? super Entry<K, V>, Throwable, ? extends V> handler = (ok, ex) -> {
if (ok != null) {
try (ReleasableLock ignored = lruLock.acquire()) {
promote(ok, now);
}
return ok.value;
} else {
try (ReleasableLock ignored = segment.writeLock.acquire()) {
CompletableFuture<Entry<K, V>> sanity = segment.map.get(key);
if (sanity != null && sanity.isCompletedExceptionally()) {
segment.map.remove(key);
}
}
return null;
}
};
try (ReleasableLock ignored = segment.writeLock.acquire()) {
future = segment.map.putIfAbsent(key, completableFuture);
}

CompletableFuture<V> completableValue;
if (future == null) {
future = completableFuture;
completableValue = future.handle(handler);
V loaded;
try {
loaded = loader.load(key);
} catch (Exception e) {
future.completeExceptionally(e);
throw new ExecutionException(e);
}
if (loaded == null) {
NullPointerException npe = new NullPointerException("loader returned a null value");
future.completeExceptionally(npe);
throw new ExecutionException(npe);
} else {
future.complete(new Entry<>(key, loaded, now));
BiFunction<? super Entry<K, V>, Throwable, ? extends V> handler = (ok, ex) -> {
if (ok != null) {
try (ReleasableLock ignored = lruLock.acquire()) {
promote(ok, now);
}
return ok.value;
} else {
completableValue = future.handle(handler);
try (ReleasableLock ignored = segment.writeLock.acquire()) {
CompletableFuture<Entry<K, V>> sanity = segment.map.get(key);
if (sanity != null && sanity.isCompletedExceptionally()) {
segment.map.remove(key);
}
}
return null;
}
};

CompletableFuture<V> completableValue;
if (future == null) {
future = completableFuture;
completableValue = future.handle(handler);
V loaded;
try {
value = completableValue.get();
// check to ensure the future hasn't been completed with an exception
if (future.isCompletedExceptionally()) {
future.get(); // call get to force the exception to be thrown for other concurrent callers
throw new IllegalStateException("the future was completed exceptionally but no exception was thrown");
}
} catch (InterruptedException e) {
throw new IllegalStateException(e);
loaded = loader.load(key);
} catch (Exception e) {
future.completeExceptionally(e);
throw new ExecutionException(e);
}
if (loaded == null) {
NullPointerException npe = new NullPointerException("loader returned a null value");
future.completeExceptionally(npe);
throw new ExecutionException(npe);
} else {
future.complete(new Entry<>(key, loaded, now));
}
} else {
completableValue = future.handle(handler);
}
V value;
try {
value = completableValue.get();
// check to ensure the future hasn't been completed with an exception
if (future.isCompletedExceptionally()) {
future.get(); // call get to force the exception to be thrown for other concurrent callers
throw new IllegalStateException("the future was completed exceptionally but no exception was thrown");
}
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return value;
}
Expand Down
34 changes: 34 additions & 0 deletions server/src/main/java/org/opensearch/common/cache/ICache.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.cache;

/**
* Represents a cache interface.
* @param <K> Type of key.
* @param <V> Type of value.
*
* @opensearch.experimental
*/
public interface ICache<K, V> {
V get(K key);

void put(K key, V value);

V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) throws Exception;

void invalidate(K key);

void invalidateAll();

Iterable<K> keys();

long count();

void refresh();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.cache;

/**
* Extends a cache loader with awareness of whether the data is loaded or not.
* @param <K> Type of key.
* @param <V> Type of value.
*
* @opensearch.internal
*/
public interface LoadAwareCacheLoader<K, V> extends CacheLoader<K, V> {
boolean isLoaded();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.cache.store;

import org.opensearch.common.cache.Cache;
import org.opensearch.common.cache.CacheBuilder;
import org.opensearch.common.cache.LoadAwareCacheLoader;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.store.builders.StoreAwareCacheBuilder;
import org.opensearch.common.cache.store.enums.CacheStoreType;
import org.opensearch.common.cache.store.listeners.StoreAwareCacheEventListener;

/**
* This variant of on-heap cache uses OpenSearch custom cache implementation.
* @param <K> Type of key.
* @param <V> Type of value.
*
* @opensearch.experimental
*/
public class OpenSearchOnHeapCache<K, V> implements StoreAwareCache<K, V>, RemovalListener<K, V> {

private final Cache<K, V> cache;

private final StoreAwareCacheEventListener<K, V> eventListener;

public OpenSearchOnHeapCache(Builder<K, V> builder) {
CacheBuilder<K, V> cacheBuilder = CacheBuilder.<K, V>builder()
.setMaximumWeight(builder.getMaxWeightInBytes())
.weigher(builder.getWeigher())
.removalListener(this);
if (builder.getExpireAfterAcess() != null) {
cacheBuilder.setExpireAfterAccess(builder.getExpireAfterAcess());
}
cache = cacheBuilder.build();
this.eventListener = builder.getEventListener();
}

@Override
public V get(K key) {
V value = cache.get(key);
if (value != null) {
eventListener.onHit(key, value, CacheStoreType.ON_HEAP);
} else {
eventListener.onMiss(key, CacheStoreType.ON_HEAP);
}
return value;
}

@Override
public void put(K key, V value) {
cache.put(key, value);
eventListener.onCached(key, value, CacheStoreType.ON_HEAP);
}

@Override
public V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) throws Exception {
V value = cache.computeIfAbsent(key, key1 -> loader.load(key));
if (!loader.isLoaded()) {
eventListener.onHit(key, value, CacheStoreType.ON_HEAP);
} else {
eventListener.onMiss(key, CacheStoreType.ON_HEAP);
eventListener.onCached(key, value, CacheStoreType.ON_HEAP);
}
return value;
}

@Override
public void invalidate(K key) {
cache.invalidate(key);
}

@Override
public void invalidateAll() {
cache.invalidateAll();
}

@Override
public Iterable<K> keys() {
return cache.keys();
}

@Override
public long count() {
return cache.count();
}

@Override
public void refresh() {
cache.refresh();
}

@Override
public CacheStoreType getTierType() {
return CacheStoreType.ON_HEAP;
}

@Override
public void onRemoval(RemovalNotification<K, V> notification) {
eventListener.onRemoval(
new StoreAwareCacheRemovalNotification<>(
notification.getKey(),
notification.getValue(),
notification.getRemovalReason(),
CacheStoreType.ON_HEAP
)
);
}

/**
* Builder object
* @param <K> Type of key
* @param <V> Type of value
*/
public static class Builder<K, V> extends StoreAwareCacheBuilder<K, V> {

@Override
public StoreAwareCache<K, V> build() {
return new OpenSearchOnHeapCache<K, V>(this);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.cache.store;

import org.opensearch.common.cache.ICache;
import org.opensearch.common.cache.store.enums.CacheStoreType;

/**
* Represents a cache with a specific type of store like onHeap, disk etc.
* @param <K> Type of key.
* @param <V> Type of value.
*
* @opensearch.experimental
*/
public interface StoreAwareCache<K, V> extends ICache<K, V> {
CacheStoreType getTierType();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.cache.store;

import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.common.cache.store.enums.CacheStoreType;

/**
* Removal notification for store aware cache.
* @param <K> Type of key.
* @param <V> Type of value.
*
* @opensearch.internal
*/
public class StoreAwareCacheRemovalNotification<K, V> extends RemovalNotification<K, V> {
private final CacheStoreType cacheStoreType;

public StoreAwareCacheRemovalNotification(K key, V value, RemovalReason removalReason, CacheStoreType cacheStoreType) {
super(key, value, removalReason);
this.cacheStoreType = cacheStoreType;
}

public CacheStoreType getCacheStoreType() {
return cacheStoreType;
}
}
Loading

0 comments on commit 736ad6a

Please sign in to comment.