From ad774810a444b4f1457592672b6f9ff572a0f932 Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Thu, 23 May 2024 12:02:22 -0700 Subject: [PATCH] [Bugfix] [Tiered Caching] Fixes issues when integrating tiered cache with disk cache (#13784) --------- Signed-off-by: Peter Alfonsi Co-authored-by: Peter Alfonsi (cherry picked from commit e67ced73226453d5a5504c78f3b7d5ae90b4914e) Signed-off-by: Peter Alfonsi --- .../common/tier/TieredSpilloverCache.java | 2 + .../cache/common/tier/MockDiskCache.java | 4 + .../tier/TieredSpilloverCacheTests.java | 35 +++++ plugins/cache-ehcache/build.gradle | 10 -- .../licenses/slf4j-api-LICENSE.txt | 2 +- .../cache/store/disk/EhcacheDiskCache.java | 138 +++++++++--------- .../plugin-metadata/plugin-security.policy | 3 + 7 files changed, 118 insertions(+), 76 deletions(-) diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java index 9942651ccdd67..f40c35dde83de 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java @@ -119,6 +119,8 @@ public class TieredSpilloverCache implements ICache { .setValueType(builder.cacheConfig.getValueType()) .setSettings(builder.cacheConfig.getSettings()) .setWeigher(builder.cacheConfig.getWeigher()) + .setKeySerializer(builder.cacheConfig.getKeySerializer()) + .setValueSerializer(builder.cacheConfig.getValueSerializer()) .setDimensionNames(builder.cacheConfig.getDimensionNames()) .setStatsTrackingEnabled(false) .build(), diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/MockDiskCache.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/MockDiskCache.java index 2058faa5181b1..69e2060f7ea2f 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/MockDiskCache.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/MockDiskCache.java @@ -141,6 +141,10 @@ public MockDiskCacheFactory(long delay, int maxSize, boolean statsTrackingEnable @Override @SuppressWarnings({ "unchecked" }) public ICache create(CacheConfig config, CacheType cacheType, Map cacheFactories) { + // As we can't directly IT with the tiered cache and ehcache, check that we receive non-null serializers, as an ehcache disk + // cache would require. + assert config.getKeySerializer() != null; + assert config.getValueSerializer() != null; return new Builder().setKeySerializer((Serializer) config.getKeySerializer()) .setValueSerializer((Serializer) config.getValueSerializer()) .setMaxSize(maxSize) diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java index 6d5ee91326338..6c49341591589 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java @@ -16,6 +16,7 @@ import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; import org.opensearch.common.cache.policy.CachedQueryResult; +import org.opensearch.common.cache.serializer.Serializer; import org.opensearch.common.cache.settings.CacheSettings; import org.opensearch.common.cache.stats.ImmutableCacheStats; import org.opensearch.common.cache.stats.ImmutableCacheStatsHolder; @@ -32,6 +33,8 @@ import org.junit.Before; import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -166,6 +169,8 @@ public void testComputeIfAbsentWithFactoryBasedCacheCreation() throws Exception .setKeyType(String.class) .setWeigher((k, v) -> keyValueSize) .setRemovalListener(removalListener) + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new StringSerializer()) .setSettings(settings) .setDimensionNames(dimensionNames) .setCachedResultParser(s -> new CachedQueryResult.PolicyValues(20_000_000L)) // Values will always appear to have taken @@ -318,6 +323,8 @@ public void testComputeIfAbsentWithEvictionsFromOnHeapCache() throws Exception { .setKeyType(String.class) .setWeigher((k, v) -> keyValueSize) .setRemovalListener(removalListener) + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new StringSerializer()) .setDimensionNames(dimensionNames) .setSettings( Settings.builder() @@ -830,6 +837,8 @@ public void testConcurrencyForEvictionFlowFromOnHeapToDiskTier() throws Exceptio .setKeyType(String.class) .setWeigher((k, v) -> 150) .setRemovalListener(removalListener) + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new StringSerializer()) .setSettings( Settings.builder() .put( @@ -1014,6 +1023,8 @@ public void testTookTimePolicyFromFactory() throws Exception { .setKeyType(String.class) .setWeigher((k, v) -> keyValueSize) .setRemovalListener(removalListener) + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new StringSerializer()) .setSettings(settings) .setMaxSizeInBytes(onHeapCacheSize * keyValueSize) .setDimensionNames(dimensionNames) @@ -1415,6 +1426,8 @@ private TieredSpilloverCache intializeTieredSpilloverCache( .setSettings(settings) .setDimensionNames(dimensionNames) .setRemovalListener(removalListener) + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new StringSerializer()) .setSettings( Settings.builder() .put( @@ -1479,4 +1492,26 @@ private ImmutableCacheStats getStatsSnapshotForTier(TieredSpilloverCache t } return snapshot; } + + // Duplicated here from EhcacheDiskCacheTests.java, we can't add a dependency on that plugin + static class StringSerializer implements Serializer { + private final Charset charset = StandardCharsets.UTF_8; + + @Override + public byte[] serialize(String object) { + return object.getBytes(charset); + } + + @Override + public String deserialize(byte[] bytes) { + if (bytes == null) { + return null; + } + return new String(bytes, charset); + } + + public boolean equals(String object, byte[] bytes) { + return object.equals(deserialize(bytes)); + } + } } diff --git a/plugins/cache-ehcache/build.gradle b/plugins/cache-ehcache/build.gradle index 07113849c6511..5747624e2fb69 100644 --- a/plugins/cache-ehcache/build.gradle +++ b/plugins/cache-ehcache/build.gradle @@ -90,13 +90,3 @@ tasks.named("bundlePlugin").configure { into 'config' } } - -test { - // TODO: Adding permission in plugin-security.policy doesn't seem to work. - systemProperty 'tests.security.manager', 'false' -} - -internalClusterTest { - // TODO: Remove this later once we have a way. - systemProperty 'tests.security.manager', 'false' -} diff --git a/plugins/cache-ehcache/licenses/slf4j-api-LICENSE.txt b/plugins/cache-ehcache/licenses/slf4j-api-LICENSE.txt index 2be7689435062..54512cc08d16b 100644 --- a/plugins/cache-ehcache/licenses/slf4j-api-LICENSE.txt +++ b/plugins/cache-ehcache/licenses/slf4j-api-LICENSE.txt @@ -18,4 +18,4 @@ MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION -WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. \ No newline at end of file +WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/plugins/cache-ehcache/src/main/java/org/opensearch/cache/store/disk/EhcacheDiskCache.java b/plugins/cache-ehcache/src/main/java/org/opensearch/cache/store/disk/EhcacheDiskCache.java index 9a4dce1067b61..b4c62fbf85cb8 100644 --- a/plugins/cache-ehcache/src/main/java/org/opensearch/cache/store/disk/EhcacheDiskCache.java +++ b/plugins/cache-ehcache/src/main/java/org/opensearch/cache/store/disk/EhcacheDiskCache.java @@ -42,6 +42,8 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.security.AccessController; +import java.security.PrivilegedAction; import java.time.Duration; import java.util.Arrays; import java.util.Iterator; @@ -175,57 +177,60 @@ private EhcacheDiskCache(Builder builder) { @SuppressWarnings({ "rawtypes" }) private Cache buildCache(Duration expireAfterAccess, Builder builder) { - try { - return this.cacheManager.createCache( - this.diskCacheAlias, - CacheConfigurationBuilder.newCacheConfigurationBuilder( - ICacheKey.class, - ByteArrayWrapper.class, - ResourcePoolsBuilder.newResourcePoolsBuilder().disk(maxWeightInBytes, MemoryUnit.B) - ).withExpiry(new ExpiryPolicy<>() { - @Override - public Duration getExpiryForCreation(ICacheKey key, ByteArrayWrapper value) { - return INFINITE; - } - - @Override - public Duration getExpiryForAccess(ICacheKey key, Supplier value) { - return expireAfterAccess; - } - - @Override - public Duration getExpiryForUpdate( - ICacheKey key, - Supplier oldValue, - ByteArrayWrapper newValue - ) { - return INFINITE; - } - }) - .withService(getListenerConfiguration(builder)) - .withService( - new OffHeapDiskStoreConfiguration( - this.threadPoolAlias, - (Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType) - .get(DISK_WRITE_CONCURRENCY_KEY) - .get(settings), - (Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType).get(DISK_SEGMENT_KEY).get(settings) + // Creating the cache requires permissions specified in plugin-security.policy + return AccessController.doPrivileged((PrivilegedAction>) () -> { + try { + return this.cacheManager.createCache( + this.diskCacheAlias, + CacheConfigurationBuilder.newCacheConfigurationBuilder( + ICacheKey.class, + ByteArrayWrapper.class, + ResourcePoolsBuilder.newResourcePoolsBuilder().disk(maxWeightInBytes, MemoryUnit.B) + ).withExpiry(new ExpiryPolicy<>() { + @Override + public Duration getExpiryForCreation(ICacheKey key, ByteArrayWrapper value) { + return INFINITE; + } + + @Override + public Duration getExpiryForAccess(ICacheKey key, Supplier value) { + return expireAfterAccess; + } + + @Override + public Duration getExpiryForUpdate( + ICacheKey key, + Supplier oldValue, + ByteArrayWrapper newValue + ) { + return INFINITE; + } + }) + .withService(getListenerConfiguration(builder)) + .withService( + new OffHeapDiskStoreConfiguration( + this.threadPoolAlias, + (Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType) + .get(DISK_WRITE_CONCURRENCY_KEY) + .get(settings), + (Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType).get(DISK_SEGMENT_KEY).get(settings) + ) ) - ) - .withKeySerializer(new KeySerializerWrapper(keySerializer)) - .withValueSerializer(new ByteArrayWrapperSerializer()) + .withKeySerializer(new KeySerializerWrapper(keySerializer)) + .withValueSerializer(new ByteArrayWrapperSerializer()) // We pass ByteArrayWrapperSerializer as ehcache's value serializer. If V is an interface, and we pass its // serializer directly to ehcache, ehcache requires the classes match exactly before/after serialization. // This is not always feasible or necessary, like for BytesReference. So, we handle the value serialization // before V hits ehcache. - ); - } catch (IllegalArgumentException ex) { - logger.error("Ehcache disk cache initialization failed due to illegal argument: {}", ex.getMessage()); - throw ex; - } catch (IllegalStateException ex) { - logger.error("Ehcache disk cache initialization failed: {}", ex.getMessage()); - throw ex; - } + ); + } catch (IllegalArgumentException ex) { + logger.error("Ehcache disk cache initialization failed due to illegal argument: {}", ex.getMessage()); + throw ex; + } catch (IllegalStateException ex) { + logger.error("Ehcache disk cache initialization failed: {}", ex.getMessage()); + throw ex; + } + }); } private CacheEventListenerConfigurationBuilder getListenerConfiguration(Builder builder) { @@ -252,25 +257,28 @@ Map, CompletableFuture, V>>> getCompletableFutur @SuppressForbidden(reason = "Ehcache uses File.io") private PersistentCacheManager buildCacheManager() { // In case we use multiple ehCaches, we can define this cache manager at a global level. - return CacheManagerBuilder.newCacheManagerBuilder() - .with(CacheManagerBuilder.persistence(new File(storagePath))) - - .using( - PooledExecutionServiceConfigurationBuilder.newPooledExecutionServiceConfigurationBuilder() - .defaultPool(THREAD_POOL_ALIAS_PREFIX + "Default#" + UNIQUE_ID, 1, 3) // Default pool used for other tasks - // like event listeners - .pool( - this.threadPoolAlias, - (Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType) - .get(DISK_WRITE_MIN_THREADS_KEY) - .get(settings), - (Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType) - .get(DISK_WRITE_MAXIMUM_THREADS_KEY) - .get(settings) - ) - .build() - ) - .build(true); + // Creating the cache manager also requires permissions specified in plugin-security.policy + return AccessController.doPrivileged((PrivilegedAction) () -> { + return CacheManagerBuilder.newCacheManagerBuilder() + .with(CacheManagerBuilder.persistence(new File(storagePath))) + + .using( + PooledExecutionServiceConfigurationBuilder.newPooledExecutionServiceConfigurationBuilder() + .defaultPool(THREAD_POOL_ALIAS_PREFIX + "Default#" + UNIQUE_ID, 1, 3) // Default pool used for other tasks + // like event listeners + .pool( + this.threadPoolAlias, + (Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType) + .get(DISK_WRITE_MIN_THREADS_KEY) + .get(settings), + (Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType) + .get(DISK_WRITE_MAXIMUM_THREADS_KEY) + .get(settings) + ) + .build() + ) + .build(true); + }); } @Override diff --git a/plugins/cache-ehcache/src/main/plugin-metadata/plugin-security.policy b/plugins/cache-ehcache/src/main/plugin-metadata/plugin-security.policy index 40007eea62dba..85c82824d5d65 100644 --- a/plugins/cache-ehcache/src/main/plugin-metadata/plugin-security.policy +++ b/plugins/cache-ehcache/src/main/plugin-metadata/plugin-security.policy @@ -9,5 +9,8 @@ grant { permission java.lang.RuntimePermission "accessClassInPackage.sun.misc"; permission java.lang.RuntimePermission "createClassLoader"; + permission java.lang.RuntimePermission "accessDeclaredMembers"; + permission java.lang.reflect.ReflectPermission "suppressAccessChecks"; + permission java.lang.RuntimePermission "getClassLoader"; };