Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Tiered caching] Framework changes #10753

Merged
merged 23 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
b196615
[Tiered caching] Framework changes
sgup432 Oct 19, 2023
168076f
Added javadoc for new files/packages
sgup432 Oct 26, 2023
f98abc2
Merge branch 'main' into tiered_caching_framework
sgup432 Oct 26, 2023
8fed7c7
Added changelog
sgup432 Oct 26, 2023
5ace3a7
Fixing javadoc warnings
sgup432 Oct 26, 2023
2e9c478
Addressing comments
sgup432 Nov 22, 2023
c29d346
Addressing additional minor comments
sgup432 Nov 22, 2023
965cbef
Merge branch 'main' into tiered_caching_framework
sgup432 Nov 22, 2023
2e024a2
Moving non null check to builder for OS onHeapCache
sgup432 Nov 22, 2023
51d948f
Adding package-info for new packages
sgup432 Nov 23, 2023
64edcfb
Removing service and adding different cache interfaces along with eve…
sgup432 Dec 7, 2023
d8156af
Merge branch 'main' into tiered_caching_framework
sgup432 Dec 7, 2023
13a03a9
Fixing gradle missingDoc issue
sgup432 Dec 7, 2023
42a111d
Changing listener logic, removing tiered cache integration with IRC
sgup432 Dec 21, 2023
c9cf2c0
Adding opensearch.internal tag for LoadAwareCacheLoader
sgup432 Dec 21, 2023
7318252
Fixing thread safety issue
sgup432 Dec 28, 2023
79dd693
Remove compute function and event listener logic change for TieredCache
sgup432 Dec 28, 2023
7c10adf
Making Cache.compute function private
sgup432 Dec 28, 2023
1832df9
Adding javadoc and more test for cache.put
sgup432 Jan 3, 2024
49aab1f
Adding write locks to refresh API as well
sgup432 Jan 3, 2024
7e72d07
Merge branch 'main' into tiered_caching_framework
sgup432 Jan 3, 2024
fdda280
Removing unwanted EventType class and refactoring one UT
sgup432 Jan 3, 2024
9956abc
Removing TieredCache interface
sgup432 Jan 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote cluster state] Make index and global metadata upload timeout dynamic cluster settings ([#10814](https://github.com/opensearch-project/OpenSearch/pull/10814))
- Added cluster setting cluster.restrict.index.replication_type to restrict setting of index setting replication type ([#10866](https://github.com/opensearch-project/OpenSearch/pull/10866))
- Add cluster state stats ([#10670](https://github.com/opensearch-project/OpenSearch/pull/10670))
- [Tiered caching] Framework changes ([#10753](https://github.com/opensearch-project/OpenSearch/pull/10753)
sgup432 marked this conversation as resolved.
Show resolved Hide resolved

### Dependencies
- Bump `com.google.api.grpc:proto-google-common-protos` from 2.10.0 to 2.25.1 ([#10208](https://github.com/opensearch-project/OpenSearch/pull/10208), [#10298](https://github.com/opensearch-project/OpenSearch/pull/10298))
Expand Down
112 changes: 59 additions & 53 deletions server/src/main/java/org/opensearch/common/cache/Cache.java
Original file line number Diff line number Diff line change
Expand Up @@ -422,68 +422,74 @@
}
});
if (value == null) {
// we need to synchronize loading of a value for a given key; however, holding the segment lock while
// invoking load can lead to deadlock against another thread due to dependent key loading; therefore, we
// need a mechanism to ensure that load is invoked at most once, but we are not invoking load while holding
// the segment lock; to do this, we atomically put a future in the map that can load the value, and then
// get the value from this future on the thread that won the race to place the future into the segment map
CacheSegment<K, V> segment = getCacheSegment(key);
CompletableFuture<Entry<K, V>> future;
CompletableFuture<Entry<K, V>> completableFuture = new CompletableFuture<>();
value = compute(key, loader);
}
return value;
}

try (ReleasableLock ignored = segment.writeLock.acquire()) {
future = segment.map.putIfAbsent(key, completableFuture);
}
public V compute(K key, CacheLoader<K, V> loader) throws ExecutionException {
sgup432 marked this conversation as resolved.
Show resolved Hide resolved
long now = now();
// we need to synchronize loading of a value for a given key; however, holding the segment lock while
// invoking load can lead to deadlock against another thread due to dependent key loading; therefore, we
// need a mechanism to ensure that load is invoked at most once, but we are not invoking load while holding
// the segment lock; to do this, we atomically put a future in the map that can load the value, and then
// get the value from this future on the thread that won the race to place the future into the segment map
CacheSegment<K, V> segment = getCacheSegment(key);
CompletableFuture<Entry<K, V>> future;
CompletableFuture<Entry<K, V>> completableFuture = new CompletableFuture<>();

BiFunction<? super Entry<K, V>, Throwable, ? extends V> handler = (ok, ex) -> {
if (ok != null) {
try (ReleasableLock ignored = lruLock.acquire()) {
promote(ok, now);
}
return ok.value;
} else {
try (ReleasableLock ignored = segment.writeLock.acquire()) {
CompletableFuture<Entry<K, V>> sanity = segment.map.get(key);
if (sanity != null && sanity.isCompletedExceptionally()) {
segment.map.remove(key);
}
}
return null;
}
};
try (ReleasableLock ignored = segment.writeLock.acquire()) {
future = segment.map.putIfAbsent(key, completableFuture);
}

CompletableFuture<V> completableValue;
if (future == null) {
future = completableFuture;
completableValue = future.handle(handler);
V loaded;
try {
loaded = loader.load(key);
} catch (Exception e) {
future.completeExceptionally(e);
throw new ExecutionException(e);
}
if (loaded == null) {
NullPointerException npe = new NullPointerException("loader returned a null value");
future.completeExceptionally(npe);
throw new ExecutionException(npe);
} else {
future.complete(new Entry<>(key, loaded, now));
BiFunction<? super Entry<K, V>, Throwable, ? extends V> handler = (ok, ex) -> {
if (ok != null) {
try (ReleasableLock ignored = lruLock.acquire()) {
promote(ok, now);
}
return ok.value;
} else {
completableValue = future.handle(handler);
try (ReleasableLock ignored = segment.writeLock.acquire()) {
CompletableFuture<Entry<K, V>> sanity = segment.map.get(key);
if (sanity != null && sanity.isCompletedExceptionally()) {
segment.map.remove(key);
}
}
return null;
}
};

CompletableFuture<V> completableValue;
if (future == null) {
future = completableFuture;
completableValue = future.handle(handler);
V loaded;
try {
value = completableValue.get();
// check to ensure the future hasn't been completed with an exception
if (future.isCompletedExceptionally()) {
future.get(); // call get to force the exception to be thrown for other concurrent callers
throw new IllegalStateException("the future was completed exceptionally but no exception was thrown");
}
} catch (InterruptedException e) {
throw new IllegalStateException(e);
loaded = loader.load(key);
} catch (Exception e) {
future.completeExceptionally(e);
throw new ExecutionException(e);
}
if (loaded == null) {
NullPointerException npe = new NullPointerException("loader returned a null value");
future.completeExceptionally(npe);
throw new ExecutionException(npe);
} else {
future.complete(new Entry<>(key, loaded, now));
}
} else {
completableValue = future.handle(handler);
}
V value;
try {
value = completableValue.get();
// check to ensure the future hasn't been completed with an exception
if (future.isCompletedExceptionally()) {
future.get(); // call get to force the exception to be thrown for other concurrent callers
throw new IllegalStateException("the future was completed exceptionally but no exception was thrown");

Check warning on line 489 in server/src/main/java/org/opensearch/common/cache/Cache.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/cache/Cache.java#L488-L489

Added lines #L488 - L489 were not covered by tests
}
} catch (InterruptedException e) {
throw new IllegalStateException(e);

Check warning on line 492 in server/src/main/java/org/opensearch/common/cache/Cache.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/cache/Cache.java#L491-L492

Added lines #L491 - L492 were not covered by tests
}
return value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.common.cache;

import org.opensearch.common.cache.tier.TierType;

/**
* Notification when an element is removed from the cache
*
Expand All @@ -42,11 +44,17 @@ public class RemovalNotification<K, V> {
private final K key;
private final V value;
private final RemovalReason removalReason;
private final TierType tierType;

public RemovalNotification(K key, V value, RemovalReason removalReason) {
this(key, value, removalReason, TierType.ON_HEAP);
}

public RemovalNotification(K key, V value, RemovalReason removalReason, TierType tierType) {
sgup432 marked this conversation as resolved.
Show resolved Hide resolved
this.key = key;
this.value = value;
this.removalReason = removalReason;
this.tierType = tierType;
}

public K getKey() {
Expand All @@ -60,4 +68,8 @@ public V getValue() {
public RemovalReason getRemovalReason() {
return removalReason;
}

public TierType getTierType() {
return tierType;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.cache.tier;

import org.opensearch.common.cache.RemovalListener;

/**
* Caching tier interface. Can be implemented/extended by concrete classes to provide different flavors of cache like
* onHeap, disk etc.
* @param <K> Type of key
* @param <V> Type of value
sgup432 marked this conversation as resolved.
Show resolved Hide resolved
*/
public interface CachingTier<K, V> {
sohami marked this conversation as resolved.
Show resolved Hide resolved

V get(K key);

void put(K key, V value);

V computeIfAbsent(K key, TieredCacheLoader<K, V> loader) throws Exception;

void invalidate(K key);

V compute(K key, TieredCacheLoader<K, V> loader) throws Exception;

void setRemovalListener(RemovalListener<K, V> removalListener);

void invalidateAll();

Iterable<K> keys();

int count();

TierType getTierType();

/**
* Force any outstanding size-based and time-based evictions to occur
*/
default void refresh() {}

Check warning on line 44 in server/src/main/java/org/opensearch/common/cache/tier/CachingTier.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/cache/tier/CachingTier.java#L44

Added line #L44 was not covered by tests
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.cache.tier;

/**
* This is specific to disk caching tier and can be used to add methods which are specific to disk tier.
* @param <K> Type of key
* @param <V> Type of value
*/
public interface DiskCachingTier<K, V> extends CachingTier<K, V> {
sohami marked this conversation as resolved.
Show resolved Hide resolved

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.cache.tier;

/**
* This is specific to onHeap caching tier and can be used to add methods which are specific to this tier.
* @param <K> Type of key
* @param <V> Type of value
*/
public interface OnHeapCachingTier<K, V> extends CachingTier<K, V> {}
sgup432 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.cache.tier;

import org.opensearch.common.cache.Cache;
import org.opensearch.common.cache.CacheBuilder;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.unit.TimeValue;

import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.function.ToLongBiFunction;

/**
* This variant of on-heap cache uses OpenSearch custom cache implementation.
* @param <K> Type of key
* @param <V> Type of value
*/
public class OpenSearchOnHeapCache<K, V> implements OnHeapCachingTier<K, V>, RemovalListener<K, V> {

private final Cache<K, V> cache;
sgup432 marked this conversation as resolved.
Show resolved Hide resolved
private RemovalListener<K, V> removalListener;

private OpenSearchOnHeapCache(Builder<K, V> builder) {
sgup432 marked this conversation as resolved.
Show resolved Hide resolved
Objects.requireNonNull(builder.weigher);
sgup432 marked this conversation as resolved.
Show resolved Hide resolved
CacheBuilder<K, V> cacheBuilder = CacheBuilder.<K, V>builder()
.setMaximumWeight(builder.maxWeightInBytes)
.weigher(builder.weigher)
.removalListener(this);
if (builder.expireAfterAcess != null) {
cacheBuilder.setExpireAfterAccess(builder.expireAfterAcess);

Check warning on line 38 in server/src/main/java/org/opensearch/common/cache/tier/OpenSearchOnHeapCache.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/cache/tier/OpenSearchOnHeapCache.java#L38

Added line #L38 was not covered by tests
}
cache = cacheBuilder.build();
}

@Override
public void setRemovalListener(RemovalListener<K, V> removalListener) {
this.removalListener = removalListener;
}

@Override
public void invalidateAll() {
cache.invalidateAll();
}

@Override
public Iterable<K> keys() {
return this.cache.keys();
}

@Override
public int count() {
return cache.count();
}

@Override
public TierType getTierType() {
return TierType.ON_HEAP;
}

@Override
public V get(K key) {
return cache.get(key);
}

@Override
public void put(K key, V value) {
cache.put(key, value);
}

Check warning on line 76 in server/src/main/java/org/opensearch/common/cache/tier/OpenSearchOnHeapCache.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/cache/tier/OpenSearchOnHeapCache.java#L75-L76

Added lines #L75 - L76 were not covered by tests

@Override
public V computeIfAbsent(K key, TieredCacheLoader<K, V> loader) throws ExecutionException {
return cache.computeIfAbsent(key, key1 -> loader.load(key));

Check warning on line 80 in server/src/main/java/org/opensearch/common/cache/tier/OpenSearchOnHeapCache.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/cache/tier/OpenSearchOnHeapCache.java#L80

Added line #L80 was not covered by tests
}

@Override
public void invalidate(K key) {
cache.invalidate(key);
}

@Override
public V compute(K key, TieredCacheLoader<K, V> loader) throws Exception {
return cache.compute(key, key1 -> loader.load(key));
}

@Override
public void refresh() {
cache.refresh();
}

@Override
public void onRemoval(RemovalNotification<K, V> notification) {
removalListener.onRemoval(notification);
sgup432 marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Builder object
* @param <K> Type of key
* @param <V> Type of value
*/
public static class Builder<K, V> {
private long maxWeightInBytes;

private ToLongBiFunction<K, V> weigher;

private TimeValue expireAfterAcess;

public Builder() {}

public Builder<K, V> setMaximumWeight(long sizeInBytes) {
this.maxWeightInBytes = sizeInBytes;
return this;
}

public Builder<K, V> setWeigher(ToLongBiFunction<K, V> weigher) {
this.weigher = weigher;
return this;
}

public Builder<K, V> setExpireAfterAccess(TimeValue expireAfterAcess) {
this.expireAfterAcess = expireAfterAcess;
return this;
}

public OpenSearchOnHeapCache<K, V> build() {
return new OpenSearchOnHeapCache<K, V>(this);
}
}
}
Loading
Loading