Skip to content

Commit

Permalink
Wire QueryPhaseCollectorManager into the query phase
Browse files Browse the repository at this point in the history
We recently introduced a collector manager for QueryPhaseCollector.
This commit wires it into the query phase, which requires also switching
the query phase to using ProfileCollectorManager which supports
concurrent profiling.

For simplicity, the wrapping for the aggs collector manager when
profiling is on is moved to the query phase, as it removed the need for
casting when creating a ProfileCollectorManager with children.

Note: there is no concurrency enabled with this as we don't yet set the
executor to the IndexSearcher
  • Loading branch information
javanna committed Jul 17, 2023
1 parent 6c76c50 commit 1d963ce
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@
import org.elasticsearch.action.search.SearchShardTask;
import org.elasticsearch.search.aggregations.support.TimeSeriesIndexSearcher;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.profile.query.CollectorResult;
import org.elasticsearch.search.profile.query.InternalProfileCollector;
import org.elasticsearch.search.profile.query.InternalProfileCollectorManager;
import org.elasticsearch.search.query.QueryPhase;
import org.elasticsearch.search.query.SingleThreadCollectorManager;

Expand Down Expand Up @@ -55,12 +52,7 @@ public static void preProcess(SearchContext context) {
} else {
collector = bucketCollector.asCollector();
}
if (context.getProfilers() != null) {
InternalProfileCollector profileCollector = new InternalProfileCollector(collector, CollectorResult.REASON_AGGREGATION);
context.aggregations().registerAggsCollectorManager(new InternalProfileCollectorManager(profileCollector));
} else {
context.aggregations().registerAggsCollectorManager(new SingleThreadCollectorManager(collector));
}
context.aggregations().registerAggsCollectorManager(new SingleThreadCollectorManager(collector));
}

private static List<Runnable> getCancellationChecks(SearchContext context) {
Expand Down

This file was deleted.

71 changes: 31 additions & 40 deletions server/src/main/java/org/elasticsearch/search/query/QueryPhase.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
import org.elasticsearch.search.internal.ContextIndexSearcher;
import org.elasticsearch.search.internal.ScrollContext;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.profile.query.InternalProfileCollector;
import org.elasticsearch.search.profile.query.InternalProfileCollectorManager;
import org.elasticsearch.search.profile.query.CollectorResult;
import org.elasticsearch.search.profile.query.ProfileCollectorManager;
import org.elasticsearch.search.rank.RankSearchContext;
import org.elasticsearch.search.rank.RankShardContext;
import org.elasticsearch.search.rescore.RescorePhase;
Expand Down Expand Up @@ -188,55 +188,51 @@ static void addCollectorsAndSearch(SearchContext searchContext) throws QueryPhas
searchContext,
searchContext.parsedPostFilter() != null || searchContext.minimumScore() != null
);

CollectorManager<Collector, Void> topDocsCollectorManager = topDocsFactory.collectorManager();
CollectorManager<? extends Collector, Void> topDocsCollectorManager = topDocsFactory.collectorManager();
ProfileCollectorManager<Void> topDocsProfileCollectorManager = null;
if (searchContext.getProfilers() != null) {
Collector topDocsCollector = topDocsCollectorManager.newCollector();
InternalProfileCollector profileCollector = new InternalProfileCollector(topDocsCollector, topDocsFactory.profilerName);
topDocsCollectorManager = new InternalProfileCollectorManager(profileCollector);
topDocsProfileCollectorManager = new ProfileCollectorManager<>(topDocsCollectorManager, topDocsFactory.profilerName);
topDocsCollectorManager = topDocsProfileCollectorManager;
}

Collector topDocsCollector = topDocsCollectorManager.newCollector();
Collector aggsCollector = null;
Weight postFilterWeight = null;
CollectorManager<? extends Collector, Void> aggsCollectorManager = null;
ProfileCollectorManager<Void> aggsProfileCollectorManager = null;
if (searchContext.aggregations() != null) {
aggsCollector = searchContext.aggregations().getAggsCollectorManager().newCollector();
aggsCollectorManager = searchContext.aggregations().getAggsCollectorManager();
if (searchContext.getProfilers() != null) {
aggsProfileCollectorManager = new ProfileCollectorManager<>(aggsCollectorManager, CollectorResult.REASON_AGGREGATION);
aggsCollectorManager = aggsProfileCollectorManager;
}
}

Weight postFilterWeight = null;
if (searchContext.parsedPostFilter() != null) {
postFilterWeight = searcher.createWeight(
searcher.rewrite(searchContext.parsedPostFilter().query()),
ScoreMode.COMPLETE_NO_SCORES,
1f
);
}
QueryPhaseCollector queryPhaseCollector = new QueryPhaseCollector(
topDocsCollector,

QueryPhaseCollector.CollectorManager queryPhaseCollectorManager = QueryPhaseCollector.createManager(
topDocsCollectorManager,
postFilterWeight,
searchContext.terminateAfter(),
aggsCollector,
aggsCollectorManager,
searchContext.minimumScore()
);

SingleThreadCollectorManager collectorManager;
final CollectorManager<? extends Collector, Void> collectorManager;
if (searchContext.getProfilers() == null) {
collectorManager = new SingleThreadCollectorManager(queryPhaseCollector);
collectorManager = queryPhaseCollectorManager;
} else {
InternalProfileCollector profileCollector;
if (aggsCollector == null) {
profileCollector = new InternalProfileCollector(
queryPhaseCollector,
REASON_SEARCH_QUERY_PHASE,
(InternalProfileCollector) topDocsCollector
);
} else {
profileCollector = new InternalProfileCollector(
queryPhaseCollector,
REASON_SEARCH_QUERY_PHASE,
(InternalProfileCollector) topDocsCollector,
(InternalProfileCollector) aggsCollector
);
}
collectorManager = new InternalProfileCollectorManager(profileCollector);
ProfileCollectorManager<Void> queryPhaseProfileManager = new ProfileCollectorManager<>(
queryPhaseCollectorManager,
REASON_SEARCH_QUERY_PHASE,
topDocsProfileCollectorManager,
aggsProfileCollectorManager
);
searchContext.getProfilers().getCurrentQueryProfiler().setCollectorManager(queryPhaseProfileManager::getCollectorTree);
collectorManager = queryPhaseProfileManager;
}

final Runnable timeoutRunnable = getTimeoutCheck(searchContext);
Expand All @@ -246,7 +242,7 @@ static void addCollectorsAndSearch(SearchContext searchContext) throws QueryPhas

try {
searchWithCollectorManager(searchContext, searcher, query, collectorManager, timeoutRunnable != null);
if (queryPhaseCollector.isTerminatedAfter()) {
if (queryPhaseCollectorManager.isTerminatedAfter()) {
queryResult.terminatedEarly(true);
}
queryResult.topDocs(topDocsFactory.topDocsAndMaxScore(), topDocsFactory.sortValueFormats);
Expand Down Expand Up @@ -274,14 +270,9 @@ private static void searchWithCollectorManager(
SearchContext searchContext,
ContextIndexSearcher searcher,
Query query,
CollectorManager<Collector, Void> collectorManager,
CollectorManager<? extends Collector, Void> collectorManager,
boolean timeoutSet
) throws IOException {
if (searchContext.getProfilers() != null) {
searchContext.getProfilers()
.getCurrentQueryProfiler()
.setCollectorManager(((InternalProfileCollectorManager) collectorManager)::getCollectorTree);
}
QuerySearchResult queryResult = searchContext.queryResult();
try {
searcher.search(query, collectorManager);
Expand Down

0 comments on commit 1d963ce

Please sign in to comment.