Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for merging multiple search responses into one #37566

Merged
merged 14 commits into from
Jan 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.search.TransportSearchAction.SearchTimeProvider;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.common.Nullable;
Expand All @@ -43,7 +44,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
Expand All @@ -70,7 +70,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final Object shardFailuresMutex = new Object();
private final AtomicInteger successfulOps = new AtomicInteger();
private final AtomicInteger skippedOps = new AtomicInteger();
private final TransportSearchAction.SearchTimeProvider timeProvider;
private final SearchTimeProvider timeProvider;
private final SearchResponse.Clusters clusters;

AbstractSearchAsyncAction(String name, Logger logger, SearchTransportService searchTransportService,
Expand All @@ -79,7 +79,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
Map<String, Set<String>> indexRoutings,
Executor executor, SearchRequest request,
ActionListener<SearchResponse> listener, GroupShardsIterator<SearchShardIterator> shardsIts,
TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion,
SearchTimeProvider timeProvider, long clusterStateVersion,
SearchTask task, SearchPhaseResults<Result> resultConsumer, int maxConcurrentRequestsPerNode,
SearchResponse.Clusters clusters) {
super(name, request, shardsIts, logger, maxConcurrentRequestsPerNode, executor);
Expand All @@ -103,8 +103,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
* Builds how long it took to execute the search.
*/
long buildTookInMillis() {
return TimeUnit.NANOSECONDS.toMillis(
timeProvider.getRelativeCurrentNanos() - timeProvider.getRelativeStartNanos());
return timeProvider.buildTookInMillis();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ static SortedTopDocs sortDocs(boolean ignoreFrom, Collection<? extends SearchPha
if (queryResult.hasConsumedTopDocs() == false) { // already consumed?
final TopDocsAndMaxScore td = queryResult.consumeTopDocs();
assert td != null;
topDocsStats.add(td);
topDocsStats.add(td, queryResult.searchTimedOut(), queryResult.terminatedEarly());
// make sure we set the shard index before we add it - the consumer didn't do that yet
if (td.topDocs.scoreDocs.length > 0) {
setShardIndex(td.topDocs, queryResult.getShardIndex());
Expand Down Expand Up @@ -439,12 +439,10 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
boolean performFinalReduce) {
assert numReducePhases >= 0 : "num reduce phases must be >= 0 but was: " + numReducePhases;
numReducePhases++; // increment for this phase
boolean timedOut = false;
Boolean terminatedEarly = null;
if (queryResults.isEmpty()) { // early terminate we have nothing to reduce
final TotalHits totalHits = topDocsStats.getTotalHits();
return new ReducedQueryPhase(totalHits, topDocsStats.fetchHits, topDocsStats.maxScore,
timedOut, terminatedEarly, null, null, null, SortedTopDocs.EMPTY, null, numReducePhases, 0, 0, true);
return new ReducedQueryPhase(totalHits, topDocsStats.fetchHits, topDocsStats.getMaxScore(),
false, null, null, null, null, SortedTopDocs.EMPTY, null, numReducePhases, 0, 0, true);
}
final QuerySearchResult firstResult = queryResults.stream().findFirst().get().queryResult();
final boolean hasSuggest = firstResult.suggest() != null;
Expand Down Expand Up @@ -476,16 +474,6 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
QuerySearchResult result = entry.queryResult();
from = result.from();
size = result.size();
if (result.searchTimedOut()) {
timedOut = true;
}
if (result.terminatedEarly() != null) {
if (terminatedEarly == null) {
terminatedEarly = result.terminatedEarly();
} else if (result.terminatedEarly()) {
terminatedEarly = true;
}
}
if (hasSuggest) {
assert result.suggest() != null;
for (Suggestion<? extends Suggestion.Entry<? extends Suggestion.Entry.Option>> suggestion : result.suggest()) {
Expand All @@ -508,8 +496,8 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
final SortedTopDocs sortedTopDocs = sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size);
final TotalHits totalHits = topDocsStats.getTotalHits();
return new ReducedQueryPhase(totalHits, topDocsStats.fetchHits, topDocsStats.maxScore,
timedOut, terminatedEarly, suggest, aggregations, shardResults, sortedTopDocs,
return new ReducedQueryPhase(totalHits, topDocsStats.fetchHits, topDocsStats.getMaxScore(),
topDocsStats.timedOut, topDocsStats.terminatedEarly, suggest, aggregations, shardResults, sortedTopDocs,
firstResult.sortValueFormats(), numReducePhases, size, from, false);
}

Expand Down Expand Up @@ -577,11 +565,7 @@ public static final class ReducedQueryPhase {
}
this.totalHits = totalHits;
this.fetchHits = fetchHits;
if (Float.isInfinite(maxScore)) {
this.maxScore = Float.NaN;
} else {
this.maxScore = maxScore;
}
this.maxScore = maxScore;
this.timedOut = timedOut;
this.terminatedEarly = terminatedEarly;
this.suggest = suggest;
Expand Down Expand Up @@ -682,7 +666,7 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
}
if (hasTopDocs) {
final TopDocsAndMaxScore topDocs = querySearchResult.consumeTopDocs(); // can't be null
topDocsStats.add(topDocs);
topDocsStats.add(topDocs, querySearchResult.searchTimedOut(), querySearchResult.terminatedEarly());
setShardIndex(topDocs.topDocs, querySearchResult.getShardIndex());
topDocsBuffer[i] = topDocs.topDocs;
}
Expand Down Expand Up @@ -744,18 +728,20 @@ static final class TopDocsStats {
private long totalHits;
private TotalHits.Relation totalHitsRelation;
long fetchHits;
float maxScore = Float.NEGATIVE_INFINITY;

TopDocsStats() {
this(SearchContext.TRACK_TOTAL_HITS_ACCURATE);
}
private float maxScore = Float.NEGATIVE_INFINITY;
boolean timedOut;
Boolean terminatedEarly;

TopDocsStats(int trackTotalHitsUpTo) {
this.trackTotalHitsUpTo = trackTotalHitsUpTo;
this.totalHits = 0;
this.totalHitsRelation = Relation.EQUAL_TO;
}

float getMaxScore() {
return Float.isInfinite(maxScore) ? Float.NaN : maxScore;
}

TotalHits getTotalHits() {
if (trackTotalHitsUpTo == SearchContext.TRACK_TOTAL_HITS_DISABLED) {
return null;
Expand All @@ -766,7 +752,7 @@ TotalHits getTotalHits() {
if (totalHits < trackTotalHitsUpTo) {
return new TotalHits(totalHits, totalHitsRelation);
} else {
/**
/*
* The user requested to count the total hits up to <code>trackTotalHitsUpTo</code>
* so we return this lower bound when the total hits is greater than this value.
* This can happen when multiple shards are merged since the limit to track total hits
Expand All @@ -777,7 +763,7 @@ TotalHits getTotalHits() {
}
}

void add(TopDocsAndMaxScore topDocs) {
void add(TopDocsAndMaxScore topDocs, boolean timedOut, Boolean terminatedEarly) {
if (trackTotalHitsUpTo != SearchContext.TRACK_TOTAL_HITS_DISABLED) {
totalHits += topDocs.topDocs.totalHits.value;
if (topDocs.topDocs.totalHits.relation == Relation.GREATER_THAN_OR_EQUAL_TO) {
Expand All @@ -788,6 +774,16 @@ void add(TopDocsAndMaxScore topDocs) {
if (!Float.isNaN(topDocs.maxScore)) {
maxScore = Math.max(maxScore, topDocs.maxScore);
}
if (timedOut) {
this.timedOut = true;
}
if (terminatedEarly != null) {
if (this.terminatedEarly == null) {
this.terminatedEarly = terminatedEarly;
} else if (terminatedEarly) {
this.terminatedEarly = true;
}
}
}
}

Expand Down
Loading