Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Search latency tracking - Coordinator node #10279

Merged
merged 1 commit into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.x]
### Added
- Add coordinator level stats for search latency ([#8386](https://github.com/opensearch-project/OpenSearch/issues/8386))
- Add metrics for thread_pool task wait time ([#9681](https://github.com/opensearch-project/OpenSearch/pull/9681))
- Async blob read support for S3 plugin ([#9694](https://github.com/opensearch-project/OpenSearch/pull/9694))
- [Telemetry-Otel] Added support for OtlpGrpcSpanExporter exporter ([#9666](https://github.com/opensearch-project/OpenSearch/pull/9666))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.action.get.MultiGetRequest;
import org.opensearch.action.get.MultiGetResponse;
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.search.SearchPhaseName;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
Expand Down Expand Up @@ -56,9 +57,11 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.action.search.TransportSearchAction.SEARCH_REQUEST_STATS_ENABLED_KEY;
import static org.opensearch.search.aggregations.AggregationBuilders.terms;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, minNumDataNodes = 3)
Expand All @@ -74,6 +77,7 @@ public void testSearchWithWRRShardRouting() throws IOException {
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.getKey() + "zone" + ".values", "a,b,c")
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone")
.put("cluster.routing.weighted.fail_open", false)
.put(SEARCH_REQUEST_STATS_ENABLED_KEY, true)
.build();

logger.info("--> starting 6 nodes on different zones");
Expand Down Expand Up @@ -180,12 +184,39 @@ public void testSearchWithWRRShardRouting() throws IOException {
assertFalse(!hitNodes.contains(nodeId));
}
nodeStats = client().admin().cluster().prepareNodesStats().execute().actionGet();
int num = 0;
int coordNumber = 0;

for (NodeStats stat : nodeStats.getNodes()) {
SearchStats.Stats searchStats = stat.getIndices().getSearch().getTotal();
if (searchStats.getRequestStatsLongHolder()
.getRequestStatsHolder()
.get(SearchPhaseName.QUERY.getName())
.getTimeInMillis() > 0) {
assertThat(
searchStats.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.QUERY.getName()).getTotal(),
greaterThan(0L)
);
assertThat(
searchStats.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.FETCH.getName()).getTimeInMillis(),
greaterThan(0L)
);
assertThat(
searchStats.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.FETCH.getName()).getTotal(),
greaterThan(0L)
);
assertThat(
searchStats.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.EXPAND.getName()).getTotal(),
greaterThan(0L)
);
coordNumber += 1;
}
Assert.assertTrue(searchStats.getQueryCount() > 0L);
Assert.assertTrue(searchStats.getFetchCount() > 0L);
num++;
}
assertThat(coordNumber, greaterThan(0));
assertThat(num, greaterThan(0));
}

private Map<String, List<String>> setupCluster(int nodeCountPerAZ, Settings commonSettings) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.search.SearchPhaseName;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.routing.GroupShardsIterator;
Expand All @@ -63,6 +64,7 @@
import java.util.Set;
import java.util.function.Function;

import static org.opensearch.action.search.TransportSearchAction.SEARCH_REQUEST_STATS_ENABLED_KEY;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
Expand All @@ -78,7 +80,7 @@
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

@OpenSearchIntegTestCase.ClusterScope(minNumDataNodes = 2)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, minNumDataNodes = 2)
public class SearchStatsIT extends ParameterizedOpenSearchIntegTestCase {

public SearchStatsIT(Settings dynamicSettings) {
Expand Down Expand Up @@ -126,6 +128,11 @@ public void testSimpleStats() throws Exception {
assertThat(numNodes, greaterThanOrEqualTo(2));
final int shardsIdx1 = randomIntBetween(1, 10); // we make sure each node gets at least a single shard...
final int shardsIdx2 = Math.max(numNodes - shardsIdx1, randomIntBetween(1, 10));
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(SEARCH_REQUEST_STATS_ENABLED_KEY, true).build())
.get();
assertThat(numNodes, lessThanOrEqualTo(shardsIdx1 + shardsIdx2));
assertAcked(
prepareCreate("test1").setSettings(
Expand Down Expand Up @@ -188,20 +195,40 @@ public void testSimpleStats() throws Exception {

Set<String> nodeIdsWithIndex = nodeIdsWithIndex("test1", "test2");
int num = 0;
int numOfCoordinators = 0;

for (NodeStats stat : nodeStats.getNodes()) {
Stats total = stat.getIndices().getSearch().getTotal();
if (total.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.QUERY.getName()).getTimeInMillis() > 0) {
assertThat(
total.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.FETCH.getName()).getTimeInMillis(),
greaterThan(0L)
);
assertEquals(
iters,
total.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.FETCH.getName()).getTotal()
);
assertEquals(
iters,
total.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.EXPAND.getName()).getTotal()
);
assertEquals(
iters,
total.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.FETCH.getName()).getTotal()
);
numOfCoordinators += 1;
}
if (nodeIdsWithIndex.contains(stat.getNode().getId())) {
assertThat(total.getQueryCount(), greaterThan(0L));
assertThat(total.getQueryTimeInMillis(), greaterThan(0L));
num++;
} else {
assertThat(total.getQueryCount(), equalTo(0L));
assertThat(total.getQueryCount(), greaterThanOrEqualTo(0L));
assertThat(total.getQueryTimeInMillis(), equalTo(0L));
}
}

assertThat(numOfCoordinators, greaterThan(0));
assertThat(num, greaterThan(0));

}

