From 891b2a7b5e9208c3613a9b0b5a324a9c837f4f63 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 5 Apr 2017 15:43:39 +0200 Subject: [PATCH 1/3] Introduce incremental reduction of TopDocs This commit adds support for incremental top N reduction if the number of expected shards in the search request is high enough. The changes here also clean up more code in SearchPhaseController to make the separation between values that are the same on each search result and values that are per response. The reduced search phase result doesn't hold an arbitrary result to obtain values like `from`, `size` or sort values which is now cleanly encapsulated. --- .../action/search/FetchSearchPhase.java | 15 +- .../action/search/SearchPhaseController.java | 300 ++++++++++++------ .../SearchScrollQueryAndFetchAsyncAction.java | 5 +- ...SearchScrollQueryThenFetchAsyncAction.java | 18 +- .../common/util/concurrent/AtomicArray.java | 9 - .../elasticsearch/search/SearchService.java | 4 +- .../search/fetch/FetchPhase.java | 2 +- .../search/query/QueryPhase.java | 59 ++-- .../search/query/QuerySearchResult.java | 60 +++- .../search/SearchPhaseControllerTests.java | 227 ++++++++++--- .../search/SearchServiceTests.java | 19 +- 11 files changed, 491 insertions(+), 227 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java b/core/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java index 920dd1b0009ea..a0e313f1d73f7 100644 --- a/core/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java +++ b/core/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java @@ -98,27 +98,26 @@ private void innerRun() throws IOException { final int numShards = context.getNumShards(); final boolean isScrollSearch = context.getRequest().scroll() != null; List phaseResults = queryResults.asList(); - ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(isScrollSearch, phaseResults); String scrollId = isScrollSearch ? TransportSearchHelper.buildScrollId(queryResults) : null; final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = resultConsumer.reduce(); final boolean queryAndFetchOptimization = queryResults.length() == 1; final Runnable finishPhase = () - -> moveToNextPhase(searchPhaseController, sortedShardDocs, scrollId, reducedQueryPhase, queryAndFetchOptimization ? + -> moveToNextPhase(searchPhaseController, scrollId, reducedQueryPhase, queryAndFetchOptimization ? queryResults : fetchResults); if (queryAndFetchOptimization) { assert phaseResults.isEmpty() || phaseResults.get(0).fetchResult() != null; // query AND fetch optimization finishPhase.run(); } else { - final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(numShards, sortedShardDocs); - if (sortedShardDocs.length == 0) { // no docs to fetch -- sidestep everything and return + final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(numShards, reducedQueryPhase.scoreDocs); + if (reducedQueryPhase.scoreDocs.length == 0) { // no docs to fetch -- sidestep everything and return phaseResults.stream() .map(e -> e.queryResult()) .forEach(this::releaseIrrelevantSearchContext); // we have to release contexts here to free up resources finishPhase.run(); } else { final ScoreDoc[] lastEmittedDocPerShard = isScrollSearch ? - searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, sortedShardDocs, numShards) + searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, numShards) : null; final CountedCollector counter = new CountedCollector<>(r -> fetchResults.set(r.getShardIndex(), r), docIdsToLoad.length, // we count down every shard in the result no matter if we got any results or not @@ -188,7 +187,7 @@ public void onFailure(Exception e) { private void releaseIrrelevantSearchContext(QuerySearchResult queryResult) { // we only release search context that we did not fetch from if we are not scrolling // and if it has at lease one hit that didn't make it to the global topDocs - if (context.getRequest().scroll() == null && queryResult.hasHits()) { + if (context.getRequest().scroll() == null && queryResult.hasSearchContext()) { try { Transport.Connection connection = context.getConnection(queryResult.getSearchShardTarget().getNodeId()); context.sendReleaseSearchContext(queryResult.getRequestId(), connection); @@ -198,11 +197,11 @@ private void releaseIrrelevantSearchContext(QuerySearchResult queryResult) { } } - private void moveToNextPhase(SearchPhaseController searchPhaseController, ScoreDoc[] sortedDocs, + private void moveToNextPhase(SearchPhaseController searchPhaseController, String scrollId, SearchPhaseController.ReducedQueryPhase reducedQueryPhase, AtomicArray fetchResultsArr) { final InternalSearchResponse internalResponse = searchPhaseController.merge(context.getRequest().scroll() != null, - sortedDocs, reducedQueryPhase, fetchResultsArr.asList(), fetchResultsArr::get); + reducedQueryPhase, fetchResultsArr.asList(), fetchResultsArr::get); context.executeNextPhase(this, nextPhaseFactory.apply(context.buildSearchResponse(internalResponse, scrollId))); } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index 9972b6e1c2af7..d539b4455afb9 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -36,6 +36,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.script.ScriptService; +import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchPhaseResult; @@ -56,7 +57,6 @@ import org.elasticsearch.search.suggest.Suggest.Suggestion.Entry; import org.elasticsearch.search.suggest.completion.CompletionSuggestion; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -147,42 +147,47 @@ private static long optionalSum(long left, long right) { * @param ignoreFrom Whether to ignore the from and sort all hits in each shard result. * Enabled only for scroll search, because that only retrieves hits of length 'size' in the query phase. * @param results the search phase results to obtain the sort docs from + * @param bufferedTopDocs the pre-consumed buffered top docs + * @param topDocsStats the top docs stats to fill + * @param from the offset into the search results top docs + * @param size the number of hits to return from the merged top docs */ - public ScoreDoc[] sortDocs(boolean ignoreFrom, Collection results) throws IOException { + public SortedTopDocs sortDocs(boolean ignoreFrom, Collection results, + final Collection bufferedTopDocs, final TopDocsStats topDocsStats, int from, int size) { if (results.isEmpty()) { - return EMPTY_DOCS; + return SortedTopDocs.EMPTY; } - final Collection topDocs = new ArrayList<>(); + final Collection topDocs = bufferedTopDocs == null ? new ArrayList<>() : bufferedTopDocs; final Map>> groupedCompletionSuggestions = new HashMap<>(); - int from = -1; - int size = -1; - for (SearchPhaseResult sortedResult : results) { + for (SearchPhaseResult sortedResult : results) { // TODO we can move this loop into the reduce call to only loop over this once /* We loop over all results once, group together the completion suggestions if there are any and collect relevant * top docs results. Each top docs gets it's shard index set on all top docs to simplify top docs merging down the road * this allowed to remove a single shared optimization code here since now we don't materialized a dense array of * top docs anymore but instead only pass relevant results / top docs to the merge method*/ QuerySearchResult queryResult = sortedResult.queryResult(); - if (queryResult.hasHits()) { - from = queryResult.from(); - size = queryResult.size(); - TopDocs td = queryResult.topDocs(); - if (td != null && td.scoreDocs.length > 0) { + if (queryResult.hasConsumedTopDocs() == false) { // already consumed? + final TopDocs td = queryResult.consumeTopDocs(); + assert td != null; + topDocsStats.add(td); + if (td.scoreDocs.length > 0) { // make sure we set the shard index before we add it - the consumer didn't do that yet setShardIndex(td, queryResult.getShardIndex()); topDocs.add(td); } + } + if (queryResult.hasSuggestHits()) { Suggest shardSuggest = queryResult.suggest(); - if (shardSuggest != null) { - for (CompletionSuggestion suggestion : shardSuggest.filter(CompletionSuggestion.class)) { - suggestion.setShardIndex(sortedResult.getShardIndex()); - List> suggestions = - groupedCompletionSuggestions.computeIfAbsent(suggestion.getName(), s -> new ArrayList<>()); - suggestions.add(suggestion); - } + for (CompletionSuggestion suggestion : shardSuggest.filter(CompletionSuggestion.class)) { + suggestion.setShardIndex(sortedResult.getShardIndex()); + List> suggestions = + groupedCompletionSuggestions.computeIfAbsent(suggestion.getName(), s -> new ArrayList<>()); + suggestions.add(suggestion); } } } - if (size != -1) { - final ScoreDoc[] mergedScoreDocs = mergeTopDocs(topDocs, size, ignoreFrom ? 0 : from); + final boolean hasNoHits = groupedCompletionSuggestions.isEmpty() && topDocs.isEmpty(); + if (hasNoHits == false) { + final TopDocs mergedTopDocs = mergeTopDocs(topDocs, size, ignoreFrom ? 0 : from); + final ScoreDoc[] mergedScoreDocs = mergedTopDocs == null ? EMPTY_DOCS : mergedTopDocs.scoreDocs; ScoreDoc[] scoreDocs = mergedScoreDocs; if (groupedCompletionSuggestions.isEmpty() == false) { int numSuggestDocs = 0; @@ -204,23 +209,35 @@ public ScoreDoc[] sortDocs(boolean ignoreFrom, Collection results, int topN, int from) { + TopDocs mergeTopDocs(Collection results, int topN, int from) { if (results.isEmpty()) { - return EMPTY_DOCS; + return null; } + assert results.isEmpty() == false; final boolean setShardIndex = false; final TopDocs topDocs = results.stream().findFirst().get(); final TopDocs mergedTopDocs; final int numShards = results.size(); if (numShards == 1 && from == 0) { // only one shard and no pagination we can just return the topDocs as we got them. - return topDocs.scoreDocs; + return topDocs; } else if (topDocs instanceof CollapseTopFieldDocs) { CollapseTopFieldDocs firstTopDocs = (CollapseTopFieldDocs) topDocs; final Sort sort = new Sort(firstTopDocs.fields); @@ -235,7 +252,7 @@ private ScoreDoc[] mergeTopDocs(Collection results, int topN, int from) final TopDocs[] shardTopDocs = results.toArray(new TopDocs[numShards]); mergedTopDocs = TopDocs.merge(from, topN, shardTopDocs, setShardIndex); } - return mergedTopDocs.scoreDocs; + return mergedTopDocs; } private static void setShardIndex(TopDocs topDocs, int shardIndex) { @@ -249,12 +266,12 @@ private static void setShardIndex(TopDocs topDocs, int shardIndex) { } } - public ScoreDoc[] getLastEmittedDocPerShard(ReducedQueryPhase reducedQueryPhase, - ScoreDoc[] sortedScoreDocs, int numShards) { - ScoreDoc[] lastEmittedDocPerShard = new ScoreDoc[numShards]; - if (reducedQueryPhase.isEmpty() == false) { + public ScoreDoc[] getLastEmittedDocPerShard(ReducedQueryPhase reducedQueryPhase, int numShards) { + final ScoreDoc[] lastEmittedDocPerShard = new ScoreDoc[numShards]; + if (reducedQueryPhase.isEmptyResult == false) { + final ScoreDoc[] sortedScoreDocs = reducedQueryPhase.scoreDocs; // from is always zero as when we use scroll, we ignore from - long size = Math.min(reducedQueryPhase.fetchHits, reducedQueryPhase.oneResult.size()); + long size = Math.min(reducedQueryPhase.fetchHits, reducedQueryPhase.size); // with collapsing we can have more hits than sorted docs size = Math.min(sortedScoreDocs.length, size); for (int sortedDocsIndex = 0; sortedDocsIndex < size; sortedDocsIndex++) { @@ -288,13 +305,13 @@ public IntArrayList[] fillDocIdsToLoad(int numShards, ScoreDoc[] shardDocs) { * Expects sortedDocs to have top search docs across all shards, optionally followed by top suggest docs for each named * completion suggestion ordered by suggestion name */ - public InternalSearchResponse merge(boolean ignoreFrom, ScoreDoc[] sortedDocs, - ReducedQueryPhase reducedQueryPhase, + public InternalSearchResponse merge(boolean ignoreFrom, ReducedQueryPhase reducedQueryPhase, Collection fetchResults, IntFunction resultsLookup) { - if (reducedQueryPhase.isEmpty()) { + if (reducedQueryPhase.isEmptyResult) { return InternalSearchResponse.empty(); } - SearchHits hits = getHits(reducedQueryPhase, ignoreFrom, sortedDocs, fetchResults, resultsLookup); + ScoreDoc[] sortedDocs = reducedQueryPhase.scoreDocs; + SearchHits hits = getHits(reducedQueryPhase, ignoreFrom, fetchResults, resultsLookup); if (reducedQueryPhase.suggest != null) { if (!fetchResults.isEmpty()) { int currentOffset = hits.getHits().length; @@ -325,21 +342,15 @@ public InternalSearchResponse merge(boolean ignoreFrom, ScoreDoc[] sortedDocs, return reducedQueryPhase.buildResponse(hits); } - private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFrom, ScoreDoc[] sortedDocs, + private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFrom, Collection fetchResults, IntFunction resultsLookup) { - boolean sorted = false; + final boolean sorted = reducedQueryPhase.isSorted; + ScoreDoc[] sortedDocs = reducedQueryPhase.scoreDocs; int sortScoreIndex = -1; - if (reducedQueryPhase.oneResult.topDocs() instanceof TopFieldDocs) { - TopFieldDocs fieldDocs = (TopFieldDocs) reducedQueryPhase.oneResult.queryResult().topDocs(); - if (fieldDocs instanceof CollapseTopFieldDocs && - fieldDocs.fields.length == 1 && fieldDocs.fields[0].getType() == SortField.Type.SCORE) { - sorted = false; - } else { - sorted = true; - for (int i = 0; i < fieldDocs.fields.length; i++) { - if (fieldDocs.fields[i].getType() == SortField.Type.SCORE) { - sortScoreIndex = i; - } + if (sorted) { + for (int i = 0; i < reducedQueryPhase.sortField.length; i++) { + if (reducedQueryPhase.sortField[i].getType() == SortField.Type.SCORE) { + sortScoreIndex = i; } } } @@ -347,8 +358,8 @@ private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFr for (SearchPhaseResult entry : fetchResults) { entry.fetchResult().initCounter(); } - int from = ignoreFrom ? 0 : reducedQueryPhase.oneResult.queryResult().from(); - int numSearchHits = (int) Math.min(reducedQueryPhase.fetchHits - from, reducedQueryPhase.oneResult.size()); + int from = ignoreFrom ? 0 : reducedQueryPhase.from; + int numSearchHits = (int) Math.min(reducedQueryPhase.fetchHits - from, reducedQueryPhase.size); // with collapsing we can have more fetch hits than sorted docs numSearchHits = Math.min(sortedDocs.length, numSearchHits); // merge hits @@ -368,7 +379,7 @@ private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFr searchHit.shard(fetchResult.getSearchShardTarget()); if (sorted) { FieldDoc fieldDoc = (FieldDoc) shardDoc; - searchHit.sortValues(fieldDoc.fields, reducedQueryPhase.oneResult.sortValueFormats()); + searchHit.sortValues(fieldDoc.fields, reducedQueryPhase.sortValueFormats); if (sortScoreIndex != -1) { searchHit.score(((Number) fieldDoc.fields[sortScoreIndex]).floatValue()); } @@ -385,42 +396,43 @@ private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFr * Reduces the given query results and consumes all aggregations and profile results. * @param queryResults a list of non-null query shard results */ - public ReducedQueryPhase reducedQueryPhase(List queryResults) { - return reducedQueryPhase(queryResults, null, 0); + public ReducedQueryPhase reducedQueryPhase(Collection queryResults, boolean isScrollRequest) { + return reducedQueryPhase(queryResults, null, new ArrayList<>(), new TopDocsStats(), 0, isScrollRequest); } /** * Reduces the given query results and consumes all aggregations and profile results. * @param queryResults a list of non-null query shard results - * @param bufferdAggs a list of pre-collected / buffered aggregations. if this list is non-null all aggregations have been consumed + * @param bufferedAggs a list of pre-collected / buffered aggregations. if this list is non-null all aggregations have been consumed + * from all non-null query results. + * @param bufferedAggs a list of pre-collected / buffered top docs. if this list is non-null all top docs have been consumed * from all non-null query results. * @param numReducePhases the number of non-final reduce phases applied to the query results. * @see QuerySearchResult#consumeAggs() * @see QuerySearchResult#consumeProfileResult() */ private ReducedQueryPhase reducedQueryPhase(Collection queryResults, - List bufferdAggs, int numReducePhases) { + List bufferedAggs, + List bufferedTopDocs, TopDocsStats topDocsStats, int numReducePhases, + boolean isScrollRequest) { assert numReducePhases >= 0 : "num reduce phases must be >= 0 but was: " + numReducePhases; numReducePhases++; // increment for this phase - long totalHits = 0; - long fetchHits = 0; - float maxScore = Float.NEGATIVE_INFINITY; boolean timedOut = false; Boolean terminatedEarly = null; if (queryResults.isEmpty()) { // early terminate we have nothing to reduce - return new ReducedQueryPhase(totalHits, fetchHits, maxScore, timedOut, terminatedEarly, null, null, null, null, - numReducePhases); + return new ReducedQueryPhase(topDocsStats.totalHits, topDocsStats.fetchHits, topDocsStats.maxScore, + timedOut, terminatedEarly, null, null, null, EMPTY_DOCS, null, null, numReducePhases, false, 0, 0, true); } final QuerySearchResult firstResult = queryResults.stream().findFirst().get().queryResult(); final boolean hasSuggest = firstResult.suggest() != null; final boolean hasProfileResults = firstResult.hasProfileResults(); final boolean consumeAggs; final List aggregationsList; - if (bufferdAggs != null) { + if (bufferedAggs != null) { consumeAggs = false; // we already have results from intermediate reduces and just need to perform the final reduce assert firstResult.hasAggs() : "firstResult has no aggs but we got non null buffered aggs?"; - aggregationsList = bufferdAggs; + aggregationsList = bufferedAggs; } else if (firstResult.hasAggs()) { // the number of shards was less than the buffer size so we reduce agg results directly aggregationsList = new ArrayList<>(queryResults.size()); @@ -435,8 +447,12 @@ private ReducedQueryPhase reducedQueryPhase(Collection> groupedSuggestions = hasSuggest ? new HashMap<>() : Collections.emptyMap(); final Map profileResults = hasProfileResults ? new HashMap<>(queryResults.size()) : Collections.emptyMap(); + int from = 0; + int size = 0; for (SearchPhaseResult entry : queryResults) { QuerySearchResult result = entry.queryResult(); + from = result.from(); + size = result.size(); if (result.searchTimedOut()) { timedOut = true; } @@ -447,11 +463,6 @@ private ReducedQueryPhase reducedQueryPhase(Collection> suggestion : result.suggest()) { @@ -472,8 +483,11 @@ private ReducedQueryPhase reducedQueryPhase(Collectionnull if the results are not sorted + final SortField[] sortField; + // true iff the result score docs is sorted + final boolean isSorted; + // the size of the top hits to return + final int size; + // true iff the query phase had no results. Otherwise false + final boolean isEmptyResult; + // the offset into the merged top hits + final int from; + // sort value formats used to sort / format the result + final DocValueFormat[] sortValueFormats; + + ReducedQueryPhase(long totalHits, long fetchHits, float maxScore, boolean timedOut, Boolean terminatedEarly, Suggest suggest, + InternalAggregations aggregations, SearchProfileShardResults shardResults, ScoreDoc[] scoreDocs, + SortField[] sortFields, DocValueFormat[] sortValueFormats, int numReducePhases, boolean isSorted, int size, + int from, boolean isEmptyResult) { if (numReducePhases <= 0) { throw new IllegalArgumentException("at least one reduce phase must have been applied but was: " + numReducePhases); } @@ -540,27 +567,26 @@ public static final class ReducedQueryPhase { } this.timedOut = timedOut; this.terminatedEarly = terminatedEarly; - this.oneResult = oneResult; this.suggest = suggest; this.aggregations = aggregations; this.shardResults = shardResults; this.numReducePhases = numReducePhases; + this.scoreDocs = scoreDocs; + this.sortField = sortFields; + this.isSorted = isSorted; + this.size = size; + this.from = from; + this.isEmptyResult = isEmptyResult; + this.sortValueFormats = sortValueFormats; } /** * Creates a new search response from the given merged hits. - * @see #merge(boolean, ScoreDoc[], ReducedQueryPhase, Collection, IntFunction) + * @see #merge(boolean, ReducedQueryPhase, Collection, IntFunction) */ public InternalSearchResponse buildResponse(SearchHits hits) { return new InternalSearchResponse(hits, aggregations, suggest, shardResults, timedOut, terminatedEarly, numReducePhases); } - - /** - * Returns true iff the query phase had no results. Otherwise false - */ - public boolean isEmpty() { - return oneResult == null; - } } /** @@ -569,12 +595,16 @@ public boolean isEmpty() { * This implementation can be configured to batch up a certain amount of results and only reduce them * iff the buffer is exhausted. */ - static final class QueryPhaseResultConsumer - extends InitialSearchPhase.SearchPhaseResults { - private final InternalAggregations[] buffer; + static final class QueryPhaseResultConsumer extends InitialSearchPhase.SearchPhaseResults { + private final InternalAggregations[] aggsBuffer; + private final TopDocs[] topDocsBuffer; + private final boolean hasAggs; + private final boolean hasTopDocs; + private final int bufferSize; private int index; private final SearchPhaseController controller; private int numReducePhases = 0; + private final TopDocsStats topDocsStats = new TopDocsStats(); /** * Creates a new {@link QueryPhaseResultConsumer} @@ -583,7 +613,8 @@ static final class QueryPhaseResultConsumer * @param bufferSize the size of the reduce buffer. if the buffer size is smaller than the number of expected results * the buffer is used to incrementally reduce aggregation results before all shards responded. */ - private QueryPhaseResultConsumer(SearchPhaseController controller, int expectedResultSize, int bufferSize) { + private QueryPhaseResultConsumer(SearchPhaseController controller, int expectedResultSize, int bufferSize, + boolean hasTopDocs, boolean hasAggs) { super(expectedResultSize); if (expectedResultSize != 1 && bufferSize < 2) { throw new IllegalArgumentException("buffer size must be >= 2 if there is more than one expected result"); @@ -591,39 +622,68 @@ private QueryPhaseResultConsumer(SearchPhaseController controller, int expectedR if (expectedResultSize <= bufferSize) { throw new IllegalArgumentException("buffer size must be less than the expected result size"); } + if (hasAggs == false && hasTopDocs == false) { + throw new IllegalArgumentException("either aggs or top docs must be present"); + } this.controller = controller; // no need to buffer anything if we have less expected results. in this case we don't consume any results ahead of time. - this.buffer = new InternalAggregations[bufferSize]; + this.aggsBuffer = new InternalAggregations[hasAggs ? bufferSize : 0]; + this.topDocsBuffer = new TopDocs[hasTopDocs ? bufferSize : 0]; + this.hasTopDocs = hasTopDocs; + this.hasAggs = hasAggs; + this.bufferSize = bufferSize; + } @Override public void consumeResult(SearchPhaseResult result) { super.consumeResult(result); QuerySearchResult queryResult = result.queryResult(); - assert queryResult.hasAggs() : "this collector should only be used if aggs are requested"; consumeInternal(queryResult); } private synchronized void consumeInternal(QuerySearchResult querySearchResult) { - InternalAggregations aggregations = (InternalAggregations) querySearchResult.consumeAggs(); - if (index == buffer.length) { - InternalAggregations reducedAggs = controller.reduceAggsIncrementally(Arrays.asList(buffer)); - Arrays.fill(buffer, null); + if (index == bufferSize) { + if (hasAggs) { + InternalAggregations reducedAggs = controller.reduceAggsIncrementally(Arrays.asList(aggsBuffer)); + Arrays.fill(aggsBuffer, null); + aggsBuffer[0] = reducedAggs; + } + if (hasTopDocs) { + TopDocs reducedTopDocs = controller.mergeTopDocs(Arrays.asList(topDocsBuffer), + querySearchResult.from() + querySearchResult.size() // we have to merge here in the same way we collect on a shard + , 0); + Arrays.fill(topDocsBuffer, null); + topDocsBuffer[0] = reducedTopDocs; + } numReducePhases++; - buffer[0] = reducedAggs; index = 1; } final int i = index++; - buffer[i] = aggregations; + if (hasAggs) { + aggsBuffer[i] = (InternalAggregations) querySearchResult.consumeAggs(); + } + if (hasTopDocs) { + final TopDocs topDocs = querySearchResult.consumeTopDocs(); // can't be null + topDocsStats.add(topDocs); + SearchPhaseController.setShardIndex(topDocs, querySearchResult.getShardIndex()); + topDocsBuffer[i] = topDocs; + } + } + + private synchronized List getRemainingAggs() { + return hasAggs ? Arrays.asList(aggsBuffer).subList(0, index) : null; } - private synchronized List getRemaining() { - return Arrays.asList(buffer).subList(0, index); + private synchronized List getRemainingTopDocs() { + return hasTopDocs ? Arrays.asList(topDocsBuffer).subList(0, index) : null; } + @Override public ReducedQueryPhase reduce() { - return controller.reducedQueryPhase(results.asList(), getRemaining(), numReducePhases); + return controller.reducedQueryPhase(results.asList(), getRemainingAggs(), getRemainingTopDocs(), topDocsStats, + numReducePhases, false); } /** @@ -641,17 +701,49 @@ int getNumBuffered() { */ InitialSearchPhase.SearchPhaseResults newSearchPhaseResults(SearchRequest request, int numShards) { SearchSourceBuilder source = request.source(); - if (source != null && source.aggregations() != null) { + boolean isScrollRequest = request.scroll() != null; + final boolean hasAggs = source != null && source.aggregations() != null; + final boolean hasTopDocs = source == null || source.size() != 0; + + if (isScrollRequest == false && (hasAggs || hasTopDocs)) { + // no incremental reduce if scroll is used - we only hit a single shard or sometimes more... if (request.getBatchedReduceSize() < numShards) { // only use this if there are aggs and if there are more shards than we should reduce at once - return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize()); + return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs); } } return new InitialSearchPhase.SearchPhaseResults(numShards) { @Override public ReducedQueryPhase reduce() { - return reducedQueryPhase(results.asList()); + return reducedQueryPhase(results.asList(), isScrollRequest); } }; } + + static final class TopDocsStats { + long totalHits; + long fetchHits; + float maxScore = Float.NEGATIVE_INFINITY; + + void add(TopDocs topDocs) { + totalHits += topDocs.totalHits; + fetchHits += topDocs.scoreDocs.length; + if (!Float.isNaN(topDocs.getMaxScore())) { + maxScore = Math.max(maxScore, topDocs.getMaxScore()); + } + } + } + + static class SortedTopDocs { + static final SortedTopDocs EMPTY = new SortedTopDocs(EMPTY_DOCS, false, null); + final ScoreDoc[] scoreDocs; + final boolean sorted; + final SortField[] sortFields; + + SortedTopDocs(ScoreDoc[] scoreDocs, boolean sorted, SortField[] sortFields) { + this.scoreDocs = scoreDocs; + this.sorted = sorted; + this.sortFields = sortFields; + } + } } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java index c39a9fe6f25a2..b3ebaed3cb61c 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java @@ -173,9 +173,8 @@ private void finishHim() { private void innerFinishHim() throws Exception { List queryFetchSearchResults = queryFetchResults.asList(); - ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(true, queryFetchResults.asList()); - final InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs, - searchPhaseController.reducedQueryPhase(queryFetchSearchResults), queryFetchSearchResults, queryFetchResults::get); + final InternalSearchResponse internalResponse = searchPhaseController.merge(true, + searchPhaseController.reducedQueryPhase(queryFetchSearchResults, true), queryFetchSearchResults, queryFetchResults::get); String scrollId = null; if (request.scroll() != null) { scrollId = request.scrollId(); diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java index 37071485a03cf..709738dcafb69 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java @@ -55,7 +55,6 @@ final class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction { private volatile AtomicArray shardFailures; final AtomicArray queryResults; final AtomicArray fetchResults; - private volatile ScoreDoc[] sortedShardDocs; private final AtomicInteger successfulOps; SearchScrollQueryThenFetchAsyncAction(Logger logger, ClusterService clusterService, SearchTransportService searchTransportService, @@ -171,16 +170,15 @@ void onQueryPhaseFailure(final int shardIndex, final CountDown counter, final lo } private void executeFetchPhase() throws Exception { - sortedShardDocs = searchPhaseController.sortDocs(true, queryResults.asList()); - if (sortedShardDocs.length == 0) { - finishHim(searchPhaseController.reducedQueryPhase(queryResults.asList())); + final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedQueryPhase(queryResults.asList(), + true); + if (reducedQueryPhase.scoreDocs.length == 0) { + finishHim(reducedQueryPhase); return; } - final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(queryResults.length(), sortedShardDocs); - SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedQueryPhase(queryResults.asList()); - final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, sortedShardDocs, - queryResults.length()); + final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(queryResults.length(), reducedQueryPhase.scoreDocs); + final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, queryResults.length()); final CountDown counter = new CountDown(docIdsToLoad.length); for (int i = 0; i < docIdsToLoad.length; i++) { final int index = i; @@ -222,8 +220,8 @@ public void onFailure(Exception t) { private void finishHim(SearchPhaseController.ReducedQueryPhase queryPhase) { try { - final InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs, queryPhase, - fetchResults.asList(), fetchResults::get); + final InternalSearchResponse internalResponse = searchPhaseController.merge(true, queryPhase, fetchResults.asList(), + fetchResults::get); String scrollId = null; if (request.scroll() != null) { scrollId = request.scrollId(); diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/AtomicArray.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/AtomicArray.java index 2bf5e50a1c2e4..fa82aa0ac634a 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/AtomicArray.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/AtomicArray.java @@ -31,14 +31,6 @@ * to get the concrete values as a list using {@link #asList()}. */ public class AtomicArray { - - private static final AtomicArray EMPTY = new AtomicArray(0); - - @SuppressWarnings("unchecked") - public static E empty() { - return (E) EMPTY; - } - private final AtomicReferenceArray array; private volatile List nonNullList; @@ -53,7 +45,6 @@ public int length() { return array.length(); } - /** * Sets the element at position {@code i} to the given value. * diff --git a/core/src/main/java/org/elasticsearch/search/SearchService.java b/core/src/main/java/org/elasticsearch/search/SearchService.java index a035228195235..e601cec0fea55 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchService.java +++ b/core/src/main/java/org/elasticsearch/search/SearchService.java @@ -259,7 +259,7 @@ public SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTas loadOrExecuteQueryPhase(request, context); - if (context.queryResult().hasHits() == false && context.scrollContext() == null) { + if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) { freeContext(context.id()); } else { contextProcessedSuccessfully(context); @@ -341,7 +341,7 @@ public QuerySearchResult executeQueryPhase(QuerySearchRequest request, SearchTas operationListener.onPreQueryPhase(context); long time = System.nanoTime(); queryPhase.execute(context); - if (context.queryResult().hasHits() == false && context.scrollContext() == null) { + if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) { // no hits, we can release the context since there will be no fetch phase freeContext(context.id()); } else { diff --git a/core/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java b/core/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java index 83af0b9abd4cc..97f2681252b76 100644 --- a/core/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java +++ b/core/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java @@ -166,7 +166,7 @@ public void execute(SearchContext context) { fetchSubPhase.hitsExecute(context, hits); } - context.fetchResult().hits(new SearchHits(hits, context.queryResult().topDocs().totalHits, context.queryResult().topDocs().getMaxScore())); + context.fetchResult().hits(new SearchHits(hits, context.queryResult().getTotalHits(), context.queryResult().getMaxScore())); } private int findRootDocumentIfNested(SearchContext context, LeafReaderContext subReaderContext, int subDocId) throws IOException { diff --git a/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java b/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java index 13f32f74d0da7..272c57fe98024 100644 --- a/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java +++ b/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java @@ -142,7 +142,6 @@ static boolean execute(SearchContext searchContext, final IndexSearcher searcher queryResult.searchTimedOut(false); final boolean doProfile = searchContext.getProfilers() != null; - final SearchType searchType = searchContext.searchType(); boolean rescore = false; try { queryResult.from(searchContext.from()); @@ -165,12 +164,7 @@ static boolean execute(SearchContext searchContext, final IndexSearcher searcher if (searchContext.getProfilers() != null) { collector = new InternalProfileCollector(collector, CollectorResult.REASON_SEARCH_COUNT, Collections.emptyList()); } - topDocsCallable = new Callable() { - @Override - public TopDocs call() throws Exception { - return new TopDocs(totalHitCountCollector.getTotalHits(), Lucene.EMPTY_SCORE_DOCS, 0); - } - }; + topDocsCallable = () -> new TopDocs(totalHitCountCollector.getTotalHits(), Lucene.EMPTY_SCORE_DOCS, 0); } else { // Perhaps have a dedicated scroll phase? final ScrollContext scrollContext = searchContext.scrollContext(); @@ -238,38 +232,35 @@ public TopDocs call() throws Exception { if (doProfile) { collector = new InternalProfileCollector(collector, CollectorResult.REASON_SEARCH_TOP_HITS, Collections.emptyList()); } - topDocsCallable = new Callable() { - @Override - public TopDocs call() throws Exception { - final TopDocs topDocs; - if (topDocsCollector instanceof TopDocsCollector) { - topDocs = ((TopDocsCollector) topDocsCollector).topDocs(); - } else if (topDocsCollector instanceof CollapsingTopDocsCollector) { - topDocs = ((CollapsingTopDocsCollector) topDocsCollector).getTopDocs(); + topDocsCallable = () -> { + final TopDocs topDocs; + if (topDocsCollector instanceof TopDocsCollector) { + topDocs = ((TopDocsCollector) topDocsCollector).topDocs(); + } else if (topDocsCollector instanceof CollapsingTopDocsCollector) { + topDocs = ((CollapsingTopDocsCollector) topDocsCollector).getTopDocs(); + } else { + throw new IllegalStateException("Unknown top docs collector " + topDocsCollector.getClass().getName()); + } + if (scrollContext != null) { + if (scrollContext.totalHits == -1) { + // first round + scrollContext.totalHits = topDocs.totalHits; + scrollContext.maxScore = topDocs.getMaxScore(); } else { - throw new IllegalStateException("Unknown top docs collector " + topDocsCollector.getClass().getName()); + // subsequent round: the total number of hits and + // the maximum score were computed on the first round + topDocs.totalHits = scrollContext.totalHits; + topDocs.setMaxScore(scrollContext.maxScore); } - if (scrollContext != null) { - if (scrollContext.totalHits == -1) { - // first round - scrollContext.totalHits = topDocs.totalHits; - scrollContext.maxScore = topDocs.getMaxScore(); - } else { - // subsequent round: the total number of hits and - // the maximum score were computed on the first round - topDocs.totalHits = scrollContext.totalHits; - topDocs.setMaxScore(scrollContext.maxScore); - } - if (searchContext.request().numberOfShards() == 1) { - // if we fetch the document in the same roundtrip, we already know the last emitted doc - if (topDocs.scoreDocs.length > 0) { - // set the last emitted doc - scrollContext.lastEmittedDoc = topDocs.scoreDocs[topDocs.scoreDocs.length - 1]; - } + if (searchContext.request().numberOfShards() == 1) { + // if we fetch the document in the same roundtrip, we already know the last emitted doc + if (topDocs.scoreDocs.length > 0) { + // set the last emitted doc + scrollContext.lastEmittedDoc = topDocs.scoreDocs[topDocs.scoreDocs.length - 1]; } } - return topDocs; } + return topDocs; }; } diff --git a/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index 15403f9967720..f071c62f12c16 100644 --- a/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -55,6 +55,9 @@ public final class QuerySearchResult extends SearchPhaseResult { private Boolean terminatedEarly = null; private ProfileShardResult profileShardResults; private boolean hasProfileResults; + private boolean hasScoreDocs; + private int totalHits; + private float maxScore; public QuerySearchResult() { } @@ -87,11 +90,34 @@ public Boolean terminatedEarly() { } public TopDocs topDocs() { + if (topDocs == null) { + throw new IllegalStateException("topDocs already consumed"); + } + return topDocs; + } + + /** + * Returns true iff the top docs have already been consumed. + */ + public boolean hasConsumedTopDocs() { + return topDocs == null; + } + + /** + * Returns and nulls out the top docs for this search results. This allows to free up memory once the top docs are consumed. + * @throws IllegalStateException if the top docs have already been consumed. + */ + public TopDocs consumeTopDocs() { + TopDocs topDocs = this.topDocs; + if (topDocs == null) { + throw new IllegalStateException("topDocs already consumed"); + } + this.topDocs = null; return topDocs; } public void topDocs(TopDocs topDocs, DocValueFormat[] sortValueFormats) { - this.topDocs = topDocs; + setTopDocs(topDocs); if (topDocs.scoreDocs.length > 0 && topDocs.scoreDocs[0] instanceof FieldDoc) { int numFields = ((FieldDoc) topDocs.scoreDocs[0]).fields.length; if (numFields != sortValueFormats.length) { @@ -102,12 +128,19 @@ public void topDocs(TopDocs topDocs, DocValueFormat[] sortValueFormats) { this.sortValueFormats = sortValueFormats; } + private void setTopDocs(TopDocs topDocs) { + this.topDocs = topDocs; + hasScoreDocs = topDocs.scoreDocs.length > 0; + this.totalHits = topDocs.totalHits; + this.maxScore = topDocs.getMaxScore(); + } + public DocValueFormat[] sortValueFormats() { return sortValueFormats; } /** - * Retruns true if this query result has unconsumed aggregations + * Returns true if this query result has unconsumed aggregations */ public boolean hasAggs() { return hasAggs; @@ -195,10 +228,15 @@ public QuerySearchResult size(int size) { return this; } - /** Returns true iff the result has hits */ - public boolean hasHits() { - return (topDocs != null && topDocs.scoreDocs.length > 0) || - (suggest != null && suggest.hasScoreDocs()); + /** + * Returns true if this result has any suggest score docs + */ + public boolean hasSuggestHits() { + return (suggest != null && suggest.hasScoreDocs()); + } + + public boolean hasSearchContext() { + return hasScoreDocs || hasSuggestHits(); } public static QuerySearchResult readQuerySearchResult(StreamInput in) throws IOException { @@ -227,7 +265,7 @@ public void readFromWithId(long id, StreamInput in) throws IOException { sortValueFormats[i] = in.readNamedWriteable(DocValueFormat.class); } } - topDocs = readTopDocs(in); + setTopDocs(readTopDocs(in)); if (hasAggs = in.readBoolean()) { aggregations = InternalAggregations.readAggregations(in); } @@ -278,4 +316,12 @@ public void writeToNoId(StreamOutput out) throws IOException { out.writeOptionalBoolean(terminatedEarly); out.writeOptionalWriteable(profileShardResults); } + + public int getTotalHits() { + return totalHits; + } + + public float getMaxScore() { + return maxScore; + } } diff --git a/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java b/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java index 207183bae4e71..c92caef628a9b 100644 --- a/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.search; +import com.carrotsearch.randomizedtesting.RandomizedContext; import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.TopDocs; import org.elasticsearch.common.lucene.Lucene; @@ -42,6 +43,7 @@ import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.search.suggest.completion.CompletionSuggestion; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.TestCluster; import org.junit.Before; import java.io.IOException; @@ -51,12 +53,16 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; public class SearchPhaseControllerTests extends ESTestCase { @@ -75,8 +81,16 @@ public void testSort() throws Exception { int nShards = randomIntBetween(1, 20); int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2); AtomicArray results = generateQueryResults(nShards, suggestions, queryResultSize, false); - ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(true, results.asList()); + Optional first = results.asList().stream().findFirst(); + int from = 0, size = 0; + if (first.isPresent()) { + from = first.get().queryResult().from(); + size = first.get().queryResult().size(); + } int accumulatedLength = Math.min(queryResultSize, getTotalQueryHits(results)); + ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(true, results.asList(), null, new SearchPhaseController.TopDocsStats(), + from, size) + .scoreDocs; for (Suggest.Suggestion suggestion : reducedSuggest(results)) { int suggestionSize = suggestion.getEntries().get(0).getOptions().size(); accumulatedLength += suggestionSize; @@ -84,48 +98,71 @@ public void testSort() throws Exception { assertThat(sortedDocs.length, equalTo(accumulatedLength)); } - public void testSortIsIdempotent() throws IOException { + public void testSortIsIdempotent() throws Exception { int nShards = randomIntBetween(1, 20); int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2); - AtomicArray results = generateQueryResults(nShards, Collections.emptyList(), queryResultSize, - randomBoolean() || true); + long randomSeed = randomLong(); + boolean useConstantScore = randomBoolean(); + AtomicArray results = generateSeededQueryResults(randomSeed, nShards, Collections.emptyList(), queryResultSize, + useConstantScore); boolean ignoreFrom = randomBoolean(); - ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(ignoreFrom, results.asList()); + Optional first = results.asList().stream().findFirst(); + int from = 0, size = 0; + if (first.isPresent()) { + from = first.get().queryResult().from(); + size = first.get().queryResult().size(); + } + SearchPhaseController.TopDocsStats topDocsStats = new SearchPhaseController.TopDocsStats(); + ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(ignoreFrom, results.asList(), null, topDocsStats, from, size).scoreDocs; + + results = generateSeededQueryResults(randomSeed, nShards, Collections.emptyList(), queryResultSize, + useConstantScore); + SearchPhaseController.TopDocsStats topDocsStats2 = new SearchPhaseController.TopDocsStats(); + ScoreDoc[] sortedDocs2 = searchPhaseController.sortDocs(ignoreFrom, results.asList(), null, topDocsStats2, from, size).scoreDocs; + assertEquals(sortedDocs.length, sortedDocs2.length); + for (int i = 0; i < sortedDocs.length; i++) { + assertEquals(sortedDocs[i].doc, sortedDocs2[i].doc); + assertEquals(sortedDocs[i].shardIndex, sortedDocs2[i].shardIndex); + assertEquals(sortedDocs[i].score, sortedDocs2[i].score, 0.0f); + } + assertEquals(topDocsStats.maxScore, topDocsStats2.maxScore, 0.0f); + assertEquals(topDocsStats.totalHits, topDocsStats2.totalHits); + assertEquals(topDocsStats.fetchHits, topDocsStats2.fetchHits); + } - ScoreDoc[] sortedDocs2 = searchPhaseController.sortDocs(ignoreFrom, results.asList()); - assertArrayEquals(sortedDocs, sortedDocs2); + private AtomicArray generateSeededQueryResults(long seed, int nShards, + List suggestions, + int searchHitsSize, boolean useConstantScore) throws Exception { + return RandomizedContext.current().runWithPrivateRandomness(seed, + () -> generateQueryResults(nShards, suggestions, searchHitsSize, useConstantScore)); } public void testMerge() throws IOException { List suggestions = new ArrayList<>(); + int maxSuggestSize = 0; for (int i = 0; i < randomIntBetween(1, 5); i++) { - suggestions.add(new CompletionSuggestion(randomAlphaOfLength(randomIntBetween(1, 5)), randomIntBetween(1, 20))); + int size = randomIntBetween(1, 20); + maxSuggestSize += size; + suggestions.add(new CompletionSuggestion(randomAlphaOfLength(randomIntBetween(1, 5)), size)); } int nShards = randomIntBetween(1, 20); int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2); AtomicArray queryResults = generateQueryResults(nShards, suggestions, queryResultSize, false); - - // calculate offsets and score doc array - List mergedScoreDocs = new ArrayList<>(); - ScoreDoc[] mergedSearchDocs = getTopShardDocs(queryResults); - mergedScoreDocs.addAll(Arrays.asList(mergedSearchDocs)); - Suggest mergedSuggest = reducedSuggest(queryResults); - for (Suggest.Suggestion suggestion : mergedSuggest) { - if (suggestion instanceof CompletionSuggestion) { - CompletionSuggestion completionSuggestion = ((CompletionSuggestion) suggestion); - mergedScoreDocs.addAll(completionSuggestion.getOptions().stream() - .map(CompletionSuggestion.Entry.Option::getDoc) - .collect(Collectors.toList())); - } - } - ScoreDoc[] sortedDocs = mergedScoreDocs.toArray(new ScoreDoc[mergedScoreDocs.size()]); - AtomicArray searchPhaseResultAtomicArray = generateFetchResults(nShards, mergedSearchDocs, mergedSuggest); - InternalSearchResponse mergedResponse = searchPhaseController.merge(true, sortedDocs, - searchPhaseController.reducedQueryPhase(queryResults.asList()), + SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedQueryPhase(queryResults.asList(), false); + AtomicArray searchPhaseResultAtomicArray = generateFetchResults(nShards, reducedQueryPhase.scoreDocs, + reducedQueryPhase.suggest); + InternalSearchResponse mergedResponse = searchPhaseController.merge(false, + reducedQueryPhase, searchPhaseResultAtomicArray.asList(), searchPhaseResultAtomicArray::get); - assertThat(mergedResponse.hits().getHits().length, equalTo(mergedSearchDocs.length)); + int suggestSize = 0; + for (Suggest.Suggestion s : reducedQueryPhase.suggest) { + Stream stream = s.getEntries().stream(); + suggestSize += stream.collect(Collectors.summingInt(e -> e.getOptions().size())); + } + assertThat(suggestSize, lessThanOrEqualTo(maxSuggestSize)); + assertThat(mergedResponse.hits().getHits().length, equalTo(reducedQueryPhase.scoreDocs.length-suggestSize)); Suggest suggestResult = mergedResponse.suggest(); - for (Suggest.Suggestion suggestion : mergedSuggest) { + for (Suggest.Suggestion suggestion : reducedQueryPhase.suggest) { assertThat(suggestion, instanceOf(CompletionSuggestion.class)); if (suggestion.getEntries().get(0).getOptions().size() > 0) { CompletionSuggestion suggestionResult = suggestResult.getSuggestion(suggestion.getName()); @@ -209,16 +246,6 @@ private Suggest reducedSuggest(AtomicArray results) { .collect(Collectors.toList())); } - private ScoreDoc[] getTopShardDocs(AtomicArray results) throws IOException { - List resultList = results.asList(); - TopDocs[] shardTopDocs = new TopDocs[resultList.size()]; - for (int i = 0; i < resultList.size(); i++) { - shardTopDocs[i] = resultList.get(i).queryResult().topDocs(); - } - int topN = Math.min(results.get(0).queryResult().size(), getTotalQueryHits(results)); - return TopDocs.merge(topN, shardTopDocs).scoreDocs; - } - private AtomicArray generateFetchResults(int nShards, ScoreDoc[] mergedSearchDocs, Suggest mergedSuggest) { AtomicArray fetchResults = new AtomicArray<>(nShards); for (int shardIndex = 0; shardIndex < nShards; shardIndex++) { @@ -309,30 +336,96 @@ public void testConsumerConcurrently() throws InterruptedException { InitialSearchPhase.SearchPhaseResults consumer = searchPhaseController.newSearchPhaseResults(request, expectedNumResults); AtomicInteger max = new AtomicInteger(); - CountDownLatch latch = new CountDownLatch(expectedNumResults); + Thread[] threads = new Thread[expectedNumResults]; for (int i = 0; i < expectedNumResults; i++) { int id = i; - Thread t = new Thread(() -> { + threads[i] = new Thread(() -> { int number = randomIntBetween(1, 1000); max.updateAndGet(prev -> Math.max(prev, number)); QuerySearchResult result = new QuerySearchResult(id, new SearchShardTarget("node", new Index("a", "b"), id)); - result.topDocs(new TopDocs(id, new ScoreDoc[0], 0.0F), new DocValueFormat[0]); + result.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(0, number)}, number), new DocValueFormat[0]); InternalAggregations aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", (double) number, DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap()))); result.aggregations(aggs); result.setShardIndex(id); + result.size(1); consumer.consumeResult(result); - latch.countDown(); }); - t.start(); + threads[i].start(); + } + for (int i = 0; i < expectedNumResults; i++) { + threads[i].join(); } - latch.await(); SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce(); InternalMax internalMax = (InternalMax) reduce.aggregations.asList().get(0); assertEquals(max.get(), internalMax.getValue(), 0.0D); + assertEquals(1, reduce.scoreDocs.length); + assertEquals(max.get(), reduce.maxScore, 0.0f); + assertEquals(expectedNumResults, reduce.totalHits); + assertEquals(max.get(), reduce.scoreDocs[0].score, 0.0f); } + public void testConsumerOnlyAggs() throws InterruptedException { + int expectedNumResults = randomIntBetween(1, 100); + int bufferSize = randomIntBetween(2, 200); + SearchRequest request = new SearchRequest(); + request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")).size(0)); + request.setBatchedReduceSize(bufferSize); + InitialSearchPhase.SearchPhaseResults consumer = + searchPhaseController.newSearchPhaseResults(request, expectedNumResults); + AtomicInteger max = new AtomicInteger(); + for (int i = 0; i < expectedNumResults; i++) { + int id = i; + int number = randomIntBetween(1, 1000); + max.updateAndGet(prev -> Math.max(prev, number)); + QuerySearchResult result = new QuerySearchResult(id, new SearchShardTarget("node", new Index("a", "b"), id)); + result.topDocs(new TopDocs(1, new ScoreDoc[0], number), new DocValueFormat[0]); + InternalAggregations aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", (double) number, + DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap()))); + result.aggregations(aggs); + result.setShardIndex(id); + result.size(1); + consumer.consumeResult(result); + } + SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce(); + InternalMax internalMax = (InternalMax) reduce.aggregations.asList().get(0); + assertEquals(max.get(), internalMax.getValue(), 0.0D); + assertEquals(0, reduce.scoreDocs.length); + assertEquals(max.get(), reduce.maxScore, 0.0f); + assertEquals(expectedNumResults, reduce.totalHits); + } + + + public void testConsumerOnlyHits() throws InterruptedException { + int expectedNumResults = randomIntBetween(1, 100); + int bufferSize = randomIntBetween(2, 200); + SearchRequest request = new SearchRequest(); + if (randomBoolean()) { + request.source(new SearchSourceBuilder().size(randomIntBetween(1, 10))); + } + request.setBatchedReduceSize(bufferSize); + InitialSearchPhase.SearchPhaseResults consumer = + searchPhaseController.newSearchPhaseResults(request, expectedNumResults); + AtomicInteger max = new AtomicInteger(); + for (int i = 0; i < expectedNumResults; i++) { + int id = i; + int number = randomIntBetween(1, 1000); + max.updateAndGet(prev -> Math.max(prev, number)); + QuerySearchResult result = new QuerySearchResult(id, new SearchShardTarget("node", new Index("a", "b"), id)); + result.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(0, number)}, number), new DocValueFormat[0]); + result.setShardIndex(id); + result.size(1); + consumer.consumeResult(result); + } + SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce(); + assertEquals(1, reduce.scoreDocs.length); + assertEquals(max.get(), reduce.maxScore, 0.0f); + assertEquals(expectedNumResults, reduce.totalHits); + assertEquals(max.get(), reduce.scoreDocs[0].score, 0.0f); + } + + public void testNewSearchPhaseResults() { for (int i = 0; i < 10; i++) { int expectedNumResults = randomIntBetween(1, 10); @@ -342,10 +435,22 @@ public void testNewSearchPhaseResults() { if ((hasAggs = randomBoolean())) { request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo"))); } + final boolean hasTopDocs; + if ((hasTopDocs = randomBoolean())) { + if (request.source() != null) { + request.source().size(randomIntBetween(1, 100)); + } // no source means size = 10 + } else { + if (request.source() == null) { + request.source(new SearchSourceBuilder().size(0)); + } else { + request.source().size(0); + } + } request.setBatchedReduceSize(bufferSize); InitialSearchPhase.SearchPhaseResults consumer = searchPhaseController.newSearchPhaseResults(request, expectedNumResults); - if (hasAggs && expectedNumResults > bufferSize) { + if ((hasAggs || hasTopDocs) && expectedNumResults > bufferSize) { assertThat("expectedNumResults: " + expectedNumResults + " bufferSize: " + bufferSize, consumer, instanceOf(SearchPhaseController.QueryPhaseResultConsumer.class)); } else { @@ -354,4 +459,36 @@ public void testNewSearchPhaseResults() { } } } + + public void testReduceTopNWithFromOffset() { + SearchRequest request = new SearchRequest(); + request.source(new SearchSourceBuilder().size(5).from(5)); + request.setBatchedReduceSize(randomIntBetween(2, 4)); + InitialSearchPhase.SearchPhaseResults consumer = + searchPhaseController.newSearchPhaseResults(request, 4); + int score = 100; + for (int i = 0; i < 4; i++) { + QuerySearchResult result = new QuerySearchResult(i, new SearchShardTarget("node", new Index("a", "b"), i)); + ScoreDoc[] docs = new ScoreDoc[3]; + for (int j = 0; j < docs.length; j++) { + docs[j] = new ScoreDoc(0, score--); + } + result.topDocs(new TopDocs(3, docs, docs[0].score), new DocValueFormat[0]); + result.setShardIndex(i); + result.size(5); + result.from(5); + consumer.consumeResult(result); + } + // 4*3 results = 12 we get result 5 to 10 here with from=5 and size=5 + + SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce(); + assertEquals(5, reduce.scoreDocs.length); + assertEquals(100.f, reduce.maxScore, 0.0f); + assertEquals(12, reduce.totalHits); + assertEquals(95.0f, reduce.scoreDocs[0].score, 0.0f); + assertEquals(94.0f, reduce.scoreDocs[1].score, 0.0f); + assertEquals(93.0f, reduce.scoreDocs[2].score, 0.0f); + assertEquals(92.0f, reduce.scoreDocs[3].score, 0.0f); + assertEquals(91.0f, reduce.scoreDocs[4].score, 0.0f); + } } diff --git a/core/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/core/src/test/java/org/elasticsearch/search/SearchServiceTests.java index f3ff6be1cc12c..6fc795a88255f 100644 --- a/core/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/core/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -223,8 +223,13 @@ public void testTimeout() throws IOException { new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f), null); - // the search context should inherit the default timeout - assertThat(contextWithDefaultTimeout.timeout(), equalTo(TimeValue.timeValueSeconds(5))); + try { + // the search context should inherit the default timeout + assertThat(contextWithDefaultTimeout.timeout(), equalTo(TimeValue.timeValueSeconds(5))); + } finally { + contextWithDefaultTimeout.decRef(); + service.freeContext(contextWithDefaultTimeout.id()); + } final long seconds = randomIntBetween(6, 10); final SearchContext context = service.createContext( @@ -238,8 +243,14 @@ public void testTimeout() throws IOException { new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f), null); - // the search context should inherit the query timeout - assertThat(context.timeout(), equalTo(TimeValue.timeValueSeconds(seconds))); + try { + // the search context should inherit the query timeout + assertThat(context.timeout(), equalTo(TimeValue.timeValueSeconds(seconds))); + } finally { + context.decRef(); + service.freeContext(context.id()); + } + } public static class FailOnRewriteQueryPlugin extends Plugin implements SearchPlugin { From eff8f1117a1c1f558852147e9330b98ed0356aaf Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 7 Apr 2017 10:16:04 +0200 Subject: [PATCH 2/3] apply feedback --- .../action/search/SearchPhaseController.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index d539b4455afb9..5dbc72b7f0d55 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -344,7 +344,7 @@ public InternalSearchResponse merge(boolean ignoreFrom, ReducedQueryPhase reduce private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFrom, Collection fetchResults, IntFunction resultsLookup) { - final boolean sorted = reducedQueryPhase.isSorted; + final boolean sorted = reducedQueryPhase.isSortedByField; ScoreDoc[] sortedDocs = reducedQueryPhase.scoreDocs; int sortScoreIndex = -1; if (sorted) { @@ -405,7 +405,7 @@ public ReducedQueryPhase reducedQueryPhase(Collectionnull if the results are not sorted final SortField[] sortField; - // true iff the result score docs is sorted - final boolean isSorted; + // true iff the result score docs is sorted by a field (not score), this implies that sortField is set. + final boolean isSortedByField; // the size of the top hits to return final int size; // true iff the query phase had no results. Otherwise false @@ -553,7 +553,7 @@ public static final class ReducedQueryPhase { ReducedQueryPhase(long totalHits, long fetchHits, float maxScore, boolean timedOut, Boolean terminatedEarly, Suggest suggest, InternalAggregations aggregations, SearchProfileShardResults shardResults, ScoreDoc[] scoreDocs, - SortField[] sortFields, DocValueFormat[] sortValueFormats, int numReducePhases, boolean isSorted, int size, + SortField[] sortFields, DocValueFormat[] sortValueFormats, int numReducePhases, boolean isSortedByField, int size, int from, boolean isEmptyResult) { if (numReducePhases <= 0) { throw new IllegalArgumentException("at least one reduce phase must have been applied but was: " + numReducePhases); @@ -573,7 +573,7 @@ public static final class ReducedQueryPhase { this.numReducePhases = numReducePhases; this.scoreDocs = scoreDocs; this.sortField = sortFields; - this.isSorted = isSorted; + this.isSortedByField = isSortedByField; this.size = size; this.from = from; this.isEmptyResult = isEmptyResult; From fe072ac9c1360e18fdc7d4e2f1c529311ace09e6 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 7 Apr 2017 18:11:05 +0200 Subject: [PATCH 3/3] apply feedback from @jpountz --- .../action/search/SearchPhaseController.java | 27 +++++++++---------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index 5dbc72b7f0d55..6836cd5bc7afb 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -184,8 +184,8 @@ public SortedTopDocs sortDocs(boolean ignoreFrom, Collection queryResults, - List bufferedAggs, - List bufferedTopDocs, TopDocsStats topDocsStats, int numReducePhases, - boolean isScrollRequest) { + List bufferedAggs, List bufferedTopDocs, + TopDocsStats topDocsStats, int numReducePhases, boolean isScrollRequest) { assert numReducePhases >= 0 : "num reduce phases must be >= 0 but was: " + numReducePhases; numReducePhases++; // increment for this phase boolean timedOut = false; @@ -487,7 +486,7 @@ private ReducedQueryPhase reducedQueryPhase(Collection