From 26df5a71b687743f61fae40b863296a34cb7a844 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Thu, 17 Jan 2019 12:57:05 +0100 Subject: [PATCH 01/13] Add support for merging multiple search responses into one This will be used in cross-cluster search when reduction will be performed locally on each cluster. The CCS coordinating node will send one search request per remote cluster involved and will get one search response back from each one of them. Such responses contain all the info to be able to perform an additional reduction and return results back to the user. Relates to #32125 --- .../search/AbstractSearchAsyncAction.java | 9 +- .../action/search/SearchPhaseController.java | 2 +- .../action/search/SearchResponseMerger.java | 288 +++++++++++++ .../action/search/TransportSearchAction.java | 11 +- .../org/elasticsearch/search/SearchHit.java | 1 + .../search/SearchResponseMergerTests.java | 406 ++++++++++++++++++ .../action/search/SearchResponseTests.java | 2 +- 7 files changed, 705 insertions(+), 14 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java create mode 100644 server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index f031dfa581064..d6abbf73e8864 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -26,6 +26,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ShardOperationFailedException; +import org.elasticsearch.action.search.TransportSearchAction.SearchTimeProvider; import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.common.Nullable; @@ -43,7 +44,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; import java.util.stream.Collectors; @@ -70,7 +70,7 @@ abstract class AbstractSearchAsyncAction exten private final Object shardFailuresMutex = new Object(); private final AtomicInteger successfulOps = new AtomicInteger(); private final AtomicInteger skippedOps = new AtomicInteger(); - private final TransportSearchAction.SearchTimeProvider timeProvider; + private final SearchTimeProvider timeProvider; private final SearchResponse.Clusters clusters; AbstractSearchAsyncAction(String name, Logger logger, SearchTransportService searchTransportService, @@ -79,7 +79,7 @@ abstract class AbstractSearchAsyncAction exten Map> indexRoutings, Executor executor, SearchRequest request, ActionListener listener, GroupShardsIterator shardsIts, - TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion, + SearchTimeProvider timeProvider, long clusterStateVersion, SearchTask task, SearchPhaseResults resultConsumer, int maxConcurrentRequestsPerNode, SearchResponse.Clusters clusters) { super(name, request, shardsIts, logger, maxConcurrentRequestsPerNode, executor); @@ -103,8 +103,7 @@ abstract class AbstractSearchAsyncAction exten * Builds how long it took to execute the search. */ long buildTookInMillis() { - return TimeUnit.NANOSECONDS.toMillis( - timeProvider.getRelativeCurrentNanos() - timeProvider.getRelativeStartNanos()); + return timeProvider.buildTookInMillis(); } /** diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index 418d95b2077a9..b22aa9669fbbc 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -766,7 +766,7 @@ TotalHits getTotalHits() { if (totalHits < trackTotalHitsUpTo) { return new TotalHits(totalHits, totalHitsRelation); } else { - /** + /* * The user requested to count the total hits up to trackTotalHitsUpTo * so we return this lower bound when the total hits is greater than this value. * This can happen when multiple shards are merged since the limit to track total hits diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java b/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java new file mode 100644 index 0000000000000..b0806fef03634 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java @@ -0,0 +1,288 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import org.apache.lucene.search.FieldDoc; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.search.TopFieldDocs; +import org.apache.lucene.search.TotalHits; +import org.apache.lucene.search.grouping.CollapseTopFieldDocs; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.search.TransportSearchAction.SearchTimeProvider; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.internal.InternalSearchResponse; +import org.elasticsearch.search.profile.ProfileShardResult; +import org.elasticsearch.search.profile.SearchProfileShardResults; +import org.elasticsearch.search.suggest.Suggest; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; +import java.util.function.Function; + +import static org.elasticsearch.action.search.SearchResponse.Clusters; + +/** + * Merges multiple search responses into one. Used in cross-cluster search when reduction is performed locally on each cluster. + * The CCS coordinating node sends one search request per remote cluster involved and gets one search response back from each one of them. + * Such responses contain all the info to be able to perform an additional reduction and return results back to the user. + * Preconditions are that only non final reduction has been performed on each cluster, meaning that buckets have not been pruned locally + * and pipeline aggregations have not yet been executed. Also, from+size search hits need to be requested to each cluster. + */ +final class SearchResponseMerger { + private final int from; + private final int size; + private final SearchTimeProvider searchTimeProvider; + private final Clusters clusters; + private final Function reduceContextFunction; + private final List searchResponses = new ArrayList<>(); + + SearchResponseMerger(int from, int size, SearchTimeProvider searchTimeProvider, Clusters clusters, + Function reduceContextFunction) { + this.from = from; + this.size = size; + this.searchTimeProvider = Objects.requireNonNull(searchTimeProvider); + this.clusters = Objects.requireNonNull(clusters); + this.reduceContextFunction = Objects.requireNonNull(reduceContextFunction); + } + + //TODO we could merge incrementally, tookInMillis computation would need to be done in the final merge. + //Incremental merges would then perform non final reduction and keep around from+size hits. + void add(SearchResponse searchResponse) { + searchResponses.add(searchResponse); + } + + SearchResponse getMergedResponse() { + return merge(); + } + + private SearchResponse merge() { + assert searchResponses.size() > 1; + int totalShards = 0; + int skippedShards = 0; + int successfulShards = 0; + boolean timedOut = false; + Boolean terminatedEarly = null; + //the current reduce phase counts as one + int numReducePhases = 1; + float maxScore = Float.NEGATIVE_INFINITY; + List failures = new ArrayList<>(); + Map profileResults = new HashMap<>(); + List aggs = new ArrayList<>(); + Map> shardResults = new TreeMap<>(); + List topDocsList = new ArrayList<>(searchResponses.size()); + Map> groupedSuggestions = new HashMap<>(); + Boolean trackTotalHits = null; + + for (SearchResponse searchResponse : searchResponses) { + totalShards += searchResponse.getTotalShards(); + skippedShards += searchResponse.getSkippedShards(); + successfulShards += searchResponse.getSuccessfulShards(); + timedOut = timedOut || searchResponse.isTimedOut(); + if (searchResponse.isTerminatedEarly() != null && searchResponse.isTerminatedEarly()) { + terminatedEarly = true; + } + numReducePhases += searchResponse.getNumReducePhases(); + + Collections.addAll(failures, searchResponse.getShardFailures()); + + profileResults.putAll(searchResponse.getProfileResults()); + + if (searchResponse.getAggregations() != null) { + InternalAggregations internalAggs = (InternalAggregations) searchResponse.getAggregations(); + aggs.add(internalAggs); + } + + Suggest suggest = searchResponse.getSuggest(); + if (suggest != null) { + for (Suggest.Suggestion> entries : suggest) { + List suggestionList = groupedSuggestions.computeIfAbsent(entries.getName(), s -> new ArrayList<>()); + suggestionList.add(entries); + } + } + + SearchHits searchHits = searchResponse.getHits(); + if (Float.isNaN(searchHits.getMaxScore()) == false) { + maxScore = Math.max(maxScore, searchHits.getMaxScore()); + } + + final TotalHits totalHits; + if (searchHits.getTotalHits() == null) { + //in case we did't track total hits, we get null from each cluster, but we need to set 0 eq to the TopDocs + totalHits = new TotalHits(0, TotalHits.Relation.EQUAL_TO); + assert trackTotalHits == null || trackTotalHits == false; + trackTotalHits = false; + + } else { + totalHits = searchHits.getTotalHits(); + assert trackTotalHits == null || trackTotalHits; + trackTotalHits = true; + } + + topDocsList.add(searchHitsToTopDocs(searchHits, totalHits, shardResults)); + } + + //now that we've gone through all the hits and we collected all the shards they come from, we can assign shardIndex to each shard + setShardIndex(shardResults.values()); + TopDocs topDocs = SearchPhaseController.mergeTopDocs(topDocsList, size, from); + SearchHits mergedSearchHits = topDocsToSearchHits(topDocs, Float.isInfinite(maxScore) ? Float.NaN : maxScore, trackTotalHits); + + Suggest suggest = groupedSuggestions.isEmpty() ? null : new Suggest(Suggest.reduce(groupedSuggestions)); + + InternalAggregations reducedAggs = InternalAggregations.reduce(aggs, reduceContextFunction.apply(true)); + + ShardSearchFailure[] shardFailures = failures.toArray(ShardSearchFailure.EMPTY_ARRAY); + //make failures ordering consistent with ordinary search and CCS + Arrays.sort(shardFailures, FAILURES_COMPARATOR); + + InternalSearchResponse response = new InternalSearchResponse(mergedSearchHits, reducedAggs, suggest, + new SearchProfileShardResults(profileResults), timedOut, terminatedEarly, numReducePhases); + + long tookInMillis = searchTimeProvider.buildTookInMillis(); + return new SearchResponse(response, null, totalShards, successfulShards, skippedShards, tookInMillis, shardFailures, clusters); + } + + private static final Comparator FAILURES_COMPARATOR = new Comparator() { + @Override + public int compare(ShardSearchFailure o1, ShardSearchFailure o2) { + ShardId shardId1 = extractShardId(o1); + ShardId shardId2 = extractShardId(o2); + if (shardId1 == null && shardId2 == null) { + return 0; + } + if (shardId1 == null) { + return -1; + } + if (shardId2 == null) { + return 1; + } + return shardId1.compareTo(shardId2); + } + + private ShardId extractShardId(ShardSearchFailure failure) { + SearchShardTarget shard = failure.shard(); + if (shard != null) { + return shard.getShardId(); + } + Throwable cause = failure.getCause(); + if (cause instanceof ElasticsearchException) { + ElasticsearchException e = (ElasticsearchException) cause; + return e.getShardId(); + } + return null; + } + }; + + private static TopDocs searchHitsToTopDocs(SearchHits searchHits, TotalHits totalHits, Map> shardResults) { + SearchHit[] hits = searchHits.getHits(); + ScoreDoc[] scoreDocs = new ScoreDoc[hits.length]; + final TopDocs topDocs; + if (searchHits.getSortFields() != null) { + if (searchHits.getCollapseField() != null) { + assert searchHits.getCollapseValues() != null; + topDocs = new CollapseTopFieldDocs(searchHits.getCollapseField(), totalHits, scoreDocs, + searchHits.getSortFields(), searchHits.getCollapseValues()); + } else { + topDocs = new TopFieldDocs(totalHits, scoreDocs, searchHits.getSortFields()); + } + } else { + topDocs = new TopDocs(totalHits, scoreDocs); + } + + for (int i = 0; i < hits.length; i++) { + SearchHit hit = hits[i]; + List shardHits = shardResults.computeIfAbsent(hit.getShard().getShardId(), shardId -> new ArrayList<>()); + final SortField[] sortFields = searchHits.getSortFields(); + final Object[] sortValues; + if (sortFields == null) { + sortValues = null; + } else { + if (sortFields.length == 1 && sortFields[0].getType() == SortField.Type.SCORE) { + sortValues = new Object[]{hit.getScore()}; + } else { + sortValues = hit.getRawSortValues(); + } + } + FieldDocAndSearchHit scoreDoc = new FieldDocAndSearchHit(hit.docId(), hit.getScore(), sortValues, hit); + scoreDocs[i] = scoreDoc; + shardHits.add(scoreDoc); + } + return topDocs; + } + + private static SearchHits topDocsToSearchHits(TopDocs topDocs, float maxScore, boolean trackTotalHits) { + SearchHit[] searchHits = new SearchHit[topDocs.scoreDocs.length]; + for (int i = 0; i < topDocs.scoreDocs.length; i++) { + FieldDocAndSearchHit scoreDoc = (FieldDocAndSearchHit)topDocs.scoreDocs[i]; + searchHits[i] = scoreDoc.searchHit; + } + + SortField[] sortFields = null; + String collapseField = null; + Object[] collapseValues = null; + if (topDocs instanceof TopFieldDocs) { + sortFields = ((TopFieldDocs)topDocs).fields; + if (topDocs instanceof CollapseTopFieldDocs) { + CollapseTopFieldDocs collapseTopFieldDocs = (CollapseTopFieldDocs)topDocs; + collapseField = collapseTopFieldDocs.field; + collapseValues = collapseTopFieldDocs.collapseValues; + } + } + //in case we didn't track total hits, we got null from each cluster, and we need to set null to the final response + final TotalHits totalHits = trackTotalHits ? topDocs.totalHits : null; + return new SearchHits(searchHits, totalHits, maxScore, sortFields, collapseField, collapseValues); + } + + private static void setShardIndex(Collection> shardResults) { + //every group of hits comes from a different shard. When hits come from the same index on multiple clusters and same + //shard identifier, we rely on such indices to have a different uuid across multiple clusters. + int i = 0; + for (List shardHits : shardResults) { + for (FieldDoc shardHit : shardHits) { + shardHit.shardIndex = i; + } + i++; + } + } + + private static final class FieldDocAndSearchHit extends FieldDoc { + private final SearchHit searchHit; + + //to simplify things, we use a FieldDoc all the time, even when only a ScoreDoc is needed, in which case fields are null. + FieldDocAndSearchHit(int doc, float score, Object[] fields, SearchHit searchHit) { + super(doc, score, fields); + this.searchHit = searchHit; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 88e2764982cb4..3f03c521df52a 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -61,6 +61,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.LongSupplier; @@ -140,7 +141,7 @@ private Map resolveIndexBoosts(SearchRequest searchRequest, Clust * to moving backwards due to NTP and other such complexities, etc.). There are also issues with * using a relative clock for reporting real time. Thus, we simply separate these two uses. */ - static class SearchTimeProvider { + static final class SearchTimeProvider { private final long absoluteStartMillis; private final long relativeStartNanos; @@ -170,12 +171,8 @@ long getAbsoluteStartMillis() { return absoluteStartMillis; } - long getRelativeStartNanos() { - return relativeStartNanos; - } - - long getRelativeCurrentNanos() { - return relativeCurrentNanosProvider.getAsLong(); + long buildTookInMillis() { + return TimeUnit.NANOSECONDS.toMillis(relativeCurrentNanosProvider.getAsLong() - relativeStartNanos); } } diff --git a/server/src/main/java/org/elasticsearch/search/SearchHit.java b/server/src/main/java/org/elasticsearch/search/SearchHit.java index 7fd68852ce284..cb34804981f1b 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchHit.java +++ b/server/src/main/java/org/elasticsearch/search/SearchHit.java @@ -121,6 +121,7 @@ public final class SearchHit implements Streamable, ToXContentObject, Iterable currentRelativeTime); + SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); + SearchResponseMerger merger = new SearchResponseMerger(randomIntBetween(0, 1000), randomIntBetween(0, 10000), + timeProvider, clusters, flag -> null); + int numResponses = randomIntBetween(2, 10); + for (int i = 0; i < numResponses; i++) { + SearchResponse searchResponse = new SearchResponse(InternalSearchResponse.empty(), null, 1, 1, 0, randomLong(), + ShardSearchFailure.EMPTY_ARRAY, SearchResponseTests.randomClusters()); + merger.add(searchResponse); + } + SearchResponse searchResponse = merger.getMergedResponse(); + assertEquals(TimeUnit.NANOSECONDS.toMillis(currentRelativeTime), searchResponse.getTook().millis()); + } + + public void testMergeShardFailures() { + SearchTimeProvider searchTimeProvider = new SearchTimeProvider(0, 0, () -> 0); + SearchResponseMerger merger = new SearchResponseMerger(0, 0, searchTimeProvider, SearchResponse.Clusters.EMPTY, flag -> null); + PriorityQueue> priorityQueue = new PriorityQueue<>(Comparator.comparing(Tuple::v1)); + int numResponses = randomIntBetween(2, 10); + int numIndices = numResponses * randomIntBetween(1, 3); + Iterator> indicesPerCluster = randomRealisticIndices(numIndices, numResponses).entrySet().iterator(); + for (int i = 0; i < numResponses; i++) { + Map.Entry entry = indicesPerCluster.next(); + String clusterAlias = entry.getKey(); + Index[] indices = entry.getValue(); + int numFailures = randomIntBetween(1, 10); + ShardSearchFailure[] shardSearchFailures = new ShardSearchFailure[numFailures]; + for (int j = 0; j < numFailures; j++) { + ShardId shardId = new ShardId(randomFrom(indices), j); + ShardSearchFailure failure; + if (randomBoolean()) { + SearchShardTarget searchShardTarget = new SearchShardTarget(randomAlphaOfLength(6), shardId, clusterAlias, null); + failure = new ShardSearchFailure(new IllegalArgumentException("broke"), searchShardTarget); + } else { + ElasticsearchException elasticsearchException = new ElasticsearchException(new IllegalArgumentException("broke")); + elasticsearchException.setShard(shardId); + failure = new ShardSearchFailure(elasticsearchException); + } + shardSearchFailures[j] = failure; + priorityQueue.add(Tuple.tuple(shardId, failure)); + } + SearchResponse searchResponse = new SearchResponse(InternalSearchResponse.empty(), null, + 1, 0, 0, 100L, shardSearchFailures, SearchResponse.Clusters.EMPTY); + merger.add(searchResponse); + } + + ShardSearchFailure[] shardFailures = merger.getMergedResponse().getShardFailures(); + assertEquals(priorityQueue.size(), shardFailures.length); + for (ShardSearchFailure shardFailure : shardFailures) { + ShardSearchFailure expected = priorityQueue.poll().v2(); + assertSame(expected, shardFailure); + } + } + + public void testMergeShardFailuresNullShardId() { + SearchTimeProvider searchTimeProvider = new SearchTimeProvider(0, 0, () -> 0); + SearchResponseMerger merger = new SearchResponseMerger(0, 0, searchTimeProvider, SearchResponse.Clusters.EMPTY, flag -> null); + List expectedFailures = new ArrayList<>(); + int numResponses = randomIntBetween(2, 10); + for (int i = 0; i < numResponses; i++) { + int numFailures = randomIntBetween(1, 50); + ShardSearchFailure[] shardSearchFailures = new ShardSearchFailure[numFailures]; + for (int j = 0; j < numFailures; j++) { + ShardSearchFailure shardSearchFailure = new ShardSearchFailure(new ElasticsearchException(new IllegalArgumentException())); + shardSearchFailures[j] = shardSearchFailure; + expectedFailures.add(shardSearchFailure); + } + SearchResponse searchResponse = new SearchResponse(InternalSearchResponse.empty(), null, + 1, 0, 0, 100L, shardSearchFailures, SearchResponse.Clusters.EMPTY); + merger.add(searchResponse); + } + ShardSearchFailure[] shardFailures = merger.getMergedResponse().getShardFailures(); + assertThat(Arrays.asList(shardFailures), containsInAnyOrder(expectedFailures.toArray(ShardSearchFailure.EMPTY_ARRAY))); + } + + public void testMergeProfileResults() { + SearchTimeProvider searchTimeProvider = new SearchTimeProvider(0, 0, () -> 0); + SearchResponseMerger merger = new SearchResponseMerger(0, 0, searchTimeProvider, SearchResponse.Clusters.EMPTY, flag -> null); + Map expectedProfile = new HashMap<>(); + int numResponses = randomIntBetween(2, 10); + for (int i = 0; i < numResponses; i++) { + SearchProfileShardResults profile = SearchProfileShardResultsTests.createTestItem(); + expectedProfile.putAll(profile.getShardResults()); + SearchHits searchHits = new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), Float.NaN); + InternalSearchResponse internalSearchResponse = new InternalSearchResponse(searchHits, null, null, profile, false, null, 1); + SearchResponse searchResponse = new SearchResponse(internalSearchResponse, null, 1, 1, 0, 100L, + ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY); + merger.add(searchResponse); + } + SearchResponse searchResponse = merger.getMergedResponse(); + assertEquals(expectedProfile, searchResponse.getProfileResults()); + } + + //TODO add tests for suggestions and aggs reduction? + + public void testMergeSearchHits() { + final long currentRelativeTime = randomLong(); + final SearchTimeProvider timeProvider = new SearchTimeProvider(randomLong(), 0, () -> currentRelativeTime); + final SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); + final int size = randomIntBetween(0, 100); + final int from = size > 0 ? randomIntBetween(0, 100) : 0; + final int requestedSize = from + size; + final int numResponses = randomIntBetween(2, 10); + final SortField[] sortFields; + final String collapseField; + boolean scoreSort = false; + if (randomBoolean()) { + int numFields = randomIntBetween(1, 3); + sortFields = new SortField[numFields]; + for (int i = 0; i < numFields; i++) { + final SortField sortField; + if (randomBoolean()) { + sortField = new SortField("field-" + i, SortField.Type.INT, randomBoolean()); + } else { + scoreSort = true; + sortField = SortField.FIELD_SCORE; + } + sortFields[i] = sortField; + } + collapseField = randomBoolean() ? "collapse" : null; + } else { + collapseField = null; + sortFields = null; + scoreSort = true; + } + TotalHits.Relation totalHitsRelation = frequently() ? randomFrom(TotalHits.Relation.values()) : null; + + PriorityQueue priorityQueue = new PriorityQueue<>(new SearchHitComparator(sortFields)); + SearchResponseMerger merger = new SearchResponseMerger(from, size, timeProvider, clusters, flag -> null); + TotalHits expectedTotalHits = null; + int expectedTotal = 0; + int expectedSuccessful = 0; + int expectedSkipped = 0; + int expectedReducePhases = 1; + boolean expectedTimedOut = false; + Boolean expectedTerminatedEarly = null; + float expectedMaxScore = Float.NEGATIVE_INFINITY; + int numIndices = requestedSize == 0 ? 0 : randomIntBetween(1, requestedSize); + Iterator> indicesIterator = randomRealisticIndices(numIndices, numResponses).entrySet().iterator(); + for (int i = 0; i < numResponses; i++) { + Map.Entry entry = indicesIterator.next(); + String clusterAlias = entry.getKey().equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) ? null : entry.getKey(); + Index[] indices = entry.getValue(); + int total = randomIntBetween(1, 1000); + expectedTotal += total; + int successful = randomIntBetween(1, total); + expectedSuccessful += successful; + int skipped = randomIntBetween(1, total); + expectedSkipped += skipped; + + TotalHits totalHits = null; + if (totalHitsRelation != null) { + //TODO totalHits may overflow if each cluster reports a very high number? + totalHits = new TotalHits(randomLongBetween(0, 1000), totalHitsRelation); + long previousValue = expectedTotalHits == null ? 0 : expectedTotalHits.value; + expectedTotalHits = new TotalHits(previousValue + totalHits.value, totalHitsRelation); + } + + final int numDocs = totalHits == null || totalHits.value >= requestedSize ? requestedSize : (int) totalHits.value; + int scoreFactor = randomIntBetween(1, numResponses); + float maxScore = scoreSort ? numDocs * scoreFactor : Float.NaN; + SearchHit[] hits = randomSearchHitArray(numDocs, numResponses, clusterAlias, indices, maxScore, scoreFactor, + sortFields, priorityQueue); + expectedMaxScore = Math.max(expectedMaxScore, maxScore); + + Object[] collapseValues = null; + if (collapseField != null) { + collapseValues = new Object[numDocs]; + for (int j = 0; j < numDocs; j++) { + //TODO test is green but randomizing this does not seem like a great idea? + collapseValues[j] = randomInt(); + } + } + + SearchHits searchHits = new SearchHits(hits, totalHits, maxScore == Float.NEGATIVE_INFINITY ? Float.NaN : maxScore, + sortFields, collapseField, collapseValues); + + int numReducePhases = randomIntBetween(1, 5); + expectedReducePhases += numReducePhases; + boolean timedOut = rarely(); + expectedTimedOut = expectedTimedOut || timedOut; + Boolean terminatedEarly = frequently() ? null : true; + expectedTerminatedEarly = expectedTerminatedEarly == null ? terminatedEarly : expectedTerminatedEarly; + + InternalSearchResponse internalSearchResponse = new InternalSearchResponse( + searchHits, null, null, null, timedOut, terminatedEarly, numReducePhases); + + SearchResponse searchResponse = new SearchResponse(internalSearchResponse, null, total, successful, skipped, + randomLong(), ShardSearchFailure.EMPTY_ARRAY, SearchResponseTests.randomClusters()); + merger.add(searchResponse); + } + + SearchResponse searchResponse = merger.getMergedResponse(); + + assertEquals(TimeUnit.NANOSECONDS.toMillis(currentRelativeTime), searchResponse.getTook().millis()); + assertEquals(expectedTotal, searchResponse.getTotalShards()); + assertEquals(expectedSuccessful, searchResponse.getSuccessfulShards()); + assertEquals(expectedSkipped, searchResponse.getSkippedShards()); + assertEquals(expectedReducePhases, searchResponse.getNumReducePhases()); + assertEquals(expectedTimedOut, searchResponse.isTimedOut()); + assertEquals(expectedTerminatedEarly, searchResponse.isTerminatedEarly()); + + assertSame(clusters, searchResponse.getClusters()); + assertNull(searchResponse.getScrollId()); + + SearchHits searchHits = searchResponse.getHits(); + assertArrayEquals(sortFields, searchHits.getSortFields()); + assertEquals(collapseField, searchHits.getCollapseField()); + if (expectedTotalHits == null) { + assertNull(searchHits.getTotalHits()); + } else { + assertEquals(expectedTotalHits.value, searchHits.getTotalHits().value); + assertSame(expectedTotalHits.relation, searchHits.getTotalHits().relation); + } + if (expectedMaxScore == Float.NEGATIVE_INFINITY) { + assertTrue(Float.isNaN(searchHits.getMaxScore())); + } else { + assertEquals(expectedMaxScore, searchHits.getMaxScore(), 0f); + } + + for (int i = 0; i < from; i++) { + priorityQueue.poll(); + } + SearchHit[] hits = searchHits.getHits(); + if (collapseField != null) { + assertEquals(hits.length, searchHits.getCollapseValues().length); + } else { + assertNull(searchHits.getCollapseValues()); + } + assertThat(hits.length, lessThanOrEqualTo(size)); + for (SearchHit hit : hits) { + SearchHit expected = priorityQueue.poll(); + assertSame(expected, hit); + } + } + + private static SearchHit[] randomSearchHitArray(int numDocs, int numResponses, String clusterAlias, Index[] indices, float maxScore, + int scoreFactor, SortField[] sortFields, PriorityQueue priorityQueue) { + SearchHit[] hits = new SearchHit[numDocs]; + + int[] sortFieldFactors = new int[sortFields == null ? 0 : sortFields.length]; + for (int j = 0; j < sortFieldFactors.length; j++) { + sortFieldFactors[j] = randomIntBetween(1, numResponses); + } + + for (int j = 0; j < numDocs; j++) { + ShardId shardId = new ShardId(randomFrom(indices), randomIntBetween(0, 10)); + SearchShardTarget shardTarget = new SearchShardTarget(randomAlphaOfLengthBetween(3, 8), shardId, + clusterAlias, OriginalIndices.NONE); + SearchHit hit = new SearchHit(randomIntBetween(0, Integer.MAX_VALUE)); + + float score = Float.NaN; + if (Float.isNaN(maxScore) == false) { + score = (maxScore - j) * scoreFactor; + hit.score(score); + } + + hit.shard(shardTarget); + if (sortFields != null) { + Object[] rawSortValues = new Object[sortFields.length]; + DocValueFormat[] docValueFormats = new DocValueFormat[sortFields.length]; + for (int k = 0; k < sortFields.length; k++) { + SortField sortField = sortFields[k]; + if (sortField == SortField.FIELD_SCORE) { + hit.score(score); + rawSortValues[k] = score; + } else { + rawSortValues[k] = sortField.getReverse() ? numDocs * sortFieldFactors[k] - j : j; + } + docValueFormats[k] = DocValueFormat.RAW; + } + hit.sortValues(rawSortValues, docValueFormats); + } + hits[j] = hit; + priorityQueue.add(hit); + } + return hits; + } + + private static Map randomRealisticIndices(int numIndices, int numClusters) { + String[] indicesNames = new String[numIndices]; + for (int i = 0; i < numIndices; i++) { + indicesNames[i] = randomAlphaOfLengthBetween(5, 10); + } + Map indicesPerCluster = new TreeMap<>(); + for (int i = 0; i < numClusters; i++) { + Index[] indices = new Index[indicesNames.length]; + for (int j = 0; j < indices.length; j++) { + //Realistically clusters have the same indices with same names, but different uuid + indices[j] = new Index(indicesNames[j], randomAlphaOfLength(10)); + } + String clusterAlias; + if (frequently() || indicesPerCluster.containsKey(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) { + clusterAlias = randomAlphaOfLengthBetween(5, 10); + } else { + clusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; + } + indicesPerCluster.put(clusterAlias, indices); + } + return indicesPerCluster; + } + + private static final class SearchHitComparator implements Comparator { + + private final SortField[] sortFields; + + SearchHitComparator(SortField[] sortFields) { + this.sortFields = sortFields; + } + + @Override + public int compare(SearchHit a, SearchHit b) { + if (sortFields == null) { + int scoreCompare = Float.compare(b.getScore(), a.getScore()); + if (scoreCompare != 0) { + return scoreCompare; + } + } else { + for (int i = 0; i < sortFields.length; i++) { + SortField sortField = sortFields[i]; + if (sortField == SortField.FIELD_SCORE) { + int scoreCompare = Float.compare(b.getScore(), a.getScore()); + if (scoreCompare != 0) { + return scoreCompare; + } + } else { + Integer aSortValue = (Integer)a.getRawSortValues()[i]; + Integer bSortValue = (Integer)b.getRawSortValues()[i]; + final int compare; + if (sortField.getReverse()) { + compare = Integer.compare(bSortValue, aSortValue); + } else { + compare = Integer.compare(aSortValue, bSortValue); + } + if (compare != 0) { + return compare; + } + } + } + } + int shardIdCompareTo = a.getShard().getShardId().compareTo(b.getShard().getShardId()); + if (shardIdCompareTo != 0) { + return shardIdCompareTo; + } + return Integer.compare(a.docId(), b.docId()); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchResponseTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchResponseTests.java index ecd2dc44a702e..f07be38765f66 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchResponseTests.java @@ -128,7 +128,7 @@ private SearchResponse createTestItem(boolean minimal, ShardSearchFailure... sha shardSearchFailures, randomBoolean() ? randomClusters() : SearchResponse.Clusters.EMPTY); } - private static SearchResponse.Clusters randomClusters() { + static SearchResponse.Clusters randomClusters() { int totalClusters = randomIntBetween(0, 10); int successfulClusters = randomIntBetween(0, totalClusters); int skippedClusters = totalClusters - successfulClusters; From 8cc93ed975105ba6e386589b5e920d62f0f7fc7a Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Thu, 17 Jan 2019 16:12:44 +0100 Subject: [PATCH 02/13] Move from ArrayList to CopyOnWriteArrayList and test concurrency also add TODOs about possible improvements --- .../action/search/SearchResponseMerger.java | 20 +++-- .../search/SearchResponseMergerTests.java | 73 +++++++++++++------ 2 files changed, 64 insertions(+), 29 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java b/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java index b0806fef03634..19a29fad2c061 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java @@ -49,6 +49,7 @@ import java.util.Map; import java.util.Objects; import java.util.TreeMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Function; import static org.elasticsearch.action.search.SearchResponse.Clusters; @@ -60,13 +61,15 @@ * Preconditions are that only non final reduction has been performed on each cluster, meaning that buckets have not been pruned locally * and pipeline aggregations have not yet been executed. Also, from+size search hits need to be requested to each cluster. */ +//TODO it may make sense to investigate reusing existing merge code in SearchPhaseController#reducedQueryPhase, the logic is similar +//yet there are substantial differences in terms of the objects exchanged and logic in the sortDocs method. final class SearchResponseMerger { private final int from; private final int size; private final SearchTimeProvider searchTimeProvider; private final Clusters clusters; private final Function reduceContextFunction; - private final List searchResponses = new ArrayList<>(); + private final List searchResponses = new CopyOnWriteArrayList<>(); SearchResponseMerger(int from, int size, SearchTimeProvider searchTimeProvider, Clusters clusters, Function reduceContextFunction) { @@ -77,17 +80,20 @@ final class SearchResponseMerger { this.reduceContextFunction = Objects.requireNonNull(reduceContextFunction); } - //TODO we could merge incrementally, tookInMillis computation would need to be done in the final merge. - //Incremental merges would then perform non final reduction and keep around from+size hits. + /** + * Add a search response to the list of responses to be merged together into one. + * Merges currently happen at once when all responses are available and {@link #getMergedResponse()} is called. That may change + * in the future as it's possible to introduce incremental merges as responses come in if necessary. + */ void add(SearchResponse searchResponse) { searchResponses.add(searchResponse); } + /** + * Returns the merged response. To be called once all responses have been added through {@link #add(SearchResponse)} + * so that all responses are merged into a single one. + */ SearchResponse getMergedResponse() { - return merge(); - } - - private SearchResponse merge() { assert searchResponses.size() > 1; int totalShards = 0; int skippedShards = 0; diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java index 8a0a7868e6cdb..19af374d0be30 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java @@ -37,6 +37,7 @@ import org.elasticsearch.search.profile.SearchProfileShardResultsTests; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.RemoteClusterAware; +import org.junit.Before; import java.util.ArrayList; import java.util.Arrays; @@ -47,6 +48,8 @@ import java.util.Map; import java.util.PriorityQueue; import java.util.TreeMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -54,27 +57,48 @@ public class SearchResponseMergerTests extends ESTestCase { - public void testMergeTookInMillis() { + private int numResponses; + private ExecutorService executorService; + + @Before + public void init() { + numResponses = randomIntBetween(2, 10); + executorService = Executors.newFixedThreadPool(numResponses); + } + + private void addResponse(SearchResponseMerger searchResponseMerger, SearchResponse searchResponse) { + if (randomBoolean()) { + executorService.submit(() -> searchResponseMerger.add(searchResponse)); + } else { + searchResponseMerger.add(searchResponse); + } + } + + private void awaitResponsesAdded() throws InterruptedException { + executorService.shutdown(); + executorService.awaitTermination(5, TimeUnit.SECONDS); + } + + public void testMergeTookInMillis() throws InterruptedException { long currentRelativeTime = randomLong(); SearchTimeProvider timeProvider = new SearchTimeProvider(randomLong(), 0, () -> currentRelativeTime); SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); SearchResponseMerger merger = new SearchResponseMerger(randomIntBetween(0, 1000), randomIntBetween(0, 10000), timeProvider, clusters, flag -> null); - int numResponses = randomIntBetween(2, 10); for (int i = 0; i < numResponses; i++) { SearchResponse searchResponse = new SearchResponse(InternalSearchResponse.empty(), null, 1, 1, 0, randomLong(), ShardSearchFailure.EMPTY_ARRAY, SearchResponseTests.randomClusters()); - merger.add(searchResponse); + addResponse(merger, searchResponse); } + awaitResponsesAdded(); SearchResponse searchResponse = merger.getMergedResponse(); assertEquals(TimeUnit.NANOSECONDS.toMillis(currentRelativeTime), searchResponse.getTook().millis()); } - public void testMergeShardFailures() { + public void testMergeShardFailures() throws InterruptedException { SearchTimeProvider searchTimeProvider = new SearchTimeProvider(0, 0, () -> 0); SearchResponseMerger merger = new SearchResponseMerger(0, 0, searchTimeProvider, SearchResponse.Clusters.EMPTY, flag -> null); PriorityQueue> priorityQueue = new PriorityQueue<>(Comparator.comparing(Tuple::v1)); - int numResponses = randomIntBetween(2, 10); int numIndices = numResponses * randomIntBetween(1, 3); Iterator> indicesPerCluster = randomRealisticIndices(numIndices, numResponses).entrySet().iterator(); for (int i = 0; i < numResponses; i++) { @@ -88,9 +112,9 @@ public void testMergeShardFailures() { ShardSearchFailure failure; if (randomBoolean()) { SearchShardTarget searchShardTarget = new SearchShardTarget(randomAlphaOfLength(6), shardId, clusterAlias, null); - failure = new ShardSearchFailure(new IllegalArgumentException("broke"), searchShardTarget); + failure = new ShardSearchFailure(new IllegalArgumentException(), searchShardTarget); } else { - ElasticsearchException elasticsearchException = new ElasticsearchException(new IllegalArgumentException("broke")); + ElasticsearchException elasticsearchException = new ElasticsearchException(new IllegalArgumentException()); elasticsearchException.setShard(shardId); failure = new ShardSearchFailure(elasticsearchException); } @@ -99,9 +123,9 @@ public void testMergeShardFailures() { } SearchResponse searchResponse = new SearchResponse(InternalSearchResponse.empty(), null, 1, 0, 0, 100L, shardSearchFailures, SearchResponse.Clusters.EMPTY); - merger.add(searchResponse); + addResponse(merger, searchResponse); } - + awaitResponsesAdded(); ShardSearchFailure[] shardFailures = merger.getMergedResponse().getShardFailures(); assertEquals(priorityQueue.size(), shardFailures.length); for (ShardSearchFailure shardFailure : shardFailures) { @@ -110,11 +134,10 @@ public void testMergeShardFailures() { } } - public void testMergeShardFailuresNullShardId() { + public void testMergeShardFailuresNullShardId() throws InterruptedException { SearchTimeProvider searchTimeProvider = new SearchTimeProvider(0, 0, () -> 0); SearchResponseMerger merger = new SearchResponseMerger(0, 0, searchTimeProvider, SearchResponse.Clusters.EMPTY, flag -> null); List expectedFailures = new ArrayList<>(); - int numResponses = randomIntBetween(2, 10); for (int i = 0; i < numResponses; i++) { int numFailures = randomIntBetween(1, 50); ShardSearchFailure[] shardSearchFailures = new ShardSearchFailure[numFailures]; @@ -125,17 +148,17 @@ public void testMergeShardFailuresNullShardId() { } SearchResponse searchResponse = new SearchResponse(InternalSearchResponse.empty(), null, 1, 0, 0, 100L, shardSearchFailures, SearchResponse.Clusters.EMPTY); - merger.add(searchResponse); + addResponse(merger, searchResponse); } + awaitResponsesAdded(); ShardSearchFailure[] shardFailures = merger.getMergedResponse().getShardFailures(); assertThat(Arrays.asList(shardFailures), containsInAnyOrder(expectedFailures.toArray(ShardSearchFailure.EMPTY_ARRAY))); } - public void testMergeProfileResults() { + public void testMergeProfileResults() throws InterruptedException { SearchTimeProvider searchTimeProvider = new SearchTimeProvider(0, 0, () -> 0); SearchResponseMerger merger = new SearchResponseMerger(0, 0, searchTimeProvider, SearchResponse.Clusters.EMPTY, flag -> null); Map expectedProfile = new HashMap<>(); - int numResponses = randomIntBetween(2, 10); for (int i = 0; i < numResponses; i++) { SearchProfileShardResults profile = SearchProfileShardResultsTests.createTestItem(); expectedProfile.putAll(profile.getShardResults()); @@ -143,22 +166,24 @@ public void testMergeProfileResults() { InternalSearchResponse internalSearchResponse = new InternalSearchResponse(searchHits, null, null, profile, false, null, 1); SearchResponse searchResponse = new SearchResponse(internalSearchResponse, null, 1, 1, 0, 100L, ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY); - merger.add(searchResponse); + addResponse(merger, searchResponse); } + awaitResponsesAdded(); SearchResponse searchResponse = merger.getMergedResponse(); assertEquals(expectedProfile, searchResponse.getProfileResults()); } //TODO add tests for suggestions and aggs reduction? - public void testMergeSearchHits() { + //TODO do we want to have a specific test for field collapsing to check that the grouping actually works? + + public void testMergeSearchHits() throws InterruptedException { final long currentRelativeTime = randomLong(); final SearchTimeProvider timeProvider = new SearchTimeProvider(randomLong(), 0, () -> currentRelativeTime); final SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); final int size = randomIntBetween(0, 100); final int from = size > 0 ? randomIntBetween(0, 100) : 0; final int requestedSize = from + size; - final int numResponses = randomIntBetween(2, 10); final SortField[] sortFields; final String collapseField; boolean scoreSort = false; @@ -184,7 +209,7 @@ public void testMergeSearchHits() { TotalHits.Relation totalHitsRelation = frequently() ? randomFrom(TotalHits.Relation.values()) : null; PriorityQueue priorityQueue = new PriorityQueue<>(new SearchHitComparator(sortFields)); - SearchResponseMerger merger = new SearchResponseMerger(from, size, timeProvider, clusters, flag -> null); + SearchResponseMerger searchResponseMerger = new SearchResponseMerger(from, size, timeProvider, clusters, flag -> null); TotalHits expectedTotalHits = null; int expectedTotal = 0; int expectedSuccessful = 0; @@ -225,8 +250,8 @@ public void testMergeSearchHits() { if (collapseField != null) { collapseValues = new Object[numDocs]; for (int j = 0; j < numDocs; j++) { - //TODO test is green but randomizing this does not seem like a great idea? - collapseValues[j] = randomInt(); + //set different collapse values for each cluster for simplicity + collapseValues[j] = j + 1000 * i; } } @@ -245,10 +270,13 @@ public void testMergeSearchHits() { SearchResponse searchResponse = new SearchResponse(internalSearchResponse, null, total, successful, skipped, randomLong(), ShardSearchFailure.EMPTY_ARRAY, SearchResponseTests.randomClusters()); - merger.add(searchResponse); + + addResponse(searchResponseMerger, searchResponse); } - SearchResponse searchResponse = merger.getMergedResponse(); + awaitResponsesAdded(); + + SearchResponse searchResponse = searchResponseMerger.getMergedResponse(); assertEquals(TimeUnit.NANOSECONDS.toMillis(currentRelativeTime), searchResponse.getTook().millis()); assertEquals(expectedTotal, searchResponse.getTotalShards()); @@ -267,6 +295,7 @@ public void testMergeSearchHits() { if (expectedTotalHits == null) { assertNull(searchHits.getTotalHits()); } else { + assertNotNull(searchHits.getTotalHits()); assertEquals(expectedTotalHits.value, searchHits.getTotalHits().value); assertSame(expectedTotalHits.relation, searchHits.getTotalHits().relation); } From 412c5698c1ca4ca266e3319cf638a883490b1a2c Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Thu, 17 Jan 2019 16:15:11 +0100 Subject: [PATCH 03/13] remove some empty lines --- .../elasticsearch/action/search/SearchResponseMerger.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java b/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java index 19a29fad2c061..c3e88660aeb2b 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java @@ -142,20 +142,17 @@ SearchResponse getMergedResponse() { if (Float.isNaN(searchHits.getMaxScore()) == false) { maxScore = Math.max(maxScore, searchHits.getMaxScore()); } - final TotalHits totalHits; if (searchHits.getTotalHits() == null) { //in case we did't track total hits, we get null from each cluster, but we need to set 0 eq to the TopDocs totalHits = new TotalHits(0, TotalHits.Relation.EQUAL_TO); assert trackTotalHits == null || trackTotalHits == false; trackTotalHits = false; - } else { totalHits = searchHits.getTotalHits(); assert trackTotalHits == null || trackTotalHits; trackTotalHits = true; } - topDocsList.add(searchHitsToTopDocs(searchHits, totalHits, shardResults)); } @@ -163,18 +160,13 @@ SearchResponse getMergedResponse() { setShardIndex(shardResults.values()); TopDocs topDocs = SearchPhaseController.mergeTopDocs(topDocsList, size, from); SearchHits mergedSearchHits = topDocsToSearchHits(topDocs, Float.isInfinite(maxScore) ? Float.NaN : maxScore, trackTotalHits); - Suggest suggest = groupedSuggestions.isEmpty() ? null : new Suggest(Suggest.reduce(groupedSuggestions)); - InternalAggregations reducedAggs = InternalAggregations.reduce(aggs, reduceContextFunction.apply(true)); - ShardSearchFailure[] shardFailures = failures.toArray(ShardSearchFailure.EMPTY_ARRAY); //make failures ordering consistent with ordinary search and CCS Arrays.sort(shardFailures, FAILURES_COMPARATOR); - InternalSearchResponse response = new InternalSearchResponse(mergedSearchHits, reducedAggs, suggest, new SearchProfileShardResults(profileResults), timedOut, terminatedEarly, numReducePhases); - long tookInMillis = searchTimeProvider.buildTookInMillis(); return new SearchResponse(response, null, totalShards, successfulShards, skippedShards, tookInMillis, shardFailures, clusters); } From 205f0aa382b049fa1f9c497fb2751030da2bd998 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Thu, 17 Jan 2019 16:50:03 +0100 Subject: [PATCH 04/13] replace TreeMap> with TreeMap --- .../action/search/SearchResponseMerger.java | 27 +++++++++++++------ 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java b/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java index c3e88660aeb2b..f5b1f6c7f8402 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java @@ -106,7 +106,7 @@ SearchResponse getMergedResponse() { List failures = new ArrayList<>(); Map profileResults = new HashMap<>(); List aggs = new ArrayList<>(); - Map> shardResults = new TreeMap<>(); + Map shards = new TreeMap<>(); List topDocsList = new ArrayList<>(searchResponses.size()); Map> groupedSuggestions = new HashMap<>(); Boolean trackTotalHits = null; @@ -153,11 +153,23 @@ SearchResponse getMergedResponse() { assert trackTotalHits == null || trackTotalHits; trackTotalHits = true; } - topDocsList.add(searchHitsToTopDocs(searchHits, totalHits, shardResults)); + topDocsList.add(searchHitsToTopDocs(searchHits, totalHits, shards)); } //now that we've gone through all the hits and we collected all the shards they come from, we can assign shardIndex to each shard - setShardIndex(shardResults.values()); + int shardIndex = 0; + for (Map.Entry shard : shards.entrySet()) { + shard.setValue(shardIndex++); + } + //and go through all the scoreDocs from each cluster and set their corresponding shardIndex + for (TopDocs topDocs : topDocsList) { + for (ScoreDoc scoreDoc : topDocs.scoreDocs) { + FieldDocAndSearchHit fieldDocAndSearchHit = (FieldDocAndSearchHit) scoreDoc; + ShardId shardId = fieldDocAndSearchHit.searchHit.getShard().getShardId(); + fieldDocAndSearchHit.shardIndex = shards.get(shardId); + } + } + TopDocs topDocs = SearchPhaseController.mergeTopDocs(topDocsList, size, from); SearchHits mergedSearchHits = topDocsToSearchHits(topDocs, Float.isInfinite(maxScore) ? Float.NaN : maxScore, trackTotalHits); Suggest suggest = groupedSuggestions.isEmpty() ? null : new Suggest(Suggest.reduce(groupedSuggestions)); @@ -202,7 +214,7 @@ private ShardId extractShardId(ShardSearchFailure failure) { } }; - private static TopDocs searchHitsToTopDocs(SearchHits searchHits, TotalHits totalHits, Map> shardResults) { + private static TopDocs searchHitsToTopDocs(SearchHits searchHits, TotalHits totalHits, Map shards) { SearchHit[] hits = searchHits.getHits(); ScoreDoc[] scoreDocs = new ScoreDoc[hits.length]; final TopDocs topDocs; @@ -220,7 +232,8 @@ private static TopDocs searchHitsToTopDocs(SearchHits searchHits, TotalHits tota for (int i = 0; i < hits.length; i++) { SearchHit hit = hits[i]; - List shardHits = shardResults.computeIfAbsent(hit.getShard().getShardId(), shardId -> new ArrayList<>()); + ShardId shardId = hit.getShard().getShardId(); + shards.putIfAbsent(shardId, null); final SortField[] sortFields = searchHits.getSortFields(); final Object[] sortValues; if (sortFields == null) { @@ -232,9 +245,7 @@ private static TopDocs searchHitsToTopDocs(SearchHits searchHits, TotalHits tota sortValues = hit.getRawSortValues(); } } - FieldDocAndSearchHit scoreDoc = new FieldDocAndSearchHit(hit.docId(), hit.getScore(), sortValues, hit); - scoreDocs[i] = scoreDoc; - shardHits.add(scoreDoc); + scoreDocs[i] = new FieldDocAndSearchHit(hit.docId(), hit.getScore(), sortValues, hit); } return topDocs; } From a3251291806a1d46a3c9e4f7d95779b5460b80c6 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Fri, 18 Jan 2019 10:25:33 +0100 Subject: [PATCH 05/13] Handle trackTotalHitsUpTo and disabling local hits tracking Adapt TopDocsStats so it can be reused. --- .../action/search/SearchPhaseController.java | 16 ++++---- .../action/search/SearchResponseMerger.java | 33 +++++++++------- .../search/SearchPhaseControllerTests.java | 2 +- .../search/SearchResponseMergerTests.java | 38 ++++++++++++++----- 4 files changed, 58 insertions(+), 31 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index b22aa9669fbbc..b587c11aba690 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -443,7 +443,7 @@ private ReducedQueryPhase reducedQueryPhase(Collection reduceContextFunction; private final List searchResponses = new CopyOnWriteArrayList<>(); - SearchResponseMerger(int from, int size, SearchTimeProvider searchTimeProvider, Clusters clusters, + SearchResponseMerger(int from, int size, int trackTotalHitsUpTo, SearchTimeProvider searchTimeProvider, Clusters clusters, Function reduceContextFunction) { this.from = from; this.size = size; + this.trackTotalHitsUpTo = trackTotalHitsUpTo; this.searchTimeProvider = Objects.requireNonNull(searchTimeProvider); this.clusters = Objects.requireNonNull(clusters); this.reduceContextFunction = Objects.requireNonNull(reduceContextFunction); @@ -102,7 +107,6 @@ SearchResponse getMergedResponse() { Boolean terminatedEarly = null; //the current reduce phase counts as one int numReducePhases = 1; - float maxScore = Float.NEGATIVE_INFINITY; List failures = new ArrayList<>(); Map profileResults = new HashMap<>(); List aggs = new ArrayList<>(); @@ -111,6 +115,8 @@ SearchResponse getMergedResponse() { Map> groupedSuggestions = new HashMap<>(); Boolean trackTotalHits = null; + TopDocsStats topDocsStats = new TopDocsStats(trackTotalHitsUpTo); + for (SearchResponse searchResponse : searchResponses) { totalShards += searchResponse.getTotalShards(); skippedShards += searchResponse.getSkippedShards(); @@ -139,12 +145,10 @@ SearchResponse getMergedResponse() { } SearchHits searchHits = searchResponse.getHits(); - if (Float.isNaN(searchHits.getMaxScore()) == false) { - maxScore = Math.max(maxScore, searchHits.getMaxScore()); - } + final TotalHits totalHits; if (searchHits.getTotalHits() == null) { - //in case we did't track total hits, we get null from each cluster, but we need to set 0 eq to the TopDocs + //in case we didn't track total hits, we get null from each cluster, but we need to set 0 eq to the TopDocs totalHits = new TotalHits(0, TotalHits.Relation.EQUAL_TO); assert trackTotalHits == null || trackTotalHits == false; trackTotalHits = false; @@ -153,7 +157,9 @@ SearchResponse getMergedResponse() { assert trackTotalHits == null || trackTotalHits; trackTotalHits = true; } - topDocsList.add(searchHitsToTopDocs(searchHits, totalHits, shards)); + TopDocs topDocs = searchHitsToTopDocs(searchHits, totalHits, shards); + topDocsStats.add(new TopDocsAndMaxScore(topDocs, searchHits.getMaxScore())); + topDocsList.add(topDocs); } //now that we've gone through all the hits and we collected all the shards they come from, we can assign shardIndex to each shard @@ -165,13 +171,15 @@ SearchResponse getMergedResponse() { for (TopDocs topDocs : topDocsList) { for (ScoreDoc scoreDoc : topDocs.scoreDocs) { FieldDocAndSearchHit fieldDocAndSearchHit = (FieldDocAndSearchHit) scoreDoc; + //When hits come from the indices with same names on multiple clusters and same shard identifier, we rely on such indices + //to have a different uuid across multiple clusters. That's how they will get a different shardIndex. ShardId shardId = fieldDocAndSearchHit.searchHit.getShard().getShardId(); fieldDocAndSearchHit.shardIndex = shards.get(shardId); } } - TopDocs topDocs = SearchPhaseController.mergeTopDocs(topDocsList, size, from); - SearchHits mergedSearchHits = topDocsToSearchHits(topDocs, Float.isInfinite(maxScore) ? Float.NaN : maxScore, trackTotalHits); + TopDocs topDocs = mergeTopDocs(topDocsList, size, from); + SearchHits mergedSearchHits = topDocsToSearchHits(topDocs, topDocsStats); Suggest suggest = groupedSuggestions.isEmpty() ? null : new Suggest(Suggest.reduce(groupedSuggestions)); InternalAggregations reducedAggs = InternalAggregations.reduce(aggs, reduceContextFunction.apply(true)); ShardSearchFailure[] shardFailures = failures.toArray(ShardSearchFailure.EMPTY_ARRAY); @@ -250,7 +258,7 @@ private static TopDocs searchHitsToTopDocs(SearchHits searchHits, TotalHits tota return topDocs; } - private static SearchHits topDocsToSearchHits(TopDocs topDocs, float maxScore, boolean trackTotalHits) { + private static SearchHits topDocsToSearchHits(TopDocs topDocs, TopDocsStats topDocsStats) { SearchHit[] searchHits = new SearchHit[topDocs.scoreDocs.length]; for (int i = 0; i < topDocs.scoreDocs.length; i++) { FieldDocAndSearchHit scoreDoc = (FieldDocAndSearchHit)topDocs.scoreDocs[i]; @@ -268,9 +276,8 @@ private static SearchHits topDocsToSearchHits(TopDocs topDocs, float maxScore, b collapseValues = collapseTopFieldDocs.collapseValues; } } - //in case we didn't track total hits, we got null from each cluster, and we need to set null to the final response - final TotalHits totalHits = trackTotalHits ? topDocs.totalHits : null; - return new SearchHits(searchHits, totalHits, maxScore, sortFields, collapseField, collapseValues); + return new SearchHits(searchHits, topDocsStats.getTotalHits(), topDocsStats.getMaxScore(), + sortFields, collapseField, collapseValues); } private static void setShardIndex(Collection> shardResults) { diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java index a5ab81d83fbcd..23e5626e11aa9 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java @@ -139,7 +139,7 @@ public void testSortIsIdempotent() throws Exception { assertEquals(sortedDocs[i].shardIndex, sortedDocs2[i].shardIndex); assertEquals(sortedDocs[i].score, sortedDocs2[i].score, 0.0f); } - assertEquals(topDocsStats.maxScore, topDocsStats2.maxScore, 0.0f); + assertEquals(topDocsStats.getMaxScore(), topDocsStats2.getMaxScore(), 0.0f); assertEquals(topDocsStats.getTotalHits().value, topDocsStats2.getTotalHits().value); assertEquals(topDocsStats.getTotalHits().relation, topDocsStats2.getTotalHits().relation); assertEquals(topDocsStats.fetchHits, topDocsStats2.fetchHits); diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java index 19af374d0be30..f2e49a109810d 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.internal.InternalSearchResponse; +import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.profile.ProfileShardResult; import org.elasticsearch.search.profile.SearchProfileShardResults; import org.elasticsearch.search.profile.SearchProfileShardResultsTests; @@ -84,7 +85,7 @@ public void testMergeTookInMillis() throws InterruptedException { SearchTimeProvider timeProvider = new SearchTimeProvider(randomLong(), 0, () -> currentRelativeTime); SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); SearchResponseMerger merger = new SearchResponseMerger(randomIntBetween(0, 1000), randomIntBetween(0, 10000), - timeProvider, clusters, flag -> null); + SearchContext.TRACK_TOTAL_HITS_ACCURATE, timeProvider, clusters, flag -> null); for (int i = 0; i < numResponses; i++) { SearchResponse searchResponse = new SearchResponse(InternalSearchResponse.empty(), null, 1, 1, 0, randomLong(), ShardSearchFailure.EMPTY_ARRAY, SearchResponseTests.randomClusters()); @@ -97,7 +98,8 @@ public void testMergeTookInMillis() throws InterruptedException { public void testMergeShardFailures() throws InterruptedException { SearchTimeProvider searchTimeProvider = new SearchTimeProvider(0, 0, () -> 0); - SearchResponseMerger merger = new SearchResponseMerger(0, 0, searchTimeProvider, SearchResponse.Clusters.EMPTY, flag -> null); + SearchResponseMerger merger = new SearchResponseMerger(0, 0, SearchContext.TRACK_TOTAL_HITS_ACCURATE, + searchTimeProvider, SearchResponse.Clusters.EMPTY, flag -> null); PriorityQueue> priorityQueue = new PriorityQueue<>(Comparator.comparing(Tuple::v1)); int numIndices = numResponses * randomIntBetween(1, 3); Iterator> indicesPerCluster = randomRealisticIndices(numIndices, numResponses).entrySet().iterator(); @@ -136,7 +138,8 @@ public void testMergeShardFailures() throws InterruptedException { public void testMergeShardFailuresNullShardId() throws InterruptedException { SearchTimeProvider searchTimeProvider = new SearchTimeProvider(0, 0, () -> 0); - SearchResponseMerger merger = new SearchResponseMerger(0, 0, searchTimeProvider, SearchResponse.Clusters.EMPTY, flag -> null); + SearchResponseMerger merger = new SearchResponseMerger(0, 0, SearchContext.TRACK_TOTAL_HITS_ACCURATE, + searchTimeProvider, SearchResponse.Clusters.EMPTY, flag -> null); List expectedFailures = new ArrayList<>(); for (int i = 0; i < numResponses; i++) { int numFailures = randomIntBetween(1, 50); @@ -157,7 +160,8 @@ public void testMergeShardFailuresNullShardId() throws InterruptedException { public void testMergeProfileResults() throws InterruptedException { SearchTimeProvider searchTimeProvider = new SearchTimeProvider(0, 0, () -> 0); - SearchResponseMerger merger = new SearchResponseMerger(0, 0, searchTimeProvider, SearchResponse.Clusters.EMPTY, flag -> null); + SearchResponseMerger merger = new SearchResponseMerger(0, 0, SearchContext.TRACK_TOTAL_HITS_ACCURATE, + searchTimeProvider, SearchResponse.Clusters.EMPTY, flag -> null); Map expectedProfile = new HashMap<>(); for (int i = 0; i < numResponses; i++) { SearchProfileShardResults profile = SearchProfileShardResultsTests.createTestItem(); @@ -206,10 +210,14 @@ public void testMergeSearchHits() throws InterruptedException { sortFields = null; scoreSort = true; } - TotalHits.Relation totalHitsRelation = frequently() ? randomFrom(TotalHits.Relation.values()) : null; + Tuple randomTrackTotalHits = randomTrackTotalHits(); + int trackTotalHitsUpTo = randomTrackTotalHits.v1(); + TotalHits.Relation totalHitsRelation = randomTrackTotalHits.v2(); PriorityQueue priorityQueue = new PriorityQueue<>(new SearchHitComparator(sortFields)); - SearchResponseMerger searchResponseMerger = new SearchResponseMerger(from, size, timeProvider, clusters, flag -> null); + SearchResponseMerger searchResponseMerger = new SearchResponseMerger(from, size, trackTotalHitsUpTo, + timeProvider, clusters, flag -> null); + TotalHits expectedTotalHits = null; int expectedTotal = 0; int expectedSuccessful = 0; @@ -232,11 +240,10 @@ public void testMergeSearchHits() throws InterruptedException { expectedSkipped += skipped; TotalHits totalHits = null; - if (totalHitsRelation != null) { - //TODO totalHits may overflow if each cluster reports a very high number? + if (trackTotalHitsUpTo != SearchContext.TRACK_TOTAL_HITS_DISABLED) { totalHits = new TotalHits(randomLongBetween(0, 1000), totalHitsRelation); long previousValue = expectedTotalHits == null ? 0 : expectedTotalHits.value; - expectedTotalHits = new TotalHits(previousValue + totalHits.value, totalHitsRelation); + expectedTotalHits = new TotalHits(Math.min(previousValue + totalHits.value, trackTotalHitsUpTo), totalHitsRelation); } final int numDocs = totalHits == null || totalHits.value >= requestedSize ? requestedSize : (int) totalHits.value; @@ -321,6 +328,19 @@ public void testMergeSearchHits() throws InterruptedException { } } + private static Tuple randomTrackTotalHits() { + switch(randomIntBetween(0, 2)) { + case 0: + return Tuple.tuple(SearchContext.TRACK_TOTAL_HITS_DISABLED, null); + case 1: + return Tuple.tuple(randomIntBetween(10, 1000), TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO); + case 2: + return Tuple.tuple(SearchContext.TRACK_TOTAL_HITS_ACCURATE, TotalHits.Relation.EQUAL_TO); + default: + throw new UnsupportedOperationException(); + } + } + private static SearchHit[] randomSearchHitArray(int numDocs, int numResponses, String clusterAlias, Index[] indices, float maxScore, int scoreFactor, SortField[] sortFields, PriorityQueue priorityQueue) { SearchHit[] hits = new SearchHit[numDocs]; From 3537afccf05af86874a4f2f222f66ff80931a41d Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Fri, 18 Jan 2019 10:27:44 +0100 Subject: [PATCH 06/13] Remove TopDocsStats constructor used only in tests --- .../action/search/SearchPhaseController.java | 4 ---- .../action/search/SearchPhaseControllerTests.java | 9 ++++----- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index b587c11aba690..2380b5b6f32be 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -742,10 +742,6 @@ static final class TopDocsStats { long fetchHits; private float maxScore = Float.NEGATIVE_INFINITY; - TopDocsStats() { - this(SearchContext.TRACK_TOTAL_HITS_ACCURATE); - } - TopDocsStats(int trackTotalHitsUpTo) { this.trackTotalHitsUpTo = trackTotalHitsUpTo; this.totalHits = 0; diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java index 23e5626e11aa9..e262147ef85d0 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java @@ -102,9 +102,8 @@ public void testSort() { size = first.get().queryResult().size(); } int accumulatedLength = Math.min(queryResultSize, getTotalQueryHits(results)); - ScoreDoc[] sortedDocs = SearchPhaseController.sortDocs(true, results.asList(), null, new SearchPhaseController.TopDocsStats(), - from, size) - .scoreDocs; + ScoreDoc[] sortedDocs = SearchPhaseController.sortDocs(true, results.asList(), null, + new SearchPhaseController.TopDocsStats(SearchContext.TRACK_TOTAL_HITS_ACCURATE), from, size).scoreDocs; for (Suggest.Suggestion suggestion : reducedSuggest(results)) { int suggestionSize = suggestion.getEntries().get(0).getOptions().size(); accumulatedLength += suggestionSize; @@ -126,12 +125,12 @@ public void testSortIsIdempotent() throws Exception { from = first.get().queryResult().from(); size = first.get().queryResult().size(); } - SearchPhaseController.TopDocsStats topDocsStats = new SearchPhaseController.TopDocsStats(); + SearchPhaseController.TopDocsStats topDocsStats = new SearchPhaseController.TopDocsStats(SearchContext.TRACK_TOTAL_HITS_ACCURATE); ScoreDoc[] sortedDocs = SearchPhaseController.sortDocs(ignoreFrom, results.asList(), null, topDocsStats, from, size).scoreDocs; results = generateSeededQueryResults(randomSeed, nShards, Collections.emptyList(), queryResultSize, useConstantScore); - SearchPhaseController.TopDocsStats topDocsStats2 = new SearchPhaseController.TopDocsStats(); + SearchPhaseController.TopDocsStats topDocsStats2 = new SearchPhaseController.TopDocsStats(SearchContext.TRACK_TOTAL_HITS_ACCURATE); ScoreDoc[] sortedDocs2 = SearchPhaseController.sortDocs(ignoreFrom, results.asList(), null, topDocsStats2, from, size).scoreDocs; assertEquals(sortedDocs.length, sortedDocs2.length); for (int i = 0; i < sortedDocs.length; i++) { From 8f1b063e0a3516e358caf377c19371ac3e8cd8ff Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Fri, 18 Jan 2019 10:44:59 +0100 Subject: [PATCH 07/13] expand TopDocsStats to include timedOut and earlyTerminated This allows to share more code between SearchResponseMerger and SearchPhaseController --- .../action/search/SearchPhaseController.java | 34 +++++++++---------- .../action/search/SearchResponseMerger.java | 11 ++---- 2 files changed, 20 insertions(+), 25 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index 2380b5b6f32be..027d9d5f10c25 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -170,7 +170,7 @@ static SortedTopDocs sortDocs(boolean ignoreFrom, Collection 0) { setShardIndex(td.topDocs, queryResult.getShardIndex()); @@ -439,12 +439,10 @@ private ReducedQueryPhase reducedQueryPhase(Collection= 0 : "num reduce phases must be >= 0 but was: " + numReducePhases; numReducePhases++; // increment for this phase - boolean timedOut = false; - Boolean terminatedEarly = null; if (queryResults.isEmpty()) { // early terminate we have nothing to reduce final TotalHits totalHits = topDocsStats.getTotalHits(); return new ReducedQueryPhase(totalHits, topDocsStats.fetchHits, topDocsStats.getMaxScore(), - timedOut, terminatedEarly, null, null, null, SortedTopDocs.EMPTY, null, numReducePhases, 0, 0, true); + false, null, null, null, null, SortedTopDocs.EMPTY, null, numReducePhases, 0, 0, true); } final QuerySearchResult firstResult = queryResults.stream().findFirst().get().queryResult(); final boolean hasSuggest = firstResult.suggest() != null; @@ -476,16 +474,6 @@ private ReducedQueryPhase reducedQueryPhase(Collection> suggestion : result.suggest()) { @@ -509,7 +497,7 @@ private ReducedQueryPhase reducedQueryPhase(Collection failures = new ArrayList<>(); @@ -121,10 +119,6 @@ SearchResponse getMergedResponse() { totalShards += searchResponse.getTotalShards(); skippedShards += searchResponse.getSkippedShards(); successfulShards += searchResponse.getSuccessfulShards(); - timedOut = timedOut || searchResponse.isTimedOut(); - if (searchResponse.isTerminatedEarly() != null && searchResponse.isTerminatedEarly()) { - terminatedEarly = true; - } numReducePhases += searchResponse.getNumReducePhases(); Collections.addAll(failures, searchResponse.getShardFailures()); @@ -158,7 +152,8 @@ SearchResponse getMergedResponse() { trackTotalHits = true; } TopDocs topDocs = searchHitsToTopDocs(searchHits, totalHits, shards); - topDocsStats.add(new TopDocsAndMaxScore(topDocs, searchHits.getMaxScore())); + topDocsStats.add(new TopDocsAndMaxScore(topDocs, searchHits.getMaxScore()), + searchResponse.isTimedOut(), searchResponse.isTerminatedEarly()); topDocsList.add(topDocs); } @@ -186,7 +181,7 @@ SearchResponse getMergedResponse() { //make failures ordering consistent with ordinary search and CCS Arrays.sort(shardFailures, FAILURES_COMPARATOR); InternalSearchResponse response = new InternalSearchResponse(mergedSearchHits, reducedAggs, suggest, - new SearchProfileShardResults(profileResults), timedOut, terminatedEarly, numReducePhases); + new SearchProfileShardResults(profileResults), topDocsStats.timedOut, topDocsStats.terminatedEarly, numReducePhases); long tookInMillis = searchTimeProvider.buildTookInMillis(); return new SearchResponse(response, null, totalShards, successfulShards, skippedShards, tookInMillis, shardFailures, clusters); } From f03feae27c09b63edf14b7e822544f23b7ba94fe Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Fri, 18 Jan 2019 10:51:47 +0100 Subject: [PATCH 08/13] improve TODO comment and make field private final --- .../elasticsearch/action/search/SearchResponseMerger.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java b/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java index 62070e899272d..5c96578fb7edd 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java @@ -64,12 +64,13 @@ * Preconditions are that only non final reduction has been performed on each cluster, meaning that buckets have not been pruned locally * and pipeline aggregations have not yet been executed. Also, from+size search hits need to be requested to each cluster. */ -//TODO it may make sense to investigate reusing existing merge code in SearchPhaseController#reducedQueryPhase, the logic is similar -//yet there are substantial differences in terms of the objects exchanged and logic in the sortDocs method. +//TODO it may make sense to integrate the remote clusters responses as a shard response in the initial search phase and ignore hits coming +//from the remote clusters in the fetch phase. This would be identical to the removed QueryAndFetch strategy except that only the remote +//cluster response would have the fetch results. final class SearchResponseMerger { private final int from; private final int size; - int trackTotalHitsUpTo; + private final int trackTotalHitsUpTo; private final SearchTimeProvider searchTimeProvider; private final Clusters clusters; private final Function reduceContextFunction; From a25274593770bba93ef72fdbe8380f64ca99f9eb Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Fri, 18 Jan 2019 10:54:59 +0100 Subject: [PATCH 09/13] remove unused method, factor setShardIndex as a new static method and clarify comment --- .../action/search/SearchResponseMerger.java | 48 ++++++++----------- 1 file changed, 19 insertions(+), 29 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java b/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java index 5c96578fb7edd..2bbaa6e7ca8c4 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java @@ -42,7 +42,6 @@ import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -158,22 +157,8 @@ SearchResponse getMergedResponse() { topDocsList.add(topDocs); } - //now that we've gone through all the hits and we collected all the shards they come from, we can assign shardIndex to each shard - int shardIndex = 0; - for (Map.Entry shard : shards.entrySet()) { - shard.setValue(shardIndex++); - } - //and go through all the scoreDocs from each cluster and set their corresponding shardIndex - for (TopDocs topDocs : topDocsList) { - for (ScoreDoc scoreDoc : topDocs.scoreDocs) { - FieldDocAndSearchHit fieldDocAndSearchHit = (FieldDocAndSearchHit) scoreDoc; - //When hits come from the indices with same names on multiple clusters and same shard identifier, we rely on such indices - //to have a different uuid across multiple clusters. That's how they will get a different shardIndex. - ShardId shardId = fieldDocAndSearchHit.searchHit.getShard().getShardId(); - fieldDocAndSearchHit.shardIndex = shards.get(shardId); - } - } - + //after going through all the hits and collecting all their distinct shards, we can assign shardIndex and set it to the ScoreDocs + setShardIndex(shards, topDocsList); TopDocs topDocs = mergeTopDocs(topDocsList, size, from); SearchHits mergedSearchHits = topDocsToSearchHits(topDocs, topDocsStats); Suggest suggest = groupedSuggestions.isEmpty() ? null : new Suggest(Suggest.reduce(groupedSuggestions)); @@ -254,6 +239,23 @@ private static TopDocs searchHitsToTopDocs(SearchHits searchHits, TotalHits tota return topDocs; } + private static void setShardIndex(Map shards, List topDocsList) { + int shardIndex = 0; + for (Map.Entry shard : shards.entrySet()) { + shard.setValue(shardIndex++); + } + //and go through all the scoreDocs from each cluster and set their corresponding shardIndex + for (TopDocs topDocs : topDocsList) { + for (ScoreDoc scoreDoc : topDocs.scoreDocs) { + FieldDocAndSearchHit fieldDocAndSearchHit = (FieldDocAndSearchHit) scoreDoc; + //When hits come from the indices with same names on multiple clusters and same shard identifier, we rely on such indices + //to have a different uuid across multiple clusters. That's how they will get a different shardIndex. + ShardId shardId = fieldDocAndSearchHit.searchHit.getShard().getShardId(); + fieldDocAndSearchHit.shardIndex = shards.get(shardId); + } + } + } + private static SearchHits topDocsToSearchHits(TopDocs topDocs, TopDocsStats topDocsStats) { SearchHit[] searchHits = new SearchHit[topDocs.scoreDocs.length]; for (int i = 0; i < topDocs.scoreDocs.length; i++) { @@ -276,18 +278,6 @@ private static SearchHits topDocsToSearchHits(TopDocs topDocs, TopDocsStats topD sortFields, collapseField, collapseValues); } - private static void setShardIndex(Collection> shardResults) { - //every group of hits comes from a different shard. When hits come from the same index on multiple clusters and same - //shard identifier, we rely on such indices to have a different uuid across multiple clusters. - int i = 0; - for (List shardHits : shardResults) { - for (FieldDoc shardHit : shardHits) { - shardHit.shardIndex = i; - } - i++; - } - } - private static final class FieldDocAndSearchHit extends FieldDoc { private final SearchHit searchHit; From b0d9f00adb02d6a1305dea3b943a33b2f125a7bf Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Fri, 18 Jan 2019 14:59:12 +0100 Subject: [PATCH 10/13] expand test to also include some simple aggs reduction --- .../search/SearchResponseMergerTests.java | 75 +++++++++++++++++-- 1 file changed, 69 insertions(+), 6 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java index f2e49a109810d..d771d6d9d66e0 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java @@ -31,6 +31,12 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.bucket.range.InternalDateRange; +import org.elasticsearch.search.aggregations.bucket.range.Range; +import org.elasticsearch.search.aggregations.metrics.InternalMax; +import org.elasticsearch.search.aggregations.metrics.Max; import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.profile.ProfileShardResult; @@ -42,6 +48,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; @@ -124,11 +131,16 @@ public void testMergeShardFailures() throws InterruptedException { priorityQueue.add(Tuple.tuple(shardId, failure)); } SearchResponse searchResponse = new SearchResponse(InternalSearchResponse.empty(), null, - 1, 0, 0, 100L, shardSearchFailures, SearchResponse.Clusters.EMPTY); + 1, 1, 0, 100L, shardSearchFailures, SearchResponse.Clusters.EMPTY); addResponse(merger, searchResponse); } awaitResponsesAdded(); - ShardSearchFailure[] shardFailures = merger.getMergedResponse().getShardFailures(); + SearchResponse mergedResponse = merger.getMergedResponse(); + assertEquals(numResponses, mergedResponse.getTotalShards()); + assertEquals(numResponses, mergedResponse.getSuccessfulShards()); + assertEquals(0, mergedResponse.getSkippedShards()); + assertEquals(priorityQueue.size(), mergedResponse.getFailedShards()); + ShardSearchFailure[] shardFailures = mergedResponse.getShardFailures(); assertEquals(priorityQueue.size(), shardFailures.length); for (ShardSearchFailure shardFailure : shardFailures) { ShardSearchFailure expected = priorityQueue.poll().v2(); @@ -150,7 +162,7 @@ public void testMergeShardFailuresNullShardId() throws InterruptedException { expectedFailures.add(shardSearchFailure); } SearchResponse searchResponse = new SearchResponse(InternalSearchResponse.empty(), null, - 1, 0, 0, 100L, shardSearchFailures, SearchResponse.Clusters.EMPTY); + 1, 1, 0, 100L, shardSearchFailures, SearchResponse.Clusters.EMPTY); addResponse(merger, searchResponse); } awaitResponsesAdded(); @@ -173,14 +185,64 @@ public void testMergeProfileResults() throws InterruptedException { addResponse(merger, searchResponse); } awaitResponsesAdded(); - SearchResponse searchResponse = merger.getMergedResponse(); - assertEquals(expectedProfile, searchResponse.getProfileResults()); + SearchResponse mergedResponse = merger.getMergedResponse(); + assertEquals(numResponses, mergedResponse.getTotalShards()); + assertEquals(numResponses, mergedResponse.getSuccessfulShards()); + assertEquals(0, mergedResponse.getSkippedShards()); + assertEquals(0, mergedResponse.getFailedShards()); + assertEquals(0, mergedResponse.getShardFailures().length); + assertEquals(expectedProfile, mergedResponse.getProfileResults()); } - //TODO add tests for suggestions and aggs reduction? + //TODO add tests for suggestions reduction? //TODO do we want to have a specific test for field collapsing to check that the grouping actually works? + public void testMergeAggs() throws InterruptedException { + SearchResponseMerger searchResponseMerger = new SearchResponseMerger(0, 0, 0, new SearchTimeProvider(0, 0, () -> 0), + SearchResponse.Clusters.EMPTY, flag -> new InternalAggregation.ReduceContext(null, null, flag)); + String maxAggName = randomAlphaOfLengthBetween(5, 8); + String rangeAggName = randomAlphaOfLengthBetween(5, 8); + int totalCount = 0; + double maxValue = Double.MIN_VALUE; + for (int i = 0; i < numResponses; i++) { + double value = randomDouble(); + maxValue = Math.max(value, maxValue); + InternalMax max = new InternalMax(maxAggName, value, DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap()); + InternalDateRange.Factory factory = new InternalDateRange.Factory(); + int count = randomIntBetween(1, 1000); + totalCount += count; + InternalDateRange.Bucket bucket = factory.createBucket("bucket", 0, 10000, count, InternalAggregations.EMPTY, + false, DocValueFormat.RAW); + InternalDateRange range = factory.create(rangeAggName, Collections.singletonList(bucket), DocValueFormat.RAW, false, + Collections.emptyList(), Collections.emptyMap()); + InternalAggregations aggs = new InternalAggregations(Arrays.asList(range, max)); + SearchHits searchHits = new SearchHits(new SearchHit[0], null, Float.NaN); + InternalSearchResponse internalSearchResponse = new InternalSearchResponse(searchHits, aggs, null, null, false, null, 1); + SearchResponse searchResponse = new SearchResponse(internalSearchResponse, null, 1, 1, 0, randomLong(), + ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY); + addResponse(searchResponseMerger, searchResponse); + } + awaitResponsesAdded(); + SearchResponse mergedResponse = searchResponseMerger.getMergedResponse(); + + assertEquals(numResponses, mergedResponse.getTotalShards()); + assertEquals(numResponses, mergedResponse.getSuccessfulShards()); + assertEquals(0, mergedResponse.getSkippedShards()); + assertEquals(0, mergedResponse.getFailedShards()); + assertEquals(0, mergedResponse.getShardFailures().length); + assertEquals(0, mergedResponse.getHits().getHits().length); + assertEquals(2, mergedResponse.getAggregations().asList().size()); + Max max = mergedResponse.getAggregations().get(maxAggName); + assertEquals(maxValue, max.getValue(), 0d); + Range range = mergedResponse.getAggregations().get(rangeAggName); + assertEquals(1, range.getBuckets().size()); + Range.Bucket bucket = range.getBuckets().get(0); + assertEquals("0.0", bucket.getFromAsString()); + assertEquals("10000.0", bucket.getToAsString()); + assertEquals(totalCount, bucket.getDocCount()); + } + public void testMergeSearchHits() throws InterruptedException { final long currentRelativeTime = randomLong(); final SearchTimeProvider timeProvider = new SearchTimeProvider(randomLong(), 0, () -> currentRelativeTime); @@ -289,6 +351,7 @@ public void testMergeSearchHits() throws InterruptedException { assertEquals(expectedTotal, searchResponse.getTotalShards()); assertEquals(expectedSuccessful, searchResponse.getSuccessfulShards()); assertEquals(expectedSkipped, searchResponse.getSkippedShards()); + assertEquals(0, searchResponse.getShardFailures().length); assertEquals(expectedReducePhases, searchResponse.getNumReducePhases()); assertEquals(expectedTimedOut, searchResponse.isTimedOut()); assertEquals(expectedTerminatedEarly, searchResponse.isTerminatedEarly()); From aecd028412f0e10c15766f1d58e902bbf3603952 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Fri, 18 Jan 2019 15:34:35 +0100 Subject: [PATCH 11/13] add basic test for suggestions --- .../search/SearchResponseMergerTests.java | 45 +++++++++++++++++-- 1 file changed, 42 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java index d771d6d9d66e0..54ce70792f884 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.search.TransportSearchAction.SearchTimeProvider; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.text.Text; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.DocValueFormat; @@ -42,6 +43,8 @@ import org.elasticsearch.search.profile.ProfileShardResult; import org.elasticsearch.search.profile.SearchProfileShardResults; import org.elasticsearch.search.profile.SearchProfileShardResultsTests; +import org.elasticsearch.search.suggest.Suggest; +import org.elasticsearch.search.suggest.completion.CompletionSuggestion; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.RemoteClusterAware; import org.junit.Before; @@ -194,9 +197,45 @@ public void testMergeProfileResults() throws InterruptedException { assertEquals(expectedProfile, mergedResponse.getProfileResults()); } - //TODO add tests for suggestions reduction? - - //TODO do we want to have a specific test for field collapsing to check that the grouping actually works? + public void testMergeSuggestions() throws InterruptedException { + String suggestionName = randomAlphaOfLengthBetween(4, 8); + boolean skipDuplicates = randomBoolean(); + int size = randomIntBetween(1, 100); + SearchResponseMerger searchResponseMerger = new SearchResponseMerger(0, 0, 0, new SearchTimeProvider(0, 0, () -> 0), + SearchResponse.Clusters.EMPTY, flag -> null); + for (int i = 0; i < numResponses; i++) { + List>> suggestions = + new ArrayList<>(); + CompletionSuggestion completionSuggestion = new CompletionSuggestion(suggestionName, size, skipDuplicates); + CompletionSuggestion.Entry options = new CompletionSuggestion.Entry(new Text("suggest"), 0, 10); + options.addOption(new CompletionSuggestion.Entry.Option(randomInt(), new Text("suggestion"), i, Collections.emptyMap())); + completionSuggestion.addTerm(options); + suggestions.add(completionSuggestion); + Suggest suggest = new Suggest(suggestions); + SearchHits searchHits = new SearchHits(new SearchHit[0], null, Float.NaN); + InternalSearchResponse internalSearchResponse = new InternalSearchResponse(searchHits, null, suggest, null, false, null, 1); + SearchResponse searchResponse = new SearchResponse(internalSearchResponse, null, 1, 1, 0, randomLong(), + ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY); + addResponse(searchResponseMerger, searchResponse); + } + awaitResponsesAdded(); + SearchResponse mergedResponse = searchResponseMerger.getMergedResponse(); + assertEquals(numResponses, mergedResponse.getTotalShards()); + assertEquals(numResponses, mergedResponse.getSuccessfulShards()); + assertEquals(0, mergedResponse.getSkippedShards()); + assertEquals(0, mergedResponse.getFailedShards()); + assertEquals(0, mergedResponse.getShardFailures().length); + Suggest.Suggestion> suggestion = + mergedResponse.getSuggest().getSuggestion(suggestionName); + assertEquals(1, suggestion.getEntries().size()); + Suggest.Suggestion.Entry options = suggestion.getEntries().get(0); + assertEquals(skipDuplicates ? 1 : Math.min(numResponses, size), options.getOptions().size()); + int i = numResponses; + for (Suggest.Suggestion.Entry.Option option : options) { + assertEquals("suggestion", option.getText().string()); + assertEquals(--i, option.getScore(), 0f); + } + } public void testMergeAggs() throws InterruptedException { SearchResponseMerger searchResponseMerger = new SearchResponseMerger(0, 0, 0, new SearchTimeProvider(0, 0, () -> 0), From a8514dc434f0be51c6ee3654404659198e7e27f5 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Fri, 18 Jan 2019 15:53:45 +0100 Subject: [PATCH 12/13] provide clusters when merging the obtained responses, it will only be known once all responses have been obtained from the remote clusters --- .../action/search/SearchResponseMerger.java | 10 ++--- .../search/SearchResponseMergerTests.java | 42 ++++++++++--------- 2 files changed, 27 insertions(+), 25 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java b/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java index 2bbaa6e7ca8c4..84f093f77f052 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java @@ -71,24 +71,22 @@ final class SearchResponseMerger { private final int size; private final int trackTotalHitsUpTo; private final SearchTimeProvider searchTimeProvider; - private final Clusters clusters; private final Function reduceContextFunction; private final List searchResponses = new CopyOnWriteArrayList<>(); - SearchResponseMerger(int from, int size, int trackTotalHitsUpTo, SearchTimeProvider searchTimeProvider, Clusters clusters, + SearchResponseMerger(int from, int size, int trackTotalHitsUpTo, SearchTimeProvider searchTimeProvider, Function reduceContextFunction) { this.from = from; this.size = size; this.trackTotalHitsUpTo = trackTotalHitsUpTo; this.searchTimeProvider = Objects.requireNonNull(searchTimeProvider); - this.clusters = Objects.requireNonNull(clusters); this.reduceContextFunction = Objects.requireNonNull(reduceContextFunction); } /** * Add a search response to the list of responses to be merged together into one. - * Merges currently happen at once when all responses are available and {@link #getMergedResponse()} is called. That may change - * in the future as it's possible to introduce incremental merges as responses come in if necessary. + * Merges currently happen at once when all responses are available and {@link #getMergedResponse(Clusters)} )} is called. + * That may change in the future as it's possible to introduce incremental merges as responses come in if necessary. */ void add(SearchResponse searchResponse) { searchResponses.add(searchResponse); @@ -98,7 +96,7 @@ void add(SearchResponse searchResponse) { * Returns the merged response. To be called once all responses have been added through {@link #add(SearchResponse)} * so that all responses are merged into a single one. */ - SearchResponse getMergedResponse() { + SearchResponse getMergedResponse(Clusters clusters) { assert searchResponses.size() > 1; int totalShards = 0; int skippedShards = 0; diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java index 54ce70792f884..d02b712eaaef3 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java @@ -93,23 +93,22 @@ private void awaitResponsesAdded() throws InterruptedException { public void testMergeTookInMillis() throws InterruptedException { long currentRelativeTime = randomLong(); SearchTimeProvider timeProvider = new SearchTimeProvider(randomLong(), 0, () -> currentRelativeTime); - SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); SearchResponseMerger merger = new SearchResponseMerger(randomIntBetween(0, 1000), randomIntBetween(0, 10000), - SearchContext.TRACK_TOTAL_HITS_ACCURATE, timeProvider, clusters, flag -> null); + SearchContext.TRACK_TOTAL_HITS_ACCURATE, timeProvider, flag -> null); for (int i = 0; i < numResponses; i++) { SearchResponse searchResponse = new SearchResponse(InternalSearchResponse.empty(), null, 1, 1, 0, randomLong(), ShardSearchFailure.EMPTY_ARRAY, SearchResponseTests.randomClusters()); addResponse(merger, searchResponse); } awaitResponsesAdded(); - SearchResponse searchResponse = merger.getMergedResponse(); + SearchResponse searchResponse = merger.getMergedResponse(SearchResponse.Clusters.EMPTY); assertEquals(TimeUnit.NANOSECONDS.toMillis(currentRelativeTime), searchResponse.getTook().millis()); } public void testMergeShardFailures() throws InterruptedException { SearchTimeProvider searchTimeProvider = new SearchTimeProvider(0, 0, () -> 0); SearchResponseMerger merger = new SearchResponseMerger(0, 0, SearchContext.TRACK_TOTAL_HITS_ACCURATE, - searchTimeProvider, SearchResponse.Clusters.EMPTY, flag -> null); + searchTimeProvider, flag -> null); PriorityQueue> priorityQueue = new PriorityQueue<>(Comparator.comparing(Tuple::v1)); int numIndices = numResponses * randomIntBetween(1, 3); Iterator> indicesPerCluster = randomRealisticIndices(numIndices, numResponses).entrySet().iterator(); @@ -138,7 +137,9 @@ public void testMergeShardFailures() throws InterruptedException { addResponse(merger, searchResponse); } awaitResponsesAdded(); - SearchResponse mergedResponse = merger.getMergedResponse(); + SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); + SearchResponse mergedResponse = merger.getMergedResponse(clusters); + assertSame(clusters, mergedResponse.getClusters()); assertEquals(numResponses, mergedResponse.getTotalShards()); assertEquals(numResponses, mergedResponse.getSuccessfulShards()); assertEquals(0, mergedResponse.getSkippedShards()); @@ -154,7 +155,7 @@ public void testMergeShardFailures() throws InterruptedException { public void testMergeShardFailuresNullShardId() throws InterruptedException { SearchTimeProvider searchTimeProvider = new SearchTimeProvider(0, 0, () -> 0); SearchResponseMerger merger = new SearchResponseMerger(0, 0, SearchContext.TRACK_TOTAL_HITS_ACCURATE, - searchTimeProvider, SearchResponse.Clusters.EMPTY, flag -> null); + searchTimeProvider, flag -> null); List expectedFailures = new ArrayList<>(); for (int i = 0; i < numResponses; i++) { int numFailures = randomIntBetween(1, 50); @@ -169,14 +170,14 @@ public void testMergeShardFailuresNullShardId() throws InterruptedException { addResponse(merger, searchResponse); } awaitResponsesAdded(); - ShardSearchFailure[] shardFailures = merger.getMergedResponse().getShardFailures(); + ShardSearchFailure[] shardFailures = merger.getMergedResponse(SearchResponse.Clusters.EMPTY).getShardFailures(); assertThat(Arrays.asList(shardFailures), containsInAnyOrder(expectedFailures.toArray(ShardSearchFailure.EMPTY_ARRAY))); } public void testMergeProfileResults() throws InterruptedException { SearchTimeProvider searchTimeProvider = new SearchTimeProvider(0, 0, () -> 0); SearchResponseMerger merger = new SearchResponseMerger(0, 0, SearchContext.TRACK_TOTAL_HITS_ACCURATE, - searchTimeProvider, SearchResponse.Clusters.EMPTY, flag -> null); + searchTimeProvider, flag -> null); Map expectedProfile = new HashMap<>(); for (int i = 0; i < numResponses; i++) { SearchProfileShardResults profile = SearchProfileShardResultsTests.createTestItem(); @@ -188,7 +189,9 @@ public void testMergeProfileResults() throws InterruptedException { addResponse(merger, searchResponse); } awaitResponsesAdded(); - SearchResponse mergedResponse = merger.getMergedResponse(); + SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); + SearchResponse mergedResponse = merger.getMergedResponse(clusters); + assertSame(clusters, mergedResponse.getClusters()); assertEquals(numResponses, mergedResponse.getTotalShards()); assertEquals(numResponses, mergedResponse.getSuccessfulShards()); assertEquals(0, mergedResponse.getSkippedShards()); @@ -201,8 +204,7 @@ public void testMergeSuggestions() throws InterruptedException { String suggestionName = randomAlphaOfLengthBetween(4, 8); boolean skipDuplicates = randomBoolean(); int size = randomIntBetween(1, 100); - SearchResponseMerger searchResponseMerger = new SearchResponseMerger(0, 0, 0, new SearchTimeProvider(0, 0, () -> 0), - SearchResponse.Clusters.EMPTY, flag -> null); + SearchResponseMerger searchResponseMerger = new SearchResponseMerger(0, 0, 0, new SearchTimeProvider(0, 0, () -> 0), flag -> null); for (int i = 0; i < numResponses; i++) { List>> suggestions = new ArrayList<>(); @@ -219,7 +221,9 @@ public void testMergeSuggestions() throws InterruptedException { addResponse(searchResponseMerger, searchResponse); } awaitResponsesAdded(); - SearchResponse mergedResponse = searchResponseMerger.getMergedResponse(); + SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); + SearchResponse mergedResponse = searchResponseMerger.getMergedResponse(clusters); + assertSame(clusters, mergedResponse.getClusters()); assertEquals(numResponses, mergedResponse.getTotalShards()); assertEquals(numResponses, mergedResponse.getSuccessfulShards()); assertEquals(0, mergedResponse.getSkippedShards()); @@ -239,7 +243,7 @@ public void testMergeSuggestions() throws InterruptedException { public void testMergeAggs() throws InterruptedException { SearchResponseMerger searchResponseMerger = new SearchResponseMerger(0, 0, 0, new SearchTimeProvider(0, 0, () -> 0), - SearchResponse.Clusters.EMPTY, flag -> new InternalAggregation.ReduceContext(null, null, flag)); + flag -> new InternalAggregation.ReduceContext(null, null, flag)); String maxAggName = randomAlphaOfLengthBetween(5, 8); String rangeAggName = randomAlphaOfLengthBetween(5, 8); int totalCount = 0; @@ -263,8 +267,9 @@ public void testMergeAggs() throws InterruptedException { addResponse(searchResponseMerger, searchResponse); } awaitResponsesAdded(); - SearchResponse mergedResponse = searchResponseMerger.getMergedResponse(); - + SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); + SearchResponse mergedResponse = searchResponseMerger.getMergedResponse(clusters); + assertSame(clusters, mergedResponse.getClusters()); assertEquals(numResponses, mergedResponse.getTotalShards()); assertEquals(numResponses, mergedResponse.getSuccessfulShards()); assertEquals(0, mergedResponse.getSkippedShards()); @@ -285,7 +290,6 @@ public void testMergeAggs() throws InterruptedException { public void testMergeSearchHits() throws InterruptedException { final long currentRelativeTime = randomLong(); final SearchTimeProvider timeProvider = new SearchTimeProvider(randomLong(), 0, () -> currentRelativeTime); - final SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); final int size = randomIntBetween(0, 100); final int from = size > 0 ? randomIntBetween(0, 100) : 0; final int requestedSize = from + size; @@ -316,8 +320,7 @@ public void testMergeSearchHits() throws InterruptedException { TotalHits.Relation totalHitsRelation = randomTrackTotalHits.v2(); PriorityQueue priorityQueue = new PriorityQueue<>(new SearchHitComparator(sortFields)); - SearchResponseMerger searchResponseMerger = new SearchResponseMerger(from, size, trackTotalHitsUpTo, - timeProvider, clusters, flag -> null); + SearchResponseMerger searchResponseMerger = new SearchResponseMerger(from, size, trackTotalHitsUpTo, timeProvider, flag -> null); TotalHits expectedTotalHits = null; int expectedTotal = 0; @@ -384,7 +387,8 @@ public void testMergeSearchHits() throws InterruptedException { awaitResponsesAdded(); - SearchResponse searchResponse = searchResponseMerger.getMergedResponse(); + final SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); + SearchResponse searchResponse = searchResponseMerger.getMergedResponse(clusters); assertEquals(TimeUnit.NANOSECONDS.toMillis(currentRelativeTime), searchResponse.getTook().millis()); assertEquals(expectedTotal, searchResponse.getTotalShards()); From 16cba8a9da1863243e41b12344702f0847c9129f Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Fri, 18 Jan 2019 16:10:32 +0100 Subject: [PATCH 13/13] expand javadocs and add limitations --- .../action/search/SearchResponseMerger.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java b/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java index 84f093f77f052..b146d42c0d2e6 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java @@ -61,7 +61,16 @@ * The CCS coordinating node sends one search request per remote cluster involved and gets one search response back from each one of them. * Such responses contain all the info to be able to perform an additional reduction and return results back to the user. * Preconditions are that only non final reduction has been performed on each cluster, meaning that buckets have not been pruned locally - * and pipeline aggregations have not yet been executed. Also, from+size search hits need to be requested to each cluster. + * and pipeline aggregations have not yet been executed. Also, from+size search hits need to be requested to each cluster and such results + * have all already been fetched downstream. + * This approach consists of a different trade-off compared to ordinary cross-cluster search where we fan out to all the shards, no matter + * whether they belong to the local or the remote cluster. Assuming that there commonly is network latency when communicating with remote + * clusters, limiting the number of requests to one per cluster is beneficial, and outweighs the downside of fetching many more hits than + * needed downstream and returning bigger responses to the coordinating node. + * Known limitations: + * - scroll requests are not supported + * - field collapsing is supported, but whenever inner_hits are requested, they will be retrieved by each cluster locally after the fetch + * phase, through the {@link ExpandSearchPhase}. Such inner_hits are not merged together as part of hits reduction. */ //TODO it may make sense to integrate the remote clusters responses as a shard response in the initial search phase and ignore hits coming //from the remote clusters in the fetch phase. This would be identical to the removed QueryAndFetch strategy except that only the remote