Skip to content

Commit

Permalink
Added tests around expiration time and invalidation
Browse files Browse the repository at this point in the history
Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com>
  • Loading branch information
sgup432 committed Apr 12, 2024
1 parent d76f405 commit 87a49a4
Show file tree
Hide file tree
Showing 3 changed files with 211 additions and 19 deletions.
5 changes: 5 additions & 0 deletions plugins/cache-ehcache/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,8 @@ test {
// TODO: Adding permission in plugin-security.policy doesn't seem to work.
systemProperty 'tests.security.manager', 'false'
}

internalClusterTest {
// TODO: Remove this later once we have a way.
systemProperty 'tests.security.manager', 'false'
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules;
import org.opensearch.action.admin.indices.cache.clear.ClearIndicesCacheRequest;
import org.opensearch.action.admin.indices.cache.clear.ClearIndicesCacheResponse;
import org.opensearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchType;
import org.opensearch.cache.store.disk.EhcacheDiskCache;
Expand All @@ -25,6 +26,7 @@
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.settings.CacheSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.cache.request.RequestCacheStats;
Expand All @@ -34,24 +36,33 @@
import org.opensearch.plugins.PluginInfo;
import org.opensearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.hamcrest.OpenSearchAssertions;
import org.junit.Assert;

import java.io.IOException;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.cache.EhcacheDiskCacheSettings.DEFAULT_CACHE_SIZE_IN_BYTES;
import static org.opensearch.cache.EhcacheDiskCacheSettings.DISK_CACHE_EXPIRE_AFTER_ACCESS_KEY;
import static org.opensearch.cache.EhcacheDiskCacheSettings.DISK_LISTENER_MODE_SYNC_KEY;
import static org.opensearch.cache.EhcacheDiskCacheSettings.DISK_MAX_SIZE_IN_BYTES_KEY;
import static org.opensearch.cache.EhcacheDiskCacheSettings.DISK_STORAGE_PATH_KEY;
import static org.opensearch.indices.IndicesService.INDICES_CACHE_CLEAN_INTERVAL_SETTING;
import static org.opensearch.search.aggregations.AggregationBuilders.dateHistogram;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse;
import static org.hamcrest.Matchers.greaterThan;

@OpenSearchIntegTestCase.ClusterScope(numDataNodes = 0, scope = OpenSearchIntegTestCase.Scope.TEST)
@ThreadLeakFilters(filters = { EhcacheThreadLeakFilter.class })
public class EhcacheDiskCacheIT extends OpenSearchIntegTestCase {

Expand All @@ -65,16 +76,17 @@ protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.PLUGGABLE_CACHE, "true").build();
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
private Settings defaultSettings(long sizeInBytes, TimeValue expirationTime) {
if (expirationTime == null) {
expirationTime = TimeValue.MAX_VALUE;
}
try (NodeEnvironment env = newNodeEnvironment(Settings.EMPTY)) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(
EhcacheDiskCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE)
.get(DISK_STORAGE_PATH_KEY)
.getKey(),
env.nodePaths()[0].indicesPath.toString() + "/request_cache"
env.nodePaths()[0].indicesPath.toString() + "/" + UUID.randomUUID() + "/request_cache/"
)
.put(
CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(),
Expand All @@ -86,13 +98,26 @@ protected Settings nodeSettings(int nodeOrdinal) {
.getKey(),
true
)
.put(
EhcacheDiskCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE)
.get(DISK_MAX_SIZE_IN_BYTES_KEY)
.getKey(),
sizeInBytes
)
.put(
EhcacheDiskCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE)
.get(DISK_CACHE_EXPIRE_AFTER_ACCESS_KEY)
.getKey(),
expirationTime
)
.build();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

public void testPluginsAreInstalled() {
internalCluster().startNode(Settings.builder().put(defaultSettings(DEFAULT_CACHE_SIZE_IN_BYTES, null)).build());
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.addMetric(NodesInfoRequest.Metric.PLUGINS.metricName());
NodesInfoResponse nodesInfoResponse = OpenSearchIntegTestCase.client().admin().cluster().nodesInfo(nodesInfoRequest).actionGet();
Expand All @@ -108,13 +133,20 @@ public void testPluginsAreInstalled() {
}

public void testSanityChecksWithIndicesRequestCache() throws InterruptedException {
internalCluster().startNode(Settings.builder().put(defaultSettings(DEFAULT_CACHE_SIZE_IN_BYTES, null)).build());
Client client = client();
assertAcked(
client.admin()
.indices()
.prepareCreate("index")
.setMapping("f", "type=date")
.setSettings(Settings.builder().put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true).build())
.setSettings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.build()
)
.get()
);
indexRandom(
Expand Down Expand Up @@ -146,7 +178,13 @@ public void testSanityChecksWithIndicesRequestCache() throws InterruptedExceptio
);
}