private Set<String> nodeIdsWithIndex(String... indices) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -107,7 +108,6 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final AtomicInteger skippedOps = new AtomicInteger();
private final TransportSearchAction.SearchTimeProvider timeProvider;
private final SearchResponse.Clusters clusters;

protected final GroupShardsIterator<SearchShardIterator> toSkipShardsIts;
protected final GroupShardsIterator<SearchShardIterator> shardsIts;
private final int expectedTotalOps;
Expand All @@ -116,8 +116,12 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final Map<String, PendingExecutions> pendingExecutionsPerNode = new ConcurrentHashMap<>();
private final boolean throttleConcurrentRequests;

private SearchPhase currentPhase;

private final List<Releasable> releasables = new ArrayList<>();

private Optional<SearchRequestOperationsListener> searchRequestOperationsListener;

AbstractSearchAsyncAction(
String name,
Logger logger,
Expand All @@ -135,7 +139,8 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
SearchTask task,
SearchPhaseResults<Result> resultConsumer,
int maxConcurrentRequestsPerNode,
SearchResponse.Clusters clusters
SearchResponse.Clusters clusters,
SearchRequestOperationsListener searchRequestOperationsListener
) {
super(name);
final List<SearchShardIterator> toSkipIterators = new ArrayList<>();
Expand Down Expand Up @@ -171,6 +176,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
this.indexRoutings = indexRoutings;
this.results = resultConsumer;
this.clusters = clusters;
this.searchRequestOperationsListener = Optional.ofNullable(searchRequestOperationsListener);
}

@Override
Expand Down Expand Up @@ -371,6 +377,7 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha
: OpenSearchException.guessRootCauses(shardSearchFailures[0].getCause())[0];
logger.debug(() -> new ParameterizedMessage("All shards failed for phase: [{}]", getName()), cause);
onPhaseFailure(currentPhase, "all shards failed", cause);

} else {
Boolean allowPartialResults = request.allowPartialSearchResults();
assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";
Expand Down Expand Up @@ -419,13 +426,24 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha
clusterState.version()
);
}
onPhaseEnd();
executePhase(nextPhase);
}
}

private void onPhaseEnd() {
this.searchRequestOperationsListener.ifPresent(searchRequestOperations -> { searchRequestOperations.onPhaseEnd(this); });
}

private void onPhaseStart(SearchPhase phase) {
setCurrentPhase(phase);
this.searchRequestOperationsListener.ifPresent(searchRequestOperations -> { searchRequestOperations.onPhaseStart(this); });
}

