Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a flag to control the concurrent execution of aggregations #96023

Merged
merged 3 commits into from
May 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,18 @@ public boolean isInSortOrderExecutionRequired() {
return false;
}

/**
* Return false if this aggregation or any of the child aggregations does not support concurrent search
*/
public boolean supportsConcurrentExecution() {
for (AggregationBuilder builder : factoriesBuilder.getAggregatorFactories()) {
if (builder.supportsConcurrentExecution() == false) {
return false;
}
}
return isInSortOrderExecutionRequired() == false;
iverase marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Called by aggregations whose parents must be sequentially ordered.
* @param type the type of the aggregation being validated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ public boolean supportsSampling() {
return true;
}

@Override
public boolean supportsConcurrentExecution() {
return false;
}

public CompositeAggregationBuilder(StreamInput in) throws IOException {
super(in);
int num = in.readVInt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ public BucketCardinality bucketCardinality() {
return BucketCardinality.ONE;
}

@Override
public boolean supportsConcurrentExecution() {
return false;
}

@Override
protected AggregatorFactory doBuild(AggregationContext context, AggregatorFactory parent, Builder subFactoriesBuilder)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ public boolean supportsSampling() {
return true;
}

@Override
public boolean supportsConcurrentExecution() {
return false;
}

@Override
protected ValuesSourceType defaultValueSourceType() {
return CoreValuesSourceType.KEYWORD;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ public boolean supportsSampling() {
return true;
}

@Override
public boolean supportsConcurrentExecution() {
return false;
}

@Override
protected boolean serializeTargetValueType(TransportVersion version) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,32 @@ public void testIndexedMultiValuedString() throws IOException {
);
}

public void testIndexedAllDifferentValues() throws IOException {
// Indexing enables testing of ordinal values
final CardinalityAggregationBuilder aggregationBuilder = new CardinalityAggregationBuilder("name").field("str_values");
final MappedFieldType mappedFieldTypes = new KeywordFieldMapper.KeywordFieldType("str_values");
int docs = randomIntBetween(50, 100);
CheckedConsumer<RandomIndexWriter, IOException> buildIndex = iw -> {

for (int i = 0; i < docs; i++) {
iw.addDocument(
List.of(
new StringField("str_values", "" + i, Field.Store.NO),
new SortedSetDocValuesField("str_values", new BytesRef("" + i))
)
);
if (rarely()) {
iw.commit();
}
}
};

testAggregation(aggregationBuilder, new MatchAllDocsQuery(), buildIndex, card -> {
assertEquals(docs, card.getValue());
assertTrue(AggregationInspectionHelper.hasValue(card));
}, mappedFieldTypes);
}

public void testUnmappedMissingString() throws IOException {
CardinalityAggregationBuilder aggregationBuilder = new CardinalityAggregationBuilder("name").field("number").missing("🍌🍌🍌");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.sandbox.document.HalfFloatPoint;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.CollectorManager;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
Expand Down Expand Up @@ -58,6 +59,7 @@
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
Expand Down Expand Up @@ -136,6 +138,8 @@
import org.elasticsearch.search.internal.SubSearchContext;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.InternalAggregationTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.ContextParser;
import org.elasticsearch.xcontent.XContentBuilder;
import org.junit.After;
Expand All @@ -145,13 +149,16 @@
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.stream.Stream;
Expand All @@ -161,7 +168,6 @@
import static java.util.stream.Collectors.toList;
import static org.elasticsearch.test.InternalAggregationTestCase.DEFAULT_MAX_BUCKETS;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.sameInstance;
Expand All @@ -177,7 +183,7 @@
*/
public abstract class AggregatorTestCase extends ESTestCase {
private NamedWriteableRegistry namedWriteableRegistry;
private List<AggregationContext> releasables = new ArrayList<>();
private final List<AggregationContext> releasables = new ArrayList<>();
protected ValuesSourceRegistry valuesSourceRegistry;
private AnalysisModule analysisModule;

Expand All @@ -192,9 +198,21 @@ public abstract class AggregatorTestCase extends ESTestCase {
CompletionFieldMapper.CONTENT_TYPE, // TODO support completion
FieldAliasMapper.CONTENT_TYPE // TODO support alias
);
ThreadPool threadPool;
ThreadPoolExecutor threadPoolExecutor;

@Before
public final void initPlugins() {
int numThreads = randomIntBetween(2, 4);
threadPool = new TestThreadPool(AggregatorTestCase.class.getName());
threadPoolExecutor = EsExecutors.newFixed(
"test",
numThreads,
10,
EsExecutors.daemonThreadFactory("test"),
threadPool.getThreadContext(),
randomBoolean()
);
List<SearchPlugin> plugins = new ArrayList<>(getSearchPlugins());
plugins.add(new AggCardinalityUpperBoundPlugin());
SearchModule searchModule = new SearchModule(Settings.EMPTY, plugins);
Expand Down Expand Up @@ -475,7 +493,14 @@ private void runWithCrankyCircuitBreaker(IndexSettings indexSettings, IndexSearc
} catch (CircuitBreakingException e) {
// Circuit breaks from the cranky breaker are expected - it randomly fails, after all
assertThat(e.getMessage(), equalTo(CrankyCircuitBreakerService.ERROR_MESSAGE));
} catch (IOException e) {
} catch (RuntimeException e) {
if (e.getCause() instanceof ExecutionException executionException) {
if (executionException.getCause() instanceof CircuitBreakingException circuitBreakingException) {
// Circuit breaks from the cranky breaker are expected - it randomly fails, after all
assertThat(circuitBreakingException.getMessage(), equalTo(CrankyCircuitBreakerService.ERROR_MESSAGE));
return;
}
}
throw e;
}
}
Expand All @@ -497,7 +522,7 @@ private <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(

final IndexReaderContext ctx = searcher.getTopReaderContext();
final PipelineTree pipelines = builder.buildPipelineTree();
List<InternalAggregation> aggs = new ArrayList<>();
List<InternalAggregation> internalAggs = new ArrayList<>();
Query rewritten = searcher.rewrite(query);

if (splitLeavesIntoSeparateAggregators
Expand Down Expand Up @@ -533,7 +558,7 @@ private <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(
}
a.postCollection();
assertEquals(shouldBeCached, context.isCacheable());
aggs.add(a.buildTopLevel());
internalAggs.add(a.buildTopLevel());
} finally {
Releasables.close(context);
}
Expand All @@ -550,39 +575,61 @@ private <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(
fieldTypes
);
try {
C root = createAggregator(builder, context);
root.preCollection();
List<C> aggregators = new ArrayList<>();
if (context.isInSortOrderExecutionRequired()) {
C root = createAggregator(builder, context);
root.preCollection();
aggregators.add(root);
new TimeSeriesIndexSearcher(searcher, List.of()).search(rewritten, MultiBucketCollector.wrap(true, List.of(root)));
} else {
searcher.search(rewritten, MultiBucketCollector.wrap(true, List.of(root)).asCollector());
CollectorManager<Collector, Void> collectorManager = new CollectorManager<>() {
@Override
public Collector newCollector() throws IOException {
C collector = createAggregator(builder, context);
collector.preCollection();
aggregators.add(collector);
return MultiBucketCollector.wrap(true, List.of(collector)).asCollector();
}

@Override
public Void reduce(Collection<Collector> collectors) {
return null;
}
};
if (aggTestConfig.builder().supportsConcurrentExecution()) {
searcher.search(rewritten, collectorManager);
} else {
searcher.search(rewritten, collectorManager.newCollector());
}
}
for (C agg : aggregators) {
agg.postCollection();
internalAggs.add(agg.buildTopLevel());
}
root.postCollection();
aggs.add(root.buildTopLevel());
} finally {
Releasables.close(context);
}
}
assertRoundTrip(aggs);
assertRoundTrip(internalAggs);

BigArrays bigArraysForReduction = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), breakerService);
try {
if (aggTestConfig.incrementalReduce() && aggs.size() > 1) {
if (aggTestConfig.incrementalReduce() && internalAggs.size() > 1) {
// sometimes do an incremental reduce
int toReduceSize = aggs.size();
Collections.shuffle(aggs, random());
int toReduceSize = internalAggs.size();
Collections.shuffle(internalAggs, random());
int r = randomIntBetween(1, toReduceSize);
List<InternalAggregation> toReduce = aggs.subList(0, r);
List<InternalAggregation> toReduce = internalAggs.subList(0, r);
AggregationReduceContext reduceContext = new AggregationReduceContext.ForPartial(
bigArraysForReduction,
getMockScriptService(),
() -> false,
builder
);
A reduced = (A) aggs.get(0).reduce(toReduce, reduceContext);
aggs = new ArrayList<>(aggs.subList(r, toReduceSize));
aggs.add(reduced);
assertRoundTrip(aggs);
A reduced = (A) internalAggs.get(0).reduce(toReduce, reduceContext);
internalAggs = new ArrayList<>(internalAggs.subList(r, toReduceSize));
internalAggs.add(reduced);
assertRoundTrip(internalAggs);
}

// now do the final reduce
Expand All @@ -600,7 +647,7 @@ private <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(
);

@SuppressWarnings("unchecked")
A internalAgg = (A) aggs.get(0).reduce(aggs, reduceContext);
A internalAgg = (A) internalAggs.get(0).reduce(internalAggs, reduceContext);
assertRoundTrip(internalAgg);

// materialize any parent pipelines
Expand Down Expand Up @@ -870,16 +917,28 @@ protected static DirectoryReader wrapInMockESDirectoryReader(DirectoryReader dir
}

/**
* Added to randomly run with more assertions on the index searcher level,
* like {@link org.apache.lucene.tests.util.LuceneTestCase#newSearcher(IndexReader)}, which can't be used because it also
* wraps in the IndexSearcher's IndexReader with other implementations that we can't handle. (e.g. ParallelCompositeReader)
* Creates a {@link ContextIndexSearcher} that supports concurrency running each segment in a different thread. It randomly
* sets the IndexSearcher to run on concurrent mode.
*/
protected static IndexSearcher newIndexSearcher(DirectoryReader indexReader) throws IOException {
protected IndexSearcher newIndexSearcher(DirectoryReader indexReader) throws IOException {
if (randomBoolean()) {
// this executes basic query checks and asserts that weights are normalized only once etc.
return new AssertingIndexSearcher(random(), indexReader);
iverase marked this conversation as resolved.
Show resolved Hide resolved
} else {
return new IndexSearcher(indexReader);
return new ContextIndexSearcher(
indexReader,
IndexSearcher.getDefaultSimilarity(),
IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(),
randomBoolean(),
this.threadPoolExecutor
) {
@Override
protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
// get a thread per segment
return slices(leaves, 1, 1);
}
};
}
}

Expand Down Expand Up @@ -1179,6 +1238,8 @@ public IndexAnalyzers getIndexAnalyzers() {
public void cleanupReleasables() {
Releasables.close(releasables);
releasables.clear();
threadPoolExecutor.shutdown();
terminate(threadPool);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder;
import org.elasticsearch.test.AbstractBuilderTestCase;
import org.elasticsearch.xcontent.ToXContent;
Expand All @@ -27,6 +28,7 @@
import java.util.Set;

import static org.elasticsearch.test.EqualsHashCodeTestUtils.checkEqualsAndHashCode;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;

public abstract class BaseAggregationTestCase<AB extends AbstractAggregationBuilder<AB>> extends AbstractBuilderTestCase {
Expand Down Expand Up @@ -56,6 +58,15 @@ public void testFromXContent() throws IOException {
assertEquals(testAgg.hashCode(), newAgg.hashCode());
}

public void testSupportsConcurrentExecution() {
AB builder = createTestAggregatorBuilder();
boolean supportsConcurrency = builder.supportsConcurrentExecution();
AggregationBuilder bucketBuilder = new HistogramAggregationBuilder("test");
assertThat(bucketBuilder.supportsConcurrentExecution(), equalTo(true));
bucketBuilder.subAggregation(builder);
assertThat(bucketBuilder.supportsConcurrentExecution(), equalTo(supportsConcurrency));
}

/**
* Create at least 2 aggregations and test equality and hash
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ public boolean supportsSampling() {
return true;
}

@Override
public boolean supportsConcurrentExecution() {
return false;
}

/**
* Sets the field to use for this aggregation.
*/
Expand Down
Loading