Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Search latency tracking - Coordinator node #8386

Merged
merged 61 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
cecdd96
CoordinatorStats
buddharajusahil Jun 30, 2023
80ab4c2
Changed version
Jun 30, 2023
e0fc5cb
Applied formatting
Jun 30, 2023
e21f2b6
Fixed NodeIndicesStatsTests bug
Jul 5, 2023
4a12c18
Update Debug_OpenSearch.xml
buddharajusahil Jun 30, 2023
6efcae5
Fixed wildcard imports
Jul 5, 2023
62e608f
Added negative test UT's to AbstractSearchAsyncAction
Jul 6, 2023
261bfa3
Formatting
Jul 18, 2023
bc0670d
Added more revisions
Jul 18, 2023
b81236d
Changed coordinator to request, refactored logic for onPhase end, sta…
Jul 24, 2023
ffa573f
Changed Coordinator to Request in TransportSearchAction
Jul 24, 2023
d015f9b
Removed test statements in TransportSearchAction
Jul 24, 2023
33b9fac
Removed configuration files from pull request
Jul 28, 2023
40660da
Addressed PR Comments related to testing
Jul 31, 2023
b4bb78c
Formatting
Jul 31, 2023
599e080
Fixed bug with instance start map
Aug 2, 2023
8fed670
Minor logic changes
Aug 4, 2023
509d923
Changed constructor logic in AbstractSearchAsyncAction
Aug 5, 2023
f8ae34c
Changed Logic, Minor changes to TransportSearch and AbstractAsync
Aug 8, 2023
298a00f
set flag value to false by default
Aug 8, 2023
b1ad9d5
fixed tracer bug
Aug 8, 2023
bdd1b5e
reverted all Tracerfactory Changes in Node.java
Aug 8, 2023
3e0de15
Removed forbidden api from ASAA Tests
Aug 8, 2023
bb52c34
Formatting
Aug 8, 2023
34b6064
Remove math.random
Aug 9, 2023
0499e44
Fixed Test cases
Aug 9, 2023
1cf98be
Changed IT's to account for flag
Aug 9, 2023
58a0fe9
More integration testing changes
Aug 9, 2023
0be1de7
rerun tests
Aug 9, 2023
a378a4e
Fixed scope bug in SearchStatsIT
Aug 9, 2023
bbf5af9
rerun tests
Aug 9, 2023
b8778b1
java docs
Aug 9, 2023
30563a5
flaky tests
Aug 9, 2023
0637e85
flaky tests
Aug 9, 2023
84ab313
Empty-Commit
Aug 10, 2023
18aeaa8
Merge branch 'main' into CoordinatorStats
Aug 10, 2023
22e2437
Merge branch 'main' into CoordinatorStats
buddharajusahil Aug 10, 2023
18ecdcb
fixed conflict
Aug 10, 2023
fbb604a
Merge branch 'main' into CoordinatorStats
Aug 29, 2023
8751cbd
Addressing comments to use CollectionUtils.isEmpty
sgup432 Aug 29, 2023
afc4a35
Merge pull request #5 from sgup432/CoordinatorStats
sgup432 Aug 29, 2023
9e7baaa
Addressing comments and adding CHANGELOG
sgup432 Sep 11, 2023
3857542
Merge branch 'main' into CoordinatorStats
sgup432 Sep 11, 2023
0e44b49
Merge pull request #6 from sgup432/CoordinatorStats
sgup432 Sep 11, 2023
e7cb97b
Fixing gradle check failure
sgup432 Sep 11, 2023
f44fb3c
Merge pull request #7 from sgup432/CoordinatorStats
sgup432 Sep 11, 2023
0d87cfb
Merge branch 'main' into CoordinatorStats
sgup432 Sep 12, 2023
110ca71
Changing interface template for SearchRequestOperationsListener
sgup432 Sep 18, 2023
b680055
Merge branch 'main' into CoordinatorStats
sgup432 Sep 18, 2023
39f7596
Merge branch 'CoordinatorStats' into CoordinatorStats
sgup432 Sep 18, 2023
4d66914
Merge pull request #8 from sgup432/CoordinatorStats
sgup432 Sep 18, 2023
706d4f5
Using generic enumMap for holding phase stats
sgup432 Sep 19, 2023
b0a2ad3
Merge pull request #9 from sgup432/CoordinatorStats
sgup432 Sep 19, 2023
b678f61
Removing unused variable from SearchRequestStats
sgup432 Sep 19, 2023
e0b2769
Merge pull request #10 from sgup432/CoordinatorStats
sgup432 Sep 19, 2023
8f5b7e5
Fixing broken UT
sgup432 Sep 20, 2023
a4ca6b0
Merge pull request #11 from sgup432/CoordinatorStats
sgup432 Sep 20, 2023
dfb85bd
Fixing javadoc issue
sgup432 Sep 20, 2023
77e6fcf
Merge pull request #12 from sgup432/CoordinatorStats
sgup432 Sep 20, 2023
6ef0cb5
Removing RequestStats file as not needed
sgup432 Sep 20, 2023
a7acab2
Merge pull request #13 from sgup432/CoordinatorStats
sgup432 Sep 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.x]
### Added
- Add coordinator level stats for search latency ([#8386](https://github.com/opensearch-project/OpenSearch/issues/8386))
- Add metrics for thread_pool task wait time ([#9681](https://github.com/opensearch-project/OpenSearch/pull/9681))
- Async blob read support for S3 plugin ([#9694](https://github.com/opensearch-project/OpenSearch/pull/9694))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.action.get.MultiGetRequest;
import org.opensearch.action.get.MultiGetResponse;
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.search.SearchPhaseName;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
Expand Down Expand Up @@ -56,9 +57,11 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.action.search.TransportSearchAction.SEARCH_REQUEST_STATS_ENABLED_KEY;
import static org.opensearch.search.aggregations.AggregationBuilders.terms;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, minNumDataNodes = 3)
Expand All @@ -74,6 +77,7 @@ public void testSearchWithWRRShardRouting() throws IOException {
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.getKey() + "zone" + ".values", "a,b,c")
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone")
.put("cluster.routing.weighted.fail_open", false)
.put(SEARCH_REQUEST_STATS_ENABLED_KEY, true)
.build();

logger.info("--> starting 6 nodes on different zones");
Expand Down Expand Up @@ -180,12 +184,39 @@ public void testSearchWithWRRShardRouting() throws IOException {
assertFalse(!hitNodes.contains(nodeId));
}
nodeStats = client().admin().cluster().prepareNodesStats().execute().actionGet();
int num = 0;
int coordNumber = 0;

for (NodeStats stat : nodeStats.getNodes()) {
SearchStats.Stats searchStats = stat.getIndices().getSearch().getTotal();
if (searchStats.getRequestStatsLongHolder()
.getRequestStatsHolder()
.get(SearchPhaseName.QUERY.getName())
.getTimeInMillis() > 0) {
assertThat(
searchStats.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.QUERY.getName()).getTotal(),
greaterThan(0L)
);
assertThat(
searchStats.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.FETCH.getName()).getTimeInMillis(),
greaterThan(0L)
);
assertThat(
searchStats.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.FETCH.getName()).getTotal(),
greaterThan(0L)
);
assertThat(
searchStats.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.EXPAND.getName()).getTotal(),
greaterThan(0L)
);
coordNumber += 1;
}
Assert.assertTrue(searchStats.getQueryCount() > 0L);
Assert.assertTrue(searchStats.getFetchCount() > 0L);
num++;
}
assertThat(coordNumber, greaterThan(0));
assertThat(num, greaterThan(0));
}