private void executePhase(SearchPhase phase) {
try {
phase.run();
onPhaseStart(phase);
phase.recordAndRun();
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug(new ParameterizedMessage("Failed to execute [{}] while moving to [{}] phase", request, phase.getName()), e);
Expand Down Expand Up @@ -603,6 +621,14 @@ private void successfulShardExecution(SearchShardIterator shardsIt) {
}
}

public SearchPhase getCurrentPhase() {
return currentPhase;
}

private void setCurrentPhase(SearchPhase phase) {
currentPhase = phase;
}

@Override
public final int getNumShards() {
return results.getNumShards();
Expand Down Expand Up @@ -670,10 +696,13 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At
}
listener.onResponse(buildSearchResponse(internalSearchResponse, failures, scrollId, searchContextId));
}
onPhaseEnd();
setCurrentPhase(null);
}

@Override
public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause) {
this.searchRequestOperationsListener.ifPresent(searchRequestOperations -> searchRequestOperations.onPhaseFailure(this));
raisePhaseFailure(new SearchPhaseExecutionException(phase.getName(), msg, cause, buildShardFailures()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMa
ClusterState clusterState,
SearchTask task,
Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory,
SearchResponse.Clusters clusters
SearchResponse.Clusters clusters,
SearchRequestOperationsListener searchRequestOperationsListener
) {
// We set max concurrent shard requests to the number of shards so no throttling happens for can_match requests
super(
Expand All @@ -110,7 +111,8 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMa
task,
new CanMatchSearchPhaseResults(shardsIts.size()),
shardsIts.size(),
clusters
clusters,
searchRequestOperationsListener
);
this.phaseFactory = phaseFactory;
this.shardsIts = shardsIts;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,11 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
final TransportSearchAction.SearchTimeProvider timeProvider,
final ClusterState clusterState,
final SearchTask task,
SearchResponse.Clusters clusters
SearchResponse.Clusters clusters,
SearchRequestOperationsListener searchRequestOperationsListener
) {
super(
"dfs",
SearchPhaseName.DFS_PRE_QUERY.getName(),
logger,
searchTransportService,
nodeIdToConnection,
Expand All @@ -95,7 +96,8 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
task,
new ArraySearchPhaseResults<>(shardsIts.size()),
request.getMaxConcurrentShardRequests(),
clusters
clusters,
searchRequestOperationsListener
);
this.queryPhaseResultConsumer = queryPhaseResultConsumer;
this.searchPhaseController = searchPhaseController;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,23 @@
*
* @opensearch.internal
*/
abstract class SearchPhase implements CheckedRunnable<IOException> {
public abstract class SearchPhase implements CheckedRunnable<IOException> {
private final String name;
private long startTimeInNanos;

protected SearchPhase(String name) {
this.name = Objects.requireNonNull(name, "name must not be null");
}

public long getStartTimeInNanos() {
return startTimeInNanos;
}

public void recordAndRun() throws IOException {
this.startTimeInNanos = System.nanoTime();
run();
}

/**
* Returns the phases name.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public interface SearchPhaseContext extends Executor {
*/
SearchRequest getRequest();

SearchPhase getCurrentPhase();

/**
* Builds and sends the final search response back to the user.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* @opensearch.internal
*/
public enum SearchPhaseName {
DFS_PRE_QUERY("dfs_pre_query"),
QUERY("query"),
FETCH("fetch"),
DFS_QUERY("dfs_query"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,11 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh
final TransportSearchAction.SearchTimeProvider timeProvider,
ClusterState clusterState,
SearchTask task,
SearchResponse.Clusters clusters
SearchResponse.Clusters clusters,
SearchRequestOperationsListener searchRequestOperationsListener
) {
super(
"query",
SearchPhaseName.QUERY.getName(),
logger,
searchTransportService,
nodeIdToConnection,
Expand All @@ -100,7 +101,8 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh
task,
resultConsumer,
request.getMaxConcurrentShardRequests(),
clusters
clusters,
searchRequestOperationsListener
);
this.topDocsSize = SearchPhaseController.getTopDocsSize(request);
this.trackTotalHitsUpTo = request.resolveTrackTotalHitsUpTo();
Expand Down
Loading
Loading