Skip to content

Commit

Permalink
Fix sorting terms by cardinality agg
Browse files Browse the repository at this point in the history
The cardinality agg delays calculating stuff until just before it is
needed. Before elastic#64016 it used the `postCollect` phase to do this work
which was perfect for the `terms` agg but we decided that `postCollect`
was dangerous because some aggs, notably the `parent` and `child` aggs
need to know which children to build and they *can't* during
`postCollect`. After elastic#64016 we built the cardinality agg results when we
built the buckets. But we if you sort on the cardinality agg then you
need to do the `postCollect` stuff in order to know which buckets
to build! So you have a chicken and egg problem. Sort of.

This change splits the difference by running the delayed cardinality agg
stuff as soon as you *either* try to build the buckets *or* read the
cardinality for use with sorting. This works, but is a little janky and
feels wrong. It feels like we could make a structural fix to the way we
read metric values from aggs before building the buckets that would make
this sort of bug much more difficult to cause. But any sort of solution
to this is a larger structural change. So this fixes the bug in the
quick and janky way and we hope to do a more structural fix to the way
we read metrics soon.

Closes elastic#67782
  • Loading branch information
nik9000 committed Jan 21, 2021
1 parent 833b910 commit 42e453c
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
Expand Down Expand Up @@ -152,6 +153,12 @@ protected void beforeBuildingResults(long[] ordsToCollect) throws IOException {

@Override
public double metric(long owningBucketOrd) {
try {
// Make sure all outstanding data has been synced down to the counts.
postCollectLastCollector();
} catch (IOException e) {
throw new AggregationExecutionException("error collecting data in last segment", e);
}
return counts == null ? 0 : counts.cardinality(owningBucketOrd);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.common.util.BitArray;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
Expand Down Expand Up @@ -103,6 +104,13 @@ public void collect(int doc, long bucketOrd) throws IOException {

@Override
protected void beforeBuildingResults(long[] ordsToCollect) throws IOException {
buildCountIfNeeded();
}

private void buildCountIfNeeded() throws IOException {
if (counts != null) {
return;
}
counts = new HyperLogLogPlusPlusSparse(precision, bigArrays, visitedOrds.size());
try (LongArray hashes = bigArrays.newLongArray(maxOrd, false)) {
try (BitArray allVisitedOrds = new BitArray(maxOrd, bigArrays)) {
Expand Down Expand Up @@ -141,12 +149,18 @@ protected void beforeBuildingResults(long[] ordsToCollect) throws IOException {

@Override
public double metric(long owningBucketOrd) {
return counts == null ? 0 : counts.cardinality(owningBucketOrd);
try {
// Make sure all outstanding data has been synced down to the counts.
buildCountIfNeeded();
} catch (IOException e) {
throw new AggregationExecutionException("error collecting data in last segment", e);
}
return counts.cardinality(owningBucketOrd);
}

@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) {
if (counts == null || owningBucketOrdinal >= counts.maxOrd() || counts.cardinality(owningBucketOrdinal) == 0) {
if (owningBucketOrdinal >= counts.maxOrd() || counts.cardinality(owningBucketOrdinal) == 0) {
return buildEmptyAggregation();
}
// We need to build a copy because the returned Aggregation needs remain usable after
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import org.elasticsearch.search.aggregations.bucket.nested.InternalNested;
import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregatorTests;
import org.elasticsearch.search.aggregations.metrics.CardinalityAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.InternalTopHits;
import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.BucketScriptPipelineAggregationBuilder;
Expand Down Expand Up @@ -113,6 +114,7 @@
import java.util.function.Function;

import static java.util.Collections.singleton;
import static java.util.stream.Collectors.toList;
import static org.elasticsearch.index.mapper.SeqNoFieldMapper.PRIMARY_TERM_NAME;
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
import static org.elasticsearch.search.aggregations.PipelineAggregatorBuilders.bucketScript;
Expand Down Expand Up @@ -1419,6 +1421,71 @@ public void testOrderByPipelineAggregation() throws Exception {
}
}

public void testOrderByCardinality() throws IOException {
boolean bIsString = randomBoolean();
TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("a").field("a")
.size(3)
.shardSize(3)
.subAggregation(new CardinalityAggregationBuilder("b").field("b"))
.order(BucketOrder.aggregation("b", false));

/*
* Build documents where larger "a"s obviously have more distinct "b"s
* associated with them. But insert them into Lucene in a random
* order using Lucene's randomizeWriter so we'll bump into situations
* where documents in the last segment change the outcome of the
* cardinality agg. At least, right now the bug has to do with
* documents in the last segment. But randomize so we can catch
* new and strange bugs in the future. Finally, its important that
* we have few enough values that cardinality can be exact.
*/
List<List<IndexableField>> docs = new ArrayList<>();
for (int a = 0; a < 10; a++) {
for (int b = 0; b <= a; b++) {
docs.add(
List.of(
new NumericDocValuesField("a", a),
bIsString ? new SortedSetDocValuesField("b", new BytesRef(Integer.toString(b))) : new NumericDocValuesField("b", b)
)
);
}
}
Collections.shuffle(docs, random());
try (Directory directory = newDirectory()) {
RandomIndexWriter iw = new RandomIndexWriter(random(), directory);
for (List<IndexableField> doc : docs) {
iw.addDocument(doc);
}
iw.close();

try (DirectoryReader unwrapped = DirectoryReader.open(directory);
IndexReader indexReader = wrapDirectoryReader(unwrapped)) {
IndexSearcher indexSearcher = newIndexSearcher(indexReader);

LongTerms terms = searchAndReduce(
createIndexSettings(),
indexSearcher,
new MatchAllDocsQuery(),
aggregationBuilder,
Integer.MAX_VALUE,
false,
new NumberFieldMapper.NumberFieldType("a", NumberFieldMapper.NumberType.INTEGER),
bIsString
? new KeywordFieldMapper.KeywordFieldType("b")
: new NumberFieldMapper.NumberFieldType("b", NumberFieldMapper.NumberType.INTEGER)
);
assertThat(
terms.getBuckets().stream().map(MultiBucketsAggregation.Bucket::getKey).collect(toList()),
equalTo(List.of(9L, 8L, 7L))
);
assertThat(
terms.getBuckets().stream().map(MultiBucketsAggregation.Bucket::getDocCount).collect(toList()),
equalTo(List.of(10L, 9L, 8L))
);
}
}
}

private final SeqNoFieldMapper.SequenceIDFields sequenceIDFields = SeqNoFieldMapper.SequenceIDFields.emptySeqID();
private List<Document> generateDocsWithNested(String id, int value, int[] nestedValues) {
List<Document> documents = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,24 @@ protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduc
AggregationBuilder builder,
int maxBucket,
MappedFieldType... fieldTypes) throws IOException {
return searchAndReduce(indexSettings, searcher, query, builder, maxBucket, randomBoolean(), fieldTypes);
}

/**
* Collects all documents that match the provided query {@link Query} and
* returns the reduced {@link InternalAggregation}.
* <p>
* @param splitLeavesIntoSeparateAggregators If true this creates a new {@link Aggregator}
* for each leaf as though it were a separate index. If false this aggregates
* all leaves together, like we do in production.
*/
protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(IndexSettings indexSettings,
IndexSearcher searcher,
Query query,
AggregationBuilder builder,
int maxBucket,
boolean splitLeavesIntoSeparateAggregators,
MappedFieldType... fieldTypes) throws IOException {
final IndexReaderContext ctx = searcher.getTopReaderContext();
final PipelineTree pipelines = builder.buildPipelineTree();
List<InternalAggregation> aggs = new ArrayList<>();
Expand All @@ -445,7 +463,7 @@ protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduc
);
C root = createAggregator(builder, context);

if (randomBoolean() && searcher.getIndexReader().leaves().size() > 0) {
if (splitLeavesIntoSeparateAggregators && searcher.getIndexReader().leaves().size() > 0) {
assertThat(ctx, instanceOf(CompositeReaderContext.class));
final CompositeReaderContext compCTX = (CompositeReaderContext) ctx;
final int size = compCTX.leaves().size();
Expand Down

0 comments on commit 42e453c

Please sign in to comment.