private Map<String, List<String>> setupCluster(int nodeCountPerAZ, Settings commonSettings) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.search.SearchPhaseName;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.routing.GroupShardsIterator;
Expand All @@ -63,6 +64,7 @@
import java.util.Set;
import java.util.function.Function;

import static org.opensearch.action.search.TransportSearchAction.SEARCH_REQUEST_STATS_ENABLED_KEY;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
Expand All @@ -78,7 +80,7 @@
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

@OpenSearchIntegTestCase.ClusterScope(minNumDataNodes = 2)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, minNumDataNodes = 2)
public class SearchStatsIT extends ParameterizedOpenSearchIntegTestCase {

public SearchStatsIT(Settings dynamicSettings) {
Expand Down Expand Up @@ -126,6 +128,11 @@ public void testSimpleStats() throws Exception {
assertThat(numNodes, greaterThanOrEqualTo(2));
final int shardsIdx1 = randomIntBetween(1, 10); // we make sure each node gets at least a single shard...
final int shardsIdx2 = Math.max(numNodes - shardsIdx1, randomIntBetween(1, 10));
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(SEARCH_REQUEST_STATS_ENABLED_KEY, true).build())
.get();
assertThat(numNodes, lessThanOrEqualTo(shardsIdx1 + shardsIdx2));
assertAcked(
prepareCreate("test1").setSettings(
Expand Down Expand Up @@ -188,20 +195,40 @@ public void testSimpleStats() throws Exception {

Set<String> nodeIdsWithIndex = nodeIdsWithIndex("test1", "test2");
int num = 0;
int numOfCoordinators = 0;

for (NodeStats stat : nodeStats.getNodes()) {
Stats total = stat.getIndices().getSearch().getTotal();
if (total.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.QUERY.getName()).getTimeInMillis() > 0) {
assertThat(
total.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.FETCH.getName()).getTimeInMillis(),
greaterThan(0L)
);
assertEquals(
iters,
total.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.FETCH.getName()).getTotal()
);
assertEquals(
iters,
total.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.EXPAND.getName()).getTotal()
);
assertEquals(
iters,
total.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.FETCH.getName()).getTotal()
);
numOfCoordinators += 1;
}
if (nodeIdsWithIndex.contains(stat.getNode().getId())) {
assertThat(total.getQueryCount(), greaterThan(0L));
assertThat(total.getQueryTimeInMillis(), greaterThan(0L));
num++;
} else {
assertThat(total.getQueryCount(), equalTo(0L));
assertThat(total.getQueryCount(), greaterThanOrEqualTo(0L));
assertThat(total.getQueryTimeInMillis(), equalTo(0L));
}
}

