diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilder.java index c5488376f43fe..99c27130014ea 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilder.java @@ -219,6 +219,18 @@ public boolean isInSortOrderExecutionRequired() { return false; } + /** + * Return false if this aggregation or any of the child aggregations does not support concurrent search + */ + public boolean supportsConcurrentExecution() { + for (AggregationBuilder builder : factoriesBuilder.getAggregatorFactories()) { + if (builder.supportsConcurrentExecution() == false) { + return false; + } + } + return isInSortOrderExecutionRequired() == false; + } + /** * Called by aggregations whose parents must be sequentially ordered. * @param type the type of the aggregation being validated diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregationBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregationBuilder.java index d722ee8f14e57..92888cb934234 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregationBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregationBuilder.java @@ -96,6 +96,11 @@ public boolean supportsSampling() { return true; } + @Override + public boolean supportsConcurrentExecution() { + return false; + } + public CompositeAggregationBuilder(StreamInput in) throws IOException { super(in); int num = in.readVInt(); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregationBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregationBuilder.java index 11cc6a43cb0d7..ee34922346b04 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregationBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregationBuilder.java @@ -80,6 +80,11 @@ public BucketCardinality bucketCardinality() { return BucketCardinality.ONE; } + @Override + public boolean supportsConcurrentExecution() { + return false; + } + @Override protected AggregatorFactory doBuild(AggregationContext context, AggregatorFactory parent, Builder subFactoriesBuilder) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregationBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregationBuilder.java index d43b0f7d55f6b..6249dae0e3445 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregationBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregationBuilder.java @@ -133,6 +133,11 @@ public boolean supportsSampling() { return true; } + @Override + public boolean supportsConcurrentExecution() { + return false; + } + @Override protected ValuesSourceType defaultValueSourceType() { return CoreValuesSourceType.KEYWORD; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/CardinalityAggregationBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/CardinalityAggregationBuilder.java index 7da9aafa59c80..12bbbac5c36a8 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/CardinalityAggregationBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/CardinalityAggregationBuilder.java @@ -112,6 +112,11 @@ public boolean supportsSampling() { return true; } + @Override + public boolean supportsConcurrentExecution() { + return false; + } + @Override protected boolean serializeTargetValueType(TransportVersion version) { return true; diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/metrics/CardinalityAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/metrics/CardinalityAggregatorTests.java index 6b0dd7806c40f..9437b9166d3b4 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/metrics/CardinalityAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/metrics/CardinalityAggregatorTests.java @@ -581,6 +581,32 @@ public void testIndexedMultiValuedString() throws IOException { ); } + public void testIndexedAllDifferentValues() throws IOException { + // Indexing enables testing of ordinal values + final CardinalityAggregationBuilder aggregationBuilder = new CardinalityAggregationBuilder("name").field("str_values"); + final MappedFieldType mappedFieldTypes = new KeywordFieldMapper.KeywordFieldType("str_values"); + int docs = randomIntBetween(50, 100); + CheckedConsumer buildIndex = iw -> { + + for (int i = 0; i < docs; i++) { + iw.addDocument( + List.of( + new StringField("str_values", "" + i, Field.Store.NO), + new SortedSetDocValuesField("str_values", new BytesRef("" + i)) + ) + ); + if (rarely()) { + iw.commit(); + } + } + }; + + testAggregation(aggregationBuilder, new MatchAllDocsQuery(), buildIndex, card -> { + assertEquals(docs, card.getValue()); + assertTrue(AggregationInspectionHelper.hasValue(card)); + }, mappedFieldTypes); + } + public void testUnmappedMissingString() throws IOException { CardinalityAggregationBuilder aggregationBuilder = new CardinalityAggregationBuilder("name").field("number").missing("🍌🍌🍌"); diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index 004cad11320b2..ec8a8ef3be5d4 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -24,6 +24,7 @@ import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.sandbox.document.HalfFloatPoint; import org.apache.lucene.search.Collector; +import org.apache.lucene.search.CollectorManager; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.Query; @@ -58,6 +59,7 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.common.util.MockPageCacheRecycler; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; @@ -136,6 +138,8 @@ import org.elasticsearch.search.internal.SubSearchContext; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.InternalAggregationTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.ContextParser; import org.elasticsearch.xcontent.XContentBuilder; import org.junit.After; @@ -145,6 +149,7 @@ import java.net.InetAddress; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -152,6 +157,8 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadPoolExecutor; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.stream.Stream; @@ -161,7 +168,6 @@ import static java.util.stream.Collectors.toList; import static org.elasticsearch.test.InternalAggregationTestCase.DEFAULT_MAX_BUCKETS; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.in; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.sameInstance; @@ -177,7 +183,7 @@ */ public abstract class AggregatorTestCase extends ESTestCase { private NamedWriteableRegistry namedWriteableRegistry; - private List releasables = new ArrayList<>(); + private final List releasables = new ArrayList<>(); protected ValuesSourceRegistry valuesSourceRegistry; private AnalysisModule analysisModule; @@ -192,9 +198,21 @@ public abstract class AggregatorTestCase extends ESTestCase { CompletionFieldMapper.CONTENT_TYPE, // TODO support completion FieldAliasMapper.CONTENT_TYPE // TODO support alias ); + ThreadPool threadPool; + ThreadPoolExecutor threadPoolExecutor; @Before public final void initPlugins() { + int numThreads = randomIntBetween(2, 4); + threadPool = new TestThreadPool(AggregatorTestCase.class.getName()); + threadPoolExecutor = EsExecutors.newFixed( + "test", + numThreads, + 10, + EsExecutors.daemonThreadFactory("test"), + threadPool.getThreadContext(), + randomBoolean() + ); List plugins = new ArrayList<>(getSearchPlugins()); plugins.add(new AggCardinalityUpperBoundPlugin()); SearchModule searchModule = new SearchModule(Settings.EMPTY, plugins); @@ -475,7 +493,14 @@ private void runWithCrankyCircuitBreaker(IndexSettings indexSettings, IndexSearc } catch (CircuitBreakingException e) { // Circuit breaks from the cranky breaker are expected - it randomly fails, after all assertThat(e.getMessage(), equalTo(CrankyCircuitBreakerService.ERROR_MESSAGE)); - } catch (IOException e) { + } catch (RuntimeException e) { + if (e.getCause() instanceof ExecutionException executionException) { + if (executionException.getCause() instanceof CircuitBreakingException circuitBreakingException) { + // Circuit breaks from the cranky breaker are expected - it randomly fails, after all + assertThat(circuitBreakingException.getMessage(), equalTo(CrankyCircuitBreakerService.ERROR_MESSAGE)); + return; + } + } throw e; } } @@ -497,7 +522,7 @@ private A searchAndReduce( final IndexReaderContext ctx = searcher.getTopReaderContext(); final PipelineTree pipelines = builder.buildPipelineTree(); - List aggs = new ArrayList<>(); + List internalAggs = new ArrayList<>(); Query rewritten = searcher.rewrite(query); if (splitLeavesIntoSeparateAggregators @@ -533,7 +558,7 @@ private A searchAndReduce( } a.postCollection(); assertEquals(shouldBeCached, context.isCacheable()); - aggs.add(a.buildTopLevel()); + internalAggs.add(a.buildTopLevel()); } finally { Releasables.close(context); } @@ -550,39 +575,61 @@ private A searchAndReduce( fieldTypes ); try { - C root = createAggregator(builder, context); - root.preCollection(); + List aggregators = new ArrayList<>(); if (context.isInSortOrderExecutionRequired()) { + C root = createAggregator(builder, context); + root.preCollection(); + aggregators.add(root); new TimeSeriesIndexSearcher(searcher, List.of()).search(rewritten, MultiBucketCollector.wrap(true, List.of(root))); } else { - searcher.search(rewritten, MultiBucketCollector.wrap(true, List.of(root)).asCollector()); + CollectorManager collectorManager = new CollectorManager<>() { + @Override + public Collector newCollector() throws IOException { + C collector = createAggregator(builder, context); + collector.preCollection(); + aggregators.add(collector); + return MultiBucketCollector.wrap(true, List.of(collector)).asCollector(); + } + + @Override + public Void reduce(Collection collectors) { + return null; + } + }; + if (aggTestConfig.builder().supportsConcurrentExecution()) { + searcher.search(rewritten, collectorManager); + } else { + searcher.search(rewritten, collectorManager.newCollector()); + } + } + for (C agg : aggregators) { + agg.postCollection(); + internalAggs.add(agg.buildTopLevel()); } - root.postCollection(); - aggs.add(root.buildTopLevel()); } finally { Releasables.close(context); } } - assertRoundTrip(aggs); + assertRoundTrip(internalAggs); BigArrays bigArraysForReduction = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), breakerService); try { - if (aggTestConfig.incrementalReduce() && aggs.size() > 1) { + if (aggTestConfig.incrementalReduce() && internalAggs.size() > 1) { // sometimes do an incremental reduce - int toReduceSize = aggs.size(); - Collections.shuffle(aggs, random()); + int toReduceSize = internalAggs.size(); + Collections.shuffle(internalAggs, random()); int r = randomIntBetween(1, toReduceSize); - List toReduce = aggs.subList(0, r); + List toReduce = internalAggs.subList(0, r); AggregationReduceContext reduceContext = new AggregationReduceContext.ForPartial( bigArraysForReduction, getMockScriptService(), () -> false, builder ); - A reduced = (A) aggs.get(0).reduce(toReduce, reduceContext); - aggs = new ArrayList<>(aggs.subList(r, toReduceSize)); - aggs.add(reduced); - assertRoundTrip(aggs); + A reduced = (A) internalAggs.get(0).reduce(toReduce, reduceContext); + internalAggs = new ArrayList<>(internalAggs.subList(r, toReduceSize)); + internalAggs.add(reduced); + assertRoundTrip(internalAggs); } // now do the final reduce @@ -600,7 +647,7 @@ private A searchAndReduce( ); @SuppressWarnings("unchecked") - A internalAgg = (A) aggs.get(0).reduce(aggs, reduceContext); + A internalAgg = (A) internalAggs.get(0).reduce(internalAggs, reduceContext); assertRoundTrip(internalAgg); // materialize any parent pipelines @@ -870,16 +917,28 @@ protected static DirectoryReader wrapInMockESDirectoryReader(DirectoryReader dir } /** - * Added to randomly run with more assertions on the index searcher level, - * like {@link org.apache.lucene.tests.util.LuceneTestCase#newSearcher(IndexReader)}, which can't be used because it also - * wraps in the IndexSearcher's IndexReader with other implementations that we can't handle. (e.g. ParallelCompositeReader) + * Creates a {@link ContextIndexSearcher} that supports concurrency running each segment in a different thread. It randomly + * sets the IndexSearcher to run on concurrent mode. */ - protected static IndexSearcher newIndexSearcher(DirectoryReader indexReader) throws IOException { + protected IndexSearcher newIndexSearcher(DirectoryReader indexReader) throws IOException { if (randomBoolean()) { // this executes basic query checks and asserts that weights are normalized only once etc. return new AssertingIndexSearcher(random(), indexReader); } else { - return new IndexSearcher(indexReader); + return new ContextIndexSearcher( + indexReader, + IndexSearcher.getDefaultSimilarity(), + IndexSearcher.getDefaultQueryCache(), + IndexSearcher.getDefaultQueryCachingPolicy(), + randomBoolean(), + this.threadPoolExecutor + ) { + @Override + protected LeafSlice[] slices(List leaves) { + // get a thread per segment + return slices(leaves, 1, 1); + } + }; } } @@ -1179,6 +1238,8 @@ public IndexAnalyzers getIndexAnalyzers() { public void cleanupReleasables() { Releasables.close(releasables); releasables.clear(); + threadPoolExecutor.shutdown(); + terminate(threadPool); } /** diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/BaseAggregationTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/BaseAggregationTestCase.java index 2c34d591b9fb2..eed1e4a921555 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/BaseAggregationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/BaseAggregationTestCase.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder; import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder; import org.elasticsearch.test.AbstractBuilderTestCase; import org.elasticsearch.xcontent.ToXContent; @@ -27,6 +28,7 @@ import java.util.Set; import static org.elasticsearch.test.EqualsHashCodeTestUtils.checkEqualsAndHashCode; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; public abstract class BaseAggregationTestCase> extends AbstractBuilderTestCase { @@ -56,6 +58,15 @@ public void testFromXContent() throws IOException { assertEquals(testAgg.hashCode(), newAgg.hashCode()); } + public void testSupportsConcurrentExecution() { + AB builder = createTestAggregatorBuilder(); + boolean supportsConcurrency = builder.supportsConcurrentExecution(); + AggregationBuilder bucketBuilder = new HistogramAggregationBuilder("test"); + assertThat(bucketBuilder.supportsConcurrentExecution(), equalTo(true)); + bucketBuilder.subAggregation(builder); + assertThat(bucketBuilder.supportsConcurrentExecution(), equalTo(supportsConcurrency)); + } + /** * Create at least 2 aggregations and test equality and hash */ diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregationBuilder.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregationBuilder.java index 4b02f084554cb..487da5733e183 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregationBuilder.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregationBuilder.java @@ -152,6 +152,11 @@ public boolean supportsSampling() { return true; } + @Override + public boolean supportsConcurrentExecution() { + return false; + } + /** * Sets the field to use for this aggregation. */ diff --git a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregatorTests.java b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregatorTests.java index 41130b0104d49..8ed213bf2ab98 100644 --- a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregatorTests.java +++ b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregatorTests.java @@ -589,6 +589,33 @@ public void testNoTerms() { } + public void testShardSize() throws IOException { + testCase( + new MatchAllDocsQuery(), + List.of( + new MultiValuesSourceFieldConfig.Builder().setFieldName(INT_FIELD).build(), + new MultiValuesSourceFieldConfig.Builder().setFieldName(DATE_FIELD).build() + ), + ab -> ab.size(1).shardSize(1), + iw -> { + for (int i = 0; i < 100; i++) { + iw.addDocument(docWithDate("2020-01-01", new NumericDocValuesField(INT_FIELD, 1))); + if (i < 80) { + iw.addDocument(docWithDate("2020-01-01", new NumericDocValuesField(INT_FIELD, 2))); + } + if (i < 60) { + iw.addDocument(docWithDate("2020-01-01", new NumericDocValuesField(INT_FIELD, 3))); + } + } + }, + h -> { + assertThat(h.getBuckets(), hasSize(1)); + assertThat(h.getBuckets().get(0).getDocCount(), equalTo(100L)); + }, + false + ); + } + private void testCase( Query query, String[] terms, @@ -609,6 +636,17 @@ private void testCase( Consumer builderSetup, CheckedConsumer buildIndex, Consumer verify + ) throws IOException { + testCase(query, terms, builderSetup, buildIndex, verify, randomBoolean()); + } + + private void testCase( + Query query, + List terms, + Consumer builderSetup, + CheckedConsumer buildIndex, + Consumer verify, + boolean withSplitLeavesIntoSeparateAggregators ) throws IOException { MappedFieldType dateType = dateFieldType(DATE_FIELD); MappedFieldType intType = new NumberFieldMapper.NumberFieldType(INT_FIELD, NumberFieldMapper.NumberType.INTEGER); @@ -630,7 +668,9 @@ private void testCase( builder.size(randomIntBetween(10, 200)); } } - testCase(buildIndex, verify, new AggTestConfig(builder, dateType, intType, floatType, keywordType).withQuery(query)); + AggTestConfig aggTestConfig = new AggTestConfig(builder, dateType, intType, floatType, keywordType).withQuery(query) + .withSplitLeavesIntoSeperateAggregators(withSplitLeavesIntoSeparateAggregators); + testCase(buildIndex, verify, aggTestConfig); } @Override