public void testInvalidationAndCleanupLogicWithIndicesRequestCache() throws Exception {
public void testInvalidationWithIndicesRequestCache() throws Exception {
internalCluster().startNode(
Settings.builder()
.put(defaultSettings(DEFAULT_CACHE_SIZE_IN_BYTES, null))
.put(INDICES_CACHE_CLEAN_INTERVAL_SETTING.getKey(), new TimeValue(1))
.build()
);
Client client = client();
assertAcked(
client.admin()
Expand All @@ -158,49 +196,193 @@ public void testInvalidationAndCleanupLogicWithIndicesRequestCache() throws Exce
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("index.refresh_interval", -1)
)
.get()
);
int numberOfIndexedItems = randomIntBetween(2, 10);
int numberOfIndexedItems = randomIntBetween(5, 10);
for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) {
indexRandom(true, client.prepareIndex("index").setSource("k" + iterator, "hello" + iterator));
}
ensureSearchable("index");
refreshAndWaitForReplication();
// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get();
OpenSearchAssertions.assertAllSuccessful(forceMergeResponse);
long perQuerySizeInCacheInBytes = -1;
for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) {
SearchResponse resp = client.prepareSearch("index")
.setRequestCache(true)
.setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator))
.get();
if (perQuerySizeInCacheInBytes == -1) {
RequestCacheStats requestCacheStats = getRequestCacheStats(client, "index");
perQuerySizeInCacheInBytes = requestCacheStats.getMemorySizeInBytes();
}
assertSearchResponse(resp);
}
RequestCacheStats requestCacheStats = client.admin()
.indices()
.prepareStats("index")
.setRequestCache(true)
.get()
.getTotal()
.getRequestCache();
RequestCacheStats requestCacheStats = getRequestCacheStats(client, "index");
assertEquals(numberOfIndexedItems, requestCacheStats.getMissCount());
assertEquals(0, requestCacheStats.getHitCount());
assertEquals(0, requestCacheStats.getEvictions());
assertTrue(requestCacheStats.getMemorySizeInBytes() > 0);
assertEquals(perQuerySizeInCacheInBytes * numberOfIndexedItems, requestCacheStats.getMemorySizeInBytes());
for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) {
SearchResponse resp = client.prepareSearch("index")
.setRequestCache(true)
.setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator))
.get();
assertSearchResponse(resp);
}
requestCacheStats = client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache();
requestCacheStats = getRequestCacheStats(client, "index");
assertEquals(numberOfIndexedItems, requestCacheStats.getHitCount());
assertEquals(numberOfIndexedItems, requestCacheStats.getMissCount());
assertEquals(perQuerySizeInCacheInBytes * numberOfIndexedItems, requestCacheStats.getMemorySizeInBytes());
assertEquals(0, requestCacheStats.getEvictions());
// Explicit refresh would invalidate cache entries.
refreshAndWaitForReplication();
assertBusy(() -> {
// Explicit refresh should clear up cache entries
assertTrue(getRequestCacheStats(client, "index").getMemorySizeInBytes() == 0);
}, 1, TimeUnit.SECONDS);
requestCacheStats = getRequestCacheStats(client, "index");
assertEquals(0, requestCacheStats.getMemorySizeInBytes());
// Hits and misses stats shouldn't get cleared up.
assertEquals(numberOfIndexedItems, requestCacheStats.getHitCount());
assertEquals(numberOfIndexedItems, requestCacheStats.getMissCount());
}

public void testExplicitCacheClearWithIndicesRequestCache() throws Exception {
internalCluster().startNode(
Settings.builder()
.put(defaultSettings(DEFAULT_CACHE_SIZE_IN_BYTES, null))
.put(INDICES_CACHE_CLEAN_INTERVAL_SETTING.getKey(), new TimeValue(1))
.build()
);
Client client = client();
assertAcked(
client.admin()
.indices()
.prepareCreate("index")
.setMapping("k", "type=keyword")
.setSettings(
Settings.builder()
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("index.refresh_interval", -1)
)
.get()
);
int numberOfIndexedItems = randomIntBetween(5, 10);
for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) {
indexRandom(true, client.prepareIndex("index").setSource("k" + iterator, "hello" + iterator));
}
ensureSearchable("index");
refreshAndWaitForReplication();
// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get();
OpenSearchAssertions.assertAllSuccessful(forceMergeResponse);