assertThat(numOfCoordinators, greaterThan(0));
assertThat(num, greaterThan(0));

}

private Set<String> nodeIdsWithIndex(String... indices) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -107,7 +108,6 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final AtomicInteger skippedOps = new AtomicInteger();
private final TransportSearchAction.SearchTimeProvider timeProvider;
private final SearchResponse.Clusters clusters;

protected final GroupShardsIterator<SearchShardIterator> toSkipShardsIts;
protected final GroupShardsIterator<SearchShardIterator> shardsIts;
private final int expectedTotalOps;
Expand All @@ -116,8 +116,12 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final Map<String, PendingExecutions> pendingExecutionsPerNode = new ConcurrentHashMap<>();
private final boolean throttleConcurrentRequests;

private SearchPhase currentPhase;

private final List<Releasable> releasables = new ArrayList<>();

private Optional<SearchRequestOperationsListener> searchRequestOperationsListener;

AbstractSearchAsyncAction(
String name,
Logger logger,
Expand All @@ -135,7 +139,8 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
SearchTask task,
SearchPhaseResults<Result> resultConsumer,
int maxConcurrentRequestsPerNode,
SearchResponse.Clusters clusters
SearchResponse.Clusters clusters,
SearchRequestOperationsListener searchRequestOperationsListener
) {
super(name);
final List<SearchShardIterator> toSkipIterators = new ArrayList<>();
Expand Down Expand Up @@ -171,6 +176,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
this.indexRoutings = indexRoutings;
this.results = resultConsumer;
this.clusters = clusters;
this.searchRequestOperationsListener = Optional.ofNullable(searchRequestOperationsListener);
}

@Override
Expand Down Expand Up @@ -371,6 +377,7 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha
: OpenSearchException.guessRootCauses(shardSearchFailures[0].getCause())[0];
logger.debug(() -> new ParameterizedMessage("All shards failed for phase: [{}]", getName()), cause);
onPhaseFailure(currentPhase, "all shards failed", cause);

buddharajusahil marked this conversation as resolved.
Show resolved Hide resolved
} else {
Boolean allowPartialResults = request.allowPartialSearchResults();
assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";
Expand Down Expand Up @@ -419,13 +426,24 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha
clusterState.version()
);
}
onPhaseEnd();
executePhase(nextPhase);
}
}

private void onPhaseEnd() {
this.searchRequestOperationsListener.ifPresent(searchRequestOperations -> { searchRequestOperations.onPhaseEnd(this); });
}

private void onPhaseStart(SearchPhase phase) {
setCurrentPhase(phase);
this.searchRequestOperationsListener.ifPresent(searchRequestOperations -> { searchRequestOperations.onPhaseStart(this); });
}

