Skip to content

Commit

Permalink
Use List instead of priority queue for stable sorting in bucket sort …
Browse files Browse the repository at this point in the history
…aggregator (#36748)

Update BucketSortPipelineAggregator to use a List and Collections.sort() for sorting instead of a priority queue. This preserves the order for equal values. Closes #36322.
  • Loading branch information
chatzikalymnios authored and dimitris-athanasiou committed Jan 9, 2019
1 parent d817b46 commit 606c0ab
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.elasticsearch.search.aggregations.pipeline.bucketsort;


import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.aggregations.InternalAggregation;
Expand All @@ -36,7 +35,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -97,22 +95,22 @@ public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext
return originalAgg.create(new ArrayList<>(buckets.subList(from, Math.min(from + currentSize, bucketsCount))));
}

int queueSize = Math.min(from + currentSize, bucketsCount);
PriorityQueue<ComparableBucket> ordered = new TopNPriorityQueue(queueSize);
List<ComparableBucket> ordered = new ArrayList<>();
for (InternalMultiBucketAggregation.InternalBucket bucket : buckets) {
ComparableBucket comparableBucket = new ComparableBucket(originalAgg, bucket);
if (comparableBucket.skip() == false) {
ordered.insertWithOverflow(new ComparableBucket(originalAgg, bucket));
ordered.add(comparableBucket);
}
}

int resultSize = Math.max(ordered.size() - from, 0);
Collections.sort(ordered);

// Popping from the priority queue returns the least element. The elements we want to skip due to offset would pop last.
// Thus, we just have to pop as many elements as we expect in results and store them in reverse order.
LinkedList<InternalMultiBucketAggregation.InternalBucket> newBuckets = new LinkedList<>();
for (int i = 0; i < resultSize; ++i) {
newBuckets.addFirst(ordered.pop().internalBucket);
// We just have to get as many elements as we expect in results and store them in the same order starting from
// the specified offset and taking currentSize into consideration.
int limit = Math.min(from + currentSize, ordered.size());
List<InternalMultiBucketAggregation.InternalBucket> newBuckets = new ArrayList<>();
for (int i = from; i < limit; ++i) {
newBuckets.add(ordered.get(i).internalBucket);
}
return originalAgg.create(newBuckets);
}
Expand Down Expand Up @@ -162,11 +160,11 @@ public int compareTo(ComparableBucket that) {
if (thisValue == null && thatValue == null) {
continue;
} else if (thisValue == null) {
return -1;
} else if (thatValue == null) {
return 1;
} else if (thatValue == null) {
return -1;
} else {
compareResult = sort.order() == SortOrder.DESC ? thisValue.compareTo(thatValue) : -thisValue.compareTo(thatValue);
compareResult = sort.order() == SortOrder.DESC ? -thisValue.compareTo(thatValue) : thisValue.compareTo(thatValue);
}
if (compareResult != 0) {
break;
Expand All @@ -175,17 +173,4 @@ public int compareTo(ComparableBucket that) {
return compareResult;
}
}


private static class TopNPriorityQueue extends PriorityQueue<ComparableBucket> {

private TopNPriorityQueue(int n) {
super(n);
}

@Override
protected boolean lessThan(ComparableBucket a, ComparableBucket b) {
return a.compareTo(b) < 0;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import static org.elasticsearch.search.aggregations.AggregationBuilders.avg;
import static org.elasticsearch.search.aggregations.AggregationBuilders.dateHistogram;
import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram;
import static org.elasticsearch.search.aggregations.AggregationBuilders.max;
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders.bucketSort;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
Expand Down Expand Up @@ -192,6 +193,26 @@ public void testSortTermsOnKey() {
}
}

public void testSortTermsOnKeyWithSize() {
SearchResponse response = client().prepareSearch(INDEX)
.setSize(0)
.addAggregation(terms("foos").field(TERM_FIELD)
.subAggregation(bucketSort("bucketSort", Arrays.asList(new FieldSortBuilder("_key"))).size(3)))
.get();

assertSearchResponse(response);

Terms terms = response.getAggregations().get("foos");
assertThat(terms, notNullValue());
List<? extends Terms.Bucket> termsBuckets = terms.getBuckets();
assertEquals(3, termsBuckets.size());
String previousKey = (String) termsBuckets.get(0).getKey();
for (Terms.Bucket termBucket : termsBuckets) {
assertThat(previousKey, lessThanOrEqualTo((String) termBucket.getKey()));
previousKey = (String) termBucket.getKey();
}
}

public void testSortTermsOnSubAggregation() {
SearchResponse response = client().prepareSearch(INDEX)
.setSize(0)
Expand Down Expand Up @@ -232,6 +253,29 @@ public void testSortTermsOnSubAggregation() {
}
}

public void testSortTermsOnSubAggregationPreservesOrderOnEquals() {
SearchResponse response = client().prepareSearch(INDEX)
.setSize(0)
.addAggregation(terms("foos").field(TERM_FIELD)
.subAggregation(bucketSort("keyBucketSort", Arrays.asList(new FieldSortBuilder("_key"))))
.subAggregation(max("max").field("missingValue").missing(1))
.subAggregation(bucketSort("maxBucketSort", Arrays.asList(new FieldSortBuilder("max")))))
.get();

assertSearchResponse(response);

Terms terms = response.getAggregations().get("foos");
assertThat(terms, notNullValue());
List<? extends Terms.Bucket> termsBuckets = terms.getBuckets();

// Since all max values are equal, we expect the order of keyBucketSort to have been preserved
String previousKey = (String) termsBuckets.get(0).getKey();
for (Terms.Bucket termBucket : termsBuckets) {
assertThat(previousKey, lessThanOrEqualTo((String) termBucket.getKey()));
previousKey = (String) termBucket.getKey();
}
}

public void testSortTermsOnCountWithSecondarySort() {
SearchResponse response = client().prepareSearch(INDEX)
.setSize(0)
Expand Down

0 comments on commit 606c0ab

Please sign in to comment.