Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix sorting terms by cardinality agg #67839

Merged
merged 1 commit into from
Jan 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
Expand Down Expand Up @@ -152,6 +153,12 @@ protected void beforeBuildingResults(long[] ordsToCollect) throws IOException {

@Override
public double metric(long owningBucketOrd) {
try {
// Make sure all outstanding data has been synced down to the counts.
postCollectLastCollector();
} catch (IOException e) {
throw new AggregationExecutionException("error collecting data in last segment", e);
}
return counts == null ? 0 : counts.cardinality(owningBucketOrd);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.common.util.BitArray;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
Expand Down Expand Up @@ -103,6 +104,13 @@ public void collect(int doc, long bucketOrd) throws IOException {

@Override
protected void beforeBuildingResults(long[] ordsToCollect) throws IOException {
buildCountIfNeeded();
}

private void buildCountIfNeeded() throws IOException {
if (counts != null) {
return;
}
counts = new HyperLogLogPlusPlusSparse(precision, bigArrays, visitedOrds.size());
try (LongArray hashes = bigArrays.newLongArray(maxOrd, false)) {
try (BitArray allVisitedOrds = new BitArray(maxOrd, bigArrays)) {
Expand Down Expand Up @@ -141,12 +149,18 @@ protected void beforeBuildingResults(long[] ordsToCollect) throws IOException {

@Override
public double metric(long owningBucketOrd) {
return counts == null ? 0 : counts.cardinality(owningBucketOrd);
try {
// Make sure all outstanding data has been synced down to the counts.
buildCountIfNeeded();
} catch (IOException e) {
throw new AggregationExecutionException("error collecting data in last segment", e);
}
return counts.cardinality(owningBucketOrd);
}

@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) {
if (counts == null || owningBucketOrdinal >= counts.maxOrd() || counts.cardinality(owningBucketOrdinal) == 0) {
if (owningBucketOrdinal >= counts.maxOrd() || counts.cardinality(owningBucketOrdinal) == 0) {
return buildEmptyAggregation();
}
// We need to build a copy because the returned Aggregation needs remain usable after
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import org.elasticsearch.search.aggregations.bucket.nested.InternalNested;
import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregatorTests;
import org.elasticsearch.search.aggregations.metrics.CardinalityAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.InternalTopHits;
import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.BucketScriptPipelineAggregationBuilder;
Expand Down Expand Up @@ -113,6 +114,7 @@
import java.util.function.Function;

import static java.util.Collections.singleton;
import static java.util.stream.Collectors.toList;
import static org.elasticsearch.index.mapper.SeqNoFieldMapper.PRIMARY_TERM_NAME;
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
import static org.elasticsearch.search.aggregations.PipelineAggregatorBuilders.bucketScript;
Expand Down Expand Up @@ -1419,6 +1421,71 @@ public void testOrderByPipelineAggregation() throws Exception {
}
}

public void testOrderByCardinality() throws IOException {
boolean bIsString = randomBoolean();
TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("a").field("a")
.size(3)
.shardSize(3)
.subAggregation(new CardinalityAggregationBuilder("b").field("b"))
.order(BucketOrder.aggregation("b", false));

/*
* Build documents where larger "a"s obviously have more distinct "b"s
* associated with them. But insert them into Lucene in a random
* order using Lucene's randomizeWriter so we'll bump into situations
* where documents in the last segment change the outcome of the
* cardinality agg. At least, right now the bug has to do with
* documents in the last segment. But randomize so we can catch
* new and strange bugs in the future. Finally, its important that
* we have few enough values that cardinality can be exact.
*/
List<List<IndexableField>> docs = new ArrayList<>();
for (int a = 0; a < 10; a++) {
for (int b = 0; b <= a; b++) {
docs.add(
List.of(
new NumericDocValuesField("a", a),
bIsString ? new SortedSetDocValuesField("b", new BytesRef(Integer.toString(b))) : new NumericDocValuesField("b", b)
)
);
}
}
Collections.shuffle(docs, random());
try (Directory directory = newDirectory()) {
RandomIndexWriter iw = new RandomIndexWriter(random(), directory);
for (List<IndexableField> doc : docs) {
iw.addDocument(doc);
}
iw.close();

try (DirectoryReader unwrapped = DirectoryReader.open(directory);
IndexReader indexReader = wrapDirectoryReader(unwrapped)) {
IndexSearcher indexSearcher = newIndexSearcher(indexReader);

LongTerms terms = searchAndReduce(
createIndexSettings(),
indexSearcher,
new MatchAllDocsQuery(),
aggregationBuilder,
Integer.MAX_VALUE,
false,
new NumberFieldMapper.NumberFieldType("a", NumberFieldMapper.NumberType.INTEGER),
bIsString
? new KeywordFieldMapper.KeywordFieldType("b")
: new NumberFieldMapper.NumberFieldType("b", NumberFieldMapper.NumberType.INTEGER)
);
assertThat(
terms.getBuckets().stream().map(MultiBucketsAggregation.Bucket::getKey).collect(toList()),
equalTo(List.of(9L, 8L, 7L))
);
assertThat(
terms.getBuckets().stream().map(MultiBucketsAggregation.Bucket::getDocCount).collect(toList()),
equalTo(List.of(10L, 9L, 8L))
);
}
}
}

private final SeqNoFieldMapper.SequenceIDFields sequenceIDFields = SeqNoFieldMapper.SequenceIDFields.emptySeqID();
private List<Document> generateDocsWithNested(String id, int value, int[] nestedValues) {
List<Document> documents = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,24 @@ protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduc
AggregationBuilder builder,
int maxBucket,
MappedFieldType... fieldTypes) throws IOException {
return searchAndReduce(indexSettings, searcher, query, builder, maxBucket, randomBoolean(), fieldTypes);
}

/**
* Collects all documents that match the provided query {@link Query} and
* returns the reduced {@link InternalAggregation}.
* <p>
* @param splitLeavesIntoSeparateAggregators If true this creates a new {@link Aggregator}
* for each leaf as though it were a separate index. If false this aggregates
* all leaves together, like we do in production.
*/
protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(IndexSettings indexSettings,
IndexSearcher searcher,
Query query,
AggregationBuilder builder,
int maxBucket,
boolean splitLeavesIntoSeparateAggregators,
MappedFieldType... fieldTypes) throws IOException {
final IndexReaderContext ctx = searcher.getTopReaderContext();
final PipelineTree pipelines = builder.buildPipelineTree();
List<InternalAggregation> aggs = new ArrayList<>();
Expand All @@ -445,7 +463,7 @@ protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduc
);
C root = createAggregator(builder, context);

if (randomBoolean() && searcher.getIndexReader().leaves().size() > 0) {
if (splitLeavesIntoSeparateAggregators && searcher.getIndexReader().leaves().size() > 0) {
assertThat(ctx, instanceOf(CompositeReaderContext.class));
final CompositeReaderContext compCTX = (CompositeReaderContext) ctx;
final int size = compCTX.leaves().size();
Expand Down