Skip to content

Commit

Permalink
Disable inter-segment concurrency when sorting by field
Browse files Browse the repository at this point in the history
The way multiple slices execute when sorting by field is not optimized
and can lead to a slowdown. We will dig deeper and potentially re-enable
once we have addressed that.

When sorting by _doc or _shard_doc, and potentially by _timestamp, there
will be no gain with inter-segment concurrency.

Relates to elastic#101230
  • Loading branch information
javanna committed Oct 30, 2023
1 parent d5c9c7c commit ba201ec
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2107,6 +2107,7 @@ public boolean supportsParallelCollection() {
if (profile) return false;

if (sorts != null) {
// the implicit sorting is by _score, which supports parallel collection
for (SortBuilder<?> sortBuilder : sorts) {
if (sortBuilder.supportsParallelCollection() == false) return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,4 +170,9 @@ public TransportVersion getMinimalSupportedVersion() {
public ScoreSortBuilder rewrite(QueryRewriteContext ctx) throws IOException {
return this;
}

@Override
public boolean supportsParallelCollection() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -502,9 +502,4 @@ public ScriptSortBuilder rewrite(QueryRewriteContext ctx) throws IOException {
}
return new ScriptSortBuilder(this).setNestedSort(rewrite);
}

@Override
public boolean supportsParallelCollection() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,6 @@ public String toString() {
}

public boolean supportsParallelCollection() {
return true;
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@
import org.elasticsearch.search.query.NonCountingTermQuery;
import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.tasks.TaskCancelHelper;
import org.elasticsearch.tasks.TaskCancelledException;
Expand Down Expand Up @@ -1960,7 +1962,7 @@ public void testEnableSearchWorkerThreads() throws IOException {
public void testDetermineMaximumNumberOfSlices() {
IndexService indexService = createIndex("index", Settings.EMPTY);
IndexShard indexShard = indexService.getShard(0);
ShardSearchRequest request = new ShardSearchRequest(
ShardSearchRequest parallelReq = new ShardSearchRequest(
OriginalIndices.NONE,
new SearchRequest().allowPartialSearchResults(randomBoolean()),
indexShard.shardId(),
Expand All @@ -1971,6 +1973,18 @@ public void testDetermineMaximumNumberOfSlices() {
System.currentTimeMillis(),
null
);
ShardSearchRequest singleSliceReq = new ShardSearchRequest(
OriginalIndices.NONE,
new SearchRequest().allowPartialSearchResults(randomBoolean())
.source(new SearchSourceBuilder().sort(SortBuilders.fieldSort(FieldSortBuilder.DOC_FIELD_NAME))),
indexShard.shardId(),
0,
indexService.numberOfShards(),
AliasFilter.EMPTY,
1f,
System.currentTimeMillis(),
null
);
int executorPoolSize = randomIntBetween(1, 100);
ExecutorService threadPoolExecutor = EsExecutors.newFixed(
"test",
Expand All @@ -1984,10 +1998,12 @@ public void testDetermineMaximumNumberOfSlices() {

SearchService service = getInstanceFromNode(SearchService.class);
{
assertEquals(executorPoolSize, service.determineMaximumNumberOfSlices(threadPoolExecutor, request, ResultsType.DFS));
assertEquals(1, service.determineMaximumNumberOfSlices(null, request, ResultsType.DFS));
assertEquals(executorPoolSize, service.determineMaximumNumberOfSlices(threadPoolExecutor, request, ResultsType.QUERY));
assertEquals(1, service.determineMaximumNumberOfSlices(notThreadPoolExecutor, request, ResultsType.DFS));
assertEquals(executorPoolSize, service.determineMaximumNumberOfSlices(threadPoolExecutor, parallelReq, ResultsType.DFS));
assertEquals(executorPoolSize, service.determineMaximumNumberOfSlices(threadPoolExecutor, singleSliceReq, ResultsType.DFS));
assertEquals(1, service.determineMaximumNumberOfSlices(null, parallelReq, ResultsType.DFS));
assertEquals(executorPoolSize, service.determineMaximumNumberOfSlices(threadPoolExecutor, parallelReq, ResultsType.QUERY));
assertEquals(1, service.determineMaximumNumberOfSlices(threadPoolExecutor, singleSliceReq, ResultsType.QUERY));
assertEquals(1, service.determineMaximumNumberOfSlices(notThreadPoolExecutor, parallelReq, ResultsType.DFS));
}
try {
ClusterUpdateSettingsResponse response = client().admin()
Expand All @@ -1997,11 +2013,11 @@ public void testDetermineMaximumNumberOfSlices() {
.get();
assertTrue(response.isAcknowledged());
{
assertEquals(executorPoolSize, service.determineMaximumNumberOfSlices(threadPoolExecutor, request, ResultsType.DFS));
assertEquals(1, service.determineMaximumNumberOfSlices(null, request, ResultsType.DFS));
assertEquals(1, service.determineMaximumNumberOfSlices(threadPoolExecutor, request, ResultsType.QUERY));
assertEquals(1, service.determineMaximumNumberOfSlices(null, request, ResultsType.QUERY));
assertEquals(1, service.determineMaximumNumberOfSlices(notThreadPoolExecutor, request, ResultsType.DFS));
assertEquals(executorPoolSize, service.determineMaximumNumberOfSlices(threadPoolExecutor, parallelReq, ResultsType.DFS));
assertEquals(1, service.determineMaximumNumberOfSlices(null, parallelReq, ResultsType.DFS));
assertEquals(1, service.determineMaximumNumberOfSlices(threadPoolExecutor, parallelReq, ResultsType.QUERY));
assertEquals(1, service.determineMaximumNumberOfSlices(null, parallelReq, ResultsType.QUERY));
assertEquals(1, service.determineMaximumNumberOfSlices(notThreadPoolExecutor, parallelReq, ResultsType.DFS));
}
} finally {
// reset original default setting
Expand All @@ -2011,8 +2027,8 @@ public void testDetermineMaximumNumberOfSlices() {
.setPersistentSettings(Settings.builder().putNull(QUERY_PHASE_PARALLEL_COLLECTION_ENABLED.getKey()).build())
.get();
{
assertEquals(executorPoolSize, service.determineMaximumNumberOfSlices(threadPoolExecutor, request, ResultsType.DFS));
assertEquals(executorPoolSize, service.determineMaximumNumberOfSlices(threadPoolExecutor, request, ResultsType.QUERY));
assertEquals(executorPoolSize, service.determineMaximumNumberOfSlices(threadPoolExecutor, parallelReq, ResultsType.DFS));
assertEquals(executorPoolSize, service.determineMaximumNumberOfSlices(threadPoolExecutor, parallelReq, ResultsType.QUERY));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -942,6 +942,34 @@ public void testSupportsParallelCollection() {
);
assertFalse(searchSourceBuilder.supportsParallelCollection());
}
{
SearchSourceBuilder searchSourceBuilder = newSearchSourceBuilder.get();
searchSourceBuilder.sort(SortBuilders.scoreSort().order(randomFrom(SortOrder.values())));
assertTrue(searchSourceBuilder.supportsParallelCollection());
searchSourceBuilder.sort(SortBuilders.fieldSort("field"));
assertFalse(searchSourceBuilder.supportsParallelCollection());
}
{
SearchSourceBuilder searchSourceBuilder = newSearchSourceBuilder.get();
searchSourceBuilder.sort(SortBuilders.scoreSort().order(randomFrom(SortOrder.values())));
assertTrue(searchSourceBuilder.supportsParallelCollection());
searchSourceBuilder.sort(SortBuilders.geoDistanceSort("field", 0, 0));
assertFalse(searchSourceBuilder.supportsParallelCollection());
}
{
SearchSourceBuilder searchSourceBuilder = newSearchSourceBuilder.get();
searchSourceBuilder.sort(SortBuilders.scoreSort().order(randomFrom(SortOrder.values())));
assertTrue(searchSourceBuilder.supportsParallelCollection());
searchSourceBuilder.sort(SortBuilders.pitTiebreaker());
assertFalse(searchSourceBuilder.supportsParallelCollection());
}
{
SearchSourceBuilder searchSourceBuilder = newSearchSourceBuilder.get();
searchSourceBuilder.sort(SortBuilders.scoreSort().order(randomFrom(SortOrder.values())));
assertTrue(searchSourceBuilder.supportsParallelCollection());
searchSourceBuilder.sort(SortBuilders.fieldSort(FieldSortBuilder.DOC_FIELD_NAME));
assertFalse(searchSourceBuilder.supportsParallelCollection());
}
{
SearchSourceBuilder searchSourceBuilder = newSearchSourceBuilder.get();
searchSourceBuilder.profile(true);
Expand Down

0 comments on commit ba201ec

Please sign in to comment.