long perQuerySizeInCacheInBytes = -1;
for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) {
SearchResponse resp = client.prepareSearch("index")
.setRequestCache(true)
.setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator))
.get();
if (perQuerySizeInCacheInBytes == -1) {
RequestCacheStats requestCacheStats = getRequestCacheStats(client, "index");
perQuerySizeInCacheInBytes = requestCacheStats.getMemorySizeInBytes();
}
assertSearchResponse(resp);
}
RequestCacheStats requestCacheStats = getRequestCacheStats(client, "index");
assertEquals(numberOfIndexedItems, requestCacheStats.getMissCount());
assertEquals(0, requestCacheStats.getHitCount());
assertEquals(0, requestCacheStats.getEvictions());
assertEquals(perQuerySizeInCacheInBytes * numberOfIndexedItems, requestCacheStats.getMemorySizeInBytes());

// Explicit clear the cache.
ClearIndicesCacheRequest request = new ClearIndicesCacheRequest("index");
ClearIndicesCacheResponse response = client.admin().indices().clearCache(request).get();
requestCacheStats = client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache();
assertEquals(0, requestCacheStats.getMemorySizeInBytes());
assertNoFailures(response);

assertBusy(() -> {
// All entries should get cleared up.
assertTrue(getRequestCacheStats(client, "index").getMemorySizeInBytes() == 0);
}, 1, TimeUnit.SECONDS);
}

public void testEvictionsFlowWithExpirationTime() throws Exception {
internalCluster().startNode(
Settings.builder()
.put(defaultSettings(DEFAULT_CACHE_SIZE_IN_BYTES, new TimeValue(0))) // Immediately evict items after
// access
.put(INDICES_CACHE_CLEAN_INTERVAL_SETTING.getKey(), new TimeValue(1))
.build()
);
Client client = client();
assertAcked(
client.admin()
.indices()
.prepareCreate("index")
.setMapping("k", "type=keyword")
.setSettings(
Settings.builder()
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("index.refresh_interval", -1)
)
.get()
);
int numberOfIndexedItems = 2;// randomIntBetween(5, 10);
for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) {
indexRandom(true, client.prepareIndex("index").setSource("k" + iterator, "hello" + iterator));
}
ensureSearchable("index");
refreshAndWaitForReplication();
// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get();
OpenSearchAssertions.assertAllSuccessful(forceMergeResponse);

long perQuerySizeInCacheInBytes = -1;
for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) {
SearchResponse resp = client.prepareSearch("index")
.setRequestCache(true)
.setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator))
.get();
if (perQuerySizeInCacheInBytes == -1) {
RequestCacheStats requestCacheStats = getRequestCacheStats(client, "index");
perQuerySizeInCacheInBytes = requestCacheStats.getMemorySizeInBytes();
}
assertSearchResponse(resp);
}
RequestCacheStats requestCacheStats = getRequestCacheStats(client, "index");
assertEquals(0, requestCacheStats.getHitCount());
assertEquals(numberOfIndexedItems, requestCacheStats.getMissCount());
assertEquals(perQuerySizeInCacheInBytes * numberOfIndexedItems, requestCacheStats.getMemorySizeInBytes());
assertEquals(0, requestCacheStats.getEvictions());

for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) {
SearchResponse resp = client.prepareSearch("index")
.setRequestCache(true)
.setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator))
.get();
assertSearchResponse(resp);
}
requestCacheStats = getRequestCacheStats(client, "index");
// Now that we have access the entries, they should expire after 1ms. So lets wait and verify that cache gets
// cleared up.
assertBusy(() -> {
// Explicit refresh should clear up cache entries
assertTrue(getRequestCacheStats(client, "index").getMemorySizeInBytes() == 0);
}, 10, TimeUnit.MILLISECONDS);
// Validate hit and miss count.
assertEquals(numberOfIndexedItems, requestCacheStats.getHitCount());
assertEquals(numberOfIndexedItems, requestCacheStats.getMissCount());
}

private RequestCacheStats getRequestCacheStats(Client client, String indexName) {
return client.admin().indices().prepareStats(indexName).setRequestCache(true).get().getTotal().getRequestCache();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
*/
public class EhcacheDiskCacheSettings {

/**
* Default cache size in bytes ie 1gb.
*/
public static final long DEFAULT_CACHE_SIZE_IN_BYTES = 1073741824L;

/**
* Ehcache disk write minimum threads for its pool
*
Expand Down Expand Up @@ -99,7 +104,7 @@ public class EhcacheDiskCacheSettings {
*/
public static final Setting.AffixSetting<Long> DISK_CACHE_MAX_SIZE_IN_BYTES_SETTING = Setting.suffixKeySetting(
EhcacheDiskCache.EhcacheDiskCacheFactory.EHCACHE_DISK_CACHE_NAME + ".max_size_in_bytes",
(key) -> Setting.longSetting(key, 1073741824L, NodeScope)
(key) -> Setting.longSetting(key, DEFAULT_CACHE_SIZE_IN_BYTES, NodeScope)
);

/**
Expand Down

0 comments on commit 87a49a4

Please sign in to comment.