Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce executor for concurrent search #98204

Merged
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
20e6dbe
Introduce executor for concurrent search
javanna Aug 4, 2023
01733dc
Update docs/changelog/98204.yaml
javanna Aug 4, 2023
9229f4d
forbidden api
javanna Aug 4, 2023
f67551f
grrrrrrrrrr
javanna Aug 4, 2023
db6e906
leftover
javanna Aug 4, 2023
3b86ab1
forbidden api
javanna Aug 4, 2023
438d76d
spotless
javanna Aug 4, 2023
ad260e0
Update server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java
javanna Aug 4, 2023
5a8ec56
iter
javanna Aug 4, 2023
9069bfe
iter
javanna Aug 7, 2023
026258c
iter
javanna Aug 7, 2023
fed2aaf
Merge branch 'main' into enhancement/bounded_executor_concurrent_search
javanna Aug 7, 2023
6688e92
iter
javanna Aug 7, 2023
cfcc819
iter
javanna Aug 7, 2023
d2a982b
special case for no segments
javanna Aug 7, 2023
0b2b243
spotless
javanna Aug 8, 2023
103363f
terminate thread pool in test
javanna Aug 8, 2023
b44a239
Merge branch 'main' into enhancement/bounded_executor_concurrent_search
javanna Aug 8, 2023
4220d6a
fix thread pool executor cast
javanna Aug 8, 2023
89faa2b
iter
javanna Aug 8, 2023
9ec1fd5
Merge branch 'main' into enhancement/bounded_executor_concurrent_search
javanna Aug 8, 2023
eced2c8
iter
javanna Aug 8, 2023
2844a05
iter
javanna Aug 8, 2023
5959f83
remove TODO and clarify comments
javanna Aug 8, 2023
ee55047
clarify docs
javanna Aug 8, 2023
8c77ed6
spotless
javanna Aug 8, 2023
af76ab6
iter
javanna Aug 8, 2023
262996d
Remove supportsOffloadingSequentialCollection() method and do aggrega…
martijnvg Aug 9, 2023
bb4ffca
Also postCollect when there are no slices
martijnvg Aug 9, 2023
c3d8034
address tests
martijnvg Aug 9, 2023
151b93b
spotless
martijnvg Aug 9, 2023
23121e3
Correctly overwrite supportsParallelCollection(...)
martijnvg Aug 10, 2023
bca4050
fixed unit tests
martijnvg Aug 10, 2023
df0ff91
Also perform postCollection when executing search in sort order.
martijnvg Aug 10, 2023
9b92a0b
Comment
javanna Aug 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/98204.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 98204
summary: Introduce executor for concurrent search
area: Search
type: enhancement
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.common.util.concurrent;

import org.apache.lucene.util.ThreadInterruptedException;

import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;

