forked from opensearch-project/OpenSearch
-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Tiered caching] Introducing cache plugins and exposing Ehcache as on…
…e of the pluggable disk cache option (opensearch-project#11874) Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com> Signed-off-by: Sagar <99425694+sgup432@users.noreply.github.com> Signed-off-by: Aman Khare <amkhar@amazon.com>
- Loading branch information
Showing
54 changed files
with
3,403 additions
and
720 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
opensearchplugin { | ||
description 'Module for caches which are optional and do not require additional security permission' | ||
classname 'org.opensearch.cache.common.tier.TieredSpilloverCachePlugin' | ||
} | ||
|
||
test { | ||
// TODO: Adding permission in plugin-security.policy doesn't seem to work. | ||
systemProperty 'tests.security.manager', 'false' | ||
} |
335 changes: 335 additions & 0 deletions
335
...les/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,335 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.cache.common.tier; | ||
|
||
import org.opensearch.common.annotation.ExperimentalApi; | ||
import org.opensearch.common.cache.CacheType; | ||
import org.opensearch.common.cache.ICache; | ||
import org.opensearch.common.cache.LoadAwareCacheLoader; | ||
import org.opensearch.common.cache.RemovalListener; | ||
import org.opensearch.common.cache.RemovalNotification; | ||
import org.opensearch.common.cache.store.config.CacheConfig; | ||
import org.opensearch.common.settings.Setting; | ||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.common.util.concurrent.ReleasableLock; | ||
import org.opensearch.common.util.iterable.Iterables; | ||
|
||
import java.io.IOException; | ||
import java.util.Arrays; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.concurrent.locks.ReadWriteLock; | ||
import java.util.concurrent.locks.ReentrantReadWriteLock; | ||
import java.util.function.Function; | ||
|
||
/** | ||
* This cache spillover the evicted items from heap tier to disk tier. All the new items are first cached on heap | ||
* and the items evicted from on heap cache are moved to disk based cache. If disk based cache also gets full, | ||
* then items are eventually evicted from it and removed which will result in cache miss. | ||
* | ||
* @param <K> Type of key | ||
* @param <V> Type of value | ||
* | ||
* @opensearch.experimental | ||
*/ | ||
@ExperimentalApi | ||
public class TieredSpilloverCache<K, V> implements ICache<K, V> { | ||
|
||
private final ICache<K, V> diskCache; | ||
private final ICache<K, V> onHeapCache; | ||
private final RemovalListener<K, V> removalListener; | ||
ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); | ||
ReleasableLock readLock = new ReleasableLock(readWriteLock.readLock()); | ||
ReleasableLock writeLock = new ReleasableLock(readWriteLock.writeLock()); | ||
/** | ||
* Maintains caching tiers in ascending order of cache latency. | ||
*/ | ||
private final List<ICache<K, V>> cacheList; | ||
|
||
TieredSpilloverCache(Builder<K, V> builder) { | ||
Objects.requireNonNull(builder.onHeapCacheFactory, "onHeap cache builder can't be null"); | ||
Objects.requireNonNull(builder.diskCacheFactory, "disk cache builder can't be null"); | ||
this.removalListener = Objects.requireNonNull(builder.removalListener, "Removal listener can't be null"); | ||
|
||
this.onHeapCache = builder.onHeapCacheFactory.create( | ||
new CacheConfig.Builder<K, V>().setRemovalListener(new RemovalListener<K, V>() { | ||
@Override | ||
public void onRemoval(RemovalNotification<K, V> notification) { | ||
try (ReleasableLock ignore = writeLock.acquire()) { | ||
diskCache.put(notification.getKey(), notification.getValue()); | ||
} | ||
removalListener.onRemoval(notification); | ||
} | ||
}) | ||
.setKeyType(builder.cacheConfig.getKeyType()) | ||
.setValueType(builder.cacheConfig.getValueType()) | ||
.setSettings(builder.cacheConfig.getSettings()) | ||
.setWeigher(builder.cacheConfig.getWeigher()) | ||
.build(), | ||
builder.cacheType, | ||
builder.cacheFactories | ||
|
||
); | ||
this.diskCache = builder.diskCacheFactory.create(builder.cacheConfig, builder.cacheType, builder.cacheFactories); | ||
this.cacheList = Arrays.asList(onHeapCache, diskCache); | ||
} | ||
|
||
// Package private for testing | ||
ICache<K, V> getOnHeapCache() { | ||
return onHeapCache; | ||
} | ||
|
||
// Package private for testing | ||
ICache<K, V> getDiskCache() { | ||
return diskCache; | ||
} | ||
|
||
@Override | ||
public V get(K key) { | ||
return getValueFromTieredCache().apply(key); | ||
} | ||
|
||
@Override | ||
public void put(K key, V value) { | ||
try (ReleasableLock ignore = writeLock.acquire()) { | ||
onHeapCache.put(key, value); | ||
} | ||
} | ||
|
||
@Override | ||
public V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) throws Exception { | ||
|
||
V cacheValue = getValueFromTieredCache().apply(key); | ||
if (cacheValue == null) { | ||
// Add the value to the onHeap cache. We are calling computeIfAbsent which does another get inside. | ||
// This is needed as there can be many requests for the same key at the same time and we only want to load | ||
// the value once. | ||
V value = null; | ||
try (ReleasableLock ignore = writeLock.acquire()) { | ||
value = onHeapCache.computeIfAbsent(key, loader); | ||
} | ||
return value; | ||
} | ||
return cacheValue; | ||
} | ||
|
||
@Override | ||
public void invalidate(K key) { | ||
// We are trying to invalidate the key from all caches though it would be present in only of them. | ||
// Doing this as we don't know where it is located. We could do a get from both and check that, but what will | ||
// also trigger a hit/miss listener event, so ignoring it for now. | ||
try (ReleasableLock ignore = writeLock.acquire()) { | ||
for (ICache<K, V> cache : cacheList) { | ||
cache.invalidate(key); | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
public void invalidateAll() { | ||
try (ReleasableLock ignore = writeLock.acquire()) { | ||
for (ICache<K, V> cache : cacheList) { | ||
cache.invalidateAll(); | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Provides an iteration over both onHeap and disk keys. This is not protected from any mutations to the cache. | ||
* @return An iterable over (onHeap + disk) keys | ||
*/ | ||
@SuppressWarnings("unchecked") | ||
@Override | ||
public Iterable<K> keys() { | ||
return Iterables.concat(onHeapCache.keys(), diskCache.keys()); | ||
} | ||
|
||
@Override | ||
public long count() { | ||
long count = 0; | ||
for (ICache<K, V> cache : cacheList) { | ||
count += cache.count(); | ||
} | ||
return count; | ||
} | ||
|
||
@Override | ||
public void refresh() { | ||
try (ReleasableLock ignore = writeLock.acquire()) { | ||
for (ICache<K, V> cache : cacheList) { | ||
cache.refresh(); | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
public void close() throws IOException { | ||
for (ICache<K, V> cache : cacheList) { | ||
cache.close(); | ||
} | ||
} | ||
|
||
private Function<K, V> getValueFromTieredCache() { | ||
return key -> { | ||
try (ReleasableLock ignore = readLock.acquire()) { | ||
for (ICache<K, V> cache : cacheList) { | ||
V value = cache.get(key); | ||
if (value != null) { | ||
// update hit stats | ||
return value; | ||
} else { | ||
// update miss stats | ||
} | ||
} | ||
} | ||
return null; | ||
}; | ||
} | ||
|
||
/** | ||
* Factory to create TieredSpilloverCache objects. | ||
*/ | ||
public static class TieredSpilloverCacheFactory implements ICache.Factory { | ||
|
||
/** | ||
* Defines cache name | ||
*/ | ||
public static final String TIERED_SPILLOVER_CACHE_NAME = "tiered_spillover"; | ||
|
||
/** | ||
* Default constructor | ||
*/ | ||
public TieredSpilloverCacheFactory() {} | ||
|
||
@Override | ||
public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType, Map<String, Factory> cacheFactories) { | ||
Settings settings = config.getSettings(); | ||
Setting<String> onHeapSetting = TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_NAME.getConcreteSettingForNamespace( | ||
cacheType.getSettingPrefix() | ||
); | ||
String onHeapCacheStoreName = onHeapSetting.get(settings); | ||
if (!cacheFactories.containsKey(onHeapCacheStoreName)) { | ||
throw new IllegalArgumentException( | ||
"No associated onHeapCache found for tieredSpilloverCache for " + "cacheType:" + cacheType | ||
); | ||
} | ||
ICache.Factory onHeapCacheFactory = cacheFactories.get(onHeapCacheStoreName); | ||
|
||
Setting<String> onDiskSetting = TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace( | ||
cacheType.getSettingPrefix() | ||
); | ||
String diskCacheStoreName = onDiskSetting.get(settings); | ||
if (!cacheFactories.containsKey(diskCacheStoreName)) { | ||
throw new IllegalArgumentException( | ||
"No associated diskCache found for tieredSpilloverCache for " + "cacheType:" + cacheType | ||
); | ||
} | ||
ICache.Factory diskCacheFactory = cacheFactories.get(diskCacheStoreName); | ||
return new Builder<K, V>().setDiskCacheFactory(diskCacheFactory) | ||
.setOnHeapCacheFactory(onHeapCacheFactory) | ||
.setRemovalListener(config.getRemovalListener()) | ||
.setCacheConfig(config) | ||
.setCacheType(cacheType) | ||
.build(); | ||
} | ||
|
||
@Override | ||
public String getCacheName() { | ||
return TIERED_SPILLOVER_CACHE_NAME; | ||
} | ||
} | ||
|
||
/** | ||
* Builder object for tiered spillover cache. | ||
* @param <K> Type of key | ||
* @param <V> Type of value | ||
*/ | ||
public static class Builder<K, V> { | ||
private ICache.Factory onHeapCacheFactory; | ||
private ICache.Factory diskCacheFactory; | ||
private RemovalListener<K, V> removalListener; | ||
private CacheConfig<K, V> cacheConfig; | ||
private CacheType cacheType; | ||
private Map<String, ICache.Factory> cacheFactories; | ||
|
||
/** | ||
* Default constructor | ||
*/ | ||
public Builder() {} | ||
|
||
/** | ||
* Set onHeap cache factory | ||
* @param onHeapCacheFactory Factory for onHeap cache. | ||
* @return builder | ||
*/ | ||
public Builder<K, V> setOnHeapCacheFactory(ICache.Factory onHeapCacheFactory) { | ||
this.onHeapCacheFactory = onHeapCacheFactory; | ||
return this; | ||
} | ||
|
||
/** | ||
* Set disk cache factory | ||
* @param diskCacheFactory Factory for disk cache. | ||
* @return builder | ||
*/ | ||
public Builder<K, V> setDiskCacheFactory(ICache.Factory diskCacheFactory) { | ||
this.diskCacheFactory = diskCacheFactory; | ||
return this; | ||
} | ||
|
||
/** | ||
* Set removal listener for tiered cache. | ||
* @param removalListener Removal listener | ||
* @return builder | ||
*/ | ||
public Builder<K, V> setRemovalListener(RemovalListener<K, V> removalListener) { | ||
this.removalListener = removalListener; | ||
return this; | ||
} | ||
|
||
/** | ||
* Set cache config. | ||
* @param cacheConfig cache config. | ||
* @return builder | ||
*/ | ||
public Builder<K, V> setCacheConfig(CacheConfig<K, V> cacheConfig) { | ||
this.cacheConfig = cacheConfig; | ||
return this; | ||
} | ||
|
||
/** | ||
* Set cache type. | ||
* @param cacheType Cache type | ||
* @return builder | ||
*/ | ||
public Builder<K, V> setCacheType(CacheType cacheType) { | ||
this.cacheType = cacheType; | ||
return this; | ||
} | ||
|
||
/** | ||
* Set cache factories | ||
* @param cacheFactories cache factories | ||
* @return builder | ||
*/ | ||
public Builder<K, V> setCacheFactories(Map<String, ICache.Factory> cacheFactories) { | ||
this.cacheFactories = cacheFactories; | ||
return this; | ||
} | ||
|
||
/** | ||
* Build tiered spillover cache. | ||
* @return TieredSpilloverCache | ||
*/ | ||
public TieredSpilloverCache<K, V> build() { | ||
return new TieredSpilloverCache<>(this); | ||
} | ||
} | ||
} |
Oops, something went wrong.