Skip to content

Commit

Permalink
Skip sibling pipeline aggregators reduction during non-final reduce
Browse files Browse the repository at this point in the history
Today a coordinating node forces a final reduction of sibling pipeline aggregators whenever reducing aggs, unless it is reducing aggs incrementally. This works well for incremental reduction of aggs, but breaks CCS when minimizing roundtrips as each cluster ends up reducing its own pipeline aggregators locally while that should only be done by the CCS coordinating node later. This causes issues as after their reduction,  pipeline aggs cannot be further reduced, which is what happens with CCS causing errors like "java.lang.UnsupportedOperationException: Not supported" being returned.

Each coordinating node should rather honour the reduce context flag that
indicates whether we are executing a final reduce or not. If not, it should leave the sibling pipeline aggregations alone.

Note that his bug affects only pipeline aggs that don't have a parent in
the aggs tree, while all the others work well.

Relates to elastic#40059 but does not fix it yet, as the CCS coordinating node also needs to be adapted to recreate sibling pipeline aggregators from the request.
  • Loading branch information
javanna committed Mar 15, 2019
1 parent 4c803e5 commit 3073152
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.dfs.AggregatedDfs;
import org.elasticsearch.search.dfs.DfsSearchResult;
Expand All @@ -65,8 +64,6 @@
import java.util.Map;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

public final class SearchPhaseController {

Expand Down Expand Up @@ -488,8 +485,8 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
reducedCompletionSuggestions = reducedSuggest.filter(CompletionSuggestion.class);
}
ReduceContext reduceContext = reduceContextFunction.apply(performFinalReduce);
final InternalAggregations aggregations = aggregationsList.isEmpty() ? null : reduceAggs(aggregationsList,
firstResult.pipelineAggregators(), reduceContext);
final InternalAggregations aggregations = aggregationsList.isEmpty() ? null :
InternalAggregations.reduce(aggregationsList, firstResult.pipelineAggregators(), reduceContext);
final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
final SortedTopDocs sortedTopDocs = sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size,
reducedCompletionSuggestions);
Expand All @@ -499,32 +496,6 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
firstResult.sortValueFormats(), numReducePhases, size, from, false);
}

/**
* Performs an intermediate reduce phase on the aggregations. For instance with this reduce phase never prune information
* that relevant for the final reduce step. For final reduce see {@link #reduceAggs(List, List, ReduceContext)}
*/
private InternalAggregations reduceAggsIncrementally(List<InternalAggregations> aggregationsList) {
ReduceContext reduceContext = reduceContextFunction.apply(false);
return aggregationsList.isEmpty() ? null : reduceAggs(aggregationsList,
null, reduceContext);
}

private static InternalAggregations reduceAggs(List<InternalAggregations> aggregationsList,
List<SiblingPipelineAggregator> pipelineAggregators, ReduceContext reduceContext) {
InternalAggregations aggregations = InternalAggregations.reduce(aggregationsList, reduceContext);
if (pipelineAggregators != null) {
List<InternalAggregation> newAggs = StreamSupport.stream(aggregations.spliterator(), false)
.map((p) -> (InternalAggregation) p)
.collect(Collectors.toList());
for (SiblingPipelineAggregator pipelineAggregator : pipelineAggregators) {
InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(newAggs), reduceContext);
newAggs.add(newAgg);
}
return new InternalAggregations(newAggs);
}
return aggregations;
}

public static final class ReducedQueryPhase {
// the sum of all hits across all reduces shards
final TotalHits totalHits;
Expand Down Expand Up @@ -644,7 +615,8 @@ public void consumeResult(SearchPhaseResult result) {
private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
if (index == bufferSize) {
if (hasAggs) {
InternalAggregations reducedAggs = controller.reduceAggsIncrementally(Arrays.asList(aggsBuffer));
ReduceContext reduceContext = controller.reduceContextFunction.apply(false);
InternalAggregations reducedAggs = InternalAggregations.reduce(Arrays.asList(aggsBuffer), reduceContext);
Arrays.fill(aggsBuffer, null);
aggsBuffer[0] = reducedAggs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,6 @@ public void onFailure(Exception e) {
if (localIndices != null) {
ActionListener<SearchResponse> ccsListener = createCCSListener(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY,
false, countDown, skippedClusters, exceptions, searchResponseMerger, totalClusters, listener);
//here we provide the empty string a cluster alias, which means no prefix in index name,
//but the coord node will perform non final reduce as it's not null.
SearchRequest ccsLocalSearchRequest = SearchRequest.crossClusterSearch(searchRequest, localIndices.indices(),
RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, timeProvider.getAbsoluteStartMillis(), false);
localSearchConsumer.accept(ccsLocalSearchRequest, ccsListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,7 @@ public AggParseContext(String name) {
}
}

public static final AggregatorFactories EMPTY = new AggregatorFactories(new AggregatorFactory<?>[0],
new ArrayList<PipelineAggregationBuilder>());
public static final AggregatorFactories EMPTY = new AggregatorFactories(new AggregatorFactory<?>[0], new ArrayList<>());

private AggregatorFactory<?>[] factories;
private List<PipelineAggregationBuilder> pipelineAggregatorFactories;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public ReduceContext(BigArrays bigArrays, ScriptService scriptService, IntConsum
/**
* Returns <code>true</code> iff the current reduce phase is the final reduce phase. This indicates if operations like
* pipeline aggregations should be applied or if specific features like {@code minDocCount} should be taken into account.
* Operations that are potentially loosing information can only be applied during the final reduce phase.
* Operations that are potentially losing information can only be applied during the final reduce phase.
*/
public boolean isFinalReduce() {
return isFinalReduce;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -52,19 +53,26 @@ private InternalAggregations() {
}

/**
* Constructs a new addAggregation.
* Constructs a new aggregation.
*/
public InternalAggregations(List<InternalAggregation> aggregations) {
super(aggregations);
}

/**
* Reduces the given lists of addAggregation.
*
* @param aggregationsList A list of aggregation to reduce
* @return The reduced addAggregation
* Reduces the given list of aggregations
*/
public static InternalAggregations reduce(List<InternalAggregations> aggregationsList, ReduceContext context) {
return reduce(aggregationsList, null, context);
}

/**
* Reduces the given list of aggregations as well as the provided sibling pipeline aggregators.
* Note that sibling pipeline aggregators are ignored when non final reduction is performed.
*/
public static InternalAggregations reduce(List<InternalAggregations> aggregationsList,
List<SiblingPipelineAggregator> siblingPipelineAggregators,
ReduceContext context) {
if (aggregationsList.isEmpty()) {
return null;
}
Expand All @@ -89,6 +97,15 @@ public static InternalAggregations reduce(List<InternalAggregations> aggregation
InternalAggregation first = aggregations.get(0); // the list can't be empty as it's created on demand
reducedAggregations.add(first.reduce(aggregations, context));
}

if (siblingPipelineAggregators != null) {
if (context.isFinalReduce()) {
for (SiblingPipelineAggregator pipelineAggregator : siblingPipelineAggregators) {
InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(reducedAggregations), context);
reducedAggregations.add(newAgg);
}
}
}
return new InternalAggregations(reducedAggregations);
}

Expand Down

0 comments on commit 3073152

Please sign in to comment.