Skip to content

Commit

Permalink
expand TopDocsStats to include timedOut and earlyTerminated
Browse files Browse the repository at this point in the history
This allows to share more code between SearchResponseMerger and SearchPhaseController
  • Loading branch information
javanna committed Jan 18, 2019
1 parent 3537afc commit 8f1b063
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ static SortedTopDocs sortDocs(boolean ignoreFrom, Collection<? extends SearchPha
if (queryResult.hasConsumedTopDocs() == false) { // already consumed?
final TopDocsAndMaxScore td = queryResult.consumeTopDocs();
assert td != null;
topDocsStats.add(td);
topDocsStats.add(td, queryResult.searchTimedOut(), queryResult.terminatedEarly());
// make sure we set the shard index before we add it - the consumer didn't do that yet
if (td.topDocs.scoreDocs.length > 0) {
setShardIndex(td.topDocs, queryResult.getShardIndex());
Expand Down Expand Up @@ -439,12 +439,10 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
boolean performFinalReduce) {
assert numReducePhases >= 0 : "num reduce phases must be >= 0 but was: " + numReducePhases;
numReducePhases++; // increment for this phase
boolean timedOut = false;
Boolean terminatedEarly = null;
if (queryResults.isEmpty()) { // early terminate we have nothing to reduce
final TotalHits totalHits = topDocsStats.getTotalHits();
return new ReducedQueryPhase(totalHits, topDocsStats.fetchHits, topDocsStats.getMaxScore(),
timedOut, terminatedEarly, null, null, null, SortedTopDocs.EMPTY, null, numReducePhases, 0, 0, true);
false, null, null, null, null, SortedTopDocs.EMPTY, null, numReducePhases, 0, 0, true);
}
final QuerySearchResult firstResult = queryResults.stream().findFirst().get().queryResult();
final boolean hasSuggest = firstResult.suggest() != null;
Expand Down Expand Up @@ -476,16 +474,6 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
QuerySearchResult result = entry.queryResult();
from = result.from();
size = result.size();
if (result.searchTimedOut()) {
timedOut = true;
}
if (result.terminatedEarly() != null) {
if (terminatedEarly == null) {
terminatedEarly = result.terminatedEarly();
} else if (result.terminatedEarly()) {
terminatedEarly = true;
}
}
if (hasSuggest) {
assert result.suggest() != null;
for (Suggestion<? extends Suggestion.Entry<? extends Suggestion.Entry.Option>> suggestion : result.suggest()) {
Expand All @@ -509,7 +497,7 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
final SortedTopDocs sortedTopDocs = sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size);
final TotalHits totalHits = topDocsStats.getTotalHits();
return new ReducedQueryPhase(totalHits, topDocsStats.fetchHits, topDocsStats.getMaxScore(),
timedOut, terminatedEarly, suggest, aggregations, shardResults, sortedTopDocs,
topDocsStats.timedOut, topDocsStats.terminatedEarly, suggest, aggregations, shardResults, sortedTopDocs,
firstResult.sortValueFormats(), numReducePhases, size, from, false);
}

Expand Down Expand Up @@ -678,7 +666,7 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
}
if (hasTopDocs) {
final TopDocsAndMaxScore topDocs = querySearchResult.consumeTopDocs(); // can't be null
topDocsStats.add(topDocs);
topDocsStats.add(topDocs, querySearchResult.searchTimedOut(), querySearchResult.terminatedEarly());
setShardIndex(topDocs.topDocs, querySearchResult.getShardIndex());
topDocsBuffer[i] = topDocs.topDocs;
}
Expand Down Expand Up @@ -741,6 +729,8 @@ static final class TopDocsStats {
private TotalHits.Relation totalHitsRelation;
long fetchHits;
private float maxScore = Float.NEGATIVE_INFINITY;
boolean timedOut;
Boolean terminatedEarly;

TopDocsStats(int trackTotalHitsUpTo) {
this.trackTotalHitsUpTo = trackTotalHitsUpTo;
Expand Down Expand Up @@ -773,7 +763,7 @@ TotalHits getTotalHits() {
}
}

void add(TopDocsAndMaxScore topDocs) {
void add(TopDocsAndMaxScore topDocs, boolean timedOut, Boolean terminatedEarly) {
if (trackTotalHitsUpTo != SearchContext.TRACK_TOTAL_HITS_DISABLED) {
totalHits += topDocs.topDocs.totalHits.value;
if (topDocs.topDocs.totalHits.relation == Relation.GREATER_THAN_OR_EQUAL_TO) {
Expand All @@ -784,6 +774,16 @@ void add(TopDocsAndMaxScore topDocs) {
if (!Float.isNaN(topDocs.maxScore)) {
maxScore = Math.max(maxScore, topDocs.maxScore);
}
if (timedOut) {
this.timedOut = true;
}
if (terminatedEarly != null) {
if (this.terminatedEarly == null) {
this.terminatedEarly = terminatedEarly;
} else if (terminatedEarly) {
this.terminatedEarly = true;
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,6 @@ SearchResponse getMergedResponse() {
int totalShards = 0;
int skippedShards = 0;
int successfulShards = 0;
boolean timedOut = false;
Boolean terminatedEarly = null;
//the current reduce phase counts as one
int numReducePhases = 1;
List<ShardSearchFailure> failures = new ArrayList<>();
Expand All @@ -121,10 +119,6 @@ SearchResponse getMergedResponse() {
totalShards += searchResponse.getTotalShards();
skippedShards += searchResponse.getSkippedShards();
successfulShards += searchResponse.getSuccessfulShards();
timedOut = timedOut || searchResponse.isTimedOut();
if (searchResponse.isTerminatedEarly() != null && searchResponse.isTerminatedEarly()) {
terminatedEarly = true;
}
numReducePhases += searchResponse.getNumReducePhases();

Collections.addAll(failures, searchResponse.getShardFailures());
Expand Down Expand Up @@ -158,7 +152,8 @@ SearchResponse getMergedResponse() {
trackTotalHits = true;
}
TopDocs topDocs = searchHitsToTopDocs(searchHits, totalHits, shards);
topDocsStats.add(new TopDocsAndMaxScore(topDocs, searchHits.getMaxScore()));
topDocsStats.add(new TopDocsAndMaxScore(topDocs, searchHits.getMaxScore()),
searchResponse.isTimedOut(), searchResponse.isTerminatedEarly());
topDocsList.add(topDocs);
}

Expand Down Expand Up @@ -186,7 +181,7 @@ SearchResponse getMergedResponse() {
//make failures ordering consistent with ordinary search and CCS
Arrays.sort(shardFailures, FAILURES_COMPARATOR);
InternalSearchResponse response = new InternalSearchResponse(mergedSearchHits, reducedAggs, suggest,
new SearchProfileShardResults(profileResults), timedOut, terminatedEarly, numReducePhases);
new SearchProfileShardResults(profileResults), topDocsStats.timedOut, topDocsStats.terminatedEarly, numReducePhases);
long tookInMillis = searchTimeProvider.buildTookInMillis();
return new SearchResponse(response, null, totalShards, successfulShards, skippedShards, tookInMillis, shardFailures, clusters);
}
Expand Down

0 comments on commit 8f1b063

Please sign in to comment.