Skip to content

Commit

Permalink
Add support for wrapping CollectorManager with profiling during concu…
Browse files Browse the repository at this point in the history
…rrent execution (#9129)

Signed-off-by: Ticheng Lin <ticheng@amazon.com>
  • Loading branch information
ticheng-aws committed Aug 11, 2023
1 parent 487e3e3 commit e8352de
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 34 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Make MultiBucketConsumerService thread safe to use across slices during search ([#9047](https://github.com/opensearch-project/OpenSearch/pull/9047))
- Change shard_size and shard_min_doc_count evaluation to happen in shard level reduce phase ([#9085](https://github.com/opensearch-project/OpenSearch/pull/9085))
- Add attributes to startSpan methods ([#9199](https://github.com/opensearch-project/OpenSearch/pull/9199))
- Add support for wrapping CollectorManager with profiling during concurrent execution ([#9129](https://github.com/opensearch-project/OpenSearch/pull/9129))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.apache.lucene.search.CollectorManager;
import org.opensearch.common.CheckedFunction;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.profile.query.InternalProfileCollector;
import org.opensearch.search.query.ReduceableSearchResult;

import java.io.IOException;
Expand Down Expand Up @@ -42,12 +41,16 @@ class AggregationCollectorManager implements CollectorManager<Collector, Reducea

@Override
public Collector newCollector() throws IOException {
final Collector collector = createCollector(context, aggProvider.apply(context), collectorReason);
final Collector collector = createCollector(aggProvider.apply(context));
// For Aggregations we should not have a NO_OP_Collector
assert collector != BucketCollector.NO_OP_COLLECTOR;
return collector;
}

public String getCollectorReason() {
return collectorReason;
}

@Override
public ReduceableSearchResult reduce(Collection<Collector> collectors) throws IOException {
final List<Aggregator> aggregators = context.bucketCollectorProcessor().toAggregators(collectors);
Expand All @@ -70,17 +73,9 @@ protected AggregationReduceableSearchResult buildAggregationResult(InternalAggre
return new AggregationReduceableSearchResult(internalAggregations);
}

static Collector createCollector(SearchContext context, List<Aggregator> collectors, String reason) throws IOException {
static Collector createCollector(List<Aggregator> collectors) throws IOException {
Collector collector = MultiBucketCollector.wrap(collectors);
((BucketCollector) collector).preCollection();
if (context.getProfilers() != null) {
collector = new InternalProfileCollector(
collector,
reason,
// TODO: report on child aggs as well
Collections.emptyList()
);
}
return collector;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ public void postProcess(SearchContext context) {
try {
if (globalCollectorManager != null) {
Query query = context.buildFilteredQuery(Queries.newMatchAllQuery());
globalCollectorManager = new InternalProfileCollectorManager(
globalCollectorManager,
CollectorResult.REASON_AGGREGATION_GLOBAL,
Collections.emptyList()
);
if (context.getProfilers() != null) {
globalCollectorManager = new InternalProfileCollectorManager(
globalCollectorManager,
CollectorResult.REASON_AGGREGATION_GLOBAL,
Collections.emptyList()
);
context.getProfilers().addQueryProfiler().setCollector((InternalProfileComponent) globalCollectorManager);
}
final ReduceableSearchResult result = context.searcher().search(query, globalCollectorManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@
import org.apache.lucene.search.Query;
import org.opensearch.common.lucene.search.Queries;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.profile.query.InternalProfileComponent;
import org.opensearch.search.profile.query.InternalProfileCollector;
import org.opensearch.search.query.QueryPhaseExecutionException;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

/**
Expand Down Expand Up @@ -74,7 +75,13 @@ public void postProcess(SearchContext context) {
if (context.getProfilers() != null) {
context.getProfilers()
.addQueryProfiler()
.setCollector((InternalProfileComponent) globalCollectorManager.newCollector());
.setCollector(
new InternalProfileCollector(
globalCollectorManager.newCollector(),
globalCollectorManager.getCollectorReason(),
Collections.emptyList()
)
);
}
context.searcher().search(query, globalCollectorManager.newCollector());
globalCollectorManager.reduce(List.of()).reduce(context.queryResult());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.lucene.search.Weight;
import org.opensearch.common.lucene.MinimumScoreCollector;
import org.opensearch.common.lucene.search.FilteredCollector;
import org.opensearch.search.profile.query.CollectorResult;
import org.opensearch.search.profile.query.InternalProfileCollector;
import org.opensearch.search.profile.query.InternalProfileCollectorManager;

Expand Down Expand Up @@ -99,11 +100,15 @@ protected InternalProfileCollector createWithProfiler(InternalProfileCollector i
*/
protected InternalProfileCollectorManager createWithProfiler(InternalProfileCollectorManager in) throws IOException {
final CollectorManager<? extends Collector, ReduceableSearchResult> manager = createManager(in);
return new InternalProfileCollectorManager(
manager,
profilerName,
in != null ? Collections.singletonList(in) : Collections.emptyList()
);
if (manager instanceof InternalProfileCollectorManager) {
return (InternalProfileCollectorManager) manager;
} else {
return new InternalProfileCollectorManager(
manager,
profilerName,
in != null ? Collections.singletonList(in) : Collections.emptyList()
);
}
}

/**
Expand Down Expand Up @@ -198,16 +203,33 @@ protected InternalProfileCollector createWithProfiler(InternalProfileCollector i

for (CollectorManager<? extends Collector, ReduceableSearchResult> manager : subs) {
final Collector collector = manager.newCollector();
if (!(collector instanceof InternalProfileCollector)) {
throw new IllegalArgumentException("non-profiling collector");
}
subCollectors.add((InternalProfileCollector) collector);
subCollectors.add(new InternalProfileCollector(collector, CollectorResult.REASON_AGGREGATION, Collections.emptyList()));
}

final Collector collector = MultiCollector.wrap(subCollectors);
return new InternalProfileCollector(collector, REASON_SEARCH_MULTI, subCollectors);
}

@Override
protected InternalProfileCollectorManager createWithProfiler(InternalProfileCollectorManager in) {
final List<CollectorManager<?, ReduceableSearchResult>> managers = new ArrayList<>();
final List<InternalProfileCollectorManager> children = new ArrayList<>();
managers.add(in);
children.add(in);
for (CollectorManager<? extends Collector, ReduceableSearchResult> manager : subs) {
InternalProfileCollectorManager subCollectorManager = new InternalProfileCollectorManager(
manager,
CollectorResult.REASON_AGGREGATION,
Collections.emptyList()
);
managers.add(subCollectorManager);
children.add(subCollectorManager);
}
CollectorManager<? extends Collector, ReduceableSearchResult> multiCollectorManager = QueryCollectorManagerContext
.createMultiCollectorManager(managers);
return new InternalProfileCollectorManager(multiCollectorManager, REASON_SEARCH_MULTI, children);
}

@Override
CollectorManager<? extends Collector, ReduceableSearchResult> createManager(
CollectorManager<? extends Collector, ReduceableSearchResult> in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.lucene.search.Collector;
import org.opensearch.search.aggregations.bucket.global.GlobalAggregator;
import org.opensearch.search.profile.query.CollectorResult;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -31,9 +32,12 @@ public void testNonGlobalCollectorManagers() throws Exception {
assertTrue(aggCollector instanceof MultiBucketCollector);
assertEquals(expectedAggCount, ((MultiBucketCollector) aggCollector).getCollectors().length);
testCollectorManagerCommon(testAggCollectorManager);
assertEquals(CollectorResult.REASON_AGGREGATION, testAggCollectorManager.getCollectorReason());

// test NonGlobalCollectorManager which will be used in concurrent segment search case
testCollectorManagerCommon(new NonGlobalAggCollectorManager(context));
final NonGlobalAggCollectorManager testNonGlobalAggCollectorManager = new NonGlobalAggCollectorManager(context);
testCollectorManagerCommon(testNonGlobalAggCollectorManager);
assertEquals(CollectorResult.REASON_AGGREGATION, testAggCollectorManager.getCollectorReason());
}

public void testGlobalCollectorManagers() throws Exception {
Expand All @@ -45,11 +49,14 @@ public void testGlobalCollectorManagers() throws Exception {
context.aggregations(contextAggregations);
final AggregationCollectorManager testAggCollectorManager = new GlobalAggCollectorManagerWithSingleCollector(context);
testCollectorManagerCommon(testAggCollectorManager);
assertEquals(CollectorResult.REASON_AGGREGATION_GLOBAL, testAggCollectorManager.getCollectorReason());
Collector aggCollector = testAggCollectorManager.newCollector();
assertTrue(aggCollector instanceof BucketCollector);

// test GlobalAggCollectorManager which will be used in concurrent segment search case
testCollectorManagerCommon(new GlobalAggCollectorManager(context));
final GlobalAggCollectorManager testGlobalAggCollectorManager = new GlobalAggCollectorManager(context);
testCollectorManagerCommon(testGlobalAggCollectorManager);
assertEquals(CollectorResult.REASON_AGGREGATION_GLOBAL, testAggCollectorManager.getCollectorReason());
}

public void testAggCollectorManagersWithBothGlobalNonGlobalAggregators() throws Exception {
Expand All @@ -70,7 +77,9 @@ public void testAggCollectorManagersWithBothGlobalNonGlobalAggregators() throws
assertTrue(globalAggCollector instanceof GlobalAggregator);

testCollectorManagerCommon(testAggCollectorManager);
assertEquals(CollectorResult.REASON_AGGREGATION, testAggCollectorManager.getCollectorReason());
testCollectorManagerCommon(testGlobalAggCollectorManager);
assertEquals(CollectorResult.REASON_AGGREGATION_GLOBAL, testGlobalAggCollectorManager.getCollectorReason());
}

public void testAssertionWhenCollectorManagerCreatesNoOPCollector() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,27 @@ public void testPreProcessWithOnlyNonGlobalAggregators() throws Exception {
}

public void testPostProcessWithNonGlobalAggregatorsAndSingleSlice() throws Exception {
testPostProcessCommon(multipleNonGlobalAggs, 1, 0, 2);
testPostProcessCommon(multipleNonGlobalAggs, 1, 0, 2, false);
}

public void testPostProcessWithNonGlobalAggregatorsAndSingleSliceWithProfilers() throws Exception {
testPostProcessCommon(multipleNonGlobalAggs, 1, 0, 2, true);
}

public void testPostProcessWithNonGlobalAggregatorsAndMultipleSlices() throws Exception {
testPostProcessCommon(multipleNonGlobalAggs, randomIntBetween(2, 5), 0, 2);
testPostProcessCommon(multipleNonGlobalAggs, randomIntBetween(2, 5), 0, 2, false);
}

public void testPostProcessWithNonGlobalAggregatorsAndMultipleSlicesWithProfilers() throws Exception {
testPostProcessCommon(multipleNonGlobalAggs, randomIntBetween(2, 5), 0, 2, true);
}

public void testPostProcessGlobalAndNonGlobalAggregators() throws Exception {
testPostProcessCommon(globalNonGlobalAggs, randomIntBetween(2, 5), 1, 1);
testPostProcessCommon(globalNonGlobalAggs, randomIntBetween(2, 5), 1, 1, false);
}

public void testPostProcessGlobalAndNonGlobalAggregatorsWithProfilers() throws Exception {
testPostProcessCommon(globalNonGlobalAggs, randomIntBetween(2, 5), 1, 1, true);
}

private void testPreProcessCommon(String agg, int expectedGlobalAggs, int expectedNonGlobalAggs) throws Exception {
Expand Down Expand Up @@ -127,8 +139,13 @@ private void testPreProcessCommon(
}
}

private void testPostProcessCommon(String aggs, int numSlices, int expectedGlobalAggs, int expectedNonGlobalAggsPerSlice)
throws Exception {
private void testPostProcessCommon(
String aggs,
int numSlices,
int expectedGlobalAggs,
int expectedNonGlobalAggsPerSlice,
boolean withProfilers
) throws Exception {
final Collection<Collector> nonGlobalCollectors = new ArrayList<>();
final Collection<Collector> globalCollectors = new ArrayList<>();
testPreProcessCommon(aggs, expectedGlobalAggs, expectedNonGlobalAggsPerSlice, nonGlobalCollectors, globalCollectors);
Expand Down Expand Up @@ -157,6 +174,9 @@ private void testPostProcessCommon(String aggs, int numSlices, int expectedGloba
.thenReturn(result);
}
assertTrue(context.queryResult().hasAggs());
if (withProfilers) {
((TestSearchContext) context).withProfilers();
}
testAggregationProcessor.postProcess(context);
assertTrue(context.queryResult().hasAggs());
// for global aggs verify that search.search is called with CollectionManager
Expand Down

0 comments on commit e8352de

Please sign in to comment.