From 03c51dbbe6538c61d3fbaf3cb816680737fc20f1 Mon Sep 17 00:00:00 2001 From: sahil <61558528+buddharajusahil@users.noreply.github.com> Date: Wed, 20 Sep 2023 14:41:44 -0700 Subject: [PATCH] Search latency tracking - Coordinator node (#8386) We are trying to add search stats to coordinator level node stats. This keeps track of the total time, current requests, and total requests of each request phase. Also added support for general coordinator stats as well on the node level. Signed-off-by: sahil buddharaju Signed-off-by: sahil <61558528+buddharajusahil@users.noreply.github.com> Signed-off-by: Sagar Upadhyaya Signed-off-by: Sagar <99425694+sgup432@users.noreply.github.com> Co-authored-by: sahil buddharaju Co-authored-by: Sagar Upadhyaya Co-authored-by: Sagar Upadhyaya Co-authored-by: Sagar <99425694+sgup432@users.noreply.github.com> --- CHANGELOG.md | 1 + .../search/SearchWeightedRoutingIT.java | 31 ++ .../search/stats/SearchStatsIT.java | 35 ++- .../search/AbstractSearchAsyncAction.java | 35 ++- .../search/CanMatchPreFilterSearchPhase.java | 6 +- .../SearchDfsQueryThenFetchAsyncAction.java | 8 +- .../opensearch/action/search/SearchPhase.java | 12 +- .../action/search/SearchPhaseContext.java | 2 + .../action/search/SearchPhaseName.java | 1 + .../SearchQueryThenFetchAsyncAction.java | 8 +- .../SearchRequestOperationsListener.java | 77 +++++ .../action/search/SearchRequestStats.java | 75 +++++ .../action/search/TransportSearchAction.java | 62 +++- .../common/metrics/CounterMetric.java | 1 + .../opensearch/common/metrics/MeanMetric.java | 1 + .../common/settings/ClusterSettings.java | 1 + .../index/search/stats/SearchStats.java | 136 ++++++++- .../opensearch/indices/IndicesService.java | 7 +- .../opensearch/indices/NodeIndicesStats.java | 7 +- .../main/java/org/opensearch/node/Node.java | 5 + .../cluster/node/stats/NodeStatsTests.java | 4 +- .../AbstractSearchAsyncActionTests.java | 273 +++++++++++++++++- .../CanMatchPreFilterSearchPhaseTests.java | 18 +- .../action/search/MockSearchPhaseContext.java | 5 + .../action/search/SearchAsyncActionTests.java | 16 +- .../SearchQueryThenFetchAsyncActionTests.java | 4 +- .../SearchRequestOperationsListenerTests.java | 69 +++++ .../search/SearchRequestStatsTests.java | 150 ++++++++++ .../index/search/stats/SearchStatsTests.java | 41 ++- .../indices/NodeIndicesStatsTests.java | 6 +- .../snapshots/SnapshotResiliencyTests.java | 4 +- 31 files changed, 1052 insertions(+), 49 deletions(-) create mode 100644 server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java create mode 100644 server/src/main/java/org/opensearch/action/search/SearchRequestStats.java create mode 100644 server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerTests.java create mode 100644 server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index d5f1d75c873c4..1623b61ac5b2b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -78,6 +78,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## [Unreleased 2.x] ### Added +- Add coordinator level stats for search latency ([#8386](https://github.com/opensearch-project/OpenSearch/issues/8386)) - Add metrics for thread_pool task wait time ([#9681](https://github.com/opensearch-project/OpenSearch/pull/9681)) - Async blob read support for S3 plugin ([#9694](https://github.com/opensearch-project/OpenSearch/pull/9694)) - [Telemetry-Otel] Added support for OtlpGrpcSpanExporter exporter ([#9666](https://github.com/opensearch-project/OpenSearch/pull/9666)) diff --git a/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java b/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java index 6fafdb0912470..5207dab83f1d9 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java @@ -16,6 +16,7 @@ import org.opensearch.action.get.MultiGetRequest; import org.opensearch.action.get.MultiGetResponse; import org.opensearch.action.index.IndexRequestBuilder; +import org.opensearch.action.search.SearchPhaseName; import org.opensearch.action.search.SearchResponse; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; @@ -56,9 +57,11 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.opensearch.action.search.TransportSearchAction.SEARCH_REQUEST_STATS_ENABLED_KEY; import static org.opensearch.search.aggregations.AggregationBuilders.terms; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, minNumDataNodes = 3) @@ -74,6 +77,7 @@ public void testSearchWithWRRShardRouting() throws IOException { .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.getKey() + "zone" + ".values", "a,b,c") .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone") .put("cluster.routing.weighted.fail_open", false) + .put(SEARCH_REQUEST_STATS_ENABLED_KEY, true) .build(); logger.info("--> starting 6 nodes on different zones"); @@ -180,12 +184,39 @@ public void testSearchWithWRRShardRouting() throws IOException { assertFalse(!hitNodes.contains(nodeId)); } nodeStats = client().admin().cluster().prepareNodesStats().execute().actionGet(); + int num = 0; + int coordNumber = 0; for (NodeStats stat : nodeStats.getNodes()) { SearchStats.Stats searchStats = stat.getIndices().getSearch().getTotal(); + if (searchStats.getRequestStatsLongHolder() + .getRequestStatsHolder() + .get(SearchPhaseName.QUERY.getName()) + .getTimeInMillis() > 0) { + assertThat( + searchStats.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.QUERY.getName()).getTotal(), + greaterThan(0L) + ); + assertThat( + searchStats.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.FETCH.getName()).getTimeInMillis(), + greaterThan(0L) + ); + assertThat( + searchStats.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.FETCH.getName()).getTotal(), + greaterThan(0L) + ); + assertThat( + searchStats.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.EXPAND.getName()).getTotal(), + greaterThan(0L) + ); + coordNumber += 1; + } Assert.assertTrue(searchStats.getQueryCount() > 0L); Assert.assertTrue(searchStats.getFetchCount() > 0L); + num++; } + assertThat(coordNumber, greaterThan(0)); + assertThat(num, greaterThan(0)); } private Map> setupCluster(int nodeCountPerAZ, Settings commonSettings) { diff --git a/server/src/internalClusterTest/java/org/opensearch/search/stats/SearchStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/search/stats/SearchStatsIT.java index 23d48b173a3db..253a8b2b14824 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/stats/SearchStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/stats/SearchStatsIT.java @@ -37,6 +37,7 @@ import org.opensearch.action.admin.cluster.node.stats.NodeStats; import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; +import org.opensearch.action.search.SearchPhaseName; import org.opensearch.action.search.SearchResponse; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.routing.GroupShardsIterator; @@ -63,6 +64,7 @@ import java.util.Set; import java.util.function.Function; +import static org.opensearch.action.search.TransportSearchAction.SEARCH_REQUEST_STATS_ENABLED_KEY; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; import static org.opensearch.index.query.QueryBuilders.matchAllQuery; @@ -78,7 +80,7 @@ import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; -@OpenSearchIntegTestCase.ClusterScope(minNumDataNodes = 2) +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, minNumDataNodes = 2) public class SearchStatsIT extends ParameterizedOpenSearchIntegTestCase { public SearchStatsIT(Settings dynamicSettings) { @@ -126,6 +128,11 @@ public void testSimpleStats() throws Exception { assertThat(numNodes, greaterThanOrEqualTo(2)); final int shardsIdx1 = randomIntBetween(1, 10); // we make sure each node gets at least a single shard... final int shardsIdx2 = Math.max(numNodes - shardsIdx1, randomIntBetween(1, 10)); + client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put(SEARCH_REQUEST_STATS_ENABLED_KEY, true).build()) + .get(); assertThat(numNodes, lessThanOrEqualTo(shardsIdx1 + shardsIdx2)); assertAcked( prepareCreate("test1").setSettings( @@ -188,20 +195,40 @@ public void testSimpleStats() throws Exception { Set nodeIdsWithIndex = nodeIdsWithIndex("test1", "test2"); int num = 0; + int numOfCoordinators = 0; + for (NodeStats stat : nodeStats.getNodes()) { Stats total = stat.getIndices().getSearch().getTotal(); + if (total.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.QUERY.getName()).getTimeInMillis() > 0) { + assertThat( + total.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.FETCH.getName()).getTimeInMillis(), + greaterThan(0L) + ); + assertEquals( + iters, + total.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.FETCH.getName()).getTotal() + ); + assertEquals( + iters, + total.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.EXPAND.getName()).getTotal() + ); + assertEquals( + iters, + total.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.FETCH.getName()).getTotal() + ); + numOfCoordinators += 1; + } if (nodeIdsWithIndex.contains(stat.getNode().getId())) { assertThat(total.getQueryCount(), greaterThan(0L)); assertThat(total.getQueryTimeInMillis(), greaterThan(0L)); num++; } else { - assertThat(total.getQueryCount(), equalTo(0L)); + assertThat(total.getQueryCount(), greaterThanOrEqualTo(0L)); assertThat(total.getQueryTimeInMillis(), equalTo(0L)); } } - + assertThat(numOfCoordinators, greaterThan(0)); assertThat(num, greaterThan(0)); - } private Set nodeIdsWithIndex(String... indices) { diff --git a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java index ee8aa10577956..1c0a1280ad550 100644 --- a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java @@ -65,6 +65,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; @@ -107,7 +108,6 @@ abstract class AbstractSearchAsyncAction exten private final AtomicInteger skippedOps = new AtomicInteger(); private final TransportSearchAction.SearchTimeProvider timeProvider; private final SearchResponse.Clusters clusters; - protected final GroupShardsIterator toSkipShardsIts; protected final GroupShardsIterator shardsIts; private final int expectedTotalOps; @@ -116,8 +116,12 @@ abstract class AbstractSearchAsyncAction exten private final Map pendingExecutionsPerNode = new ConcurrentHashMap<>(); private final boolean throttleConcurrentRequests; + private SearchPhase currentPhase; + private final List releasables = new ArrayList<>(); + private Optional searchRequestOperationsListener; + AbstractSearchAsyncAction( String name, Logger logger, @@ -135,7 +139,8 @@ abstract class AbstractSearchAsyncAction exten SearchTask task, SearchPhaseResults resultConsumer, int maxConcurrentRequestsPerNode, - SearchResponse.Clusters clusters + SearchResponse.Clusters clusters, + SearchRequestOperationsListener searchRequestOperationsListener ) { super(name); final List toSkipIterators = new ArrayList<>(); @@ -171,6 +176,7 @@ abstract class AbstractSearchAsyncAction exten this.indexRoutings = indexRoutings; this.results = resultConsumer; this.clusters = clusters; + this.searchRequestOperationsListener = Optional.ofNullable(searchRequestOperationsListener); } @Override @@ -371,6 +377,7 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha : OpenSearchException.guessRootCauses(shardSearchFailures[0].getCause())[0]; logger.debug(() -> new ParameterizedMessage("All shards failed for phase: [{}]", getName()), cause); onPhaseFailure(currentPhase, "all shards failed", cause); + } else { Boolean allowPartialResults = request.allowPartialSearchResults(); assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults"; @@ -419,13 +426,24 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha clusterState.version() ); } + onPhaseEnd(); executePhase(nextPhase); } } + private void onPhaseEnd() { + this.searchRequestOperationsListener.ifPresent(searchRequestOperations -> { searchRequestOperations.onPhaseEnd(this); }); + } + + private void onPhaseStart(SearchPhase phase) { + setCurrentPhase(phase); + this.searchRequestOperationsListener.ifPresent(searchRequestOperations -> { searchRequestOperations.onPhaseStart(this); }); + } + private void executePhase(SearchPhase phase) { try { - phase.run(); + onPhaseStart(phase); + phase.recordAndRun(); } catch (Exception e) { if (logger.isDebugEnabled()) { logger.debug(new ParameterizedMessage("Failed to execute [{}] while moving to [{}] phase", request, phase.getName()), e); @@ -603,6 +621,14 @@ private void successfulShardExecution(SearchShardIterator shardsIt) { } } + public SearchPhase getCurrentPhase() { + return currentPhase; + } + + private void setCurrentPhase(SearchPhase phase) { + currentPhase = phase; + } + @Override public final int getNumShards() { return results.getNumShards(); @@ -670,10 +696,13 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At } listener.onResponse(buildSearchResponse(internalSearchResponse, failures, scrollId, searchContextId)); } + onPhaseEnd(); + setCurrentPhase(null); } @Override public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause) { + this.searchRequestOperationsListener.ifPresent(searchRequestOperations -> searchRequestOperations.onPhaseFailure(this)); raisePhaseFailure(new SearchPhaseExecutionException(phase.getName(), msg, cause, buildShardFailures())); } diff --git a/server/src/main/java/org/opensearch/action/search/CanMatchPreFilterSearchPhase.java b/server/src/main/java/org/opensearch/action/search/CanMatchPreFilterSearchPhase.java index 6c3ee652de2de..ae481736ad0aa 100644 --- a/server/src/main/java/org/opensearch/action/search/CanMatchPreFilterSearchPhase.java +++ b/server/src/main/java/org/opensearch/action/search/CanMatchPreFilterSearchPhase.java @@ -90,7 +90,8 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction, SearchPhase> phaseFactory, - SearchResponse.Clusters clusters + SearchResponse.Clusters clusters, + SearchRequestOperationsListener searchRequestOperationsListener ) { // We set max concurrent shard requests to the number of shards so no throttling happens for can_match requests super( @@ -110,7 +111,8 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction(shardsIts.size()), request.getMaxConcurrentShardRequests(), - clusters + clusters, + searchRequestOperationsListener ); this.queryPhaseResultConsumer = queryPhaseResultConsumer; this.searchPhaseController = searchPhaseController; diff --git a/server/src/main/java/org/opensearch/action/search/SearchPhase.java b/server/src/main/java/org/opensearch/action/search/SearchPhase.java index 50b0cd8e01c1d..1c7b3c1f1563c 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchPhase.java +++ b/server/src/main/java/org/opensearch/action/search/SearchPhase.java @@ -42,13 +42,23 @@ * * @opensearch.internal */ -abstract class SearchPhase implements CheckedRunnable { +public abstract class SearchPhase implements CheckedRunnable { private final String name; + private long startTimeInNanos; protected SearchPhase(String name) { this.name = Objects.requireNonNull(name, "name must not be null"); } + public long getStartTimeInNanos() { + return startTimeInNanos; + } + + public void recordAndRun() throws IOException { + this.startTimeInNanos = System.nanoTime(); + run(); + } + /** * Returns the phases name. */ diff --git a/server/src/main/java/org/opensearch/action/search/SearchPhaseContext.java b/server/src/main/java/org/opensearch/action/search/SearchPhaseContext.java index 4ffd5521793f6..45d39a6f85ea2 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchPhaseContext.java +++ b/server/src/main/java/org/opensearch/action/search/SearchPhaseContext.java @@ -73,6 +73,8 @@ public interface SearchPhaseContext extends Executor { */ SearchRequest getRequest(); + SearchPhase getCurrentPhase(); + /** * Builds and sends the final search response back to the user. * diff --git a/server/src/main/java/org/opensearch/action/search/SearchPhaseName.java b/server/src/main/java/org/opensearch/action/search/SearchPhaseName.java index b6f842cf2cce1..4c0fe3ac06326 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchPhaseName.java +++ b/server/src/main/java/org/opensearch/action/search/SearchPhaseName.java @@ -13,6 +13,7 @@ * @opensearch.internal */ public enum SearchPhaseName { + DFS_PRE_QUERY("dfs_pre_query"), QUERY("query"), FETCH("fetch"), DFS_QUERY("dfs_query"), diff --git a/server/src/main/java/org/opensearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/opensearch/action/search/SearchQueryThenFetchAsyncAction.java index f75ab2554e693..ca5ad087d3089 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/opensearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -81,10 +81,11 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction listeners; + private final Logger logger; + + public CompositeListener(List listeners, Logger logger) { + this.listeners = listeners; + this.logger = logger; + } + + @Override + public void onPhaseStart(SearchPhaseContext context) { + for (SearchRequestOperationsListener listener : listeners) { + try { + listener.onPhaseStart(context); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("onPhaseStart listener [{}] failed", listener), e); + } + } + } + + @Override + public void onPhaseEnd(SearchPhaseContext context) { + for (SearchRequestOperationsListener listener : listeners) { + try { + listener.onPhaseEnd(context); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("onPhaseEnd listener [{}] failed", listener), e); + } + } + } + + @Override + public void onPhaseFailure(SearchPhaseContext context) { + for (SearchRequestOperationsListener listener : listeners) { + try { + listener.onPhaseFailure(context); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("onPhaseFailure listener [{}] failed", listener), e); + } + } + } + } +} diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java b/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java new file mode 100644 index 0000000000000..ad299c11b987d --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java @@ -0,0 +1,75 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.common.inject.Inject; +import org.opensearch.common.metrics.CounterMetric; +import org.opensearch.common.metrics.MeanMetric; + +import java.util.EnumMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * Request level search stats to track coordinator level node search latencies + * + * @opensearch.internal + */ +public final class SearchRequestStats implements SearchRequestOperationsListener { + Map phaseStatsMap = new EnumMap<>(SearchPhaseName.class); + + @Inject + public SearchRequestStats() { + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + phaseStatsMap.put(searchPhaseName, new StatsHolder()); + } + } + + public long getPhaseCurrent(SearchPhaseName searchPhaseName) { + return phaseStatsMap.get(searchPhaseName).current.count(); + } + + public long getPhaseTotal(SearchPhaseName searchPhaseName) { + return phaseStatsMap.get(searchPhaseName).total.count(); + } + + public long getPhaseMetric(SearchPhaseName searchPhaseName) { + return phaseStatsMap.get(searchPhaseName).timing.sum(); + } + + @Override + public void onPhaseStart(SearchPhaseContext context) { + phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()).current.inc(); + } + + @Override + public void onPhaseEnd(SearchPhaseContext context) { + StatsHolder phaseStats = phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()); + phaseStats.current.dec(); + phaseStats.total.inc(); + phaseStats.timing.inc(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - context.getCurrentPhase().getStartTimeInNanos())); + } + + @Override + public void onPhaseFailure(SearchPhaseContext context) { + phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()).current.dec(); + } + + /** + * Holder of statistics values + * + * @opensearch.internal + */ + + public static final class StatsHolder { + CounterMetric current = new CounterMetric(); + CounterMetric total = new CounterMetric(); + MeanMetric timing = new MeanMetric(); + } +} diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index 25ec0fc57d19f..cff1005beff27 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -67,6 +67,7 @@ import org.opensearch.core.common.breaker.CircuitBreaker; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.common.util.CollectionUtils; import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.indices.breaker.CircuitBreakerService; @@ -145,6 +146,14 @@ public class TransportSearchAction extends HandledTransportAction SEARCH_REQUEST_STATS_ENABLED = Setting.boolSetting( + SEARCH_REQUEST_STATS_ENABLED_KEY, + false, + Property.Dynamic, + Property.NodeScope + ); + private final NodeClient client; private final ThreadPool threadPool; private final ClusterService clusterService; @@ -157,6 +166,10 @@ public class TransportSearchAction extends HandledTransportAction) SearchRequest::new); this.client = client; @@ -185,6 +199,13 @@ public TransportSearchAction( this.indexNameExpressionResolver = indexNameExpressionResolver; this.namedWriteableRegistry = namedWriteableRegistry; this.searchPipelineService = searchPipelineService; + this.isRequestStatsEnabled = clusterService.getClusterSettings().get(SEARCH_REQUEST_STATS_ENABLED); + clusterService.getClusterSettings().addSettingsUpdateConsumer(SEARCH_REQUEST_STATS_ENABLED, this::setIsRequestStatsEnabled); + this.searchRequestStats = searchRequestStats; + } + + private void setIsRequestStatsEnabled(boolean isRequestStatsEnabled) { + this.isRequestStatsEnabled = isRequestStatsEnabled; } private Map buildPerIndexAliasFilter( @@ -311,6 +332,13 @@ public void executeRequest( SinglePhaseSearchAction phaseSearchAction, ActionListener listener ) { + final List searchListenersList = createSearchListenerList(); + final SearchRequestOperationsListener searchRequestOperationsListener; + if (!CollectionUtils.isEmpty(searchListenersList)) { + searchRequestOperationsListener = new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger); + } else { + searchRequestOperationsListener = null; + } executeRequest(task, searchRequest, new SearchAsyncActionProvider() { @Override public AbstractSearchAsyncAction asyncSearchAction( @@ -346,7 +374,8 @@ public AbstractSearchAsyncAction asyncSearchAction( task, new ArraySearchPhaseResults<>(shardsIts.size()), searchRequest.getMaxConcurrentShardRequests(), - clusters + clusters, + searchRequestOperationsListener ) { @Override protected void executePhaseOnShard( @@ -916,9 +945,7 @@ private void executeSearch( @Nullable SearchContextId searchContext, SearchAsyncActionProvider searchAsyncActionProvider ) { - clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ); - // TODO: I think startTime() should become part of ActionRequest and that should be used both for index name // date math expressions and $now in scripts. This way all apis will deal with now in the same way instead // of just for the _search api @@ -968,11 +995,8 @@ private void executeSearch( indexRoutings = routingMap; } final GroupShardsIterator shardIterators = mergeShardsIterators(localShardIterators, remoteShardIterators); - failIfOverShardCountLimit(clusterService, shardIterators.size()); - Map concreteIndexBoosts = resolveIndexBoosts(searchRequest, clusterState); - // optimize search type for cases where there is only one shard group to search on if (shardIterators.size() == 1) { // if we only have one group, then we always want Q_T_F, no need for DFS, and no need to do THEN since we hit one shard @@ -1107,6 +1131,14 @@ AbstractSearchAsyncAction asyncSearchAction( ); } + private List createSearchListenerList() { + final List searchListenersList = new ArrayList<>(); + if (isRequestStatsEnabled) { + searchListenersList.add(searchRequestStats); + } + return searchListenersList; + } + private AbstractSearchAsyncAction searchAsyncAction( SearchTask task, SearchRequest searchRequest, @@ -1123,6 +1155,13 @@ private AbstractSearchAsyncAction searchAsyncAction ThreadPool threadPool, SearchResponse.Clusters clusters ) { + final List searchListenersList = createSearchListenerList(); + final SearchRequestOperationsListener searchRequestOperationsListener; + if (!CollectionUtils.isEmpty(searchListenersList)) { + searchRequestOperationsListener = new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger); + } else { + searchRequestOperationsListener = null; + } if (preFilter) { return new CanMatchPreFilterSearchPhase( logger, @@ -1162,7 +1201,8 @@ public void run() { } }; }, - clusters + clusters, + searchRequestOperationsListener ); } else { final QueryPhaseResultConsumer queryResultConsumer = searchPhaseController.newSearchPhaseResults( @@ -1192,7 +1232,8 @@ public void run() { timeProvider, clusterState, task, - clusters + clusters, + searchRequestOperationsListener ); break; case QUERY_THEN_FETCH: @@ -1212,7 +1253,8 @@ public void run() { timeProvider, clusterState, task, - clusters + clusters, + searchRequestOperationsListener ); break; default: diff --git a/server/src/main/java/org/opensearch/common/metrics/CounterMetric.java b/server/src/main/java/org/opensearch/common/metrics/CounterMetric.java index 5c48c1f772ff0..cb181840406a5 100644 --- a/server/src/main/java/org/opensearch/common/metrics/CounterMetric.java +++ b/server/src/main/java/org/opensearch/common/metrics/CounterMetric.java @@ -62,4 +62,5 @@ public void dec(long n) { public long count() { return counter.sum(); } + } diff --git a/server/src/main/java/org/opensearch/common/metrics/MeanMetric.java b/server/src/main/java/org/opensearch/common/metrics/MeanMetric.java index 33f12c8cb42d3..359facdce633b 100644 --- a/server/src/main/java/org/opensearch/common/metrics/MeanMetric.java +++ b/server/src/main/java/org/opensearch/common/metrics/MeanMetric.java @@ -79,4 +79,5 @@ public void clear() { counter.reset(); sum.reset(); } + } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 74224d66400da..22e65b2c04668 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -373,6 +373,7 @@ public void apply(Settings value, Settings current, Settings previous) { SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS, TransportSearchAction.SHARD_COUNT_LIMIT_SETTING, TransportSearchAction.SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING, + TransportSearchAction.SEARCH_REQUEST_STATS_ENABLED, RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE, SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER, RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING, diff --git a/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java b/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java index 76b216304b3b7..14aaf7e58a59c 100644 --- a/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java +++ b/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java @@ -33,6 +33,8 @@ package org.opensearch.index.search.stats; import org.opensearch.Version; +import org.opensearch.action.search.SearchPhaseName; +import org.opensearch.action.search.SearchRequestStats; import org.opensearch.common.Nullable; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.FeatureFlags; @@ -58,10 +60,79 @@ public class SearchStats implements Writeable, ToXContentFragment { /** - * Statistics for search + * Holds statistic values for a particular phase. * * @opensearch.internal */ + public static class PhaseStatsLongHolder implements Writeable { + + long current; + long total; + long timeInMillis; + + public long getCurrent() { + return current; + } + + public long getTotal() { + return total; + } + + public long getTimeInMillis() { + return timeInMillis; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(current); + out.writeVLong(total); + out.writeVLong(timeInMillis); + } + + PhaseStatsLongHolder() { + this(0, 0, 0); + } + + PhaseStatsLongHolder(long current, long total, long timeInMillis) { + this.current = current; + this.total = total; + this.timeInMillis = timeInMillis; + } + + PhaseStatsLongHolder(StreamInput in) throws IOException { + this.current = in.readVLong(); + this.total = in.readVLong(); + this.timeInMillis = in.readVLong(); + } + + } + + /** + * Holds requests stats for different phases. + * + * @opensearch.internal + */ + public static class RequestStatsLongHolder { + + Map requestStatsHolder = new HashMap<>(); + + public Map getRequestStatsHolder() { + return requestStatsHolder; + } + + RequestStatsLongHolder() { + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + requestStatsHolder.put(searchPhaseName.getName(), new PhaseStatsLongHolder()); + } + } + } + + /** + * Holder of statistics values + * + * @opensearch.internal + */ + public static class Stats implements Writeable, ToXContentFragment { private long queryCount; @@ -89,6 +160,13 @@ public static class Stats implements Writeable, ToXContentFragment { private long pitTimeInMillis; private long pitCurrent; + @Nullable + private RequestStatsLongHolder requestStatsLongHolder; + + public RequestStatsLongHolder getRequestStatsLongHolder() { + return requestStatsLongHolder; + } + private Stats() { // for internal use, initializes all counts to 0 } @@ -114,6 +192,7 @@ public Stats( long suggestTimeInMillis, long suggestCurrent ) { + this.requestStatsLongHolder = new RequestStatsLongHolder(); this.queryCount = queryCount; this.queryTimeInMillis = queryTimeInMillis; this.queryCurrent = queryCurrent; @@ -163,6 +242,10 @@ private Stats(StreamInput in) throws IOException { pitCurrent = in.readVLong(); } + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + this.requestStatsLongHolder = new RequestStatsLongHolder(); + requestStatsLongHolder.requestStatsHolder = in.readMap(StreamInput::readString, PhaseStatsLongHolder::new); + } if (in.getVersion().onOrAfter(Version.V_2_10_0)) { concurrentQueryCount = in.readVLong(); concurrentQueryTimeInMillis = in.readVLong(); @@ -354,6 +437,17 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVLong(pitCurrent); } + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + if (requestStatsLongHolder == null) { + requestStatsLongHolder = new RequestStatsLongHolder(); + } + out.writeMap( + requestStatsLongHolder.getRequestStatsHolder(), + StreamOutput::writeString, + (stream, stats) -> stats.writeTo(stream) + ); + } + if (out.getVersion().onOrAfter(Version.V_2_10_0)) { out.writeVLong(concurrentQueryCount); out.writeVLong(concurrentQueryTimeInMillis); @@ -391,6 +485,22 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.humanReadableField(Fields.SUGGEST_TIME_IN_MILLIS, Fields.SUGGEST_TIME, getSuggestTime()); builder.field(Fields.SUGGEST_CURRENT, suggestCurrent); + if (requestStatsLongHolder != null) { + builder.startObject(Fields.REQUEST); + + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + PhaseStatsLongHolder statsLongHolder = requestStatsLongHolder.requestStatsHolder.get(searchPhaseName.getName()); + if (statsLongHolder == null) { + continue; + } + builder.startObject(searchPhaseName.getName()); + builder.humanReadableField(Fields.TIME_IN_MILLIS, Fields.TIME, new TimeValue(statsLongHolder.timeInMillis)); + builder.field(Fields.CURRENT, statsLongHolder.current); + builder.field(Fields.TOTAL, statsLongHolder.total); + builder.endObject(); + } + builder.endObject(); + } return builder; } } @@ -405,6 +515,24 @@ public SearchStats() { totalStats = new Stats(); } + // Set the different Request Stats fields in here + public void setSearchRequestStats(SearchRequestStats searchRequestStats) { + if (totalStats.requestStatsLongHolder == null) { + totalStats.requestStatsLongHolder = new RequestStatsLongHolder(); + } + + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + totalStats.requestStatsLongHolder.requestStatsHolder.put( + searchPhaseName.getName(), + new PhaseStatsLongHolder( + searchRequestStats.getPhaseCurrent(searchPhaseName), + searchRequestStats.getPhaseTotal(searchPhaseName), + searchRequestStats.getPhaseMetric(searchPhaseName) + ) + ); + } + } + public SearchStats(Stats totalStats, long openContexts, @Nullable Map groupStats) { this.totalStats = totalStats; this.openContexts = openContexts; @@ -520,6 +648,12 @@ static final class Fields { static final String SUGGEST_TIME = "suggest_time"; static final String SUGGEST_TIME_IN_MILLIS = "suggest_time_in_millis"; static final String SUGGEST_CURRENT = "suggest_current"; + static final String REQUEST = "request"; + static final String TIME_IN_MILLIS = "time_in_millis"; + static final String TIME = "time"; + static final String CURRENT = "current"; + static final String TOTAL = "total"; + } @Override diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 2ed3cb8d9e8ea..cf64b886ed523 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -47,6 +47,7 @@ import org.opensearch.action.admin.indices.stats.CommonStatsFlags.Flag; import org.opensearch.action.admin.indices.stats.IndexShardStats; import org.opensearch.action.admin.indices.stats.ShardStats; +import org.opensearch.action.search.SearchRequestStats; import org.opensearch.action.search.SearchType; import org.opensearch.client.Client; import org.opensearch.cluster.ClusterState; @@ -333,6 +334,8 @@ public class IndicesService extends AbstractLifecycleComponent private volatile TimeValue clusterRemoteTranslogBufferInterval; private final FileCacheCleaner fileCacheCleaner; + private final SearchRequestStats searchRequestStats; + @Override protected void doStart() { // Start thread that will manage cleaning the field data cache periodically @@ -363,6 +366,7 @@ public IndicesService( IndexStorePlugin.DirectoryFactory remoteDirectoryFactory, Supplier repositoriesServiceSupplier, FileCacheCleaner fileCacheCleaner, + SearchRequestStats searchRequestStats, @Nullable RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory ) { this.settings = settings; @@ -453,6 +457,7 @@ protected void closeInternal() { clusterService.getClusterSettings().addSettingsUpdateConsumer(ALLOW_EXPENSIVE_QUERIES, this::setAllowExpensiveQueries); this.remoteDirectoryFactory = remoteDirectoryFactory; this.translogFactorySupplier = getTranslogFactorySupplier(repositoriesServiceSupplier, threadPool, remoteStoreStatsTrackerFactory); + this.searchRequestStats = searchRequestStats; this.clusterDefaultRefreshInterval = CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING.get(clusterService.getSettings()); clusterService.getClusterSettings() .addSettingsUpdateConsumer(CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING, this::onRefreshIntervalUpdate); @@ -576,7 +581,7 @@ public NodeIndicesStats stats(CommonStatsFlags flags) { } } - return new NodeIndicesStats(commonStats, statsByShard(this, flags)); + return new NodeIndicesStats(commonStats, statsByShard(this, flags), searchRequestStats); } Map> statsByShard(final IndicesService indicesService, final CommonStatsFlags flags) { diff --git a/server/src/main/java/org/opensearch/indices/NodeIndicesStats.java b/server/src/main/java/org/opensearch/indices/NodeIndicesStats.java index cc3d8193dfa6b..8a7aaba2726f4 100644 --- a/server/src/main/java/org/opensearch/indices/NodeIndicesStats.java +++ b/server/src/main/java/org/opensearch/indices/NodeIndicesStats.java @@ -35,6 +35,7 @@ import org.opensearch.action.admin.indices.stats.CommonStats; import org.opensearch.action.admin.indices.stats.IndexShardStats; import org.opensearch.action.admin.indices.stats.ShardStats; +import org.opensearch.action.search.SearchRequestStats; import org.opensearch.common.Nullable; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; @@ -71,7 +72,6 @@ * @opensearch.internal */ public class NodeIndicesStats implements Writeable, ToXContentFragment { - private CommonStats stats; private Map> statsByShard; @@ -92,7 +92,7 @@ public NodeIndicesStats(StreamInput in) throws IOException { } } - public NodeIndicesStats(CommonStats oldStats, Map> statsByShard) { + public NodeIndicesStats(CommonStats oldStats, Map> statsByShard, SearchRequestStats searchRequestStats) { // this.stats = stats; this.statsByShard = statsByShard; @@ -105,6 +105,9 @@ public NodeIndicesStats(CommonStats oldStats, Map> } } } + if (this.stats.search != null) { + this.stats.search.setSearchRequestStats(searchRequestStats); + } } @Nullable diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 1f8f17f8e8d91..90fb339951d62 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -46,6 +46,7 @@ import org.opensearch.action.admin.cluster.snapshots.status.TransportNodesSnapshotsStatus; import org.opensearch.action.search.SearchExecutionStatsCollector; import org.opensearch.action.search.SearchPhaseController; +import org.opensearch.action.search.SearchRequestStats; import org.opensearch.action.search.SearchTransportService; import org.opensearch.action.support.TransportAction; import org.opensearch.action.update.UpdateHelper; @@ -761,6 +762,8 @@ protected Node( threadPool ); + final SearchRequestStats searchRequestStats = new SearchRequestStats(); + remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(clusterService, settings); final IndicesService indicesService = new IndicesService( settings, @@ -786,6 +789,7 @@ protected Node( remoteDirectoryFactory, repositoriesServiceReference::get, fileCacheCleaner, + searchRequestStats, remoteStoreStatsTrackerFactory ); @@ -1199,6 +1203,7 @@ protected Node( b.bind(SystemIndices.class).toInstance(systemIndices); b.bind(IdentityService.class).toInstance(identityService); b.bind(Tracer.class).toInstance(tracer); + b.bind(SearchRequestStats.class).toInstance(searchRequestStats); b.bind(RemoteClusterStateService.class).toProvider(() -> remoteClusterStateService); b.bind(PersistedStateRegistry.class).toInstance(persistedStateRegistry); }); diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java index 672ebea8d01f9..e3f16463a5328 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -34,6 +34,7 @@ import org.opensearch.action.admin.indices.stats.CommonStats; import org.opensearch.action.admin.indices.stats.CommonStatsFlags; +import org.opensearch.action.search.SearchRequestStats; import org.opensearch.cluster.coordination.PendingClusterStateStats; import org.opensearch.cluster.coordination.PublishClusterStateStats; import org.opensearch.cluster.node.DiscoveryNode; @@ -801,8 +802,7 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) { private static NodeIndicesStats getNodeIndicesStats(boolean remoteStoreStats) { NodeIndicesStats indicesStats = null; if (remoteStoreStats) { - indicesStats = new NodeIndicesStats(new CommonStats(CommonStatsFlags.ALL), new HashMap<>()); - + indicesStats = new NodeIndicesStats(new CommonStats(CommonStatsFlags.ALL), new HashMap<>(), new SearchRequestStats()); RemoteSegmentStats remoteSegmentStats = indicesStats.getSegments().getRemoteSegmentStats(); remoteSegmentStats.addUploadBytesStarted(10L); remoteSegmentStats.addUploadBytesSucceeded(10L); diff --git a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java index 705479ec21fc1..f628bb3201452 100644 --- a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java @@ -38,8 +38,12 @@ import org.opensearch.cluster.routing.GroupShardsIterator; import org.opensearch.common.UUIDs; import org.opensearch.common.collect.Tuple; +import org.opensearch.common.util.concurrent.AtomicArray; +import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.common.util.set.Sets; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.breaker.CircuitBreaker; +import org.opensearch.core.common.breaker.NoopCircuitBreaker; import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.query.MatchAllQueryBuilder; @@ -51,7 +55,10 @@ import org.opensearch.search.internal.ShardSearchContextId; import org.opensearch.search.internal.ShardSearchRequest; import org.opensearch.search.query.QuerySearchResult; +import org.opensearch.test.InternalAggregationTestCase; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.Transport; import org.junit.After; import org.junit.Before; @@ -65,6 +72,7 @@ import java.util.UUID; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -77,18 +85,21 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; +import static org.mockito.Mockito.mock; public class AbstractSearchAsyncActionTests extends OpenSearchTestCase { private final List> resolvedNodes = new ArrayList<>(); private final Set releasedContexts = new CopyOnWriteArraySet<>(); private ExecutorService executor; + ThreadPool threadPool; @Before @Override public void setUp() throws Exception { super.setUp(); executor = Executors.newFixedThreadPool(1); + threadPool = new TestThreadPool(getClass().getName()); } @After @@ -97,6 +108,7 @@ public void tearDown() throws Exception { super.tearDown(); executor.shutdown(); assertTrue(executor.awaitTermination(1, TimeUnit.SECONDS)); + ThreadPool.terminate(threadPool, 5, TimeUnit.SECONDS); } private AbstractSearchAsyncAction createAction( @@ -126,6 +138,7 @@ private AbstractSearchAsyncAction createAction( final AtomicLong expected, final SearchShardIterator... shards ) { + final Runnable runnable; final TransportSearchAction.SearchTimeProvider timeProvider; if (controlled) { @@ -161,7 +174,8 @@ private AbstractSearchAsyncAction createAction( null, results, request.getMaxConcurrentShardRequests(), - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + null ) { @Override protected SearchPhase getNextPhase(final SearchPhaseResults results, SearchPhaseContext context) { @@ -313,6 +327,53 @@ public void testSendSearchResponseDisallowPartialFailures() { assertEquals(requestIds, releasedContexts); } + public void testOnPhaseFailureAndVerifyListeners() { + SearchRequestStats testListener = new SearchRequestStats(); + + final List requestOperationListeners = new ArrayList<>(List.of(testListener)); + SearchQueryThenFetchAsyncAction action = createSearchQueryThenFetchAsyncAction(requestOperationListeners); + action.start(); + assertEquals(1, testListener.getPhaseCurrent(action.getSearchPhaseName())); + action.onPhaseFailure(new SearchPhase("test") { + @Override + public void run() { + + } + }, "message", null); + assertEquals(0, testListener.getPhaseCurrent(action.getSearchPhaseName())); + assertEquals(0, testListener.getPhaseTotal(action.getSearchPhaseName())); + + SearchDfsQueryThenFetchAsyncAction searchDfsQueryThenFetchAsyncAction = createSearchDfsQueryThenFetchAsyncAction( + requestOperationListeners + ); + searchDfsQueryThenFetchAsyncAction.start(); + assertEquals(1, testListener.getPhaseCurrent(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName())); + searchDfsQueryThenFetchAsyncAction.onPhaseFailure(new SearchPhase("test") { + @Override + public void run() { + + } + }, "message", null); + assertEquals(0, testListener.getPhaseCurrent(action.getSearchPhaseName())); + assertEquals(0, testListener.getPhaseTotal(action.getSearchPhaseName())); + + FetchSearchPhase fetchPhase = createFetchSearchPhase(); + ShardId shardId = new ShardId(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLength(10), randomInt()); + SearchShardIterator searchShardIterator = new SearchShardIterator(null, shardId, Collections.emptyList(), OriginalIndices.NONE); + searchShardIterator.resetAndSkip(); + action.skipShard(searchShardIterator); + action.executeNextPhase(action, fetchPhase); + assertEquals(1, testListener.getPhaseCurrent(fetchPhase.getSearchPhaseName())); + action.onPhaseFailure(new SearchPhase("test") { + @Override + public void run() { + + } + }, "message", null); + assertEquals(0, testListener.getPhaseCurrent(fetchPhase.getSearchPhaseName())); + assertEquals(0, testListener.getPhaseTotal(fetchPhase.getSearchPhaseName())); + } + public void testOnPhaseFailure() { SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(false); AtomicReference exception = new AtomicReference<>(); @@ -321,6 +382,7 @@ public void testOnPhaseFailure() { List> nodeLookups = new ArrayList<>(); ArraySearchPhaseResults phaseResults = phaseResults(requestIds, nodeLookups, 0); AbstractSearchAsyncAction action = createAction(searchRequest, phaseResults, listener, false, new AtomicLong()); + action.onPhaseFailure(new SearchPhase("test") { @Override public void run() { @@ -528,6 +590,215 @@ public void onFailure(Exception e) { assertThat(searchResponse.getSuccessfulShards(), equalTo(shards.length)); } + public void testOnPhaseListenersWithQueryAndThenFetchType() throws InterruptedException { + SearchRequestStats testListener = new SearchRequestStats(); + final List requestOperationListeners = new ArrayList<>(List.of(testListener)); + + long delay = (randomIntBetween(1, 5)); + delay = delay * 10; + + SearchQueryThenFetchAsyncAction action = createSearchQueryThenFetchAsyncAction(requestOperationListeners); + action.start(); + + // Verify queryPhase current metric + assertEquals(1, testListener.getPhaseCurrent(action.getSearchPhaseName())); + TimeUnit.MILLISECONDS.sleep(delay); + + FetchSearchPhase fetchPhase = createFetchSearchPhase(); + ShardId shardId = new ShardId(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLength(10), randomInt()); + SearchShardIterator searchShardIterator = new SearchShardIterator(null, shardId, Collections.emptyList(), OriginalIndices.NONE); + searchShardIterator.resetAndSkip(); + action.skipShard(searchShardIterator); + action.executeNextPhase(action, fetchPhase); + + // Verify queryPhase total, current and latency metrics + assertEquals(0, testListener.getPhaseCurrent(action.getSearchPhaseName())); + assertThat(testListener.getPhaseMetric(action.getSearchPhaseName()), greaterThanOrEqualTo(delay)); + assertEquals(1, testListener.getPhaseTotal(action.getSearchPhaseName())); + + // Verify fetchPhase current metric + assertEquals(1, testListener.getPhaseCurrent(fetchPhase.getSearchPhaseName())); + TimeUnit.MILLISECONDS.sleep(delay); + + ExpandSearchPhase expandPhase = createExpandSearchPhase(); + action.executeNextPhase(fetchPhase, expandPhase); + TimeUnit.MILLISECONDS.sleep(delay); + + // Verify fetchPhase total, current and latency metrics + assertThat(testListener.getPhaseMetric(fetchPhase.getSearchPhaseName()), greaterThanOrEqualTo(delay)); + assertEquals(1, testListener.getPhaseTotal(fetchPhase.getSearchPhaseName())); + assertEquals(0, testListener.getPhaseCurrent(fetchPhase.getSearchPhaseName())); + + assertEquals(1, testListener.getPhaseCurrent(expandPhase.getSearchPhaseName())); + + action.executeNextPhase(expandPhase, fetchPhase); + + action.sendSearchResponse(mock(InternalSearchResponse.class), mock(String.valueOf(QuerySearchResult.class))); + assertThat(testListener.getPhaseMetric(expandPhase.getSearchPhaseName()), greaterThanOrEqualTo(delay)); + assertEquals(1, testListener.getPhaseTotal(expandPhase.getSearchPhaseName())); + assertEquals(0, testListener.getPhaseCurrent(expandPhase.getSearchPhaseName())); + } + + public void testOnPhaseListenersWithDfsType() throws InterruptedException { + SearchRequestStats testListener = new SearchRequestStats(); + final List requestOperationListeners = new ArrayList<>(List.of(testListener)); + + SearchDfsQueryThenFetchAsyncAction searchDfsQueryThenFetchAsyncAction = createSearchDfsQueryThenFetchAsyncAction( + requestOperationListeners + ); + long delay = (randomIntBetween(1, 5)); + + FetchSearchPhase fetchPhase = createFetchSearchPhase(); + searchDfsQueryThenFetchAsyncAction.start(); + assertEquals(1, testListener.getPhaseCurrent(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName())); + TimeUnit.MILLISECONDS.sleep(delay); + ShardId shardId = new ShardId(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLength(10), randomInt()); + SearchShardIterator searchShardIterator = new SearchShardIterator(null, shardId, Collections.emptyList(), OriginalIndices.NONE); + searchShardIterator.resetAndSkip(); + + searchDfsQueryThenFetchAsyncAction.skipShard(searchShardIterator); + searchDfsQueryThenFetchAsyncAction.executeNextPhase(searchDfsQueryThenFetchAsyncAction, fetchPhase); + + assertThat(testListener.getPhaseMetric(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName()), greaterThanOrEqualTo(delay)); + assertEquals(1, testListener.getPhaseTotal(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName())); + assertEquals(0, testListener.getPhaseCurrent(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName())); + } + + private SearchDfsQueryThenFetchAsyncAction createSearchDfsQueryThenFetchAsyncAction( + List searchRequestOperationsListeners + ) { + SearchPhaseController controller = new SearchPhaseController( + writableRegistry(), + r -> InternalAggregationTestCase.emptyReduceContextBuilder() + ); + SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true); + SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap()); + Executor executor = OpenSearchExecutors.newDirectExecutorService(); + SearchShardIterator shards = new SearchShardIterator(null, null, Collections.emptyList(), null); + GroupShardsIterator shardsIter = new GroupShardsIterator<>(List.of(shards)); + QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer( + searchRequest, + executor, + new NoopCircuitBreaker(CircuitBreaker.REQUEST), + controller, + task.getProgressListener(), + writableRegistry(), + shardsIter.size(), + exc -> {} + ); + AtomicReference exception = new AtomicReference<>(); + ActionListener listener = ActionListener.wrap(response -> fail("onResponse should not be called"), exception::set); + + return new SearchDfsQueryThenFetchAsyncAction( + logger, + null, + null, + null, + null, + null, + null, + executor, + resultConsumer, + searchRequest, + listener, + shardsIter, + null, + null, + task, + SearchResponse.Clusters.EMPTY, + new SearchRequestOperationsListener.CompositeListener(searchRequestOperationsListeners, logger) + ); + } + + private SearchQueryThenFetchAsyncAction createSearchQueryThenFetchAsyncAction( + List searchRequestOperationsListeners + ) { + SearchPhaseController controller = new SearchPhaseController( + writableRegistry(), + r -> InternalAggregationTestCase.emptyReduceContextBuilder() + ); + SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true); + SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap()); + Executor executor = OpenSearchExecutors.newDirectExecutorService(); + SearchShardIterator shards = new SearchShardIterator(null, null, Collections.emptyList(), null); + GroupShardsIterator shardsIter = new GroupShardsIterator<>(List.of(shards)); + QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer( + searchRequest, + executor, + new NoopCircuitBreaker(CircuitBreaker.REQUEST), + controller, + task.getProgressListener(), + writableRegistry(), + shardsIter.size(), + exc -> {} + ); + AtomicReference exception = new AtomicReference<>(); + ActionListener listener = ActionListener.wrap(response -> fail("onResponse should not be called"), exception::set); + return new SearchQueryThenFetchAsyncAction( + logger, + null, + null, + null, + null, + null, + null, + executor, + resultConsumer, + searchRequest, + listener, + shardsIter, + null, + null, + task, + SearchResponse.Clusters.EMPTY, + new SearchRequestOperationsListener.CompositeListener(searchRequestOperationsListeners, logger) + ) { + @Override + ShardSearchFailure[] buildShardFailures() { + return ShardSearchFailure.EMPTY_ARRAY; + } + + @Override + public void sendSearchResponse(InternalSearchResponse internalSearchResponse, AtomicArray queryResults) { + start(); + } + }; + } + + private FetchSearchPhase createFetchSearchPhase() { + SearchPhaseController controller = new SearchPhaseController( + writableRegistry(), + r -> InternalAggregationTestCase.emptyReduceContextBuilder() + ); + MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(1); + QueryPhaseResultConsumer results = controller.newSearchPhaseResults( + OpenSearchExecutors.newDirectExecutorService(), + new NoopCircuitBreaker(CircuitBreaker.REQUEST), + SearchProgressListener.NOOP, + mockSearchPhaseContext.getRequest(), + 1, + exc -> {} + ); + return new FetchSearchPhase( + results, + controller, + null, + mockSearchPhaseContext, + (searchResponse, scrollId) -> new SearchPhase("test") { + @Override + public void run() { + mockSearchPhaseContext.sendSearchResponse(searchResponse, null); + } + } + ); + } + + private ExpandSearchPhase createExpandSearchPhase() { + MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(1); + InternalSearchResponse internalSearchResponse = new InternalSearchResponse(null, null, null, null, false, null, 1); + return new ExpandSearchPhase(mockSearchPhaseContext, internalSearchResponse, null); + } + private static final class PhaseResult extends SearchPhaseResult { PhaseResult(ShardSearchContextId contextId) { this.contextId = contextId; diff --git a/server/src/test/java/org/opensearch/action/search/CanMatchPreFilterSearchPhaseTests.java b/server/src/test/java/org/opensearch/action/search/CanMatchPreFilterSearchPhaseTests.java index 45f00a8418d5c..43029fe57d5dd 100644 --- a/server/src/test/java/org/opensearch/action/search/CanMatchPreFilterSearchPhaseTests.java +++ b/server/src/test/java/org/opensearch/action/search/CanMatchPreFilterSearchPhaseTests.java @@ -136,7 +136,8 @@ public void run() throws IOException { latch.countDown(); } }, - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + null ); canMatchPhase.start(); @@ -227,7 +228,8 @@ public void run() throws IOException { latch.countDown(); } }, - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + null ); canMatchPhase.start(); @@ -317,7 +319,8 @@ public void sendCanMatch( null, new ArraySearchPhaseResults<>(iter.size()), randomIntBetween(1, 32), - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + null ) { @Override @@ -344,7 +347,8 @@ protected void executePhaseOnShard( } } }, - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + null ); canMatchPhase.start(); @@ -428,7 +432,8 @@ public void run() { latch.countDown(); } }, - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + null ); canMatchPhase.start(); @@ -527,7 +532,8 @@ public void run() { latch.countDown(); } }, - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + null ); canMatchPhase.start(); diff --git a/server/src/test/java/org/opensearch/action/search/MockSearchPhaseContext.java b/server/src/test/java/org/opensearch/action/search/MockSearchPhaseContext.java index e078b4a467e91..b5e1050b968ee 100644 --- a/server/src/test/java/org/opensearch/action/search/MockSearchPhaseContext.java +++ b/server/src/test/java/org/opensearch/action/search/MockSearchPhaseContext.java @@ -99,6 +99,11 @@ public SearchRequest getRequest() { return searchRequest; } + @Override + public SearchPhase getCurrentPhase() { + return null; + } + @Override public void sendSearchResponse(InternalSearchResponse internalSearchResponse, AtomicArray queryResults) { String scrollId = getRequest().scroll() != null ? TransportSearchHelper.buildScrollId(queryResults, Version.CURRENT) : null; diff --git a/server/src/test/java/org/opensearch/action/search/SearchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/SearchAsyncActionTests.java index 830fa99f90bb9..4b94b6589c6c8 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchAsyncActionTests.java @@ -135,7 +135,8 @@ public void testSkipSearchShards() throws InterruptedException { null, new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + null ) { @Override @@ -253,7 +254,8 @@ public void testLimitConcurrentShardRequests() throws InterruptedException { null, new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + null ) { @Override @@ -370,7 +372,8 @@ public void sendFreeContext(Transport.Connection connection, ShardSearchContextI null, new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + null ) { TestSearchResponse response = new TestSearchResponse(); @@ -492,7 +495,8 @@ public void sendFreeContext(Transport.Connection connection, ShardSearchContextI null, new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + null ) { TestSearchResponse response = new TestSearchResponse(); @@ -605,9 +609,9 @@ public void testAllowPartialResults() throws InterruptedException { null, new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + null ) { - @Override protected void executePhaseOnShard( SearchShardIterator shardIt, diff --git a/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java index 0e2780c195cb8..6a22a7ea2b5e4 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java @@ -214,7 +214,8 @@ public void sendExecuteQuery( timeProvider, null, task, - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + null ) { @Override protected SearchPhase getNextPhase(SearchPhaseResults results, SearchPhaseContext context) { @@ -226,6 +227,7 @@ public void run() { }; } }; + action.start(); latch.await(); assertThat(successfulOps.get(), equalTo(numShards)); diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerTests.java new file mode 100644 index 0000000000000..ef880043e863c --- /dev/null +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerTests.java @@ -0,0 +1,69 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.test.OpenSearchTestCase; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class SearchRequestOperationsListenerTests extends OpenSearchTestCase { + + public void testListenersAreExecuted() { + Map searchPhaseMap = new HashMap<>(); + + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + searchPhaseMap.put(searchPhaseName, new SearchRequestStats.StatsHolder()); + } + SearchRequestOperationsListener testListener = new SearchRequestOperationsListener() { + + @Override + public void onPhaseStart(SearchPhaseContext context) { + searchPhaseMap.get(context.getCurrentPhase().getSearchPhaseName()).current.inc(); + } + + @Override + public void onPhaseEnd(SearchPhaseContext context) { + searchPhaseMap.get(context.getCurrentPhase().getSearchPhaseName()).current.dec(); + searchPhaseMap.get(context.getCurrentPhase().getSearchPhaseName()).total.inc(); + } + + @Override + public void onPhaseFailure(SearchPhaseContext context) { + searchPhaseMap.get(context.getCurrentPhase().getSearchPhaseName()).current.dec(); + } + }; + + int totalListeners = randomIntBetween(1, 10); + final List requestOperationListeners = new ArrayList<>(); + for (int i = 0; i < totalListeners; i++) { + requestOperationListeners.add(testListener); + } + + SearchRequestOperationsListener compositeListener = new SearchRequestOperationsListener.CompositeListener( + requestOperationListeners, + logger + ); + + SearchPhaseContext ctx = mock(SearchPhaseContext.class); + SearchPhase searchPhase = mock(SearchPhase.class); + + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + when(ctx.getCurrentPhase()).thenReturn(searchPhase); + when(searchPhase.getSearchPhaseName()).thenReturn(searchPhaseName); + compositeListener.onPhaseStart(ctx); + assertEquals(totalListeners, searchPhaseMap.get(searchPhaseName).current.count()); + } + } +} diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java new file mode 100644 index 0000000000000..f24147a8194b4 --- /dev/null +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java @@ -0,0 +1,150 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.test.OpenSearchTestCase; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Phaser; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class SearchRequestStatsTests extends OpenSearchTestCase { + public void testSearchRequestPhaseFailure() { + SearchRequestStats testRequestStats = new SearchRequestStats(); + SearchPhaseContext ctx = mock(SearchPhaseContext.class); + SearchPhase mockSearchPhase = mock(SearchPhase.class); + when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase); + + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName); + testRequestStats.onPhaseStart(ctx); + assertEquals(1, testRequestStats.getPhaseCurrent(searchPhaseName)); + testRequestStats.onPhaseFailure(ctx); + assertEquals(0, testRequestStats.getPhaseCurrent(searchPhaseName)); + } + } + + public void testSearchRequestStats() { + SearchRequestStats testRequestStats = new SearchRequestStats(); + + SearchPhaseContext ctx = mock(SearchPhaseContext.class); + SearchPhase mockSearchPhase = mock(SearchPhase.class); + when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase); + + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName); + long tookTimeInMillis = randomIntBetween(1, 10); + testRequestStats.onPhaseStart(ctx); + long startTime = System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(tookTimeInMillis); + when(mockSearchPhase.getStartTimeInNanos()).thenReturn(startTime); + assertEquals(1, testRequestStats.getPhaseCurrent(searchPhaseName)); + testRequestStats.onPhaseEnd(ctx); + assertEquals(0, testRequestStats.getPhaseCurrent(searchPhaseName)); + assertEquals(1, testRequestStats.getPhaseTotal(searchPhaseName)); + assertThat(testRequestStats.getPhaseMetric(searchPhaseName), greaterThanOrEqualTo(tookTimeInMillis)); + } + } + + public void testSearchRequestStatsOnPhaseStartConcurrently() throws InterruptedException { + SearchRequestStats testRequestStats = new SearchRequestStats(); + int numTasks = randomIntBetween(5, 50); + Thread[] threads = new Thread[numTasks * SearchPhaseName.values().length]; + Phaser phaser = new Phaser(numTasks * SearchPhaseName.values().length + 1); + CountDownLatch countDownLatch = new CountDownLatch(numTasks * SearchPhaseName.values().length); + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + SearchPhaseContext ctx = mock(SearchPhaseContext.class); + SearchPhase mockSearchPhase = mock(SearchPhase.class); + when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase); + when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName); + for (int i = 0; i < numTasks; i++) { + threads[i] = new Thread(() -> { + phaser.arriveAndAwaitAdvance(); + testRequestStats.onPhaseStart(ctx); + countDownLatch.countDown(); + }); + threads[i].start(); + } + } + phaser.arriveAndAwaitAdvance(); + countDownLatch.await(); + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + assertEquals(numTasks, testRequestStats.getPhaseCurrent(searchPhaseName)); + } + } + + public void testSearchRequestStatsOnPhaseEndConcurrently() throws InterruptedException { + SearchRequestStats testRequestStats = new SearchRequestStats(); + int numTasks = randomIntBetween(5, 50); + Thread[] threads = new Thread[numTasks * SearchPhaseName.values().length]; + Phaser phaser = new Phaser(numTasks * SearchPhaseName.values().length + 1); + CountDownLatch countDownLatch = new CountDownLatch(numTasks * SearchPhaseName.values().length); + Map searchPhaseNameLongMap = new HashMap<>(); + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + SearchPhaseContext ctx = mock(SearchPhaseContext.class); + SearchPhase mockSearchPhase = mock(SearchPhase.class); + when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase); + when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName); + long tookTimeInMillis = randomIntBetween(1, 10); + long startTime = System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(tookTimeInMillis); + when(mockSearchPhase.getStartTimeInNanos()).thenReturn(startTime); + for (int i = 0; i < numTasks; i++) { + threads[i] = new Thread(() -> { + phaser.arriveAndAwaitAdvance(); + testRequestStats.onPhaseEnd(ctx); + countDownLatch.countDown(); + }); + threads[i].start(); + } + searchPhaseNameLongMap.put(searchPhaseName, tookTimeInMillis); + } + phaser.arriveAndAwaitAdvance(); + countDownLatch.await(); + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + assertEquals(numTasks, testRequestStats.getPhaseTotal(searchPhaseName)); + assertThat( + testRequestStats.getPhaseMetric(searchPhaseName), + greaterThanOrEqualTo((searchPhaseNameLongMap.get(searchPhaseName) * numTasks)) + ); + } + } + + public void testSearchRequestStatsOnPhaseFailureConcurrently() throws InterruptedException { + SearchRequestStats testRequestStats = new SearchRequestStats(); + int numTasks = randomIntBetween(5, 50); + Thread[] threads = new Thread[numTasks * SearchPhaseName.values().length]; + Phaser phaser = new Phaser(numTasks * SearchPhaseName.values().length + 1); + CountDownLatch countDownLatch = new CountDownLatch(numTasks * SearchPhaseName.values().length); + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + SearchPhaseContext ctx = mock(SearchPhaseContext.class); + SearchPhase mockSearchPhase = mock(SearchPhase.class); + when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase); + when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName); + for (int i = 0; i < numTasks; i++) { + threads[i] = new Thread(() -> { + phaser.arriveAndAwaitAdvance(); + testRequestStats.onPhaseStart(ctx); + testRequestStats.onPhaseFailure(ctx); + countDownLatch.countDown(); + }); + threads[i].start(); + } + } + phaser.arriveAndAwaitAdvance(); + countDownLatch.await(); + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + assertEquals(0, testRequestStats.getPhaseCurrent(searchPhaseName)); + } + } +} diff --git a/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java b/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java index f65b0e231b1dd..c27e4bf27327a 100644 --- a/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java +++ b/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java @@ -32,11 +32,20 @@ package org.opensearch.index.search.stats; +import org.opensearch.action.search.SearchPhase; +import org.opensearch.action.search.SearchPhaseContext; +import org.opensearch.action.search.SearchPhaseName; +import org.opensearch.action.search.SearchRequestStats; import org.opensearch.index.search.stats.SearchStats.Stats; import org.opensearch.test.OpenSearchTestCase; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class SearchStatsTests extends OpenSearchTestCase { @@ -63,6 +72,37 @@ public void testShardLevelSearchGroupStats() throws Exception { // adding again would then return wrong search stats (would return 4! instead of 3) searchStats1.add(searchStats2); assertStats(groupStats1.get("group1"), 3); + + long paramValue = randomIntBetween(2, 50); + + // Testing for request stats + SearchRequestStats testRequestStats = new SearchRequestStats(); + SearchPhaseContext ctx = mock(SearchPhaseContext.class); + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + SearchPhase mockSearchPhase = mock(SearchPhase.class); + when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase); + when(mockSearchPhase.getStartTimeInNanos()).thenReturn(System.nanoTime() - TimeUnit.SECONDS.toNanos(paramValue)); + when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName); + for (int iterator = 0; iterator < paramValue; iterator++) { + testRequestStats.onPhaseStart(ctx); + testRequestStats.onPhaseEnd(ctx); + } + } + searchStats1.setSearchRequestStats(testRequestStats); + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + assertEquals( + 0, + searchStats1.getTotal().getRequestStatsLongHolder().getRequestStatsHolder().get(searchPhaseName.getName()).current + ); + assertEquals( + paramValue, + searchStats1.getTotal().getRequestStatsLongHolder().getRequestStatsHolder().get(searchPhaseName.getName()).total + ); + assertThat( + searchStats1.getTotal().getRequestStatsLongHolder().getRequestStatsHolder().get(searchPhaseName.getName()).timeInMillis, + greaterThanOrEqualTo(paramValue) + ); + } } private static void assertStats(Stats stats, long equalTo) { @@ -87,5 +127,4 @@ private static void assertStats(Stats stats, long equalTo) { // avg_concurrency is not summed up across stats assertEquals(1, stats.getConcurrentAvgSliceCount(), 0); } - } diff --git a/server/src/test/java/org/opensearch/indices/NodeIndicesStatsTests.java b/server/src/test/java/org/opensearch/indices/NodeIndicesStatsTests.java index 9be45d4e77940..6f36d22b7e17b 100644 --- a/server/src/test/java/org/opensearch/indices/NodeIndicesStatsTests.java +++ b/server/src/test/java/org/opensearch/indices/NodeIndicesStatsTests.java @@ -32,6 +32,8 @@ package org.opensearch.indices; +import org.opensearch.action.admin.indices.stats.CommonStats; +import org.opensearch.action.search.SearchRequestStats; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.test.OpenSearchTestCase; @@ -43,7 +45,9 @@ public class NodeIndicesStatsTests extends OpenSearchTestCase { public void testInvalidLevel() { - final NodeIndicesStats stats = new NodeIndicesStats(null, Collections.emptyMap()); + CommonStats oldStats = new CommonStats(); + SearchRequestStats requestStats = new SearchRequestStats(); + final NodeIndicesStats stats = new NodeIndicesStats(oldStats, Collections.emptyMap(), requestStats); final String level = randomAlphaOfLength(16); final ToXContent.Params params = new ToXContent.MapParams(Collections.singletonMap("level", level)); final IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> stats.toXContent(null, params)); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 3c31c979ce856..2b432906ee128 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -2069,6 +2069,7 @@ public void onFailure(final Exception e) { new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService, threadPool), repositoriesServiceReference::get, fileCacheCleaner, + null, new RemoteStoreStatsTrackerFactory(clusterService, settings) ); final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); @@ -2297,7 +2298,8 @@ public void onFailure(final Exception e) { namedWriteableRegistry, List.of(), client - ) + ), + null ) ); actions.put(