Skip to content

Commit

Permalink
Introduce executor for concurrent search (elastic#98204)
Browse files Browse the repository at this point in the history
This commit enables concurrent search execution in the DFS phase, which is going to improve resource usage as well as performance of knn queries which benefit from both concurrent rewrite and collection.

We will enable concurrent execution for the query phase in a subsequent commit. While this commit does not introduce parallelism for the query phase, it introduces offloading sequential computation to the newly introduced executor. This is true both for situations where a single slice needs to be searched, as well as scenarios where a specific request does not support concurrency (currently only DFS phase does regardless of the request). Sequential collection is not offloaded only if the request includes aggregations that don't support offloading: composite, nested and cardinality as their post collection method must be executed in the same thread as the collection or we'll trip a lucene assertion that verifies that doc_values are pulled and consumed from the same thread.

## Technical details

This commit introduces a secondary executor, used exclusively to execute the concurrent bits of search. The search threads are still the ones that coordinate the search (where the caller search will originate from), but the actual work will be offloaded to the newly introduced executor.

We are offloading not only parallel execution but also sequential execution, to make the workload more predictable, as it would be surprising to have bits of search executed in either of the two thread pools. Also, that would introduce the possibility to suddenly run a higher amount of heavy operations overall (some in the caller thread and some in the separate threads), which could overload the system as well as make sizing of thread pools more difficult.

Note that fetch, together with other actions,  is still executed in the search thread pool. This commit does not make the search thread pool merely a coordinating only thread pool, It does so only for what concerns the IndexSearcher#search operation itself, which is though a big portion of the different phases of search API execution.

Given that the searcher blocks waiting for all tasks to be completed, we take a simple approach of introducing a thread pool executor that has the same size as the existing search thread pool but relies on an unbounded queue. This simplifies handling of thread pool queue and rejections. In fact, we'd like to guarantee that the secondary thread pool won't reject, and delegate queuing entirely to the search thread pool which is the entry point for every search operation anyway. The principle behind this is that if you got a slot in the search thread pool, you should be able to complete your search, and rather quickly.

As part of this commit we are also introducing the ability to cancel tasks that have not started yet, so that if any task throws an exception, other tasks are prevented from starting needless computation.

Relates to elastic#80693
Relates to elastic#90700
  • Loading branch information
javanna authored and csoulios committed Aug 18, 2023
1 parent 7e78d5b commit a30bee2
Show file tree
Hide file tree
Showing 31 changed files with 893 additions and 481 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/98204.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 98204
summary: Introduce executor for concurrent search
area: Search
type: enhancement
issues: []
10 changes: 8 additions & 2 deletions docs/reference/modules/threadpool.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,16 @@ There are several thread pools, but the important ones include:

[[search-threadpool]]
`search`::
For count/search/suggest operations. Thread pool type is
`fixed` with a size of `int((`<<node.processors,
For coordination of count/search operations at the shard level whose computation
is offloaded to the search_worker thread pool. Used also by fetch and other search
related operations Thread pool type is `fixed` with a size of `int((`<<node.processors,
`# of allocated processors`>>`pass:[ * ]3) / 2) + 1`, and queue_size of `1000`.

`search_worker`::
For the heavy workload of count/search operations that may be executed concurrently
across segments within the same shard when possible. Thread pool type is `fixed`
with a size of `int((`<<node.processors, `# of allocated processors`>>`pass:[ * ]3) / 2) + 1`, and unbounded queue_size .

[[search-throttled]]`search_throttled`::
For count/search/suggest/get operations on `search_throttled indices`.
Thread pool type is `fixed` with a size of `1`, and queue_size of `100`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ public void apply(Settings value, Settings current, Settings previous) {
ResourceWatcherService.RELOAD_INTERVAL_LOW,
SearchModule.INDICES_MAX_CLAUSE_COUNT_SETTING,
SearchModule.INDICES_MAX_NESTED_DEPTH_SETTING,
SearchModule.SEARCH_CONCURRENCY_ENABLED,
SearchService.SEARCH_WORKER_THREADS_ENABLED,
ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING,
ThreadPool.LATE_TIME_INTERVAL_WARN_THRESHOLD_SETTING,
ThreadPool.SLOW_SCHEDULER_TASK_WARN_THRESHOLD_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.function.LongSupplier;

final class DefaultSearchContext extends SearchContext {
Expand Down Expand Up @@ -136,10 +137,11 @@ final class DefaultSearchContext extends SearchContext {
SearchShardTarget shardTarget,
LongSupplier relativeTimeSupplier,
TimeValue timeout,
int minimumDocsPerSlice,
FetchPhase fetchPhase,
boolean lowLevelCancellation,
boolean parallelize
Executor executor,
int maximumNumberOfSlices,
int minimumDocsPerSlice
) throws IOException {
this.readerContext = readerContext;
this.request = request;
Expand All @@ -150,19 +152,26 @@ final class DefaultSearchContext extends SearchContext {
this.indexShard = readerContext.indexShard();

Engine.Searcher engineSearcher = readerContext.acquireSearcher("search");
this.searcher = new ContextIndexSearcher(
engineSearcher.getIndexReader(),
engineSearcher.getSimilarity(),
engineSearcher.getQueryCache(),
engineSearcher.getQueryCachingPolicy(),
minimumDocsPerSlice,
lowLevelCancellation,
// TODO not set the for now, this needs a special thread pool and can be enabled after its introduction
// parallelize
// ? (EsThreadPoolExecutor) this.indexService.getThreadPool().executor(ThreadPool.Names.CONCURRENT_COLLECTION_TBD)
// : null,
null
);
if (executor == null) {
this.searcher = new ContextIndexSearcher(
engineSearcher.getIndexReader(),
engineSearcher.getSimilarity(),
engineSearcher.getQueryCache(),
engineSearcher.getQueryCachingPolicy(),
lowLevelCancellation
);
} else {
this.searcher = new ContextIndexSearcher(
engineSearcher.getIndexReader(),
engineSearcher.getSimilarity(),
engineSearcher.getQueryCache(),
engineSearcher.getQueryCachingPolicy(),
lowLevelCancellation,
executor,
maximumNumberOfSlices,
minimumDocsPerSlice
);
}
releasables.addAll(List.of(engineSearcher, searcher));

this.relativeTimeSupplier = relativeTimeSupplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,13 +281,6 @@ public class SearchModule {
Setting.Property.NodeScope
);

public static final Setting<Boolean> SEARCH_CONCURRENCY_ENABLED = Setting.boolSetting(
"search.concurrency_enabled",
true,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private final Map<String, Highlighter> highlighters;

private final List<FetchSubPhase> fetchSubPhases = new ArrayList<>();
Expand Down
42 changes: 28 additions & 14 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -141,7 +143,6 @@
import static org.elasticsearch.core.TimeValue.timeValueMillis;
import static org.elasticsearch.core.TimeValue.timeValueMinutes;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.elasticsearch.search.SearchModule.SEARCH_CONCURRENCY_ENABLED;

public class SearchService extends AbstractLifecycleComponent implements IndexEventListener {
private static final Logger logger = LogManager.getLogger(SearchService.class);
Expand Down Expand Up @@ -211,6 +212,13 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
Property.NodeScope
);

public static final Setting<Boolean> SEARCH_WORKER_THREADS_ENABLED = Setting.boolSetting(
"search.worker_threads_enabled",
true,
Property.NodeScope,
Property.Dynamic
);

public static final Setting<Integer> MAX_OPEN_SCROLL_CONTEXT = Setting.intSetting(
"search.max_open_scroll_context",
500,
Expand Down Expand Up @@ -253,7 +261,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
private final DfsPhase dfsPhase = new DfsPhase();

private final FetchPhase fetchPhase;
private volatile boolean enableConcurrentCollection;
private volatile boolean enableSearchWorkerThreads;

private volatile long defaultKeepAlive;

Expand Down Expand Up @@ -344,16 +352,12 @@ public SearchService(
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(ENABLE_REWRITE_AGGS_TO_FILTER_BY_FILTER, this::setEnableRewriteAggsToFilterByFilter);

enableConcurrentCollection = SEARCH_CONCURRENCY_ENABLED.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(SEARCH_CONCURRENCY_ENABLED, this::setEnableConcurrentCollection);
}

private void setEnableConcurrentCollection(boolean concurrentCollection) {
this.enableConcurrentCollection = concurrentCollection;
enableSearchWorkerThreads = SEARCH_WORKER_THREADS_ENABLED.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(SEARCH_WORKER_THREADS_ENABLED, this::setEnableSearchWorkerThreads);
}

boolean isConcurrentCollectionEnabled() {
return this.enableConcurrentCollection;
private void setEnableSearchWorkerThreads(boolean enableSearchWorkerThreads) {
this.enableSearchWorkerThreads = enableSearchWorkerThreads;
}

private static void validateKeepAlives(TimeValue defaultKeepAlive, TimeValue maxKeepAlive) {
Expand Down Expand Up @@ -1039,7 +1043,7 @@ public DefaultSearchContext createSearchContext(ShardSearchRequest request, Time
final Engine.SearcherSupplier reader = indexShard.acquireSearcherSupplier();
final ShardSearchContextId id = new ShardSearchContextId(sessionId, idGenerator.incrementAndGet());
try (ReaderContext readerContext = new ReaderContext(id, indexService, indexShard, reader, -1L, true)) {
DefaultSearchContext searchContext = createSearchContext(readerContext, request, timeout, null);
DefaultSearchContext searchContext = createSearchContext(readerContext, request, timeout, ResultsType.NONE);
searchContext.addReleasable(readerContext.markAsUsed(0L));
return searchContext;
}
Expand All @@ -1060,16 +1064,20 @@ private DefaultSearchContext createSearchContext(
reader.indexShard().shardId(),
request.getClusterAlias()
);
ExecutorService executor = this.enableSearchWorkerThreads ? threadPool.executor(Names.SEARCH_WORKER) : null;
int maximumNumberOfSlices = executor instanceof ThreadPoolExecutor tpe
&& supportsParallelCollection(resultsType, request.source()) ? tpe.getMaximumPoolSize() : 1;
searchContext = new DefaultSearchContext(
reader,
request,
shardTarget,
threadPool::relativeTimeInMillis,
timeout,
minimumDocsPerSlice,
fetchPhase,
lowLevelCancellation,
this.enableConcurrentCollection && concurrentSearchEnabled(resultsType, request.source())
executor,
maximumNumberOfSlices,
minimumDocsPerSlice
);
// we clone the query shard context here just for rewriting otherwise we
// might end up with incorrect state since we are using now() or script services
Expand All @@ -1089,10 +1097,16 @@ this.enableConcurrentCollection && concurrentSearchEnabled(resultsType, request.
return searchContext;
}

static boolean concurrentSearchEnabled(ResultsType resultsType, SearchSourceBuilder source) {
static boolean supportsParallelCollection(ResultsType resultsType, SearchSourceBuilder source) {
if (resultsType == ResultsType.DFS) {
return true; // only enable concurrent collection for DFS phase for now
}
/*
TODO uncomment this block to enable inter-segment concurrency for the query phase
if (resultsType == ResultsType.QUERY) {
return source == null || source.aggregations() == null || source.aggregations().supportsParallelCollection();
}
*/
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,35 +220,15 @@ public boolean isInSortOrderExecutionRequired() {
}

/**
* Return false if this aggregation or any of the child aggregations does not support concurrent search.
* As a result, such aggregation will always be executed sequentially despite concurrency is enabled for the query phase.
* Note: aggregations that don't support concurrency, may or may not support offloading their collection to the search worker threads,
* depending on what {@link #supportsOffloadingSequentialCollection()} returns.
* Return false if this aggregation or any of the child aggregations does not support parallel collection.
* As a result, a request including such aggregation is always executed sequentially despite concurrency is enabled for the query phase.
*/
public boolean supportsConcurrentExecution() {
public boolean supportsParallelCollection() {
if (isInSortOrderExecutionRequired()) {
return false;
}
for (AggregationBuilder builder : factoriesBuilder.getAggregatorFactories()) {
if (builder.supportsConcurrentExecution() == false) {
return false;
}
}
return supportsOffloadingSequentialCollection();
}

/**
* Returns false if this aggregation or any of its child aggregations does not support offloading its sequential collection
* to a separate thread. As a result, such aggregation will always be executed sequentially, and fully in the search thread,
* without offloading its collection to the search worker threads.
* Note: aggregations that don't support offloading sequential collection, don't support concurrency by definition.
*/
public boolean supportsOffloadingSequentialCollection() {
if (isInSortOrderExecutionRequired()) {
return false;
}
for (AggregationBuilder builder : factoriesBuilder.getAggregatorFactories()) {
if (builder.supportsOffloadingSequentialCollection() == false) {
if (builder.supportsParallelCollection() == false) {
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ private static void executeInSortOrder(SearchContext context, BucketCollector co
searcher.setProfiler(context);
try {
searcher.search(context.rewrittenQuery(), collector);
collector.postCollection();
} catch (IOException e) {
throw new AggregationExecutionException("Could not perform time series aggregation", e);
}
Expand Down Expand Up @@ -113,7 +114,6 @@ public static void execute(SearchContext context) {
final List<InternalAggregation> aggregations = new ArrayList<>(aggregators.length);
for (Aggregator aggregator : aggregators) {
try {
aggregator.postCollection();
aggregations.add(aggregator.buildTopLevel());
} catch (IOException e) {
throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,15 +334,17 @@ public boolean isInSortOrderExecutionRequired() {
}

/**
* Return false if this aggregation or any of the child aggregations does not support concurrent search
* Return false if this aggregation or any of the child aggregations does not support parallel collection.
* As a result, a request including such aggregation is always executed sequentially despite concurrency is enabled for the query
* phase.
*/
public boolean supportsConcurrentExecution() {
public boolean supportsParallelCollection() {
for (AggregationBuilder builder : aggregationBuilders) {
if (builder.supportsConcurrentExecution() == false) {
if (builder.supportsParallelCollection() == false) {
return false;
}
}
return isInSortOrderExecutionRequired() == false;
return true;
}

public Builder addAggregator(AggregationBuilder factory) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public final Collector asCollector() {
return new BucketCollectorWrapper(this);
}

private record BucketCollectorWrapper(BucketCollector bucketCollector) implements Collector {
public record BucketCollectorWrapper(BucketCollector bucketCollector) implements Collector {

@Override
public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public boolean supportsSampling() {
}

@Override
public boolean supportsOffloadingSequentialCollection() {
public boolean supportsParallelCollection() {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public BucketCardinality bucketCardinality() {
}

@Override
public boolean supportsConcurrentExecution() {
public boolean supportsParallelCollection() {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public BucketCardinality bucketCardinality() {
}

@Override
public boolean supportsOffloadingSequentialCollection() {
public boolean supportsParallelCollection() {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public boolean supportsSampling() {
}

@Override
public boolean supportsConcurrentExecution() {
public boolean supportsParallelCollection() {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public boolean supportsSampling() {
}

@Override
public boolean supportsOffloadingSequentialCollection() {
public boolean supportsParallelCollection() {
return false;
}

Expand Down
Loading

0 comments on commit a30bee2

Please sign in to comment.