Skip to content

Commit

Permalink
Add support for wrapping CollectorManager with profiling during concu…
Browse files Browse the repository at this point in the history
…rrent execution

Signed-off-by: Ticheng Lin <ticheng@amazon.com>
  • Loading branch information
ticheng-aws committed Aug 5, 2023
1 parent 1d28fac commit f2d70fc
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support for aggregation profiler with concurrent aggregation ([#8801](https://github.com/opensearch-project/OpenSearch/pull/8801))
- [Remove] Deprecated Fractional ByteSizeValue support #9005 ([#9005](https://github.com/opensearch-project/OpenSearch/pull/9005))
- Make MultiBucketConsumerService thread safe to use across slices during search ([#9047](https://github.com/opensearch-project/OpenSearch/pull/9047))
- Add support for wrapping CollectorManager with profiling during concurrent execution ([#9129](https://github.com/opensearch-project/OpenSearch/pull/9129))
### Deprecated

### Removed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.apache.lucene.search.CollectorManager;
import org.opensearch.common.CheckedFunction;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.profile.query.InternalProfileCollector;
import org.opensearch.search.query.ReduceableSearchResult;

import java.io.IOException;
Expand Down Expand Up @@ -42,12 +41,16 @@ class AggregationCollectorManager implements CollectorManager<Collector, Reducea

@Override
public Collector newCollector() throws IOException {
final Collector collector = createCollector(context, aggProvider.apply(context), collectorReason);
final Collector collector = createCollector(aggProvider.apply(context));
// For Aggregations we should not have a NO_OP_Collector
assert collector != BucketCollector.NO_OP_COLLECTOR;
return collector;
}

public String getCollectorReason() {
return collectorReason;
}

@Override
public ReduceableSearchResult reduce(Collection<Collector> collectors) throws IOException {
final List<Aggregator> aggregators = context.bucketCollectorProcessor().toAggregators(collectors);
Expand Down Expand Up @@ -77,17 +80,9 @@ public ReduceableSearchResult reduce(Collection<Collector> collectors) throws IO
}
}

static Collector createCollector(SearchContext context, List<Aggregator> collectors, String reason) throws IOException {
static Collector createCollector(List<Aggregator> collectors) throws IOException {
Collector collector = MultiBucketCollector.wrap(collectors);
((BucketCollector) collector).preCollection();
if (context.getProfilers() != null) {
collector = new InternalProfileCollector(
collector,
reason,
// TODO: report on child aggs as well
Collections.emptyList()
);
}
return collector;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ public void postProcess(SearchContext context) {
try {
if (globalCollectorManager != null) {
Query query = context.buildFilteredQuery(Queries.newMatchAllQuery());
globalCollectorManager = new InternalProfileCollectorManager(
globalCollectorManager,
CollectorResult.REASON_AGGREGATION_GLOBAL,
Collections.emptyList()
);
if (context.getProfilers() != null) {
globalCollectorManager = new InternalProfileCollectorManager(
globalCollectorManager,
CollectorResult.REASON_AGGREGATION_GLOBAL,
Collections.emptyList()
);
context.getProfilers().addQueryProfiler().setCollector((InternalProfileComponent) globalCollectorManager);
}
final ReduceableSearchResult result = context.searcher().search(query, globalCollectorManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@
import org.apache.lucene.search.Query;
import org.opensearch.common.lucene.search.Queries;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.profile.query.InternalProfileComponent;
import org.opensearch.search.profile.query.InternalProfileCollector;
import org.opensearch.search.query.QueryPhaseExecutionException;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

/**
Expand Down Expand Up @@ -74,7 +75,13 @@ public void postProcess(SearchContext context) {
if (context.getProfilers() != null) {
context.getProfilers()
.addQueryProfiler()
.setCollector((InternalProfileComponent) globalCollectorManager.newCollector());
.setCollector(
new InternalProfileCollector(
globalCollectorManager.newCollector(),
globalCollectorManager.getCollectorReason(),
Collections.emptyList()
)
);
}
context.searcher().search(query, globalCollectorManager.newCollector());
globalCollectorManager.reduce(List.of()).reduce(context.queryResult());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.lucene.search.Weight;
import org.opensearch.common.lucene.MinimumScoreCollector;
import org.opensearch.common.lucene.search.FilteredCollector;
import org.opensearch.search.profile.query.CollectorResult;
import org.opensearch.search.profile.query.InternalProfileCollector;
import org.opensearch.search.profile.query.InternalProfileCollectorManager;

Expand Down Expand Up @@ -99,11 +100,15 @@ protected InternalProfileCollector createWithProfiler(InternalProfileCollector i
*/
protected InternalProfileCollectorManager createWithProfiler(InternalProfileCollectorManager in) throws IOException {
final CollectorManager<? extends Collector, ReduceableSearchResult> manager = createManager(in);
return new InternalProfileCollectorManager(
manager,
profilerName,
in != null ? Collections.singletonList(in) : Collections.emptyList()
);
if (manager instanceof InternalProfileCollectorManager) {
return (InternalProfileCollectorManager) manager;
} else {
return new InternalProfileCollectorManager(
manager,
profilerName,
in != null ? Collections.singletonList(in) : Collections.emptyList()
);
}
}

/**
Expand Down Expand Up @@ -198,16 +203,33 @@ protected InternalProfileCollector createWithProfiler(InternalProfileCollector i

for (CollectorManager<? extends Collector, ReduceableSearchResult> manager : subs) {
final Collector collector = manager.newCollector();
if (!(collector instanceof InternalProfileCollector)) {
throw new IllegalArgumentException("non-profiling collector");
}
subCollectors.add((InternalProfileCollector) collector);
subCollectors.add(new InternalProfileCollector(collector, CollectorResult.REASON_AGGREGATION, Collections.emptyList()));
}

final Collector collector = MultiCollector.wrap(subCollectors);
return new InternalProfileCollector(collector, REASON_SEARCH_MULTI, subCollectors);
}

@Override
protected InternalProfileCollectorManager createWithProfiler(InternalProfileCollectorManager in) {
final List<CollectorManager<?, ReduceableSearchResult>> managers = new ArrayList<>();
final List<InternalProfileCollectorManager> children = new ArrayList<>();
managers.add(in);
children.add(in);
for (CollectorManager<? extends Collector, ReduceableSearchResult> manager : subs) {
InternalProfileCollectorManager subCollectorManager = new InternalProfileCollectorManager(
manager,
CollectorResult.REASON_AGGREGATION,
Collections.emptyList()
);
managers.add(subCollectorManager);
children.add(subCollectorManager);
}
CollectorManager<? extends Collector, ReduceableSearchResult> multiCollectorManager = QueryCollectorManagerContext
.createMultiCollectorManager(managers);
return new InternalProfileCollectorManager(multiCollectorManager, REASON_SEARCH_MULTI, children);
}

@Override
CollectorManager<? extends Collector, ReduceableSearchResult> createManager(
CollectorManager<? extends Collector, ReduceableSearchResult> in
Expand Down

0 comments on commit f2d70fc

Please sign in to comment.