Skip to content

Commit

Permalink
Add additional shards routing info in ShardSearchRequest
Browse files Browse the repository at this point in the history
This commit adds two new methods to ShardSearchRequest:
 * #numberOfShardsIndex() that returns the number of shards of this index
   that participates in the request.
 * #remapShardId() that returns the remapped shard id of this shard for this request.
   The remapped shard id is the id of the requested shard among all shards
   of this index that are part of the request. Note that the remapped shard id
   is equal to the original shard id if all shards of this index are part of the request.

These informations are useful when the _search is executed with a preference (or a routing) that
restricts the number of shards requested for an index.
This change allows to fix a bug in sliced scrolls executed with a preference (or a routing).
Instead of computing the slice query from the total number of shards in the index, this change allows to
compute this number from the number of shards per index that participates in the request.

Fixes elastic#27550
  • Loading branch information
jimczi committed Apr 16, 2018
1 parent 62e33ee commit b45f64e
Show file tree
Hide file tree
Showing 15 changed files with 373 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.transport.Transport;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -128,17 +129,17 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha
onPhaseFailure(currentPhase, "all shards failed", cause);
} else {
Boolean allowPartialResults = request.allowPartialSearchResults();
assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";
assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";
if (allowPartialResults == false && shardFailures.get() != null ){
if (logger.isDebugEnabled()) {
final ShardOperationFailedException[] shardSearchFailures = ExceptionsHelper.groupBy(buildShardFailures());
Throwable cause = shardSearchFailures.length == 0 ? null :
ElasticsearchException.guessRootCauses(shardSearchFailures[0].getCause())[0];
logger.debug(() -> new ParameterizedMessage("{} shards failed for phase: [{}]",
logger.debug(() -> new ParameterizedMessage("{} shards failed for phase: [{}]",
shardSearchFailures.length, getName()), cause);
}
onPhaseFailure(currentPhase, "Partial shards failure", null);
} else {
onPhaseFailure(currentPhase, "Partial shards failure", null);
} else {
if (logger.isTraceEnabled()) {
final String resultsFrom = results.getSuccessfulResults()
.map(r -> r.getSearchShardTarget().toString()).collect(Collectors.joining(","));
Expand Down Expand Up @@ -271,14 +272,14 @@ public final SearchRequest getRequest() {

@Override
public final SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId) {

ShardSearchFailure[] failures = buildShardFailures();
Boolean allowPartialResults = request.allowPartialSearchResults();
assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";
if (allowPartialResults == false && failures.length > 0){
raisePhaseFailure(new SearchPhaseExecutionException("", "Shard failures", null, failures));
}
raisePhaseFailure(new SearchPhaseExecutionException("", "Shard failures", null, failures));
}

return new SearchResponse(internalSearchResponse, scrollId, getNumShards(), successfulOps.get(),
skippedOps.get(), buildTookInMillis(), failures, clusters);
}
Expand Down Expand Up @@ -318,8 +319,11 @@ public final ShardSearchTransportRequest buildShardSearchRequest(SearchShardIter
AliasFilter filter = aliasFilter.get(shardIt.shardId().getIndex().getUUID());
assert filter != null;
float indexBoost = concreteIndexBoosts.getOrDefault(shardIt.shardId().getIndex().getUUID(), DEFAULT_INDEX_BOOST);
return new ShardSearchTransportRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), getNumShards(),
filter, indexBoost, timeProvider.getAbsoluteStartMillis(), clusterAlias);
int[] indexShards = getIndexShards(shardIt.shardId().getIndex());
int remapShardId = Arrays.binarySearch(indexShards, shardIt.shardId().getId());
assert remapShardId >= 0;
return new ShardSearchTransportRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), remapShardId,
indexShards.length, getNumShards(), filter, indexBoost, timeProvider.getAbsoluteStartMillis(), clusterAlias);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,18 @@
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.Index;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -131,7 +134,7 @@ public final void run() throws IOException {
if (shardsIts.size() > 0) {
int maxConcurrentShardRequests = Math.min(this.maxConcurrentShardRequests, shardsIts.size());
final boolean success = shardExecutionIndex.compareAndSet(0, maxConcurrentShardRequests);
assert success;
assert success;
assert request.allowPartialSearchResults() != null : "SearchRequest missing setting for allowPartialSearchResults";
if (request.allowPartialSearchResults() == false) {
final StringBuilder missingShards = new StringBuilder();
Expand All @@ -140,7 +143,7 @@ public final void run() throws IOException {
final SearchShardIterator shardRoutings = shardsIts.get(index);
if (shardRoutings.size() == 0) {
if(missingShards.length() >0 ){
missingShards.append(", ");
missingShards.append(", ");
}
missingShards.append(shardRoutings.shardId());
}
Expand Down Expand Up @@ -377,4 +380,18 @@ protected void skipShard(SearchShardIterator iterator) {
successfulShardExecution(iterator);
}

/**
* Returns the list of shard ids in the request that match the provided {@link Index}.
*/
protected int[] getIndexShards(Index index) {
List<Integer> shards = new ArrayList<>();
for (ShardIterator it : shardsIts) {
if (index.equals(it.shardId().getIndex())) {
shards.add(it.shardId().getId());
}
}
Collections.sort(shards);
return shards.stream().mapToInt((i) -> i).toArray();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.Counter;
import org.elasticsearch.Version;
import org.elasticsearch.action.search.SearchTask;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.Nullable;
Expand Down Expand Up @@ -120,6 +121,7 @@ final class DefaultSearchContext extends SearchContext {
// filter for sliced scroll
private SliceBuilder sliceBuilder;
private SearchTask task;
private final Version minNodeVersion;


/**
Expand Down Expand Up @@ -154,7 +156,7 @@ final class DefaultSearchContext extends SearchContext {

DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarget shardTarget, Engine.Searcher engineSearcher,
IndexService indexService, IndexShard indexShard, BigArrays bigArrays, Counter timeEstimateCounter,
TimeValue timeout, FetchPhase fetchPhase, String clusterAlias) {
TimeValue timeout, FetchPhase fetchPhase, String clusterAlias, Version minNodeVersion) {
this.id = id;
this.request = request;
this.fetchPhase = fetchPhase;
Expand All @@ -171,6 +173,7 @@ final class DefaultSearchContext extends SearchContext {
this.searcher = new ContextIndexSearcher(engineSearcher, indexService.cache().query(), indexShard.getQueryCachingPolicy());
this.timeEstimateCounter = timeEstimateCounter;
this.timeout = timeout;
this.minNodeVersion = minNodeVersion;
queryShardContext = indexService.newQueryShardContext(request.shardId().id(), searcher.getIndexReader(), request::nowInMillis,
clusterAlias);
queryShardContext.setTypes(request.types());
Expand Down Expand Up @@ -278,8 +281,7 @@ && new NestedHelper(mapperService()).mightMatchNestedDocs(query)
}

if (sliceBuilder != null) {
filters.add(sliceBuilder.toFilter(queryShardContext, shardTarget().getShardId().getId(),
queryShardContext.getIndexSettings().getNumberOfShards()));
filters.add(sliceBuilder.toFilter(queryShardContext, remapShardId(), numberOfIndexShards(), minNodeVersion));
}

if (filters.isEmpty()) {
Expand Down Expand Up @@ -335,6 +337,14 @@ public int numberOfShards() {
return request.numberOfShards();
}

public int numberOfIndexShards() {
return request.numberOfIndexShards();
}

public int remapShardId() {
return request.remapShardId();
}

@Override
public float queryBoost() {
return queryBoost;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ private DefaultSearchContext createSearchContext(ShardSearchRequest request, Tim

final DefaultSearchContext searchContext = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget,
engineSearcher, indexService, indexShard, bigArrays, threadPool.estimatedTimeInMillisCounter(), timeout, fetchPhase,
request.getClusterAlias());
request.getClusterAlias(), clusterService.state().nodes().getMinNodeVersion());
boolean success = false;
try {
// we clone the query shard context here just for rewriting otherwise we
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public class ShardSearchLocalRequest implements ShardSearchRequest {

private String clusterAlias;
private ShardId shardId;
private int remapShardId;
private int numberOfIndexShards;
private int numberOfShards;
private SearchType searchType;
private Scroll scroll;
Expand All @@ -80,10 +82,10 @@ public class ShardSearchLocalRequest implements ShardSearchRequest {
ShardSearchLocalRequest() {
}

ShardSearchLocalRequest(SearchRequest searchRequest, ShardId shardId, int numberOfShards,
ShardSearchLocalRequest(SearchRequest searchRequest, ShardId shardId, int remapShardId, int numberOfIndexShards, int numberOfShards,
AliasFilter aliasFilter, float indexBoost, long nowInMillis, String clusterAlias) {
this(shardId, numberOfShards, searchRequest.searchType(),
searchRequest.source(), searchRequest.types(), searchRequest.requestCache(), aliasFilter, indexBoost,
this(shardId, remapShardId, numberOfIndexShards, numberOfShards, searchRequest.searchType(),
searchRequest.source(), searchRequest.types(), searchRequest.requestCache(), aliasFilter, indexBoost,
searchRequest.allowPartialSearchResults());
// If allowPartialSearchResults is unset (ie null), the cluster-level default should have been substituted
// at this stage. Any NPEs in the above are therefore an error in request preparation logic.
Expand All @@ -101,9 +103,12 @@ public ShardSearchLocalRequest(ShardId shardId, String[] types, long nowInMillis
indexBoost = 1.0f;
}

public ShardSearchLocalRequest(ShardId shardId, int numberOfShards, SearchType searchType, SearchSourceBuilder source, String[] types,
Boolean requestCache, AliasFilter aliasFilter, float indexBoost, boolean allowPartialSearchResults) {
public ShardSearchLocalRequest(ShardId shardId, int remapShardId, int numberOfIndexShards, int numberOfShards,
SearchType searchType, SearchSourceBuilder source, String[] types,
Boolean requestCache, AliasFilter aliasFilter, float indexBoost, boolean allowPartialSearchResults) {
this.shardId = shardId;
this.remapShardId = remapShardId;
this.numberOfIndexShards = numberOfIndexShards;
this.numberOfShards = numberOfShards;
this.searchType = searchType;
this.source = source;
Expand All @@ -114,7 +119,6 @@ public ShardSearchLocalRequest(ShardId shardId, int numberOfShards, SearchType s
this.allowPartialSearchResults = allowPartialSearchResults;
}


@Override
public ShardId shardId() {
return shardId;
Expand Down Expand Up @@ -150,6 +154,16 @@ public int numberOfShards() {
return numberOfShards;
}

@Override
public int numberOfIndexShards() {
return numberOfIndexShards;
}

@Override
public int remapShardId() {
return remapShardId;
}

@Override
public SearchType searchType() {
return searchType;
Expand All @@ -169,12 +183,12 @@ public long nowInMillis() {
public Boolean requestCache() {
return requestCache;
}

@Override
public Boolean allowPartialSearchResults() {
return allowPartialSearchResults;
}


@Override
public Scroll scroll() {
Expand All @@ -199,6 +213,14 @@ protected void innerReadFrom(StreamInput in) throws IOException {
shardId = ShardId.readShardId(in);
searchType = SearchType.fromId(in.readByte());
numberOfShards = in.readVInt();
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
remapShardId = in.readVInt();
numberOfIndexShards = in.readVInt();
assert remapShardId != -1 && numberOfIndexShards != -1;
} else {
remapShardId = -1;
numberOfIndexShards = -1;
}
scroll = in.readOptionalWriteable(Scroll::new);
source = in.readOptionalWriteable(SearchSourceBuilder::new);
types = in.readStringArray();
Expand Down Expand Up @@ -232,6 +254,10 @@ protected void innerWriteTo(StreamOutput out, boolean asKey) throws IOException
out.writeByte(searchType.id());
if (!asKey) {
out.writeVInt(numberOfShards);
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeVInt(remapShardId);
out.writeVInt(numberOfIndexShards);
}
}
out.writeOptionalWriteable(scroll);
out.writeOptionalWriteable(source);
Expand All @@ -250,7 +276,7 @@ protected void innerWriteTo(StreamOutput out, boolean asKey) throws IOException
if (out.getVersion().onOrAfter(Version.V_6_3_0)) {
out.writeOptionalBoolean(allowPartialSearchResults);
}

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ public interface ShardSearchRequest {

ShardId shardId();

/**
* Returns the remapped shard id of the requested shard for this request
* or -1 if this information is not available.
* The remapped shard id is the id of the requested shard among all shards
* of this index that are part of the request. Note that the remapped shard id
* is equal to the original shard id if all shards of this index are part of the request.
*/
int remapShardId();

String[] types();

SearchSourceBuilder source();
Expand All @@ -59,6 +68,15 @@ public interface ShardSearchRequest {

void source(SearchSourceBuilder source);

/**
* Returns the number of shards of this index ({@link ShardId#getIndex()}) that participates in the request
* or -1 if this information is not available.
*/
int numberOfIndexShards();

/**
* Returns the number of shards that participates in the request.
*/
int numberOfShards();

SearchType searchType();
Expand All @@ -68,7 +86,7 @@ public interface ShardSearchRequest {
long nowInMillis();

Boolean requestCache();

Boolean allowPartialSearchResults();

Scroll scroll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,11 @@ public class ShardSearchTransportRequest extends TransportRequest implements Sha
public ShardSearchTransportRequest(){
}

public ShardSearchTransportRequest(OriginalIndices originalIndices, SearchRequest searchRequest, ShardId shardId, int numberOfShards,
AliasFilter aliasFilter, float indexBoost, long nowInMillis, String clusterAlias) {
this.shardSearchLocalRequest = new ShardSearchLocalRequest(searchRequest, shardId, numberOfShards, aliasFilter, indexBoost,
public ShardSearchTransportRequest(OriginalIndices originalIndices, SearchRequest searchRequest, ShardId shardId, int remapShardId,
int numberOfIndexShards, int numberOfShards, AliasFilter aliasFilter, float indexBoost,
long nowInMillis, String clusterAlias) {
this.shardSearchLocalRequest = new ShardSearchLocalRequest(searchRequest, shardId, remapShardId,
numberOfIndexShards, numberOfShards, aliasFilter, indexBoost,
nowInMillis, clusterAlias);
this.originalIndices = originalIndices;
}
Expand Down Expand Up @@ -102,6 +104,11 @@ public ShardId shardId() {
return shardSearchLocalRequest.shardId();
}

@Override
public int remapShardId() {
return shardSearchLocalRequest.remapShardId();
}

@Override
public String[] types() {
return shardSearchLocalRequest.types();
Expand Down Expand Up @@ -132,6 +139,11 @@ public int numberOfShards() {
return shardSearchLocalRequest.numberOfShards();
}

@Override
public int numberOfIndexShards() {
return shardSearchLocalRequest.numberOfIndexShards();
}

@Override
public SearchType searchType() {
return shardSearchLocalRequest.searchType();
Expand All @@ -151,11 +163,11 @@ public long nowInMillis() {
public Boolean requestCache() {
return shardSearchLocalRequest.requestCache();
}

@Override
public Boolean allowPartialSearchResults() {
return shardSearchLocalRequest.allowPartialSearchResults();
}
}

@Override
public Scroll scroll() {
Expand Down
Loading

0 comments on commit b45f64e

Please sign in to comment.