/**
* {@link Executor} implementation that wraps another executor and limits the number of concurrent tasks that are delegated to it.
* Meant to be used when the caller thread is executed as part of a separate thread pool that is subject to its own queueing and rejection
* policy. Simplifies queue and rejection handling in the secondary thread pool as it effectively relies on those of the caller thread pool.
*
* Not suitable for non-blocking scenarios as it assumes that the caller thread will block and wait for all its tasks to be
* completed anyway. In this scenario an additional wait when tasks are submitted does not introduce extra overhead.
*
* Note: execution permits are released before the delegate executor updates its internal state to mark each task completed.
* That means that we'll accept tasks before there are free threads to take them, hence there will be some overflow queueing required in
* the underlying thread pool executor. This is the reason why a queue is required despite the wait applied at submit.
*/
public class BoundedExecutor implements Executor {
javanna marked this conversation as resolved.
Show resolved Hide resolved
private final Executor executor;
private final int bound;
private final Semaphore semaphore;

public BoundedExecutor(ThreadPoolExecutor executor) {
this(executor, executor.getMaximumPoolSize());
}

public BoundedExecutor(Executor executor, int bound) {
if (bound <= 0) {
throw new IllegalArgumentException("bound should be positive");
}
this.executor = executor;
this.bound = bound;
this.semaphore = new Semaphore(bound, true);
}

/**
* Returns the number of tasks that this executor will allow to be executed concurrently
*/
public final int getBound() {
return bound;
}

@Override
public void execute(Runnable command) {
Objects.requireNonNull(command, "command cannot be null");
try {
semaphore.acquire();
} catch (InterruptedException e) {
throw new ThreadInterruptedException(e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the thread is interrupted, shouldn't we bubble it up via Thread.currentThread().interrupt();?

}
try {
executor.execute(() -> {
try {
command.run();
} finally {
semaphore.release();
}
});
} catch (RejectedExecutionException e) {
semaphore.release();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.action.search.SearchShardTask;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.lucene.search.Queries;
import org.elasticsearch.common.util.concurrent.BoundedExecutor;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.TimeValue;
Expand Down Expand Up @@ -139,7 +140,8 @@ final class DefaultSearchContext extends SearchContext {
int minimumDocsPerSlice,
FetchPhase fetchPhase,
boolean lowLevelCancellation,
boolean parallelize
BoundedExecutor executor,
boolean forceSequentialCollection
) throws IOException {
this.readerContext = readerContext;
this.request = request;
Expand All @@ -157,11 +159,8 @@ final class DefaultSearchContext extends SearchContext {
engineSearcher.getQueryCachingPolicy(),
minimumDocsPerSlice,
lowLevelCancellation,
// TODO not set the for now, this needs a special thread pool and can be enabled after its introduction
// parallelize
// ? (EsThreadPoolExecutor) this.indexService.getThreadPool().executor(ThreadPool.Names.CONCURRENT_COLLECTION_TBD)
// : null,
null
executor,
forceSequentialCollection
);
releasables.addAll(List.of(engineSearcher, searcher));

Expand Down
17 changes: 10 additions & 7 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.util.concurrent.BoundedExecutor;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Releasable;
Expand Down Expand Up @@ -130,6 +131,7 @@
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -284,6 +286,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv

private final Tracer tracer;

private final BoundedExecutor concurrentSearchExecutor;

public SearchService(
ClusterService clusterService,
IndicesService indicesService,
Expand Down Expand Up @@ -346,16 +350,14 @@ public SearchService(

enableConcurrentCollection = SEARCH_CONCURRENCY_ENABLED.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(SEARCH_CONCURRENCY_ENABLED, this::setEnableConcurrentCollection);

concurrentSearchExecutor = new BoundedExecutor((ThreadPoolExecutor) threadPool.executor(Names.SEARCH_WORKER));
}

private void setEnableConcurrentCollection(boolean concurrentCollection) {
this.enableConcurrentCollection = concurrentCollection;
}

boolean isConcurrentCollectionEnabled() {
return this.enableConcurrentCollection;
}

private static void validateKeepAlives(TimeValue defaultKeepAlive, TimeValue maxKeepAlive) {
if (defaultKeepAlive.millis() > maxKeepAlive.millis()) {
throw new IllegalArgumentException(
Expand Down Expand Up @@ -1039,7 +1041,7 @@ public DefaultSearchContext createSearchContext(ShardSearchRequest request, Time
final Engine.SearcherSupplier reader = indexShard.acquireSearcherSupplier();
final ShardSearchContextId id = new ShardSearchContextId(sessionId, idGenerator.incrementAndGet());
try (ReaderContext readerContext = new ReaderContext(id, indexService, indexShard, reader, -1L, true)) {
DefaultSearchContext searchContext = createSearchContext(readerContext, request, timeout, null);
DefaultSearchContext searchContext = createSearchContext(readerContext, request, timeout, ResultsType.NONE);
searchContext.addReleasable(readerContext.markAsUsed(0L));
return searchContext;
}
Expand Down Expand Up @@ -1069,7 +1071,8 @@ private DefaultSearchContext createSearchContext(
minimumDocsPerSlice,
fetchPhase,
lowLevelCancellation,
this.enableConcurrentCollection && concurrentSearchEnabled(resultsType, request.source())
this.enableConcurrentCollection ? concurrentSearchExecutor : null,
javanna marked this conversation as resolved.
Show resolved Hide resolved
supportsConcurrency(resultsType, request.source()) == false
);
// we clone the query shard context here just for rewriting otherwise we
// might end up with incorrect state since we are using now() or script services
Expand All @@ -1089,7 +1092,7 @@ this.enableConcurrentCollection && concurrentSearchEnabled(resultsType, request.
return searchContext;
}

static boolean concurrentSearchEnabled(ResultsType resultsType, SearchSourceBuilder source) {
static boolean supportsConcurrency(ResultsType resultsType, SearchSourceBuilder source) {
if (resultsType == ResultsType.DFS) {
return true; // only enable concurrent collection for DFS phase for now
}
Expand Down
Loading