From 43c0ef4457eb567f053e95d120dd86b0d6cb47c8 Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Tue, 20 Aug 2024 15:00:29 -0700 Subject: [PATCH 1/9] Fix IRC timeout bug Signed-off-by: Peter Alfonsi --- .../indices/IndicesRequestCacheIT.java | 65 +++++++++++++++++++ .../indices/IndicesRequestCache.java | 11 ++-- .../opensearch/indices/IndicesService.java | 4 ++ 3 files changed, 74 insertions(+), 6 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java index 09d5c208a8756..a9060c1cb24e3 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java @@ -34,6 +34,12 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.Weight; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.cluster.node.stats.NodeStats; import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; @@ -56,7 +62,10 @@ import org.opensearch.env.NodeEnvironment; import org.opensearch.index.IndexSettings; import org.opensearch.index.cache.request.RequestCacheStats; +import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryBuilders; +import org.opensearch.index.query.QueryShardContext; +import org.opensearch.index.query.TermQueryBuilder; import org.opensearch.search.aggregations.bucket.global.GlobalAggregationBuilder; import org.opensearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.opensearch.search.aggregations.bucket.histogram.Histogram; @@ -65,6 +74,7 @@ import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase; import org.opensearch.test.hamcrest.OpenSearchAssertions; +import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.time.ZoneId; @@ -768,6 +778,61 @@ public void testDeleteAndCreateSameIndexShardOnSameNode() throws Exception { assertTrue(stats.getMemorySizeInBytes() == 0); } + public void testTimedOutQuery() throws Exception { + // A timed out query should be cached and then invalidated + Client client = client(); + String index = "index"; + assertAcked( + client.admin() + .indices() + .prepareCreate(index) + .setMapping("k", "type=keyword") + .setSettings( + Settings.builder() + .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + // Disable index refreshing to avoid cache being invalidated mid-test + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(-1)) + ) + .get() + ); + indexRandom(true, client.prepareIndex(index).setSource("k", "hello")); + ensureSearchable(index); + // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache + forceMerge(client, index); + + QueryBuilder timeoutQueryBuilder = new TermQueryBuilder("k", "hello") { + @Override + protected Query doToQuery(QueryShardContext context) { + return new TermQuery(new Term("k", "hello")) { + @Override + public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost) throws IOException { + // Create the weight before sleeping. Otherwise, TermStates.build() (in the call to super.createWeight()) will + // sometimes throw an exception on timeout, rather than timing out gracefully. + Weight result = super.createWeight(searcher, scoreMode, boost); + try { + // Pick 500 ms as it's the same duration used in SearchTimeoutIT.testSimpleTimeout() to ensure a timeout + // (We can't directly reuse their ScriptQuery-based logic as it isn't cacheable) + Thread.sleep(500); + } catch (InterruptedException ignored) {} + return result; + } + }; + } + }; + + SearchResponse resp = client.prepareSearch(index) + .setRequestCache(true) + .setQuery(timeoutQueryBuilder) + .setTimeout(TimeValue.ZERO) + .get(); + assertTrue(resp.isTimedOut()); + RequestCacheStats requestCacheStats = getRequestCacheStats(client, index); + // The cache should be empty as the timed-out query was invalidated + assertEquals(0, requestCacheStats.getMemorySizeInBytes()); + } + private Path[] shardDirectory(String server, Index index, int shard) { NodeEnvironment env = internalCluster().getInstance(NodeEnvironment.class, server); final Path[] paths = env.availableShardPaths(new ShardId(index, shard)); diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 93946fa11de13..71f8cf5a78ec5 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -310,12 +310,11 @@ BytesReference getOrCompute( * @param cacheKey the cache key to invalidate */ void invalidate(IndicesService.IndexShardCacheEntity cacheEntity, DirectoryReader reader, BytesReference cacheKey) { - assert reader.getReaderCacheHelper() != null; - String readerCacheKeyId = null; - if (reader instanceof OpenSearchDirectoryReader) { - IndexReader.CacheHelper cacheHelper = ((OpenSearchDirectoryReader) reader).getDelegatingCacheHelper(); - readerCacheKeyId = ((OpenSearchDirectoryReader.DelegatingCacheHelper) cacheHelper).getDelegatingCacheKey().getId(); - } + assert reader.getReaderCacheHelper() instanceof OpenSearchDirectoryReader.DelegatingCacheHelper; + OpenSearchDirectoryReader.DelegatingCacheHelper delegatingCacheHelper = (OpenSearchDirectoryReader.DelegatingCacheHelper) reader + .getReaderCacheHelper(); + String readerCacheKeyId = delegatingCacheHelper.getDelegatingCacheKey().getId(); + IndexShard indexShard = (IndexShard) cacheEntity.getCacheIdentity(); cache.invalidate(getICacheKey(new Key(indexShard.shardId(), cacheKey, readerCacheKeyId, System.identityHashCode(indexShard)))); } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 902ca95643625..f1efc2351e78a 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -68,6 +68,7 @@ import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; +import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.settings.IndexScopedSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; @@ -1754,6 +1755,9 @@ public boolean canCache(ShardSearchRequest request, SearchContext context) { if (context.getQueryShardContext().isCacheable() == false) { return false; } + if (!(context.searcher().getIndexReader().getReaderCacheHelper() instanceof OpenSearchDirectoryReader.DelegatingCacheHelper)) { + return false; + } return true; } From 192d78ed9e751137bd8a82d5ae0af24726bca7bb Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Tue, 20 Aug 2024 16:40:44 -0700 Subject: [PATCH 2/9] addressed Sagar's comments Signed-off-by: Peter Alfonsi --- .../java/org/opensearch/indices/IndicesRequestCacheIT.java | 2 -- server/src/main/java/org/opensearch/indices/IndicesService.java | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java index a9060c1cb24e3..108ef14f0fcb4 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java @@ -812,8 +812,6 @@ public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float bo // sometimes throw an exception on timeout, rather than timing out gracefully. Weight result = super.createWeight(searcher, scoreMode, boost); try { - // Pick 500 ms as it's the same duration used in SearchTimeoutIT.testSimpleTimeout() to ensure a timeout - // (We can't directly reuse their ScriptQuery-based logic as it isn't cacheable) Thread.sleep(500); } catch (InterruptedException ignored) {} return result; diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index f1efc2351e78a..05f8d8ca66f69 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -1755,7 +1755,7 @@ public boolean canCache(ShardSearchRequest request, SearchContext context) { if (context.getQueryShardContext().isCacheable() == false) { return false; } - if (!(context.searcher().getIndexReader().getReaderCacheHelper() instanceof OpenSearchDirectoryReader.DelegatingCacheHelper)) { + if (!(context.searcher().getDirectoryReader().getReaderCacheHelper() instanceof OpenSearchDirectoryReader.DelegatingCacheHelper)) { return false; } return true; From a48c75c32f23e35af53c5bd9d9a6aa5ac22694d9 Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Wed, 21 Aug 2024 10:41:57 -0700 Subject: [PATCH 3/9] addressed Ankit's comments Signed-off-by: Peter Alfonsi --- .../main/java/org/opensearch/indices/IndicesService.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 05f8d8ca66f69..a78328e24c588 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -68,7 +68,7 @@ import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; -import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; +import org.opensearch.common.lucene.index.OpenSearchDirectoryReader.DelegatingCacheHelper; import org.opensearch.common.settings.IndexScopedSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; @@ -1755,11 +1755,7 @@ public boolean canCache(ShardSearchRequest request, SearchContext context) { if (context.getQueryShardContext().isCacheable() == false) { return false; } - if (!(context.searcher().getDirectoryReader().getReaderCacheHelper() instanceof OpenSearchDirectoryReader.DelegatingCacheHelper)) { - return false; - } - return true; - + return context.searcher().getDirectoryReader().getReaderCacheHelper() instanceof DelegatingCacheHelper; } /** From 884961a755ef9669d5ed25b84e1b0c3b7eb5f98d Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Wed, 21 Aug 2024 11:32:06 -0700 Subject: [PATCH 4/9] Add UT for test coverage Signed-off-by: Peter Alfonsi --- .../indices/IndicesServiceTests.java | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/server/src/test/java/org/opensearch/indices/IndicesServiceTests.java b/server/src/test/java/org/opensearch/indices/IndicesServiceTests.java index 6757dbc184961..d7ded741b6cb8 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesServiceTests.java @@ -31,12 +31,15 @@ package org.opensearch.indices; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; import org.apache.lucene.search.similarities.BM25Similarity; import org.apache.lucene.search.similarities.Similarity; import org.apache.lucene.store.AlreadyClosedException; import org.opensearch.Version; import org.opensearch.action.admin.indices.stats.CommonStatsFlags; import org.opensearch.action.admin.indices.stats.IndexShardStats; +import org.opensearch.action.search.SearchType; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexGraveyard; @@ -44,6 +47,7 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.UUIDs; +import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; @@ -76,8 +80,11 @@ import org.opensearch.plugins.EnginePlugin; import org.opensearch.plugins.MapperPlugin; import org.opensearch.plugins.Plugin; +import org.opensearch.search.internal.ContextIndexSearcher; +import org.opensearch.search.internal.ShardSearchRequest; import org.opensearch.test.IndexSettingsModule; import org.opensearch.test.OpenSearchSingleNodeTestCase; +import org.opensearch.test.TestSearchContext; import org.opensearch.test.hamcrest.RegexMatcher; import java.io.IOException; @@ -627,4 +634,32 @@ public void testClusterRemoteTranslogBufferIntervalDefault() { indicesService.getRemoteStoreSettings().getClusterRemoteTranslogBufferInterval() ); } + + public void testDirectoryReaderWithoutDelegatingCacheHelperNotCacheable() throws IOException { + IndicesService indicesService = getIndicesService(); + final IndexService indexService = createIndex("test"); + ShardSearchRequest request = mock(ShardSearchRequest.class); + when(request.requestCache()).thenReturn(true); + + TestSearchContext context = new TestSearchContext(indexService.getBigArrays(), indexService) { + @Override + public SearchType searchType() { + return SearchType.QUERY_THEN_FETCH; + } + }; + + ContextIndexSearcher searcher = mock(ContextIndexSearcher.class); + context.setSearcher(searcher); + DirectoryReader reader = mock(DirectoryReader.class); + when(searcher.getDirectoryReader()).thenReturn(reader); + when(searcher.getIndexReader()).thenReturn(reader); + IndexReader.CacheHelper notDelegatingCacheHelper = mock(IndexReader.CacheHelper.class); + OpenSearchDirectoryReader.DelegatingCacheHelper delegatingCacheHelper = mock(OpenSearchDirectoryReader.DelegatingCacheHelper.class); + + for (boolean useDelegatingCacheHelper : new boolean[] { true, false }) { + IndexReader.CacheHelper cacheHelper = useDelegatingCacheHelper ? delegatingCacheHelper : notDelegatingCacheHelper; + when(reader.getReaderCacheHelper()).thenReturn(cacheHelper); + assertEquals(useDelegatingCacheHelper, indicesService.canCache(request, context)); + } + } } From c9f4d16ba40153440def56f06e8df52cee40f794 Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Wed, 21 Aug 2024 12:55:31 -0700 Subject: [PATCH 5/9] rerun gradle Signed-off-by: Peter Alfonsi From 3b419ccf07c1a88ae6f5299a114840bfdca05711 Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Wed, 21 Aug 2024 15:12:49 -0700 Subject: [PATCH 6/9] tweak imports in new UT Signed-off-by: Peter Alfonsi --- .../test/java/org/opensearch/indices/IndicesServiceTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/opensearch/indices/IndicesServiceTests.java b/server/src/test/java/org/opensearch/indices/IndicesServiceTests.java index d7ded741b6cb8..b5350a39e8599 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesServiceTests.java @@ -47,7 +47,7 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.UUIDs; -import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; +import org.opensearch.common.lucene.index.OpenSearchDirectoryReader.DelegatingCacheHelper; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; @@ -654,7 +654,7 @@ public SearchType searchType() { when(searcher.getDirectoryReader()).thenReturn(reader); when(searcher.getIndexReader()).thenReturn(reader); IndexReader.CacheHelper notDelegatingCacheHelper = mock(IndexReader.CacheHelper.class); - OpenSearchDirectoryReader.DelegatingCacheHelper delegatingCacheHelper = mock(OpenSearchDirectoryReader.DelegatingCacheHelper.class); + DelegatingCacheHelper delegatingCacheHelper = mock(DelegatingCacheHelper.class); for (boolean useDelegatingCacheHelper : new boolean[] { true, false }) { IndexReader.CacheHelper cacheHelper = useDelegatingCacheHelper ? delegatingCacheHelper : notDelegatingCacheHelper; From 7cdaffe930d39aa1cf4d293b5df3c16723384231 Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Thu, 22 Aug 2024 12:04:46 -0700 Subject: [PATCH 7/9] rerun gradle Signed-off-by: Peter Alfonsi From cf03a50ce002fffe6d794de843aac4c29d342d4e Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Thu, 22 Aug 2024 13:18:04 -0700 Subject: [PATCH 8/9] rerun gradle Signed-off-by: Peter Alfonsi From 8f9f5f846f013df7b499d4f80936eab3b4fbbfb5 Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Thu, 22 Aug 2024 18:21:29 -0700 Subject: [PATCH 9/9] rerun gradle Signed-off-by: Peter Alfonsi