private void executePhase(SearchPhase phase) {
try {
phase.run();
onPhaseStart(phase);
phase.recordAndRun();
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug(new ParameterizedMessage("Failed to execute [{}] while moving to [{}] phase", request, phase.getName()), e);
Expand Down Expand Up @@ -603,6 +621,14 @@ private void successfulShardExecution(SearchShardIterator shardsIt) {
}
}

public SearchPhase getCurrentPhase() {
return currentPhase;
}

private void setCurrentPhase(SearchPhase phase) {
currentPhase = phase;
}

@Override
public final int getNumShards() {
return results.getNumShards();
Expand Down Expand Up @@ -670,10 +696,13 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At
}
listener.onResponse(buildSearchResponse(internalSearchResponse, failures, scrollId, searchContextId));
}
onPhaseEnd();
setCurrentPhase(null);
buddharajusahil marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause) {
this.searchRequestOperationsListener.ifPresent(searchRequestOperations -> searchRequestOperations.onPhaseFailure(this));
raisePhaseFailure(new SearchPhaseExecutionException(phase.getName(), msg, cause, buildShardFailures()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMa
ClusterState clusterState,
SearchTask task,
Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory,
SearchResponse.Clusters clusters
SearchResponse.Clusters clusters,
SearchRequestOperationsListener searchRequestOperationsListener
) {
// We set max concurrent shard requests to the number of shards so no throttling happens for can_match requests
super(
Expand All @@ -110,7 +111,8 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMa
task,
new CanMatchSearchPhaseResults(shardsIts.size()),
shardsIts.size(),
clusters
clusters,
searchRequestOperationsListener
);
this.phaseFactory = phaseFactory;
this.shardsIts = shardsIts;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,11 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
final TransportSearchAction.SearchTimeProvider timeProvider,
final ClusterState clusterState,
final SearchTask task,
SearchResponse.Clusters clusters
SearchResponse.Clusters clusters,
SearchRequestOperationsListener searchRequestOperationsListener
) {
super(
"dfs",
SearchPhaseName.DFS_PRE_QUERY.getName(),
logger,
searchTransportService,
nodeIdToConnection,
Expand All @@ -95,7 +96,8 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
task,
new ArraySearchPhaseResults<>(shardsIts.size()),
request.getMaxConcurrentShardRequests(),
clusters
clusters,
searchRequestOperationsListener
);
this.queryPhaseResultConsumer = queryPhaseResultConsumer;
this.searchPhaseController = searchPhaseController;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,23 @@
*
* @opensearch.internal
*/
abstract class SearchPhase implements CheckedRunnable<IOException> {
public abstract class SearchPhase implements CheckedRunnable<IOException> {
private final String name;
private long startTimeInNanos;

protected SearchPhase(String name) {
this.name = Objects.requireNonNull(name, "name must not be null");
}

public long getStartTimeInNanos() {
return startTimeInNanos;
}

public void recordAndRun() throws IOException {
this.startTimeInNanos = System.nanoTime();
run();
}

/**
* Returns the phases name.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public interface SearchPhaseContext extends Executor {
*/
SearchRequest getRequest();

SearchPhase getCurrentPhase();

/**
* Builds and sends the final search response back to the user.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* @opensearch.internal
*/
public enum SearchPhaseName {
DFS_PRE_QUERY("dfs_pre_query"),
QUERY("query"),
FETCH("fetch"),
DFS_QUERY("dfs_query"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,11 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh
final TransportSearchAction.SearchTimeProvider timeProvider,
ClusterState clusterState,
SearchTask task,
SearchResponse.Clusters clusters
SearchResponse.Clusters clusters,
SearchRequestOperationsListener searchRequestOperationsListener
) {
super(
"query",
SearchPhaseName.QUERY.getName(),
logger,
searchTransportService,
nodeIdToConnection,
Expand All @@ -100,7 +101,8 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh
task,
resultConsumer,
request.getMaxConcurrentShardRequests(),
clusters
clusters,
searchRequestOperationsListener
);
this.topDocsSize = SearchPhaseController.getTopDocsSize(request);
this.trackTotalHitsUpTo = request.resolveTrackTotalHitsUpTo();
Expand Down
Loading
Loading