Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perform buildAggregation concurrently and partially support composite aggs #12697

Merged
merged 1 commit into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
## [Unreleased 2.x]
### Added
- Add a counter to node stat api to track shard going from idle to non-idle ([#12768](https://github.com/opensearch-project/OpenSearch/pull/12768))
- [Concurrent Segment Search] Perform buildAggregation concurrently and support Composite Aggregations ([#12697](https://github.com/opensearch-project/OpenSearch/pull/12697))

### Dependencies
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.search.SearchPhaseExecutionException;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.search.aggregations.bucket.terms.IncludeExclude;
Expand All @@ -56,6 +58,8 @@
import java.util.List;

import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.search.aggregations.AggregationBuilders.global;
import static org.opensearch.search.aggregations.AggregationBuilders.stats;
import static org.opensearch.search.aggregations.AggregationBuilders.terms;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse;
Expand Down Expand Up @@ -164,4 +168,23 @@ private void runLargeStringAggregationTest(AggregationBuilder aggregation) {
}
assertTrue("Exception should have been thrown", exceptionThrown);
}

public void testAggsOnEmptyShards() {
// Create index with 5 shards but only 1 doc
assertAcked(
prepareCreate(
"idx",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 5).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
).setMapping("score", "type=integer")
);
client().prepareIndex("idx").setId("1").setSource("score", "5").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();

// Validate global agg does not throw an exception
assertSearchResponse(
client().prepareSearch("idx").addAggregation(global("global").subAggregation(stats("value_stats").field("score"))).get()
);

// Validate non-global agg does not throw an exception
assertSearchResponse(client().prepareSearch("idx").addAggregation(stats("value_stats").field("score")).get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Collection;
import java.util.List;

import static org.opensearch.indices.IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse;
Expand All @@ -50,23 +51,25 @@ public void setupSuiteScopeCluster() throws Exception {
assertAcked(
prepareCreate(
"idx",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), false)
).setMapping("type", "type=keyword", "num", "type=integer", "score", "type=integer")
);
waitForRelocation(ClusterHealthStatus.GREEN);

client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "5").get();
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "11", "score", "50").get();
refresh("idx");
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "2").get();
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "12", "score", "20").get();
refresh("idx");
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "10").get();
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "15").get();
refresh("idx");
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "1").get();
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "100").get();
refresh("idx");
indexRandom(
true,
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "5"),
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "11", "score", "50"),
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "2"),
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "12", "score", "20"),
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "10"),
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "15"),
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "1"),
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "100")
);

waitForRelocation(ClusterHealthStatus.GREEN);
refresh();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
import org.opensearch.search.query.ReduceableSearchResult;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;

/**
* Common {@link CollectorManager} used by both concurrent and non-concurrent aggregation path and also for global and non-global
Expand Down Expand Up @@ -56,17 +56,9 @@ public String getCollectorReason() {

@Override
public ReduceableSearchResult reduce(Collection<Collector> collectors) throws IOException {
final List<Aggregator> aggregators = context.bucketCollectorProcessor().toAggregators(collectors);
final List<InternalAggregation> internals = new ArrayList<>(aggregators.size());
final List<InternalAggregation> internals = context.bucketCollectorProcessor().toInternalAggregations(collectors);
assert internals.stream().noneMatch(Objects::isNull);
context.aggregations().resetBucketMultiConsumer();
for (Aggregator aggregator : aggregators) {
try {
// post collection is called in ContextIndexSearcher after search on leaves are completed
internals.add(aggregator.buildTopLevel());
} catch (IOException e) {
throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e);
}
}

final InternalAggregations internalAggregations = InternalAggregations.from(internals);
return buildAggregationResult(internalAggregations);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.search.aggregations;

import org.opensearch.OpenSearchParseException;
import org.opensearch.common.SetOnce;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.lease.Releasable;
import org.opensearch.core.ParseField;
Expand Down Expand Up @@ -61,6 +62,8 @@
@PublicApi(since = "1.0.0")
public abstract class Aggregator extends BucketCollector implements Releasable {

private final SetOnce<InternalAggregation> internalAggregation = new SetOnce<>();

/**
* Parses the aggregation request and creates the appropriate aggregator factory for it.
*
Expand All @@ -83,6 +86,13 @@ public interface Parser {
AggregationBuilder parse(String aggregationName, XContentParser parser) throws IOException;
}

/**
* Returns the InternalAggregation stored during post collection
*/
public InternalAggregation getPostCollectionAggregation() {
jed326 marked this conversation as resolved.
Show resolved Hide resolved
return internalAggregation.get();
}

