Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sort and collapse info to SearchHits transport serialization #36555

Merged
merged 9 commits into from
Dec 14, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ private void innerRun() throws IOException {
// query AND fetch optimization
finishPhase.run();
} else {
final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(numShards, reducedQueryPhase.scoreDocs);
if (reducedQueryPhase.scoreDocs.length == 0) { // no docs to fetch -- sidestep everything and return
ScoreDoc[] scoreDocs = reducedQueryPhase.sortedTopDocs.scoreDocs;
final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(numShards, scoreDocs);
if (scoreDocs.length == 0) { // no docs to fetch -- sidestep everything and return
phaseResults.stream()
.map(SearchPhaseResult::queryResult)
.forEach(this::releaseIrrelevantSearchContext); // we have to release contexts here to free up resources
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,18 +211,23 @@ static SortedTopDocs sortDocs(boolean ignoreFrom, Collection<? extends SearchPha
}
}
}
final boolean isSortedByField;
final SortField[] sortFields;
boolean isSortedByField = false;
SortField[] sortFields = null;
String collapseField = null;
Object[] collapseValues = null;
if (mergedTopDocs instanceof TopFieldDocs) {
TopFieldDocs fieldDocs = (TopFieldDocs) mergedTopDocs;
isSortedByField = (fieldDocs instanceof CollapseTopFieldDocs &&
fieldDocs.fields.length == 1 && fieldDocs.fields[0].getType() == SortField.Type.SCORE) == false;
sortFields = fieldDocs.fields;
} else {
isSortedByField = false;
sortFields = null;
if (fieldDocs instanceof CollapseTopFieldDocs) {
isSortedByField = (fieldDocs.fields.length == 1 && fieldDocs.fields[0].getType() == SortField.Type.SCORE) == false;
CollapseTopFieldDocs collapseTopFieldDocs = (CollapseTopFieldDocs) fieldDocs;
collapseField = collapseTopFieldDocs.field;
collapseValues = collapseTopFieldDocs.collapseValues;
} else {
isSortedByField = true;
}
}
return new SortedTopDocs(scoreDocs, isSortedByField, sortFields);
return new SortedTopDocs(scoreDocs, isSortedByField, sortFields, collapseField, collapseValues);
} else {
// no relevant docs
return SortedTopDocs.EMPTY;
Expand Down Expand Up @@ -266,7 +271,7 @@ private static void setShardIndex(TopDocs topDocs, int shardIndex) {
public ScoreDoc[] getLastEmittedDocPerShard(ReducedQueryPhase reducedQueryPhase, int numShards) {
final ScoreDoc[] lastEmittedDocPerShard = new ScoreDoc[numShards];
if (reducedQueryPhase.isEmptyResult == false) {
final ScoreDoc[] sortedScoreDocs = reducedQueryPhase.scoreDocs;
final ScoreDoc[] sortedScoreDocs = reducedQueryPhase.sortedTopDocs.scoreDocs;
// from is always zero as when we use scroll, we ignore from
long size = Math.min(reducedQueryPhase.fetchHits, reducedQueryPhase.size);
// with collapsing we can have more hits than sorted docs
Expand Down Expand Up @@ -307,7 +312,7 @@ public InternalSearchResponse merge(boolean ignoreFrom, ReducedQueryPhase reduce
if (reducedQueryPhase.isEmptyResult) {
return InternalSearchResponse.empty();
}
ScoreDoc[] sortedDocs = reducedQueryPhase.scoreDocs;
ScoreDoc[] sortedDocs = reducedQueryPhase.sortedTopDocs.scoreDocs;
SearchHits hits = getHits(reducedQueryPhase, ignoreFrom, fetchResults, resultsLookup);
if (reducedQueryPhase.suggest != null) {
if (!fetchResults.isEmpty()) {
Expand Down Expand Up @@ -345,12 +350,12 @@ public InternalSearchResponse merge(boolean ignoreFrom, ReducedQueryPhase reduce

private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFrom,
Collection<? extends SearchPhaseResult> fetchResults, IntFunction<SearchPhaseResult> resultsLookup) {
final boolean sorted = reducedQueryPhase.isSortedByField;
ScoreDoc[] sortedDocs = reducedQueryPhase.scoreDocs;
SortedTopDocs sortedTopDocs = reducedQueryPhase.sortedTopDocs;
int sortScoreIndex = -1;
if (sorted) {
for (int i = 0; i < reducedQueryPhase.sortField.length; i++) {
if (reducedQueryPhase.sortField[i].getType() == SortField.Type.SCORE) {
if (sortedTopDocs.isSortedByField) {
SortField[] sortFields = sortedTopDocs.sortFields;
for (int i = 0; i < sortFields.length; i++) {
if (sortFields[i].getType() == SortField.Type.SCORE) {
sortScoreIndex = i;
}
}
Expand All @@ -362,12 +367,12 @@ private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFr
int from = ignoreFrom ? 0 : reducedQueryPhase.from;
int numSearchHits = (int) Math.min(reducedQueryPhase.fetchHits - from, reducedQueryPhase.size);
// with collapsing we can have more fetch hits than sorted docs
numSearchHits = Math.min(sortedDocs.length, numSearchHits);
numSearchHits = Math.min(sortedTopDocs.scoreDocs.length, numSearchHits);
// merge hits
List<SearchHit> hits = new ArrayList<>();
if (!fetchResults.isEmpty()) {
for (int i = 0; i < numSearchHits; i++) {
ScoreDoc shardDoc = sortedDocs[i];
ScoreDoc shardDoc = sortedTopDocs.scoreDocs[i];
SearchPhaseResult fetchResultProvider = resultsLookup.apply(shardDoc.shardIndex);
if (fetchResultProvider == null) {
// this can happen if we are hitting a shard failure during the fetch phase
Expand All @@ -381,21 +386,21 @@ private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFr
assert index < fetchResult.hits().getHits().length : "not enough hits fetched. index [" + index + "] length: "
+ fetchResult.hits().getHits().length;
SearchHit searchHit = fetchResult.hits().getHits()[index];
if (sorted == false) {
searchHit.score(shardDoc.score);
}
searchHit.shard(fetchResult.getSearchShardTarget());
if (sorted) {
if (sortedTopDocs.isSortedByField) {
FieldDoc fieldDoc = (FieldDoc) shardDoc;
searchHit.sortValues(fieldDoc.fields, reducedQueryPhase.sortValueFormats);
if (sortScoreIndex != -1) {
searchHit.score(((Number) fieldDoc.fields[sortScoreIndex]).floatValue());
}
} else {
searchHit.score(shardDoc.score);
}
hits.add(searchHit);
}
}
return new SearchHits(hits.toArray(new SearchHit[0]), reducedQueryPhase.totalHits, reducedQueryPhase.maxScore);
return new SearchHits(hits.toArray(new SearchHit[0]), reducedQueryPhase.totalHits,
reducedQueryPhase.maxScore, sortedTopDocs.sortFields, sortedTopDocs.collapseField, sortedTopDocs.collapseValues);
}

/**
Expand Down Expand Up @@ -436,8 +441,7 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
if (queryResults.isEmpty()) { // early terminate we have nothing to reduce
final TotalHits totalHits = topDocsStats.getTotalHits();
return new ReducedQueryPhase(totalHits, topDocsStats.fetchHits, topDocsStats.maxScore,
timedOut, terminatedEarly, null, null, null, EMPTY_DOCS, null,
null, numReducePhases, false, 0, 0, true);
timedOut, terminatedEarly, null, null, null, SortedTopDocs.EMPTY, null, numReducePhases, 0, 0, true);
}
final QuerySearchResult firstResult = queryResults.stream().findFirst().get().queryResult();
final boolean hasSuggest = firstResult.suggest() != null;
Expand Down Expand Up @@ -499,11 +503,11 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
final InternalAggregations aggregations = aggregationsList.isEmpty() ? null : reduceAggs(aggregationsList,
firstResult.pipelineAggregators(), reduceContext);
final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
final SortedTopDocs scoreDocs = sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size);
final SortedTopDocs sortedTopDocs = sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size);
final TotalHits totalHits = topDocsStats.getTotalHits();
return new ReducedQueryPhase(totalHits, topDocsStats.fetchHits, topDocsStats.maxScore,
timedOut, terminatedEarly, suggest, aggregations, shardResults, scoreDocs.scoreDocs, scoreDocs.sortFields,
firstResult.sortValueFormats(), numReducePhases, scoreDocs.isSortedByField, size, from, false);
timedOut, terminatedEarly, suggest, aggregations, shardResults, sortedTopDocs,
firstResult.sortValueFormats(), numReducePhases, size, from, firstResult == null);
}

/**
Expand Down Expand Up @@ -551,12 +555,8 @@ public static final class ReducedQueryPhase {
final SearchProfileShardResults shardResults;
// the number of reduces phases
final int numReducePhases;
// the searches merged top docs
final ScoreDoc[] scoreDocs;
// the top docs sort fields used to sort the score docs, <code>null</code> if the results are not sorted
final SortField[] sortField;
// <code>true</code> iff the result score docs is sorted by a field (not score), this implies that <code>sortField</code> is set.
final boolean isSortedByField;
//encloses info about the merged top docs, the sort fields used to sort the score docs etc.
final SortedTopDocs sortedTopDocs;
// the size of the top hits to return
final int size;
// <code>true</code> iff the query phase had no results. Otherwise <code>false</code>
Expand All @@ -567,9 +567,8 @@ public static final class ReducedQueryPhase {
final DocValueFormat[] sortValueFormats;

ReducedQueryPhase(TotalHits totalHits, long fetchHits, float maxScore, boolean timedOut, Boolean terminatedEarly, Suggest suggest,
InternalAggregations aggregations, SearchProfileShardResults shardResults, ScoreDoc[] scoreDocs,
SortField[] sortFields, DocValueFormat[] sortValueFormats, int numReducePhases, boolean isSortedByField, int size,
int from, boolean isEmptyResult) {
InternalAggregations aggregations, SearchProfileShardResults shardResults, SortedTopDocs sortedTopDocs,
DocValueFormat[] sortValueFormats, int numReducePhases, int size, int from, boolean isEmptyResult) {
if (numReducePhases <= 0) {
throw new IllegalArgumentException("at least one reduce phase must have been applied but was: " + numReducePhases);
}
Expand All @@ -586,9 +585,7 @@ public static final class ReducedQueryPhase {
this.aggregations = aggregations;
this.shardResults = shardResults;
this.numReducePhases = numReducePhases;
this.scoreDocs = scoreDocs;
this.sortField = sortFields;
this.isSortedByField = isSortedByField;
this.sortedTopDocs = sortedTopDocs;
this.size = size;
this.from = from;
this.isEmptyResult = isEmptyResult;
Expand Down Expand Up @@ -728,7 +725,7 @@ InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> newSearchPhaseResu
}
return new InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult>(numShards) {
@Override
public ReducedQueryPhase reduce() {
ReducedQueryPhase reduce() {
return reducedQueryPhase(results.asList(), isScrollRequest, trackTotalHits);
}
};
Expand Down Expand Up @@ -770,15 +767,23 @@ void add(TopDocsAndMaxScore topDocs) {
}

static final class SortedTopDocs {
static final SortedTopDocs EMPTY = new SortedTopDocs(EMPTY_DOCS, false, null);
static final SortedTopDocs EMPTY = new SortedTopDocs(EMPTY_DOCS, false, null, null, null);
// the searches merged top docs
final ScoreDoc[] scoreDocs;
// <code>true</code> iff the result score docs is sorted by a field (not score), this implies that <code>sortField</code> is set.
final boolean isSortedByField;
// the top docs sort fields used to sort the score docs, <code>null</code> if the results are not sorted
final SortField[] sortFields;
final String collapseField;
final Object[] collapseValues;

SortedTopDocs(ScoreDoc[] scoreDocs, boolean isSortedByField, SortField[] sortFields) {
SortedTopDocs(ScoreDoc[] scoreDocs, boolean isSortedByField, SortField[] sortFields,
String collapseField, Object[] collapseValues) {
this.scoreDocs = scoreDocs;
this.isSortedByField = isSortedByField;
this.sortFields = sortFields;
this.collapseField = collapseField;
this.collapseValues = collapseValues;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.elasticsearch.search.query.ScrollQuerySearchResult;
import org.elasticsearch.transport.Transport;

import java.io.IOException;
import java.util.function.BiFunction;

final class SearchScrollQueryThenFetchAsyncAction extends SearchScrollAsyncAction<ScrollQuerySearchResult> {
Expand Down Expand Up @@ -68,16 +67,16 @@ protected void executeInitialPhase(Transport.Connection connection, InternalScro
protected SearchPhase moveToNextPhase(BiFunction<String, String, DiscoveryNode> clusterNodeLookup) {
return new SearchPhase("fetch") {
@Override
public void run() throws IOException {
public void run() {
final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedScrollQueryPhase(
queryResults.asList());
if (reducedQueryPhase.scoreDocs.length == 0) {
ScoreDoc[] scoreDocs = reducedQueryPhase.sortedTopDocs.scoreDocs;
if (scoreDocs.length == 0) {
sendResponse(reducedQueryPhase, fetchResults);
return;
}

final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(queryResults.length(),
reducedQueryPhase.scoreDocs);
final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(queryResults.length(), scoreDocs);
final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase,
queryResults.length());
final CountDown counter = new CountDown(docIdsToLoad.length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -785,22 +785,36 @@ public <T> void writeArray(final Writer<T> writer, final T[] array) throws IOExc
}
}

public <T extends Writeable> void writeArray(T[] array) throws IOException {
writeVInt(array.length);
for (T value: array) {
value.writeTo(this);
}
}

public <T extends Writeable> void writeOptionalArray(@Nullable T[] array) throws IOException {
/**
* Same as {@link #writeArray(Writer, Object[])} but the provided array may be null. An additional boolean value is
* serialized to indicate whether the array was null or not.
*/
public <T> void writeOptionalArray(final Writer<T> writer, final @Nullable T[] array) throws IOException {
if (array == null) {
writeBoolean(false);
} else {
writeBoolean(true);
writeArray(array);
writeArray(writer, array);
}
}

/**
* Writes the specified array of {@link Writeable}s. This method can be seen as
* writer version of {@link StreamInput#readArray(Writeable.Reader, IntFunction)}. The length of array encoded as a variable-length
* integer is first written to the stream, and then the elements of the array are written to the stream.
*/
public <T extends Writeable> void writeArray(T[] array) throws IOException {
javanna marked this conversation as resolved.
Show resolved Hide resolved
writeArray((out, value) -> value.writeTo(out), array);
}

/**
* Same as {@link #writeArray(Writeable[])} but the provided array may be null. An additional boolean value is
* serialized to indicate whether the array was null or not.
*/
public <T extends Writeable> void writeOptionalArray(@Nullable T[] array) throws IOException {
writeOptionalArray((out, value) -> value.writeTo(out), array);
}

/**
* Serializes a potential null value.
*/
Expand Down
Loading