Skip to content

Commit

Permalink
Remove aggregation's postCollect phase (#64016)
Browse files Browse the repository at this point in the history
After #63811 it became clear to me that `postCollect` is kind of
dangerous and not all that useful. So this removes it.

The trouble with `postCollect` is that it all happened right after we
finished calling `collect` on the `LeafBucketCollectors` but before we
built the aggregation results. But in #63811 we found out that we can't
call `postCollect` on the children of `parent` or `child` aggregators
until we know which *which* aggregation results we're building.

So this removes `postCollect` and moves all of the things we did at
post-collect phase into `buildAggregations` or into hooks called in
those methods.
  • Loading branch information
nik9000 authored Oct 28, 2020
1 parent 2251a7a commit 3af540b
Show file tree
Hide file tree
Showing 43 changed files with 34 additions and 231 deletions.
12 changes: 3 additions & 9 deletions docs/reference/search/profile.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -796,9 +796,7 @@ This yields the following aggregation profile output:
"collect": 45786,
"collect_count": 4,
"build_leaf_collector": 18211,
"build_leaf_collector_count": 1,
"post_collection": 929,
"post_collection_count": 1
"build_leaf_collector_count": 1
},
"debug": {
"total_buckets": 1,
Expand All @@ -819,9 +817,7 @@ This yields the following aggregation profile output:
"collect": 69401,
"collect_count": 4,
"build_leaf_collector": 8150,
"build_leaf_collector_count": 1,
"post_collection": 1584,
"post_collection_count": 1
"build_leaf_collector_count": 1
},
"children": [
{
Expand All @@ -838,9 +834,7 @@ This yields the following aggregation profile output:
"collect": 61611,
"collect_count": 4,
"build_leaf_collector": 5564,
"build_leaf_collector_count": 1,
"post_collection": 471,
"post_collection_count": 1
"build_leaf_collector_count": 1
},
"debug": {
"total_buckets": 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,7 @@ public void collect(int docId, long owningBucketOrd) throws IOException {
}

@Override
public void postCollection() throws IOException {
// Delaying until beforeBuildingBuckets
}

@Override
protected void beforeBuildingBuckets(long[] ordsToCollect) throws IOException {
protected void prepareSubAggs(long[] bucketOrdsToCollect) throws IOException {
IndexReader indexReader = searcher().getIndexReader();
for (LeafReaderContext ctx : indexReader.leaves()) {
Scorer childDocsScorer = outFilter.scorer(ctx);
Expand Down Expand Up @@ -160,14 +155,13 @@ public int docID() {
* structure that maps a primitive long to a list of primitive
* longs.
*/
for (long owningBucketOrd: ordsToCollect) {
if (collectionStrategy.exists(owningBucketOrd, globalOrdinal)) {
collectBucket(sub, docId, owningBucketOrd);
for (long o: bucketOrdsToCollect) {
if (collectionStrategy.exists(o, globalOrdinal)) {
collectBucket(sub, docId, o);
}
}
}
}
super.postCollection(); // Run post collection after collecting the sub-aggs
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,6 @@ setup:
- gt: { profile.shards.0.aggregations.0.breakdown.build_leaf_collector: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.collect: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.build_aggregation: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.post_collection: 0 }
- match: { profile.shards.0.aggregations.0.debug.empty_collectors_used: 0 }
- gt: { profile.shards.0.aggregations.0.debug.numeric_collectors_used: 0 }
- match: { profile.shards.0.aggregations.0.debug.ordinals_collectors_used: 0 }
Expand All @@ -258,7 +257,6 @@ setup:
- gt: { profile.shards.0.aggregations.0.breakdown.build_leaf_collector: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.collect: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.build_aggregation: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.post_collection: 0 }
- match: { profile.shards.0.aggregations.0.debug.empty_collectors_used: 0 }
- gt: { profile.shards.0.aggregations.0.debug.numeric_collectors_used: 0 }
- match: { profile.shards.0.aggregations.0.debug.ordinals_collectors_used: 0 }
Expand All @@ -285,4 +283,3 @@ setup:
- gt: { profile.shards.0.aggregations.0.breakdown.build_leaf_collector: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.collect: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.build_aggregation: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.post_collection: 0 }
Original file line number Diff line number Diff line change
Expand Up @@ -590,8 +590,5 @@ public ScoreMode scoreMode() {

@Override
public void preCollection() throws IOException {}

@Override
public void postCollection() throws IOException {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,21 +52,18 @@
public class AggregationProfilerIT extends ESIntegTestCase {
private static final String BUILD_LEAF_COLLECTOR = AggregationTimingType.BUILD_LEAF_COLLECTOR.toString();
private static final String COLLECT = AggregationTimingType.COLLECT.toString();
private static final String POST_COLLECTION = AggregationTimingType.POST_COLLECTION.toString();
private static final String INITIALIZE = AggregationTimingType.INITIALIZE.toString();
private static final String BUILD_AGGREGATION = AggregationTimingType.BUILD_AGGREGATION.toString();
private static final String REDUCE = AggregationTimingType.REDUCE.toString();
private static final Set<String> BREAKDOWN_KEYS = Set.of(
INITIALIZE,
BUILD_LEAF_COLLECTOR,
COLLECT,
POST_COLLECTION,
BUILD_AGGREGATION,
REDUCE,
INITIALIZE + "_count",
BUILD_LEAF_COLLECTOR + "_count",
COLLECT + "_count",
POST_COLLECTION + "_count",
BUILD_AGGREGATION + "_count",
REDUCE + "_count"
);
Expand Down Expand Up @@ -330,7 +327,6 @@ public void testDiversifiedAggProfile() {
assertThat(diversifyBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(diversifyBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
assertThat(diversifyBreakdown.get(COLLECT), greaterThan(0L));
assertThat(diversifyBreakdown.get(POST_COLLECTION), greaterThan(0L));
assertThat(diversifyBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
assertThat(diversifyBreakdown.get(REDUCE), equalTo(0L));
assertThat(diversifyAggResult.getDebugInfo(), equalTo(Map.of(DEFERRED, List.of("max"))));
Expand All @@ -347,7 +343,6 @@ public void testDiversifiedAggProfile() {
assertThat(diversifyBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(diversifyBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
assertThat(diversifyBreakdown.get(COLLECT), greaterThan(0L));
assertThat(diversifyBreakdown.get(POST_COLLECTION), greaterThan(0L));
assertThat(maxBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
assertThat(maxBreakdown.get(REDUCE), equalTo(0L));
assertThat(maxAggResult.getDebugInfo(), equalTo(Map.of()));
Expand Down Expand Up @@ -391,7 +386,6 @@ public void testComplexProfile() {
assertThat(histoBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(histoBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
assertThat(histoBreakdown.get(COLLECT), greaterThan(0L));
assertThat(histoBreakdown.get(POST_COLLECTION), greaterThan(0L));
assertThat(histoBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
assertThat(histoBreakdown.get(REDUCE), equalTo(0L));
Map<String, Object> histoDebugInfo = histoAggResult.getDebugInfo();
Expand All @@ -413,7 +407,6 @@ public void testComplexProfile() {
assertThat(tagsBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(tagsBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
assertThat(tagsBreakdown.get(COLLECT), greaterThan(0L));
assertThat(tagsBreakdown.get(POST_COLLECTION), greaterThan(0L));
assertThat(tagsBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
assertThat(tagsBreakdown.get(REDUCE), equalTo(0L));
assertRemapTermsDebugInfo(tagsAggResult);
Expand All @@ -432,7 +425,6 @@ public void testComplexProfile() {
assertThat(avgBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(avgBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
assertThat(avgBreakdown.get(COLLECT), greaterThan(0L));
assertThat(avgBreakdown.get(POST_COLLECTION), greaterThan(0L));
assertThat(avgBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
assertThat(avgBreakdown.get(REDUCE), equalTo(0L));
assertThat(avgAggResult.getDebugInfo(), equalTo(Map.of()));
Expand All @@ -448,7 +440,6 @@ public void testComplexProfile() {
assertThat(maxBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(maxBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
assertThat(maxBreakdown.get(COLLECT), greaterThan(0L));
assertThat(maxBreakdown.get(POST_COLLECTION), greaterThan(0L));
assertThat(maxBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
assertThat(maxBreakdown.get(REDUCE), equalTo(0L));
assertThat(maxAggResult.getDebugInfo(), equalTo(Map.of()));
Expand All @@ -464,7 +455,6 @@ public void testComplexProfile() {
assertThat(stringsBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(stringsBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
assertThat(stringsBreakdown.get(COLLECT), greaterThan(0L));
assertThat(stringsBreakdown.get(POST_COLLECTION), greaterThan(0L));
assertThat(stringsBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
assertThat(stringsBreakdown.get(REDUCE), equalTo(0L));
assertRemapTermsDebugInfo(stringsAggResult);
Expand All @@ -483,7 +473,6 @@ public void testComplexProfile() {
assertThat(avgBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(avgBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
assertThat(avgBreakdown.get(COLLECT), greaterThan(0L));
assertThat(avgBreakdown.get(POST_COLLECTION), greaterThan(0L));
assertThat(avgBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
assertThat(avgBreakdown.get(REDUCE), equalTo(0L));
assertThat(avgAggResult.getDebugInfo(), equalTo(Map.of()));
Expand All @@ -499,7 +488,6 @@ public void testComplexProfile() {
assertThat(maxBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(maxBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
assertThat(maxBreakdown.get(COLLECT), greaterThan(0L));
assertThat(maxBreakdown.get(POST_COLLECTION), greaterThan(0L));
assertThat(maxBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
assertThat(maxBreakdown.get(REDUCE), equalTo(0L));
assertThat(maxAggResult.getDebugInfo(), equalTo(Map.of()));
Expand All @@ -516,7 +504,6 @@ public void testComplexProfile() {
assertThat(tagsBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(tagsBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
assertThat(tagsBreakdown.get(COLLECT), greaterThan(0L));
assertThat(tagsBreakdown.get(POST_COLLECTION), greaterThan(0L));
assertThat(tagsBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
assertThat(tagsBreakdown.get(REDUCE), equalTo(0L));
assertRemapTermsDebugInfo(tagsAggResult);
Expand All @@ -535,7 +522,6 @@ public void testComplexProfile() {
assertThat(avgBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(avgBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
assertThat(avgBreakdown.get(COLLECT), greaterThan(0L));
assertThat(avgBreakdown.get(POST_COLLECTION), greaterThan(0L));
assertThat(avgBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
assertThat(avgBreakdown.get(REDUCE), equalTo(0L));
assertThat(avgAggResult.getDebugInfo(), equalTo(Map.of()));
Expand All @@ -551,7 +537,6 @@ public void testComplexProfile() {
assertThat(maxBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(maxBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
assertThat(maxBreakdown.get(COLLECT), greaterThan(0L));
assertThat(maxBreakdown.get(POST_COLLECTION), greaterThan(0L));
assertThat(maxBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
assertThat(maxBreakdown.get(REDUCE), equalTo(0L));
assertThat(maxAggResult.getDebugInfo(), equalTo(Map.of()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ public void execute(SearchContext context) {
context.aggregations().resetBucketMultiConsumer();
for (Aggregator aggregator : context.aggregations().aggregators()) {
try {
aggregator.postCollection();
aggregations.add(aggregator.buildTopLevel());
} catch (IOException e) {
throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,12 @@ public interface BucketComparator {

/**
* Build the results of this aggregation.
* @param owningBucketOrds the ordinals of the buckets that we want to
* @param ordsToCollect the ordinals of the buckets that we want to
* collect from this aggregation
* @return the results for each ordinal, in the same order as the array
* of ordinals
*/
public abstract InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException;
public abstract InternalAggregation[] buildAggregations(long[] ordsToCollect) throws IOException;

/**
* Build the result of this aggregation if it is at the "top level"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,6 @@ public void preCollection() throws IOException {
badState();
}

@Override
public void postCollection() throws IOException {
badState();
}
@Override
public ScoreMode scoreMode() {
badState();
Expand Down Expand Up @@ -243,19 +239,6 @@ public Aggregator subAggregator(String aggName) {
return subAggregatorbyName.get(aggName);
}

/**
* Called after collection of all document is done.
* <p>
* Warning: this is not final only to allow the parent join aggregator
* to delay this until building buckets.
*/
@Override
public void postCollection() throws IOException {
// post-collect this agg before subs to make it possible to buffer and then replay in postCollection()
doPostCollection();
collectableSubAggregators.postCollection();
}

/** Called upon release of the aggregator. */
@Override
public void close() {
Expand All @@ -269,12 +252,6 @@ public void close() {
/** Release instance-specific data. */
protected void doClose() {}

/**
* Can be overridden by aggregator implementation to be called back when the collection phase ends.
*/
protected void doPostCollection() throws IOException {
}

protected final InternalAggregations buildEmptySubAggregations() {
List<InternalAggregation> aggs = new ArrayList<>();
for (Aggregator aggregator : subAggregators) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ public void preCollection() throws IOException {
// no-op
}
@Override
public void postCollection() throws IOException {
// no-op
}
@Override
public ScoreMode scoreMode() {
return ScoreMode.COMPLETE_NO_SCORES;
}
Expand All @@ -58,10 +54,4 @@ public ScoreMode scoreMode() {
* Pre collection callback.
*/
public abstract void preCollection() throws IOException;

/**
* Post-collection callback.
*/
public abstract void postCollection() throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,6 @@ public void preCollection() throws IOException {
}
}

@Override
public void postCollection() throws IOException {
for (BucketCollector collector : collectors) {
collector.postCollection();
}
}

@Override
public String toString() {
return Arrays.toString(collectors);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ static class Entry {
protected PackedLongValues.Builder docDeltasBuilder;
protected PackedLongValues.Builder bucketsBuilder;
protected long maxBucket = -1;
protected boolean finished = false;
protected LongHash selectedBuckets;

/**
Expand Down Expand Up @@ -137,20 +136,12 @@ public void preCollection() throws IOException {
collector.preCollection();
}

@Override
public void postCollection() throws IOException {
finishLeaf();
finished = true;
}

/**
* Replay the wrapped collector, but only on a selection of buckets.
*/
@Override
public void prepareSelectedBuckets(long... selectedBuckets) throws IOException {
if (finished == false) {
throw new IllegalStateException("Cannot replay yet, collection is not finished: postCollect() has not been called");
}
finishLeaf();
if (this.selectedBuckets != null) {
throw new IllegalStateException("Already been replayed");
}
Expand Down Expand Up @@ -202,7 +193,6 @@ public void prepareSelectedBuckets(long... selectedBuckets) throws IOException {
// continue with the following leaf
}
}
collector.postCollection();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,9 @@ public final int bucketDocCount(long bucketOrd) {
}

/**
* Hook to allow taking an action before building buckets.
* Hook to allow taking an action before building the sub agg results.
*/
protected void beforeBuildingBuckets(long[] ordsToCollect) throws IOException {}
protected void prepareSubAggs(long[] bucketOrdsToCollect) throws IOException {}

/**
* Build the results of the sub-aggregations of the buckets at each of
Expand All @@ -186,7 +186,7 @@ protected void beforeBuildingBuckets(long[] ordsToCollect) throws IOException {}
* array of ordinals
*/
protected final InternalAggregations[] buildSubAggsForBuckets(long[] bucketOrdsToCollect) throws IOException {
beforeBuildingBuckets(bucketOrdsToCollect);
prepareSubAggs(bucketOrdsToCollect);
InternalAggregation[][] aggregations = new InternalAggregation[subAggregators.length][];
for (int i = 0; i < subAggregators.length; i++) {
aggregations[i] = subAggregators[i].buildAggregations(bucketOrdsToCollect);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ protected boolean shouldDefer(Aggregator aggregator) {
}

@Override
protected void beforeBuildingBuckets(long[] ordsToCollect) throws IOException {
protected final void prepareSubAggs(long[] bucketOrdsToCollect) throws IOException {
if (recordingWrapper != null) {
recordingWrapper.prepareSelectedBuckets(ordsToCollect);
recordingWrapper.prepareSelectedBuckets(bucketOrdsToCollect);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,6 @@ public void preCollection() throws IOException {
"Deferred collectors cannot be collected directly. They must be collected through the recording wrapper.");
}

@Override
public void postCollection() throws IOException {
throw new IllegalStateException(
"Deferred collectors cannot be collected directly. They must be collected through the recording wrapper.");
}

@Override
public Aggregator resolveSortPath(PathElement next, Iterator<PathElement> path) {
return in.resolveSortPath(next, path);
Expand Down
Loading

0 comments on commit 3af540b

Please sign in to comment.