/**
* Return the name of this aggregator.
*/
Expand Down Expand Up @@ -185,13 +195,15 @@ public interface BucketComparator {

/**
* Build the result of this aggregation if it is at the "top level"
* of the aggregation tree. If, instead, it is a sub-aggregation of
* another aggregation then the aggregation that contains it will call
* {@link #buildAggregations(long[])}.
* of the aggregation tree and save it. This should get called
* during post collection. If, instead, it is a sub-aggregation
* of another aggregation then the aggregation that contains
* it will call {@link #buildAggregations(long[])}.
*/
public final InternalAggregation buildTopLevel() throws IOException {
assert parent() == null;
return buildAggregations(new long[] { 0 })[0];
this.internalAggregation.set(buildAggregations(new long[] { 0 })[0]);
jed326 marked this conversation as resolved.
Show resolved Hide resolved
return internalAggregation.get();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@
}
} else if (currentCollector instanceof BucketCollector) {
((BucketCollector) currentCollector).postCollection();

// Perform build aggregation during post collection
if (currentCollector instanceof Aggregator) {
((Aggregator) currentCollector).buildTopLevel();
} else if (currentCollector instanceof MultiBucketCollector) {
for (Collector innerCollector : ((MultiBucketCollector) currentCollector).getCollectors()) {
collectors.offer(innerCollector);
}
}
}
}
}
Expand Down Expand Up @@ -106,4 +115,31 @@
}
return aggregators;
}

/**
* Unwraps the input collection of {@link Collector} to get the list of the {@link InternalAggregation}. The
* input is expected to contain the collectors related to Aggregations only as that is passed to {@link AggregationCollectorManager}
* during the reduce phase. This list of {@link InternalAggregation} is used to optionally perform reduce at shard level before
* returning response to coordinator
* @param collectors collection of aggregation collectors to reduce
* @return list of unwrapped {@link InternalAggregation}
*/
public List<InternalAggregation> toInternalAggregations(Collection<Collector> collectors) throws IOException {
List<InternalAggregation> internalAggregations = new ArrayList<>();

final Deque<Collector> allCollectors = new LinkedList<>(collectors);
while (!allCollectors.isEmpty()) {
Collector currentCollector = allCollectors.pop();
if (currentCollector instanceof InternalProfileCollector) {
currentCollector = ((InternalProfileCollector) currentCollector).getCollector();

Check warning on line 134 in server/src/main/java/org/opensearch/search/aggregations/BucketCollectorProcessor.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/aggregations/BucketCollectorProcessor.java#L134

Added line #L134 was not covered by tests
}

if (currentCollector instanceof Aggregator) {
internalAggregations.add(((Aggregator) currentCollector).getPostCollectionAggregation());
} else if (currentCollector instanceof MultiBucketCollector) {
allCollectors.addAll(Arrays.asList(((MultiBucketCollector) currentCollector).getCollectors()));
}
}
return internalAggregations;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
import org.apache.lucene.search.CollectorManager;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.profile.query.CollectorResult;
import org.opensearch.search.query.ReduceableSearchResult;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;

Expand Down Expand Up @@ -42,6 +44,19 @@
}
}

@Override
public ReduceableSearchResult reduce(Collection<Collector> collectors) throws IOException {
// If there are no leaves then in concurrent search case postCollection, and subsequently buildAggregation, will not be called in
// search path. Since we build the InternalAggregation in postCollection that will not get created in such cases either. Therefore
// we need to manually processPostCollection here to build empty InternalAggregation objects for this collector tree.
if (context.searcher().getLeafContexts().isEmpty()) {
for (Collector c : collectors) {
context.bucketCollectorProcessor().processPostCollection(c);
}

Check warning on line 55 in server/src/main/java/org/opensearch/search/aggregations/GlobalAggCollectorManager.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/aggregations/GlobalAggCollectorManager.java#L54-L55

Added lines #L54 - L55 were not covered by tests
}
return super.reduce(collectors);
}

@Override
protected AggregationReduceableSearchResult buildAggregationResult(InternalAggregations internalAggregations) {
// Reduce the aggregations across slices before sending to the coordinator. We will perform shard level reduce as long as any slices
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,10 @@ public static class MultiBucketConsumer implements IntConsumer {
private final int limit;
private final CircuitBreaker breaker;

// aggregations execute in a single thread for both sequential
// and concurrent search, so no atomic here
// count is currently only updated in final reduce phase which is executed in single thread for both concurrent and non-concurrent
jed326 marked this conversation as resolved.
Show resolved Hide resolved
// search
private int count;

// will be updated by multiple threads in concurrent search
// hence making it as LongAdder
// will be updated by multiple threads in concurrent search hence making it as LongAdder
private final LongAdder callCount;
private volatile boolean circuitBreakerTripped;
private final int availProcessors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
import org.apache.lucene.search.CollectorManager;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.profile.query.CollectorResult;
import org.opensearch.search.query.ReduceableSearchResult;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;

Expand Down Expand Up @@ -42,6 +44,19 @@
}
}

