Skip to content

Commit

Permalink
[Bugfix] [Tiered Caching] Fixes issues when integrating tiered cache …
Browse files Browse the repository at this point in the history
…with disk cache (opensearch-project#13784)

---------

Signed-off-by: Peter Alfonsi <petealft@amazon.com>
Co-authored-by: Peter Alfonsi <petealft@amazon.com>
(cherry picked from commit e67ced7)
Signed-off-by: Peter Alfonsi <petealft@amazon.com>
  • Loading branch information
peteralfonsi and Peter Alfonsi committed May 23, 2024
1 parent ae782e1 commit ad77481
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
.setValueType(builder.cacheConfig.getValueType())
.setSettings(builder.cacheConfig.getSettings())
.setWeigher(builder.cacheConfig.getWeigher())
.setKeySerializer(builder.cacheConfig.getKeySerializer())
.setValueSerializer(builder.cacheConfig.getValueSerializer())
.setDimensionNames(builder.cacheConfig.getDimensionNames())
.setStatsTrackingEnabled(false)
.build(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ public MockDiskCacheFactory(long delay, int maxSize, boolean statsTrackingEnable
@Override
@SuppressWarnings({ "unchecked" })
public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType, Map<String, Factory> cacheFactories) {
// As we can't directly IT with the tiered cache and ehcache, check that we receive non-null serializers, as an ehcache disk
// cache would require.
assert config.getKeySerializer() != null;
assert config.getValueSerializer() != null;
return new Builder<K, V>().setKeySerializer((Serializer<K, byte[]>) config.getKeySerializer())
.setValueSerializer((Serializer<V, byte[]>) config.getValueSerializer())
.setMaxSize(maxSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.policy.CachedQueryResult;
import org.opensearch.common.cache.serializer.Serializer;
import org.opensearch.common.cache.settings.CacheSettings;
import org.opensearch.common.cache.stats.ImmutableCacheStats;
import org.opensearch.common.cache.stats.ImmutableCacheStatsHolder;
Expand All @@ -32,6 +33,8 @@
import org.junit.Before;

import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -166,6 +169,8 @@ public void testComputeIfAbsentWithFactoryBasedCacheCreation() throws Exception
.setKeyType(String.class)
.setWeigher((k, v) -> keyValueSize)
.setRemovalListener(removalListener)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setSettings(settings)
.setDimensionNames(dimensionNames)
.setCachedResultParser(s -> new CachedQueryResult.PolicyValues(20_000_000L)) // Values will always appear to have taken
Expand Down Expand Up @@ -318,6 +323,8 @@ public void testComputeIfAbsentWithEvictionsFromOnHeapCache() throws Exception {
.setKeyType(String.class)
.setWeigher((k, v) -> keyValueSize)
.setRemovalListener(removalListener)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setDimensionNames(dimensionNames)
.setSettings(
Settings.builder()
Expand Down Expand Up @@ -830,6 +837,8 @@ public void testConcurrencyForEvictionFlowFromOnHeapToDiskTier() throws Exceptio
.setKeyType(String.class)
.setWeigher((k, v) -> 150)
.setRemovalListener(removalListener)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setSettings(
Settings.builder()
.put(
Expand Down Expand Up @@ -1014,6 +1023,8 @@ public void testTookTimePolicyFromFactory() throws Exception {
.setKeyType(String.class)
.setWeigher((k, v) -> keyValueSize)
.setRemovalListener(removalListener)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setSettings(settings)
.setMaxSizeInBytes(onHeapCacheSize * keyValueSize)
.setDimensionNames(dimensionNames)
Expand Down Expand Up @@ -1415,6 +1426,8 @@ private TieredSpilloverCache<String, String> intializeTieredSpilloverCache(
.setSettings(settings)
.setDimensionNames(dimensionNames)
.setRemovalListener(removalListener)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setSettings(
Settings.builder()
.put(
Expand Down Expand Up @@ -1479,4 +1492,26 @@ private ImmutableCacheStats getStatsSnapshotForTier(TieredSpilloverCache<?, ?> t
}
return snapshot;
}

// Duplicated here from EhcacheDiskCacheTests.java, we can't add a dependency on that plugin
static class StringSerializer implements Serializer<String, byte[]> {
private final Charset charset = StandardCharsets.UTF_8;

@Override
public byte[] serialize(String object) {
return object.getBytes(charset);
}

@Override
public String deserialize(byte[] bytes) {
if (bytes == null) {
return null;
}
return new String(bytes, charset);
}

public boolean equals(String object, byte[] bytes) {
return object.equals(deserialize(bytes));
}
}
}
10 changes: 0 additions & 10 deletions plugins/cache-ehcache/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,3 @@ tasks.named("bundlePlugin").configure {
into 'config'
}
}

test {
// TODO: Adding permission in plugin-security.policy doesn't seem to work.
systemProperty 'tests.security.manager', 'false'
}

internalClusterTest {
// TODO: Remove this later once we have a way.
systemProperty 'tests.security.manager', 'false'
}
2 changes: 1 addition & 1 deletion plugins/cache-ehcache/licenses/slf4j-api-LICENSE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
Expand Down Expand Up @@ -175,57 +177,60 @@ private EhcacheDiskCache(Builder<K, V> builder) {

@SuppressWarnings({ "rawtypes" })
private Cache<ICacheKey, ByteArrayWrapper> buildCache(Duration expireAfterAccess, Builder<K, V> builder) {
try {
return this.cacheManager.createCache(
this.diskCacheAlias,
CacheConfigurationBuilder.newCacheConfigurationBuilder(
ICacheKey.class,
ByteArrayWrapper.class,
ResourcePoolsBuilder.newResourcePoolsBuilder().disk(maxWeightInBytes, MemoryUnit.B)
).withExpiry(new ExpiryPolicy<>() {
@Override
public Duration getExpiryForCreation(ICacheKey key, ByteArrayWrapper value) {
return INFINITE;
}

@Override
public Duration getExpiryForAccess(ICacheKey key, Supplier<? extends ByteArrayWrapper> value) {
return expireAfterAccess;
}

@Override
public Duration getExpiryForUpdate(
ICacheKey key,
Supplier<? extends ByteArrayWrapper> oldValue,
ByteArrayWrapper newValue
) {
return INFINITE;
}
})
.withService(getListenerConfiguration(builder))
.withService(
new OffHeapDiskStoreConfiguration(
this.threadPoolAlias,
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType)
.get(DISK_WRITE_CONCURRENCY_KEY)
.get(settings),
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType).get(DISK_SEGMENT_KEY).get(settings)
// Creating the cache requires permissions specified in plugin-security.policy
return AccessController.doPrivileged((PrivilegedAction<Cache<ICacheKey, ByteArrayWrapper>>) () -> {
try {
return this.cacheManager.createCache(
this.diskCacheAlias,
CacheConfigurationBuilder.newCacheConfigurationBuilder(
ICacheKey.class,
ByteArrayWrapper.class,
ResourcePoolsBuilder.newResourcePoolsBuilder().disk(maxWeightInBytes, MemoryUnit.B)
).withExpiry(new ExpiryPolicy<>() {
@Override
public Duration getExpiryForCreation(ICacheKey key, ByteArrayWrapper value) {
return INFINITE;
}

@Override
public Duration getExpiryForAccess(ICacheKey key, Supplier<? extends ByteArrayWrapper> value) {
return expireAfterAccess;
}

@Override
public Duration getExpiryForUpdate(
ICacheKey key,
Supplier<? extends ByteArrayWrapper> oldValue,
ByteArrayWrapper newValue
) {
return INFINITE;
}
})
.withService(getListenerConfiguration(builder))
.withService(
new OffHeapDiskStoreConfiguration(
this.threadPoolAlias,
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType)
.get(DISK_WRITE_CONCURRENCY_KEY)
.get(settings),
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType).get(DISK_SEGMENT_KEY).get(settings)
)
)
)
.withKeySerializer(new KeySerializerWrapper(keySerializer))
.withValueSerializer(new ByteArrayWrapperSerializer())
.withKeySerializer(new KeySerializerWrapper(keySerializer))
.withValueSerializer(new ByteArrayWrapperSerializer())
// We pass ByteArrayWrapperSerializer as ehcache's value serializer. If V is an interface, and we pass its
// serializer directly to ehcache, ehcache requires the classes match exactly before/after serialization.
// This is not always feasible or necessary, like for BytesReference. So, we handle the value serialization
// before V hits ehcache.
);
} catch (IllegalArgumentException ex) {
logger.error("Ehcache disk cache initialization failed due to illegal argument: {}", ex.getMessage());
throw ex;
} catch (IllegalStateException ex) {
logger.error("Ehcache disk cache initialization failed: {}", ex.getMessage());
throw ex;
}
);
} catch (IllegalArgumentException ex) {
logger.error("Ehcache disk cache initialization failed due to illegal argument: {}", ex.getMessage());
throw ex;
} catch (IllegalStateException ex) {
logger.error("Ehcache disk cache initialization failed: {}", ex.getMessage());
throw ex;
}
});
}

private CacheEventListenerConfigurationBuilder getListenerConfiguration(Builder<K, V> builder) {
Expand All @@ -252,25 +257,28 @@ Map<ICacheKey<K>, CompletableFuture<Tuple<ICacheKey<K>, V>>> getCompletableFutur
@SuppressForbidden(reason = "Ehcache uses File.io")
private PersistentCacheManager buildCacheManager() {
// In case we use multiple ehCaches, we can define this cache manager at a global level.
return CacheManagerBuilder.newCacheManagerBuilder()
.with(CacheManagerBuilder.persistence(new File(storagePath)))

.using(
PooledExecutionServiceConfigurationBuilder.newPooledExecutionServiceConfigurationBuilder()
.defaultPool(THREAD_POOL_ALIAS_PREFIX + "Default#" + UNIQUE_ID, 1, 3) // Default pool used for other tasks
// like event listeners
.pool(
this.threadPoolAlias,
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType)
.get(DISK_WRITE_MIN_THREADS_KEY)
.get(settings),
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType)
.get(DISK_WRITE_MAXIMUM_THREADS_KEY)
.get(settings)
)
.build()
)
.build(true);
// Creating the cache manager also requires permissions specified in plugin-security.policy
return AccessController.doPrivileged((PrivilegedAction<PersistentCacheManager>) () -> {
return CacheManagerBuilder.newCacheManagerBuilder()
.with(CacheManagerBuilder.persistence(new File(storagePath)))

.using(
PooledExecutionServiceConfigurationBuilder.newPooledExecutionServiceConfigurationBuilder()
.defaultPool(THREAD_POOL_ALIAS_PREFIX + "Default#" + UNIQUE_ID, 1, 3) // Default pool used for other tasks
// like event listeners
.pool(
this.threadPoolAlias,
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType)
.get(DISK_WRITE_MIN_THREADS_KEY)
.get(settings),
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType)
.get(DISK_WRITE_MAXIMUM_THREADS_KEY)
.get(settings)
)
.build()
)
.build(true);
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,8 @@
grant {
permission java.lang.RuntimePermission "accessClassInPackage.sun.misc";
permission java.lang.RuntimePermission "createClassLoader";
permission java.lang.RuntimePermission "accessDeclaredMembers";
permission java.lang.reflect.ReflectPermission "suppressAccessChecks";
permission java.lang.RuntimePermission "getClassLoader";
};

0 comments on commit ad77481

Please sign in to comment.