@Override
public ReduceableSearchResult reduce(Collection<Collector> collectors) throws IOException {
// If there are no leaves then in concurrent search case postCollection, and subsequently buildAggregation, will not be called in
// search path. Since we build the InternalAggregation in postCollection that will not get created in such cases either. Therefore
// we need to manually processPostCollection here to build empty InternalAggregation objects for this collector tree.
if (context.searcher().getLeafContexts().isEmpty()) {
for (Collector c : collectors) {
context.bucketCollectorProcessor().processPostCollection(c);
}

Check warning on line 55 in server/src/main/java/org/opensearch/search/aggregations/NonGlobalAggCollectorManager.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/aggregations/NonGlobalAggCollectorManager.java#L54-L55

Added lines #L54 - L55 were not covered by tests
}
return super.reduce(collectors);
}

@Override
protected AggregationReduceableSearchResult buildAggregationResult(InternalAggregations internalAggregations) {
// Reduce the aggregations across slices before sending to the coordinator. We will perform shard level reduce as long as any slices
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.search.internal.SearchContext;

import java.io.IOException;
import java.util.Arrays;
import java.util.Map;

/**
Expand Down Expand Up @@ -80,7 +81,7 @@

@Override
protected boolean supportsConcurrentSegmentSearch() {
// See https://github.com/opensearch-project/OpenSearch/issues/12331 for details
return false;
// Disable concurrent search if any scripting is used. See https://github.com/opensearch-project/OpenSearch/issues/12331 for details
return Arrays.stream(sources).noneMatch(CompositeValuesSourceConfig::hasScript);

Check warning on line 85 in server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregationFactory.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregationFactory.java#L85

Added line #L85 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.PriorityQueue;
import org.opensearch.common.SetOnce;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.util.LongArray;
Expand Down Expand Up @@ -94,8 +95,8 @@
private final long valueCount;
private final String fieldName;
private Weight weight;
private final GlobalOrdLookupFunction lookupGlobalOrd;
protected final CollectionStrategy collectionStrategy;
private final SetOnce<SortedSetDocValues> dvs = new SetOnce<>();
protected int segmentsWithSingleValuedOrds = 0;
protected int segmentsWithMultiValuedOrds = 0;

Expand Down Expand Up @@ -129,11 +130,10 @@
this.resultStrategy = resultStrategy.apply(this); // ResultStrategy needs a reference to the Aggregator to do its job.
this.valuesSource = valuesSource;
final IndexReader reader = context.searcher().getIndexReader();
final SortedSetDocValues values = reader.leaves().size() > 0
final SortedSetDocValues values = !reader.leaves().isEmpty()
reta marked this conversation as resolved.
Show resolved Hide resolved
? valuesSource.globalOrdinalsValues(context.searcher().getIndexReader().leaves().get(0))
: DocValues.emptySortedSet();
this.valueCount = values.getValueCount();
this.lookupGlobalOrd = values::lookupOrd;
this.acceptedGlobalOrdinals = includeExclude == null ? ALWAYS_TRUE : includeExclude.acceptedGlobalOrdinals(values)::get;
if (remapGlobalOrds) {
this.collectionStrategy = new RemapGlobalOrds(cardinality);
Expand Down Expand Up @@ -885,7 +885,10 @@
}

StringTerms.Bucket convertTempBucketToRealBucket(OrdBucket temp) throws IOException {
BytesRef term = BytesRef.deepCopyOf(lookupGlobalOrd.apply(temp.globalOrd));
// Recreate DocValues as needed for concurrent segment search
SortedSetDocValues values = getDocValues();
BytesRef term = BytesRef.deepCopyOf(values.lookupOrd(temp.globalOrd));

StringTerms.Bucket result = new StringTerms.Bucket(term, temp.docCount, null, showTermDocCountError, 0, format);
result.bucketOrd = temp.bucketOrd;
result.docCountError = 0;
Expand Down Expand Up @@ -1001,7 +1004,9 @@
long subsetSize = subsetSize(owningBucketOrd);
return (spare, globalOrd, bucketOrd, docCount) -> {
spare.bucketOrd = bucketOrd;
oversizedCopy(lookupGlobalOrd.apply(globalOrd), spare.termBytes);
// Recreate DocValues as needed for concurrent segment search
SortedSetDocValues values = getDocValues();
oversizedCopy(values.lookupOrd(globalOrd), spare.termBytes);
spare.subsetDf = docCount;
spare.subsetSize = subsetSize;
spare.supersetDf = backgroundFrequencies.freq(spare.termBytes);
Expand Down Expand Up @@ -1086,4 +1091,18 @@
* Predicate used for {@link #acceptedGlobalOrdinals} if there is no filter.
*/
private static final LongPredicate ALWAYS_TRUE = l -> true;

/**
* If DocValues have not been initialized yet for reduce phase, create and set them.
*/
private SortedSetDocValues getDocValues() throws IOException {
if (dvs.get() == null) {
dvs.set(
!context.searcher().getIndexReader().leaves().isEmpty()
? valuesSource.globalOrdinalsValues(context.searcher().getIndexReader().leaves().get(0))
: DocValues.emptySortedSet()

Check warning on line 1103 in server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java#L1103

Added line #L1103 was not covered by tests
);
}
return dvs.get();
}
}
Loading
Loading