diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java index d1b7c45b90576..4c21067d9519a 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java @@ -111,6 +111,8 @@ import org.elasticsearch.search.aggregations.bucket.histogram.ParsedAutoDateHistogram; import org.elasticsearch.search.aggregations.bucket.histogram.ParsedDateHistogram; import org.elasticsearch.search.aggregations.bucket.histogram.ParsedHistogram; +import org.elasticsearch.search.aggregations.bucket.histogram.ParsedVariableWidthHistogram; +import org.elasticsearch.search.aggregations.bucket.histogram.VariableWidthHistogramAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.missing.MissingAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.missing.ParsedMissing; import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregationBuilder; @@ -1929,6 +1931,8 @@ static List getDefaultNamedXContents() { map.put(HistogramAggregationBuilder.NAME, (p, c) -> ParsedHistogram.fromXContent(p, (String) c)); map.put(DateHistogramAggregationBuilder.NAME, (p, c) -> ParsedDateHistogram.fromXContent(p, (String) c)); map.put(AutoDateHistogramAggregationBuilder.NAME, (p, c) -> ParsedAutoDateHistogram.fromXContent(p, (String) c)); + map.put(VariableWidthHistogramAggregationBuilder.NAME, + (p, c) -> ParsedVariableWidthHistogram.fromXContent(p, (String) c)); map.put(StringTerms.NAME, (p, c) -> ParsedStringTerms.fromXContent(p, (String) c)); map.put(LongTerms.NAME, (p, c) -> ParsedLongTerms.fromXContent(p, (String) c)); map.put(DoubleTerms.NAME, (p, c) -> ParsedDoubleTerms.fromXContent(p, (String) c)); diff --git a/docs/reference/aggregations/bucket/variablewidthhistogram-aggregation.asciidoc b/docs/reference/aggregations/bucket/variablewidthhistogram-aggregation.asciidoc new file mode 100644 index 0000000000000..535d1f24c0272 --- /dev/null +++ b/docs/reference/aggregations/bucket/variablewidthhistogram-aggregation.asciidoc @@ -0,0 +1,91 @@ +[[search-aggregations-bucket-variablewidthhistogram-aggregation]] +=== Variable Width Histogram Aggregation + +This is a multi-bucket aggregation similar to <>. +However, the width of each bucket is not specified. Rather, a target number of buckets is provided and bucket intervals +are dynamically determined based on the document distribution. This is done using a simple one-pass document clustering algorithm +that aims to obtain low distances between bucket centroids. Unlike other multi-bucket aggregations, the intervals will not +necessarily have a uniform width. + +TIP: The number of buckets returned will always be less than or equal to the target number. + +Requesting a target of 2 buckets. + +[source,console] +-------------------------------------------------- +POST /sales/_search?size=0 +{ + "aggs" : { + "prices" : { + "variable_width_histogram" : { + "field" : "price", + "buckets" : 2 + } + } + } +} +-------------------------------------------------- +// TEST[setup:sales] + +Response: + +[source,console-result] +-------------------------------------------------- +{ + ... + "aggregations": { + "prices" : { + "buckets": [ + { + "min": 10.0, + "key": 30.0, + "max": 50.0, + "doc_count": 2 + }, + { + "min": 150.0, + "key": 185.0, + "max": 200.0, + "doc_count": 5 + } + ] + } + } +} +-------------------------------------------------- +// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/] + +==== Clustering Algorithm +Each shard fetches the first `initial_buffer` documents and stores them in memory. Once the buffer is full, these documents +are sorted and linearly separated into `3/4 * shard_size buckets`. +Next each remaining documents is either collected into the nearest bucket, or placed into a new bucket if it is distant +from all the existing ones. At most `shard_size` total buckets are created. + +In the reduce step, the coordinating node sorts the buckets from all shards by their centroids. Then, the two buckets +with the nearest centroids are repeatedly merged until the target number of buckets is achieved. +This merging procedure is a form of https://en.wikipedia.org/wiki/Hierarchical_clustering[agglomerative hierarchical clustering]. + +TIP: A shard can return fewer than `shard_size` buckets, but it cannot return more. + +==== Shard size +The `shard_size` parameter specifies the number of buckets that the coordinating node will request from each shard. +A higher `shard_size` leads each shard to produce smaller buckets. This reduce the likelihood of buckets overlapping +after the reduction step. Increasing the `shard_size` will improve the accuracy of the histogram, but it will +also make it more expensive to compute the final result because bigger priority queues will have to be managed on a +shard level, and the data transfers between the nodes and the client will be larger. + +TIP: Parameters `buckets`, `shard_size`, and `initial_buffer` are optional. By default, `buckets = 10`, `shard_size = 500` and `initial_buffer = min(50 * shard_size, 50000)`. + +==== Initial Buffer +The `initial_buffer` parameter can be used to specify the number of individual documents that will be stored in memory +on a shard before the initial bucketing algorithm is run. Bucket distribution is determined using this sample +of `initial_buffer` documents. So, although a higher `initial_buffer` will use more memory, it will lead to more representative +clusters. + +==== Bucket bounds are approximate +During the reduce step, the master node continuously merges the two buckets with the nearest centroids. If two buckets have +overlapping bounds but distant centroids, then it is possible that they will not be merged. Because of this, after +reduction the maximum value in some interval (`max`) might be greater than the minimum value in the subsequent +bucket (`min`). To reduce the impact of this error, when such an overlap occurs the bound between these intervals is adjusted to be `(max + min) / 2`. + +TIP: Bucket bounds are very sensitive to outliers diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/350_variable_width_histogram.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/350_variable_width_histogram.yml new file mode 100644 index 0000000000000..071e543e8a25e --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/350_variable_width_histogram.yml @@ -0,0 +1,50 @@ +setup: + - do: + indices.create: + index: test + body: + settings: + number_of_replicas: 0 + mappings: + properties: + number: + type: integer + - do: + bulk: + refresh: true + index: test + body: + - '{"index": {}}' + - '{"number": -3}' + - '{"index": {}}' + - '{"number": -2}' + - '{"index": {}}' + - '{"number": 1}' + - '{"index": {}}' + - '{"number": 4}' + - '{"index": {}}' + - '{"number": 5}' + +--- +"basic": + - skip: + version: " - 7.99.99" + reason: added in 8.0.0 (to be backported to 7.9.0) + - do: + search: + body: + size: 0 + aggs: + histo: + variable_width_histogram: + field: number + buckets: 3 + - match: { hits.total.value: 5 } + - length: { aggregations.histo.buckets: 3 } + - match: { aggregations.histo.buckets.0.key: -2.5 } + - match: { aggregations.histo.buckets.0.doc_count: 2 } + - match: { aggregations.histo.buckets.1.key: 1.0 } + - match: { aggregations.histo.buckets.1.doc_count: 1 } + - match: { aggregations.histo.buckets.2.key: 4.5 } + - match: { aggregations.histo.buckets.2.doc_count: 2 } + diff --git a/server/src/main/java/org/elasticsearch/common/util/BigArrays.java b/server/src/main/java/org/elasticsearch/common/util/BigArrays.java index 46c83b5d9cc51..12be79179b1e1 100644 --- a/server/src/main/java/org/elasticsearch/common/util/BigArrays.java +++ b/server/src/main/java/org/elasticsearch/common/util/BigArrays.java @@ -691,6 +691,35 @@ public DoubleArray grow(DoubleArray array, long minSize) { return resize(array, newSize); } + public static class DoubleBinarySearcher extends BinarySearcher{ + + DoubleArray array; + double searchFor; + + public DoubleBinarySearcher(DoubleArray array){ + this.array = array; + this.searchFor = Integer.MIN_VALUE; + } + + @Override + protected int compare(int index) { + // Prevent use of BinarySearcher.search() and force the use of DoubleBinarySearcher.search() + assert this.searchFor != Integer.MIN_VALUE; + + return Double.compare(array.get(index), searchFor); + } + + @Override + protected double distance(int index) { + return Math.abs(array.get(index) - searchFor); + } + + public int search(int from, int to, double searchFor) { + this.searchFor = searchFor; + return super.search(from, to); + } + } + /** * Allocate a new {@link FloatArray}. * @param size the initial length of the array @@ -782,3 +811,4 @@ public ObjectArray grow(ObjectArray array, long minSize) { return resize(array, newSize); } } + diff --git a/server/src/main/java/org/elasticsearch/common/util/BinarySearcher.java b/server/src/main/java/org/elasticsearch/common/util/BinarySearcher.java new file mode 100644 index 0000000000000..dee1571fe6b82 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/util/BinarySearcher.java @@ -0,0 +1,117 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.util; + +/** + * Performs binary search on an arbitrary data structure. + * + * To do a search, create a subclass and implement custom {@link #compare(int)} and {@link #distance(int)} methods. + * + * {@link BinarySearcher} knows nothing about the value being searched for or the underlying data structure. + * These things should be determined by the subclass in its overridden methods. + * + * Refer to {@link BigArrays.DoubleBinarySearcher} for an example. + * + * NOTE: this class is not thread safe + */ +public abstract class BinarySearcher{ + + /** + * @return a negative integer, zero, or a positive integer if the array's value at index is less than, + * equal to, or greater than the value being searched for. + */ + protected abstract int compare(int index); + + /** + * @return the magnitude of the distance between the element at index and the value being searched for. + * It will usually be Math.abs(array[index] - searchValue). + */ + protected abstract double distance(int index); + + /** + * @return the index who's underlying value is closest to the value being searched for. + */ + private int getClosestIndex(int index1, int index2){ + if(distance(index1) < distance(index2)){ + return index1; + } else { + return index2; + } + } + + /** + * Uses a binary search to determine the index of the element within the index range {from, ... , to} that is + * closest to the search value. + * + * Unlike most binary search implementations, the value being searched for is not an argument to search method. + * Rather, this value should be stored by the subclass along with the underlying array. + * + * @return the index of the closest element. + * + * Requires: The underlying array should be sorted. + **/ + public int search(int from, int to){ + while(from < to){ + int mid = (from + to) >>> 1; + int compareResult = compare(mid); + + if(compareResult == 0){ + // arr[mid] == value + return mid; + } else if(compareResult < 0){ + // arr[mid] < val + + if(mid < to) { + // Check if val is between (mid, mid + 1) before setting left = mid + 1 + // (mid < to) ensures that mid + 1 is not out of bounds + int compareValAfterMid = compare(mid + 1); + if (compareValAfterMid > 0) { + return getClosestIndex(mid, mid + 1); + } + } else if(mid == to){ + // val > arr[mid] and there are no more elements above mid, so mid is the closest + return mid; + } + + from = mid + 1; + } else{ + // arr[mid] > val + + if(mid > from) { + // Check if val is between (mid - 1, mid) + // (mid > from) ensures that mid - 1 is not out of bounds + int compareValBeforeMid = compare(mid - 1); + if (compareValBeforeMid < 0) { + // val is between indices (mid - 1), mid + return getClosestIndex(mid, mid - 1); + } + } else if(mid == 0){ + // val < arr[mid] and there are no more candidates below mid, so mid is the closest + return mid; + } + + to = mid - 1; + } + } + + return from; + } + +} diff --git a/server/src/main/java/org/elasticsearch/search/SearchModule.java b/server/src/main/java/org/elasticsearch/search/SearchModule.java index b6afe65b79ac5..2207d47ae4873 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchModule.java +++ b/server/src/main/java/org/elasticsearch/search/SearchModule.java @@ -119,9 +119,11 @@ import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.global.InternalGlobal; import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.histogram.VariableWidthHistogramAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.InternalAutoDateHistogram; +import org.elasticsearch.search.aggregations.bucket.histogram.InternalVariableWidthHistogram; import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram; import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram; import org.elasticsearch.search.aggregations.bucket.missing.InternalMissing; @@ -475,6 +477,11 @@ private ValuesSourceRegistry registerAggregations(List plugins) { AutoDateHistogramAggregationBuilder.PARSER) .addResultReader(InternalAutoDateHistogram::new) .setAggregatorRegistrar(AutoDateHistogramAggregationBuilder::registerAggregators), builder); + registerAggregation(new AggregationSpec(VariableWidthHistogramAggregationBuilder.NAME, + VariableWidthHistogramAggregationBuilder::new, + VariableWidthHistogramAggregationBuilder.PARSER) + .addResultReader(InternalVariableWidthHistogram::new) + .setAggregatorRegistrar(VariableWidthHistogramAggregationBuilder::registerAggregators), builder); registerAggregation(new AggregationSpec(GeoDistanceAggregationBuilder.NAME, GeoDistanceAggregationBuilder::new, GeoDistanceAggregationBuilder::parse) .addResultReader(InternalGeoDistance::new) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/Aggregation.java b/server/src/main/java/org/elasticsearch/search/aggregations/Aggregation.java index 9fe13602896b6..e355185aa823a 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/Aggregation.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/Aggregation.java @@ -68,5 +68,9 @@ final class CommonFields extends ParseField.CommonFields { public static final ParseField FROM_AS_STRING = new ParseField("from_as_string"); public static final ParseField TO = new ParseField("to"); public static final ParseField TO_AS_STRING = new ParseField("to_as_string"); + public static final ParseField MIN = new ParseField("min"); + public static final ParseField MIN_AS_STRING = new ParseField("min_as_string"); + public static final ParseField MAX = new ParseField("max"); + public static final ParseField MAX_AS_STRING = new ParseField("max_as_string"); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java index 3ed81bc4a8a0a..9e2ed84f6e3c0 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java @@ -101,6 +101,12 @@ public final void collectExistingBucket(LeafBucketCollector subCollector, int do subCollector.collect(doc, bucketOrd); } + /** + * This only tidies up doc counts. Call {@link MergingBucketsDeferringCollector#mergeBuckets(long[])} to merge the actual + * ordinals and doc ID deltas. + * + * Refer to that method for documentation about the merge map. + */ public final void mergeBuckets(long[] mergeMap, long newNumBuckets) { try (IntArray oldDocCounts = docCounts) { docCounts = bigArrays.newIntArray(newNumBuckets, true); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalVariableWidthHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalVariableWidthHistogram.java new file mode 100644 index 0000000000000..ced1a4915bfdf --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalVariableWidthHistogram.java @@ -0,0 +1,608 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.histogram; + +import org.apache.lucene.util.PriorityQueue; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; +import org.elasticsearch.search.aggregations.KeyComparable; +import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public class InternalVariableWidthHistogram + extends InternalMultiBucketAggregation + implements Histogram, HistogramFactory{ + + public static class Bucket extends InternalMultiBucketAggregation.InternalBucket + implements Histogram.Bucket, KeyComparable { + + public static class BucketBounds { + public double min; + public double max; + + public BucketBounds(double min, double max) { + assert min <= max; + this.min = min; + this.max = max; + } + + public BucketBounds(StreamInput in) throws IOException { + this(in.readDouble(), in.readDouble()); + } + + public void writeTo(StreamOutput out) throws IOException { + out.writeDouble(min); + out.writeDouble(max); + } + + public boolean equals(Object obj){ + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + BucketBounds that = (BucketBounds) obj; + return min == that.min && max == that.max; + } + + @Override + public int hashCode() { + return Objects.hash(getClass(), min, max); + } + } + + private final BucketBounds bounds; + private long docCount; + private InternalAggregations aggregations; + protected final transient DocValueFormat format; + private double centroid; + + public Bucket(double centroid, + BucketBounds bounds, + long docCount, + DocValueFormat format, + InternalAggregations aggregations) { + this.format = format; + this.centroid = centroid; + this.bounds = bounds; + this.docCount = docCount; + this.aggregations = aggregations; + } + + /** + * Read from a stream. + */ + public Bucket(StreamInput in, DocValueFormat format) throws IOException { + this.format = format; + centroid = in.readDouble(); + docCount = in.readVLong(); + bounds = new BucketBounds(in); + aggregations = new InternalAggregations(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeDouble(centroid); + out.writeVLong(docCount); + bounds.writeTo(out); + aggregations.writeTo(out); + } + + @Override + public boolean equals(Object obj) { + if (obj == null || obj.getClass() != InternalVariableWidthHistogram.Bucket.class) { + return false; + } + InternalVariableWidthHistogram.Bucket that = (InternalVariableWidthHistogram.Bucket) obj; + return centroid == that.centroid + && bounds.equals(that.bounds) + && docCount == that.docCount + && Objects.equals(aggregations, that.aggregations); + } + + @Override + public int hashCode() { + return Objects.hash(getClass(), centroid, bounds, docCount, aggregations); + } + + @Override + public String getKeyAsString() { + return format.format((double) getKey()).toString(); + } + + /** + * Buckets are compared using their centroids. But, in the final XContent returned by the aggregation, + * we want the bucket's key to be its min. Otherwise, it would look like the distances between centroids + * are buckets, which is incorrect. + */ + @Override + public Object getKey() { return centroid; } + + public double min() { return bounds.min; } + + public double max() { return bounds.max; } + + public double centroid() { return centroid; } + + @Override + public long getDocCount() { + return docCount; + } + + @Override + public Aggregations getAggregations() { + return aggregations; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + String keyAsString = format.format((double) getKey()).toString(); + builder.startObject(); + + builder.field(CommonFields.MIN.getPreferredName(), min()); + if (format != DocValueFormat.RAW) { + builder.field(CommonFields.MIN_AS_STRING.getPreferredName(), format.format(min())); + } + + builder.field(CommonFields.KEY.getPreferredName(), getKey()); + if (format != DocValueFormat.RAW) { + builder.field(CommonFields.KEY_AS_STRING.getPreferredName(), keyAsString); + } + + builder.field(CommonFields.MAX.getPreferredName(), max()); + if (format != DocValueFormat.RAW) { + builder.field(CommonFields.MAX_AS_STRING.getPreferredName(), format.format(max())); + } + + builder.field(CommonFields.DOC_COUNT.getPreferredName(), docCount); + aggregations.toXContentInternal(builder, params); + builder.endObject(); + return builder; + } + + @Override + public int compareKey(InternalVariableWidthHistogram.Bucket other) { + return Double.compare(centroid, other.centroid); // Use centroid for bucket ordering + } + + public DocValueFormat getFormatter() { + return format; + } + } + + static class EmptyBucketInfo { + + final InternalAggregations subAggregations; + + EmptyBucketInfo(InternalAggregations subAggregations) { + this.subAggregations = subAggregations; + } + + EmptyBucketInfo(StreamInput in) throws IOException { + this(new InternalAggregations(in)); + } + + public void writeTo(StreamOutput out) throws IOException { + subAggregations.writeTo(out); + } + + @Override + public boolean equals(Object obj) { + if (obj == null || getClass() != obj.getClass()) { + return false; + } + EmptyBucketInfo that = (EmptyBucketInfo) obj; + return Objects.equals(subAggregations, that.subAggregations); + } + + @Override + public int hashCode() { + return Objects.hash(getClass(), subAggregations); + } + } + + private List buckets; + private final DocValueFormat format; + private final int targetNumBuckets; + final EmptyBucketInfo emptyBucketInfo; + + InternalVariableWidthHistogram(String name, List buckets, EmptyBucketInfo emptyBucketInfo, int targetNumBuckets, + DocValueFormat formatter, Map metaData){ + super(name, metaData); + this.buckets = buckets; + this.emptyBucketInfo = emptyBucketInfo; + this.format = formatter; + this.targetNumBuckets = targetNumBuckets; + } + + /** + * Stream from a stream. + */ + public InternalVariableWidthHistogram(StreamInput in) throws IOException{ + super(in); + emptyBucketInfo = new EmptyBucketInfo(in); + format = in.readNamedWriteable(DocValueFormat.class); + buckets = in.readList(stream -> new Bucket(stream, format)); + targetNumBuckets = in.readVInt(); + } + + @Override + protected void doWriteTo(StreamOutput out) throws IOException { + emptyBucketInfo.writeTo(out); + out.writeNamedWriteable(format); + out.writeList(buckets); + out.writeVInt(targetNumBuckets); + } + + @Override + public String getWriteableName() { + return VariableWidthHistogramAggregationBuilder.NAME; + } + + @Override + public List getBuckets() { + return Collections.unmodifiableList(buckets); + } + + DocValueFormat getFormatter() { + return format; + } + + public int getTargetBuckets() { + return targetNumBuckets; + } + + public EmptyBucketInfo getEmptyBucketInfo() { + return emptyBucketInfo; + } + + @Override + public InternalVariableWidthHistogram create(List buckets) { + return new InternalVariableWidthHistogram(name, buckets, emptyBucketInfo, targetNumBuckets, + format, metadata); + } + + @Override + public Bucket createBucket(InternalAggregations aggregations, Bucket prototype) { + return new Bucket(prototype.centroid, prototype.bounds, prototype.docCount, prototype.format, aggregations); + } + + @Override + public Bucket createBucket(Number key, long docCount, InternalAggregations aggregations) { + return new Bucket(key.doubleValue(), new Bucket.BucketBounds(key.doubleValue(), key.doubleValue()), + docCount, format, aggregations); + } + + @Override + public Number getKey(MultiBucketsAggregation.Bucket bucket) { + return ((Bucket) bucket).centroid; + } + + @Override + public Number nextKey(Number key) { + return nextKey(key.doubleValue()); + } + + /** + * This method should not be called for this specific subclass of InternalHistogram, since there should not be + * empty buckets when clustering. += */ + private double nextKey(double key){ return key + 1; } + + private static class IteratorAndCurrent { + + private final Iterator iterator; + private Bucket current; + + IteratorAndCurrent(Iterator iterator) { + this.iterator = iterator; + current = iterator.next(); + } + + } + + @Override + protected Bucket reduceBucket(List buckets, ReduceContext context) { + List aggregations = new ArrayList<>(buckets.size()); + long docCount = 0; + double min = Double.POSITIVE_INFINITY; + double max = Double.NEGATIVE_INFINITY; + double sum = 0; + for (InternalVariableWidthHistogram.Bucket bucket : buckets) { + docCount += bucket.docCount; + min = Math.min(min, bucket.bounds.min); + max = Math.max(max, bucket.bounds.max); + sum += bucket.docCount * bucket.centroid; + aggregations.add((InternalAggregations) bucket.getAggregations()); + } + InternalAggregations aggs = InternalAggregations.reduce(aggregations, context); + double centroid = sum / docCount; + Bucket.BucketBounds bounds = new Bucket.BucketBounds(min, max); + return new Bucket(centroid, bounds, docCount, format, aggs); + } + + public List reduceBuckets(List aggregations, ReduceContext reduceContext) { + PriorityQueue pq = new PriorityQueue(aggregations.size()) { + @Override + protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { + return Double.compare(a.current.centroid, b.current.centroid) < 0; + } + }; + for (InternalAggregation aggregation : aggregations) { + InternalVariableWidthHistogram histogram = (InternalVariableWidthHistogram) aggregation; + if (histogram.buckets.isEmpty() == false) { + pq.add(new IteratorAndCurrent(histogram.buckets.iterator())); + } + } + + List reducedBuckets = new ArrayList<>(); + if(pq.size() > 0) { + double key = pq.top().current.centroid(); + // list of buckets coming from different shards that have the same key + List currentBuckets = new ArrayList<>(); + do { + IteratorAndCurrent top = pq.top(); + + if (Double.compare(top.current.centroid(), key) != 0) { + // The key changes, reduce what we already buffered and reset the buffer for current buckets. + final Bucket reduced = reduceBucket(currentBuckets, reduceContext); + reduceContext.consumeBucketsAndMaybeBreak(1); + reducedBuckets.add(reduced); + currentBuckets.clear(); + key = top.current.centroid(); + } + + currentBuckets.add(top.current); + + if (top.iterator.hasNext()) { + Bucket next = top.iterator.next(); + assert next.compareKey(top.current) >= 0 : "shards must return data sorted by centroid"; + top.current = next; + pq.updateTop(); + } else { + pq.pop(); + } + } while(pq.size() > 0); + + if (currentBuckets.isEmpty() == false) { + final Bucket reduced = reduceBucket(currentBuckets, reduceContext); + reduceContext.consumeBucketsAndMaybeBreak(1); + reducedBuckets.add(reduced); + } + } + + mergeBucketsIfNeeded(reducedBuckets, targetNumBuckets, reduceContext); + + return reducedBuckets; + } + + + class BucketRange{ + int startIdx; + int endIdx; + + /** + * These are optional utility fields + * They're useful for determining whether buckets should be merged + */ + double min; + double max; + double centroid; + long docCount; + + public void mergeWith(BucketRange other){ + startIdx = Math.min(startIdx, other.startIdx); + endIdx = Math.max(endIdx, other.endIdx); + + if(docCount + other.docCount > 0) { + // Avoids div by 0 error. This condition could be false if the optional docCount field was not set + centroid = ((centroid * docCount) + (other.centroid * other.docCount)) / (docCount + other.docCount); + docCount += other.docCount; + } + min = Math.min(min, other.min); + max = Math.max(max, other.max); + } + } + + /** + * For each range {startIdx, endIdx} in ranges, all the buckets in that index range + * from buckets are merged, and this merged bucket replaces the entire range. + */ + private void mergeBucketsWithPlan(List buckets, List plan, ReduceContext reduceContext){ + for(int i = plan.size() - 1; i >= 0; i--) { + BucketRange range = plan.get(i); + int endIdx = range.endIdx; + int startIdx = range.startIdx; + + if(startIdx == endIdx) continue; + + List toMerge = new ArrayList<>(); + for(int idx = endIdx; idx > startIdx; idx--){ + toMerge.add(buckets.get(idx)); + buckets.remove(idx); + } + toMerge.add(buckets.get(startIdx)); // Don't remove the startIdx bucket because it will be replaced by the merged bucket + + reduceContext.consumeBucketsAndMaybeBreak(- (toMerge.size() - 1)); + Bucket merged_bucket = reduceBucket(toMerge, reduceContext); + + buckets.set(startIdx, merged_bucket); + } + } + + /** + * Makes a merge plan by simulating the merging of the two closest buckets, until the target number of buckets is reached. + * Distance is determined by centroid comparison. + * Then, this plan is actually executed and the underlying buckets are merged. + * + * Requires: buckets is sorted by centroid. + */ + private void mergeBucketsIfNeeded(List buckets, int targetNumBuckets, ReduceContext reduceContext) { + // Make a plan for getting the target number of buckets + // Each range represents a set of adjacent bucket indices of buckets that will be merged together + List ranges = new ArrayList<>(); + + // Initialize each range to represent an individual bucket + for (int i = 0; i < buckets.size(); i++) { + // Since buckets is sorted by centroid, ranges will be as well + BucketRange range = new BucketRange(); + range.centroid = buckets.get(i).centroid; + range.docCount = buckets.get(i).getDocCount(); + range.startIdx = i; + range.endIdx = i; + ranges.add(range); + } + + // Continually merge the two closest ranges until the target is reached + while (ranges.size() > targetNumBuckets) { + + // Find two closest ranges (i.e. the two closest buckets after the previous merges are completed) + // We only have to make one pass through the list because it is sorted by centroid + int closestIdx = 0; // After this loop, (closestIdx, closestIdx + 1) will be the 2 closest buckets + double smallest_distance = Double.POSITIVE_INFINITY; + for (int i = 0; i < ranges.size() - 1; i++) { + double new_distance = ranges.get(i + 1).centroid - ranges.get(i).centroid; // Positive because buckets is sorted + if (new_distance < smallest_distance) { + closestIdx = i; + smallest_distance = new_distance; + } + } + // Merge the two closest ranges + ranges.get(closestIdx).mergeWith(ranges.get(closestIdx + 1)); + ranges.remove(closestIdx + 1); + } + + // Execute the plan (merge the underlying buckets) + mergeBucketsWithPlan(buckets, ranges, reduceContext); + } + + private void mergeBucketsWithSameMin(List buckets, ReduceContext reduceContext){ + // Create a merge plan + List ranges = new ArrayList<>(); + + // Initialize each range to represent an individual bucket + for (int i = 0; i < buckets.size(); i++) { + BucketRange range = new BucketRange(); + range.min = buckets.get(i).min(); + range.startIdx = i; + range.endIdx = i; + ranges.add(range); + } + + // Merge ranges with same min value + int i = 0; + while(i < ranges.size() - 1){ + BucketRange range = ranges.get(i); + BucketRange nextRange = ranges.get(i+1); + + if(range.min == nextRange.min){ + range.mergeWith(nextRange); + ranges.remove(i+1); + } else{ + i++; + } + } + + // Execute the plan (merge the underlying buckets) + mergeBucketsWithPlan(buckets, ranges, reduceContext); + } + + /** + * When two adjacent buckets A, B overlap (A.max > B.min) then their boundary is set to + * the midpoint: (A.max + B.min) / 2. + * + * After this adjustment, A will contain more values than indicated and B will have less. + */ + private void adjustBoundsForOverlappingBuckets(List buckets, ReduceContext reduceContext){ + for(int i = 1; i < buckets.size(); i++){ + Bucket curBucket = buckets.get(i); + Bucket prevBucket = buckets.get(i-1); + if(curBucket.bounds.min < prevBucket.bounds.max){ + // We don't want overlapping buckets --> Adjust their bounds + // TODO: Think of a fairer way to do this. Should prev.max = cur.min? + curBucket.bounds.min = (prevBucket.bounds.max + curBucket.bounds.min) / 2; + prevBucket.bounds.max = curBucket.bounds.min; + } + } + } + + @Override + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { + List reducedBuckets = reduceBuckets(aggregations, reduceContext); + + if(reduceContext.isFinalReduce()) { + buckets.sort(Comparator.comparing(Bucket::min)); + mergeBucketsWithSameMin(reducedBuckets, reduceContext); + adjustBoundsForOverlappingBuckets(reducedBuckets, reduceContext); + } + return new InternalVariableWidthHistogram(getName(), reducedBuckets, emptyBucketInfo, targetNumBuckets, + format, metadata); + } + + @Override + public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { + builder.startArray(CommonFields.BUCKETS.getPreferredName()); + for (Bucket bucket : buckets) { + bucket.toXContent(builder, params); + } + builder.endArray(); + return builder; + } + + @Override + public InternalAggregation createAggregation(List buckets) { + // convert buckets to the right type + List buckets2 = new ArrayList<>(buckets.size()); + for (Object b : buckets) { + buckets2.add((Bucket) b); + } + buckets2 = Collections.unmodifiableList(buckets2); + return new InternalVariableWidthHistogram(name, buckets2, emptyBucketInfo, targetNumBuckets, + format, getMetadata()); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + if (super.equals(obj) == false) return false; + + InternalVariableWidthHistogram that = (InternalVariableWidthHistogram) obj; + return Objects.equals(buckets, that.buckets) + && Objects.equals(format, that.format) + && Objects.equals(emptyBucketInfo, that.emptyBucketInfo); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), buckets, format, emptyBucketInfo); + } +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/ParsedVariableWidthHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/ParsedVariableWidthHistogram.java new file mode 100644 index 0000000000000..2fa188980164e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/ParsedVariableWidthHistogram.java @@ -0,0 +1,206 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.histogram; + +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentParserUtils; +import org.elasticsearch.search.aggregations.Aggregation; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; + +public class ParsedVariableWidthHistogram extends ParsedMultiBucketAggregation + implements Histogram{ + + @Override + public String getType() { return VariableWidthHistogramAggregationBuilder.NAME; } + + @Override + public List getBuckets() { return buckets; } + + private static ObjectParser PARSER = + new ObjectParser<>( + ParsedVariableWidthHistogram.class.getSimpleName(), + true, + ParsedVariableWidthHistogram::new + ) ; + static { + declareMultiBucketAggregationFields(PARSER, + parser -> ParsedBucket.fromXContent(parser, false), + parser -> ParsedBucket.fromXContent(parser, true)); + } + + public static ParsedVariableWidthHistogram fromXContent(XContentParser parser, String name) throws IOException { + ParsedVariableWidthHistogram aggregation = PARSER.parse(parser, null); + aggregation.setName(name); + return aggregation; + } + + + public static class ParsedBucket extends ParsedMultiBucketAggregation.ParsedBucket implements Histogram.Bucket{ + private Double key; + + private Double min; + private Double max; + + private String minAsString; + private String maxAsString; + + @Override + public Object getKey() { + return key; + } + + @Override + public String getKeyAsString() { + String keyAsString = super.getKeyAsString(); + if (keyAsString != null) { + return keyAsString; + } + if (key != null) { + return Double.toString(key); + } + return null; + } + + public void setMin(Double min) { + this.min = min; + } + + public void setMinAsString(String minAsString){ + this.minAsString = minAsString; + } + + public double getMin() { + return min; + } + + public String getMinAsString() { + if (minAsString != null) { + return minAsString; + } + if (min != null) { + return Double.toString(min); + } + return null; + } + + public void setMax(Double max){ + this.max = max; + } + + public void setMaxAsString(String maxAsString){ + this.maxAsString = maxAsString; + } + + public double getMax() { + return max; + } + + public String getMaxAsString() { + if (maxAsString != null) { + return maxAsString; + } + if (max != null) { + return Double.toString(max); + } + return null; + } + + static ParsedBucket fromXContent(XContentParser parser, boolean keyed) throws IOException { + final ParsedBucket bucket = new ParsedBucket(); + bucket.setKeyed(keyed); + XContentParser.Token token = parser.currentToken(); + String currentFieldName = parser.currentName(); + if (keyed) { + ensureExpectedToken(XContentParser.Token.FIELD_NAME, token, parser::getTokenLocation); + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation); + } + + List aggregations = new ArrayList<>(); + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token.isValue()) { + if (CommonFields.KEY_AS_STRING.getPreferredName().equals(currentFieldName)) { + bucket.setKeyAsString(parser.text()); + } else if (CommonFields.KEY.getPreferredName().equals(currentFieldName)) { + bucket.key = parser.doubleValue(); + } else if (CommonFields.MIN_AS_STRING.getPreferredName().equals(currentFieldName)) { + bucket.setMinAsString(parser.text()); + } else if (CommonFields.MIN.getPreferredName().equals(currentFieldName)) { + bucket.setMin(parser.doubleValue()); + } else if (CommonFields.MAX_AS_STRING.getPreferredName().equals(currentFieldName)) { + bucket.setMaxAsString(parser.text()); + } else if (CommonFields.MAX.getPreferredName().equals(currentFieldName)) { + bucket.setMax(parser.doubleValue()); + } else if (CommonFields.DOC_COUNT.getPreferredName().equals(currentFieldName)) { + bucket.setDocCount(parser.longValue()); + } + } else if (token == XContentParser.Token.START_OBJECT) { + if (CommonFields.KEY.getPreferredName().equals(currentFieldName)) { + bucket.key = parser.doubleValue(); + } else { + XContentParserUtils.parseTypedKeysObject(parser, Aggregation.TYPED_KEYS_DELIMITER, Aggregation.class, + aggregations::add); + } + } + } + bucket.setAggregations(new Aggregations(aggregations)); + return bucket; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + if (isKeyed()) { + builder.startObject(getKeyAsString()); + } else { + builder.startObject(); + } + + if (minAsString != null) { + builder.field(CommonFields.MIN_AS_STRING.getPreferredName(), minAsString); + } + builder.field(CommonFields.MIN.getPreferredName(), getMin()); + + if (super.getKeyAsString() != null) { + builder.field(CommonFields.KEY_AS_STRING.getPreferredName(), getKeyAsString()); + } + keyToXContent(builder); + + if (maxAsString != null) { + builder.field(CommonFields.MAX_AS_STRING.getPreferredName(), maxAsString); + } + builder.field(CommonFields.MAX.getPreferredName(), getMax()); + + builder.field(CommonFields.DOC_COUNT.getPreferredName(), getDocCount()); + getAggregations().toXContentInternal(builder, params); + builder.endObject(); + return builder; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregationBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregationBuilder.java new file mode 100644 index 0000000000000..5b7b42e62535e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregationBuilder.java @@ -0,0 +1,187 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.histogram; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.query.QueryShardContext; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.aggregations.MultiBucketConsumerService; +import org.elasticsearch.search.aggregations.support.CoreValuesSourceType; +import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder; +import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory; +import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; +import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry; +import org.elasticsearch.search.aggregations.support.ValuesSourceType; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +public class VariableWidthHistogramAggregationBuilder + extends ValuesSourceAggregationBuilder { + + public static final String NAME = "variable_width_histogram"; + + private static final ParseField NUM_BUCKETS_FIELD = new ParseField("buckets"); + + private static ParseField INITIAL_BUFFER_FIELD = new ParseField("initial_buffer"); + + private static final ParseField SHARD_SIZE_FIELD = new ParseField("shard_size"); + + public static final ObjectParser PARSER = + ObjectParser.fromBuilder(NAME, VariableWidthHistogramAggregationBuilder::new); + static{ + ValuesSourceAggregationBuilder.declareFields(PARSER, true, true, true); + PARSER.declareInt(VariableWidthHistogramAggregationBuilder::setNumBuckets, NUM_BUCKETS_FIELD); + PARSER.declareInt(VariableWidthHistogramAggregationBuilder::setShardSize, SHARD_SIZE_FIELD); + PARSER.declareInt(VariableWidthHistogramAggregationBuilder::setInitialBuffer, INITIAL_BUFFER_FIELD); + } + + private int numBuckets = 10; + private int shardSize = numBuckets * 50; + private int initialBuffer = Math.min(10 * this.shardSize, 50000); + + public static void registerAggregators(ValuesSourceRegistry.Builder builder) { + VariableWidthHistogramAggregatorFactory.registerAggregators(builder); + } + /** Create a new builder with the given name. */ + public VariableWidthHistogramAggregationBuilder(String name) { + super(name); + } + + /** Read in object data from a stream, for internal use only. */ + public VariableWidthHistogramAggregationBuilder(StreamInput in) throws IOException { + super(in); + numBuckets = in.readVInt(); + } + + protected VariableWidthHistogramAggregationBuilder(VariableWidthHistogramAggregationBuilder clone, + AggregatorFactories.Builder factoriesBuilder, + Map metaData) { + super(clone, factoriesBuilder, metaData); + this.numBuckets = clone.numBuckets; + } + + @Override + protected ValuesSourceType defaultValueSourceType() { + return CoreValuesSourceType.NUMERIC; + } + + public VariableWidthHistogramAggregationBuilder setNumBuckets(int numBuckets){ + if (numBuckets <= 0) { + throw new IllegalArgumentException(NUM_BUCKETS_FIELD.getPreferredName() + " must be greater than 0 for [" + + name + "]"); + } else if (numBuckets > 50000){ + throw new IllegalArgumentException(NUM_BUCKETS_FIELD.getPreferredName() + " must not be greater than 50,000 for [" + + name + "]"); + } + this.numBuckets = numBuckets; + return this; + } + + public VariableWidthHistogramAggregationBuilder setShardSize(int shardSize){ + if (shardSize < numBuckets) { + throw new IllegalArgumentException(SHARD_SIZE_FIELD.getPreferredName() + " must not be less than " + + NUM_BUCKETS_FIELD.getPreferredName() + " for [" + name + "]"); + } + this.shardSize = shardSize; + return this; + } + + public VariableWidthHistogramAggregationBuilder setInitialBuffer(int initialBuffer){ + if (initialBuffer < numBuckets) { + // If numBuckets buckets are being returned, then at least that many must be stored in memory + throw new IllegalArgumentException(INITIAL_BUFFER_FIELD.getPreferredName() + " must be greater than numBuckets " + + NUM_BUCKETS_FIELD.getPreferredName() + " for [" + name + "]"); + + } + this.initialBuffer = initialBuffer; + return this; + } + + public int getNumBuckets(){ return numBuckets; } + + public int getShardSize(){ return shardSize; } + + public int getInitialBuffer(){ return initialBuffer; } + + @Override + public BucketCardinality bucketCardinality() { + return BucketCardinality.MANY; + } + + @Override + protected AggregationBuilder shallowCopy(AggregatorFactories.Builder factoriesBuilder, Map metaData) { + return new VariableWidthHistogramAggregationBuilder(this, factoriesBuilder, metaData); + } + + @Override + protected void innerWriteTo(StreamOutput out) throws IOException { + out.writeVInt(numBuckets); + } + + @Override + protected ValuesSourceAggregatorFactory innerBuild(QueryShardContext queryShardContext, + ValuesSourceConfig config, + AggregatorFactory parent, + AggregatorFactories.Builder subFactoriesBuilder) throws IOException { + + Settings settings = queryShardContext.getIndexSettings().getNodeSettings(); + int maxBuckets = MultiBucketConsumerService.MAX_BUCKET_SETTING.get(settings); + if (numBuckets > maxBuckets) { + throw new IllegalArgumentException(NUM_BUCKETS_FIELD.getPreferredName()+ + " must be less than " + maxBuckets); + } + return new VariableWidthHistogramAggregatorFactory(name, config, numBuckets, shardSize, initialBuffer, + queryShardContext, parent, subFactoriesBuilder, metadata); + } + + @Override + protected XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { + builder.field(NUM_BUCKETS_FIELD.getPreferredName(), numBuckets); + return builder; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), numBuckets, shardSize, initialBuffer); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + if (super.equals(obj) == false) return false; + VariableWidthHistogramAggregationBuilder other = (VariableWidthHistogramAggregationBuilder) obj; + return Objects.equals(numBuckets, other.numBuckets) + && Objects.equals(shardSize, other.shardSize) + && Objects.equals(initialBuffer, other.initialBuffer); + } + + @Override + public String getType() { return NAME; } +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregator.java new file mode 100644 index 0000000000000..3e32da0709eb5 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregator.java @@ -0,0 +1,584 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.histogram; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.util.CollectionUtil; +import org.apache.lucene.util.InPlaceMergeSorter; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.DoubleArray; +import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.BucketOrder; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; +import org.elasticsearch.search.aggregations.bucket.DeferableBucketAggregator; +import org.elasticsearch.search.aggregations.bucket.DeferringBucketCollector; +import org.elasticsearch.search.aggregations.bucket.MergingBucketsDeferringCollector; +import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregator; +import org.elasticsearch.search.aggregations.support.ValuesSource; +import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +public class VariableWidthHistogramAggregator extends DeferableBucketAggregator { + + /** + * This aggregator goes through multiple phases of collection. Each phase has a different CollectionPhase::collectValue + * implementation + * + * Running a clustering algorithm like K-Means is unfeasible because large indices don't fit into memory. + * But having multiple collection phases lets us accurately bucket the docs in one pass. + */ + private abstract class CollectionPhase implements Releasable { + + /** + * This method will collect the doc and then either return itself or a new CollectionPhase + * It is responsible for determining when a phase is over and what phase will run next + */ + abstract CollectionPhase collectValue(LeafBucketCollector sub, int doc, double val) throws IOException; + + + /** + * @return the final number of buckets that will be used + * If this is not the final phase, then an instance of the next phase is created and it is asked for this answer. + */ + abstract int finalNumBuckets(); + + /** + * If this CollectionPhase is the final phase then this method will build and return the i'th bucket + * Otherwise, it will create an instance of the next phase and ask it for the i'th bucket (naturally, if that phase + * not the last phase then it will do the same and so on...) + */ + abstract InternalVariableWidthHistogram.Bucket buildBucket(int bucketOrd, InternalAggregations subAggregations) throws IOException; + + } + + /** + * Phase 1: Build up a buffer of docs (i.e. give each new doc its own bucket). No clustering decisions are made here. + * Building this buffer lets us analyze the distribution of the data before we begin clustering. + */ + private class BufferValuesPhase extends CollectionPhase{ + + private DoubleArray buffer; + private int bufferSize; + private int bufferLimit; + private MergeBucketsPhase mergeBucketsPhase; + + BufferValuesPhase(int bufferLimit){ + this.buffer = bigArrays.newDoubleArray(1); + this.bufferSize = 0; + this.bufferLimit = bufferLimit; + this.mergeBucketsPhase = null; + } + + @Override + public CollectionPhase collectValue(LeafBucketCollector sub, int doc, double val) throws IOException{ + if (bufferSize < bufferLimit) { + // Add to the buffer i.e store the doc in a new bucket + buffer = bigArrays.grow(buffer, bufferSize + 1); + buffer.set((long) bufferSize, val); + collectBucket(sub, doc, bufferSize); + bufferSize += 1; + } + + if(bufferSize == bufferLimit) { + // We have hit the buffer limit. Switch to merge mode + CollectionPhase mergeBuckets = new MergeBucketsPhase(buffer, bufferSize); + Releasables.close(this); + return mergeBuckets; + } else { + // There is still room in the buffer + return this; + } + } + + int finalNumBuckets(){ + return getMergeBucketPhase().finalNumBuckets(); + } + + @Override + InternalVariableWidthHistogram.Bucket buildBucket(int bucketOrd, InternalAggregations subAggregations) throws IOException{ + InternalVariableWidthHistogram.Bucket bucket = getMergeBucketPhase().buildBucket(bucketOrd, subAggregations); + return bucket; + } + + MergeBucketsPhase getMergeBucketPhase(){ + if(mergeBucketsPhase == null){ + mergeBucketsPhase = new MergeBucketsPhase(buffer, bufferSize); + } + return mergeBucketsPhase; + } + + @Override + public void close() { + if(mergeBucketsPhase != null){ + Releasables.close(mergeBucketsPhase); + } + Releasables.close(buffer); + } + } + + /** + * Phase 2: This phase is initialized with the buffer created in Phase 1. + * It is responsible for merging the buffered docs into a smaller number of buckets and then determining which existing + * bucket all subsequent docs belong to. New buckets will be created for docs that are distant from all existing ones + */ + private class MergeBucketsPhase extends CollectionPhase{ + /** + * "Cluster" refers to intermediate buckets during collection + * They are kept sorted by centroid. The i'th index in all these arrays always refers to the i'th cluster + */ + public DoubleArray clusterMaxes; + public DoubleArray clusterMins; + public DoubleArray clusterCentroids; + public DoubleArray clusterSizes; // clusterSizes != bucketDocCounts when clusters are in the middle of a merge + public int numClusters; + + private int avgBucketDistance; + + MergeBucketsPhase(DoubleArray buffer, int bufferSize) { + // Cluster the documents to reduce the number of buckets + // Target shardSizes * (3/4) buckets so that there's room for more distant buckets to be added during rest of collection + bucketBufferedDocs(buffer, bufferSize, shardSize * 3 / 4); + + if(bufferSize > 1) { + // Calculate the average distance between buckets + // Subsequent documents will be compared with this value to determine if they should be collected into + // an existing bucket or into a new bucket + // This can be done in a single linear scan because buckets are sorted by centroid + int sum = 0; + for (int i = 0; i < numClusters - 1; i++) { + sum += clusterCentroids.get(i + 1) - clusterCentroids.get(i); + } + avgBucketDistance = (sum / (numClusters - 1)); + } + } + + /** + * Sorts the indices of values by their underlying value + * This will produce a merge map whose application will sort values + */ + private class ClusterSorter extends InPlaceMergeSorter { + + final DoubleArray values; + final long[] indexes; + int length; + + ClusterSorter(DoubleArray values, int length){ + this.values = values; + this.length = length; + + this.indexes = new long[length]; + for(int i = 0; i < indexes.length; i++){ + indexes[i] = i; + } + } + + @Override + protected int compare(int i, int j) { + double iVal = values.get(indexes[i]); + double jVal = values.get(indexes[j]); + return Double.compare(iVal, jVal); + } + + @Override + protected void swap(int i, int j) { + long hold = indexes[i]; + indexes[i] = indexes[j]; + indexes[j] = hold; + } + + /** + * Produces a merge map where `mergeMap[i]` represents the index that values[i] + * would be moved to if values were sorted + * In other words, this method produces a merge map that will sort values + * + * See BucketsAggregator::mergeBuckets to learn more about the merge map + */ + public long[] generateMergeMap(){ + sort(0, indexes.length); + return indexes; + } + } + + /** + * Sorting the documents by key lets us bucket the documents into groups with a single linear scan + * + * But we can't do this by just sorting buffer, because we also need to generate a merge map + * for every change we make to the list, so that we can apply the changes to the underlying buckets as well. + * + * By just creating a merge map, we eliminate the need to actually sort buffer. We can just + * use the merge map to find any doc's sorted index. + */ + private void bucketBufferedDocs(final DoubleArray buffer, final int bufferSize, final int numBuckets){ + // Allocate space for the clusters about to be created + clusterMins = bigArrays.newDoubleArray(1); + clusterMaxes = bigArrays.newDoubleArray(1); + clusterCentroids = bigArrays.newDoubleArray(1); + clusterSizes = bigArrays.newDoubleArray(1); + numClusters = 0; + + ClusterSorter sorter = new ClusterSorter(buffer, bufferSize); + long[] mergeMap = sorter.generateMergeMap(); + + // Naively use basic linear separation to group the first bufferSize docs into initialNumBuckets buckets + // This will require modifying the merge map, which currently represents a sorted list of buckets with 1 doc / bucket + int docsPerBucket = (int) Math.ceil((double) bufferSize / (double) numBuckets); + int bucketOrd = 0; + for(int i = 0; i < mergeMap.length; i++){ + // mergeMap[i] is the index of the i'th smallest doc + double val = buffer.get(mergeMap[i]); + + // Put the i'th smallest doc into the bucket at bucketOrd + mergeMap[i] = (int)(mergeMap[i]/docsPerBucket); + if(bucketOrd == numClusters){ + createAndAppendNewCluster(val); + } else { + addToCluster(bucketOrd, val); + } + + if((i + 1) % docsPerBucket == 0){ + // This bucket is full. Make a new one + bucketOrd += 1; + } + } + + mergeBuckets(mergeMap, numBuckets); + if (deferringCollector != null) { + deferringCollector.mergeBuckets(mergeMap); + } + } + + @Override + public CollectionPhase collectValue(LeafBucketCollector sub, int doc, double val) throws IOException{ + int bucketOrd = getNearestBucket(val); + double distance = Math.abs(clusterCentroids.get(bucketOrd)- val); + if(bucketOrd == -1 || distance > (2 * avgBucketDistance) && numClusters < shardSize) { + // Make a new bucket since the document is distant from all existing buckets + // TODO: (maybe) Create a new bucket for all distant docs and merge down to shardSize buckets at end + + createAndAppendNewCluster(val); + collectBucket(sub, doc, numClusters - 1); + + if(val > clusterCentroids.get(bucketOrd)){ + // Insert just ahead of bucketOrd so that the array remains sorted + bucketOrd += 1; + } + moveLastCluster(bucketOrd); + } else { + addToCluster(bucketOrd, val); + collectExistingBucket(sub, doc, bucketOrd); + } + return this; + } + + /** + * Creates a new cluster with value and appends it to the cluster arrays + */ + private void createAndAppendNewCluster(double value){ + // Ensure there is space for the cluster + clusterMaxes = bigArrays.grow(clusterMaxes, numClusters + 1); // + 1 because indexing starts at 0 + clusterMins = bigArrays.grow(clusterMins, numClusters + 1); + clusterCentroids = bigArrays.grow(clusterCentroids, numClusters + 1); + clusterSizes = bigArrays.grow(clusterSizes, numClusters + 1); + + // Initialize the cluster at the end of the array + clusterMaxes.set(numClusters, value); + clusterMins.set(numClusters, value); + clusterCentroids.set(numClusters, value); + clusterSizes.set(numClusters, 1); + + numClusters += 1; + } + + /** + * Move the last cluster to position idx + * This is expensive because a merge map of size numClusters is created, so don't call this method too often + * + * TODO: Make this more efficient + */ + private void moveLastCluster(int index){ + if(index != numClusters - 1) { + + // Move the cluster metadata + double holdMax = clusterMaxes.get(numClusters-1); + double holdMin = clusterMins.get(numClusters-1); + double holdCentroid = clusterCentroids.get(numClusters-1); + double holdSize = clusterSizes.get(numClusters-1); + for (int i = numClusters - 1; i > index; i--) { + // The clusters in range {index ... numClusters - 1} move up 1 index to make room for the new cluster + clusterMaxes.set(i, clusterMaxes.get(i-1)); + clusterMins.set(i, clusterMins.get(i-1)); + clusterCentroids.set(i, clusterCentroids.get(i-1)); + clusterSizes.set(i, clusterSizes.get(i-1)); + } + clusterMaxes.set(index, holdMax); + clusterMins.set(index, holdMin); + clusterCentroids.set(index, holdCentroid); + clusterSizes.set(index, holdSize); + + // Move the underlying buckets + long[] mergeMap = new long[numClusters]; + for (int i = 0; i < index; i++) { + // The clusters in range {0 ... idx - 1} don't move + mergeMap[i] = i; + } + for (int i = index; i < numClusters - 1; i++) { + // The clusters in range {index ... numClusters - 1} shift up + mergeMap[i] = i + 1; + } + // Finally, the new cluster moves to index + mergeMap[numClusters - 1] = index; + + // TODO: Create a moveLastCluster() method in BucketsAggregator which is like BucketsAggregator::mergeBuckets, + // except it doesn't require a merge map. This would be more efficient as there would be no need to create a + // merge map on every call. + mergeBuckets(mergeMap, numClusters); + if (deferringCollector != null) { + deferringCollector.mergeBuckets(mergeMap); + } + } + } + + /** + * Adds val to the cluster at index bucketOrd. + * The cluster's centroid, min, max, and size are recalculated. + */ + private void addToCluster(int bucketOrd, double val){ + assert bucketOrd < numClusters; + + double max = Math.max(clusterMaxes.get(bucketOrd), val); + double min = Math.min(clusterMins.get(bucketOrd), val); + + // Recalculate the centroid + double oldCentroid = clusterCentroids.get(bucketOrd); + double size = clusterSizes.get(bucketOrd); + double newCentroid = ((oldCentroid * size) + val) / (size + 1); + + clusterMaxes.set(bucketOrd, max); + clusterMins.set(bucketOrd, min); + clusterCentroids.set(bucketOrd, newCentroid); + clusterSizes.increment(bucketOrd, 1); + } + + /** + * Returns the ordinal of the bucket whose centroid is closest to val, or -1 if there are no buckets. + **/ + private int getNearestBucket(double value){ + if (numClusters == 0){ + return -1; + } + BigArrays.DoubleBinarySearcher binarySearcher = new BigArrays.DoubleBinarySearcher(clusterCentroids); + return binarySearcher.search(0, numClusters - 1, value); + } + + @Override + int finalNumBuckets(){ + return numClusters; + } + + @Override + InternalVariableWidthHistogram.Bucket buildBucket(int bucketOrd, InternalAggregations subAggregations){ + return new InternalVariableWidthHistogram.Bucket( + clusterCentroids.get(bucketOrd), + new InternalVariableWidthHistogram.Bucket.BucketBounds(clusterMins.get(bucketOrd), clusterMaxes.get(bucketOrd)), + bucketDocCount(bucketOrd), + formatter, + subAggregations); + } + + @Override + public void close() { + Releasables.close(clusterMaxes, clusterMins, clusterCentroids, clusterSizes); + } + } + + private final ValuesSource.Numeric valuesSource; + private final DocValueFormat formatter; + + // Aggregation parameters + private final int numBuckets; + private final int shardSize; + private final int bufferLimit; + + final BigArrays bigArrays; + private CollectionPhase collector; + + private MergingBucketsDeferringCollector deferringCollector; + + VariableWidthHistogramAggregator(String name, AggregatorFactories factories, int numBuckets, int shardSize, + int initialBuffer, @Nullable ValuesSourceConfig valuesSourceConfig, + SearchContext context, Aggregator parent, + Map metadata) throws IOException{ + super(name, factories, context, parent, metadata); + + this.numBuckets = numBuckets; + this.valuesSource = (ValuesSource.Numeric) valuesSourceConfig.getValuesSource(); + this.formatter = valuesSourceConfig.format(); + this.shardSize = shardSize; + this.bufferLimit = initialBuffer; + + bigArrays = context.bigArrays(); + collector = new BufferValuesPhase(this.bufferLimit); + + String scoringAgg = subAggsNeedScore(); + String nestedAgg = descendsFromNestedAggregator(parent); + if (scoringAgg != null && nestedAgg != null) { + /* + * Terms agg would force the collect mode to depth_first here, because + * we need to access the score of nested documents in a sub-aggregation + * and we are not able to generate this score while replaying deferred documents. + * + * But the VariableWidthHistogram agg _must_ execute in breadth first since it relies on + * deferring execution, so we just have to throw up our hands and refuse + */ + throw new IllegalStateException("VariableWidthHistogram agg [" + name() + "] is the child of the nested agg [" + nestedAgg + + "], and also has a scoring child agg [" + scoringAgg + "]. This combination is not supported because " + + "it requires executing in [depth_first] mode, which the VariableWidthHistogram agg cannot do."); + } + } + + private String subAggsNeedScore() { + for (Aggregator subAgg : subAggregators) { + if (subAgg.scoreMode().needsScores()) { + return subAgg.name(); + } + } + return null; + } + + private String descendsFromNestedAggregator(Aggregator parent) { + while (parent != null) { + if (parent.getClass() == NestedAggregator.class) { + return parent.name(); + } + parent = parent.parent(); + } + return null; + } + + @Override + public ScoreMode scoreMode() { + if (valuesSource != null && valuesSource.needsScores()) { + return ScoreMode.COMPLETE; + } + return super.scoreMode(); + } + + @Override + protected boolean shouldDefer(Aggregator aggregator) { + return true; + } + + @Override + public DeferringBucketCollector getDeferringCollector() { + deferringCollector = new MergingBucketsDeferringCollector(context, descendsFromGlobalAggregator(parent())); + return deferringCollector; + } + + @Override + protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { + if (valuesSource == null) { + return LeafBucketCollector.NO_OP_COLLECTOR; + } + final SortedNumericDoubleValues values = valuesSource.doubleValues(ctx); + return new LeafBucketCollectorBase(sub, values){ + @Override + public void collect(int doc, long bucket) throws IOException { + assert bucket == 0; + if(values.advanceExact(doc)){ + final int valuesCount = values.docValueCount(); + double prevVal = Double.NEGATIVE_INFINITY; + for (int i = 0; i < valuesCount; ++i) { + double val = values.nextValue(); + assert val >= prevVal; + if (val == prevVal){ + continue; + } + + collector = collector.collectValue(sub, doc, val); + } + } + } + }; + } + + + @Override + public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + int numClusters = collector.finalNumBuckets(); + + long[] bucketOrdsToCollect = new long[numClusters]; + for (int i = 0; i < numClusters; i++) { + bucketOrdsToCollect[i] = i; + } + + InternalAggregations[] subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect); + + List buckets = new ArrayList<>(numClusters); + for (int bucketOrd = 0; bucketOrd < numClusters; bucketOrd++) { + buckets.add(collector.buildBucket(bucketOrd, subAggregationResults[bucketOrd])); + } + + Function, InternalAggregation> resultBuilder = bucketsToFormat -> { + // The contract of the histogram aggregation is that shards must return + // buckets ordered by centroid in ascending order + CollectionUtil.introSort(bucketsToFormat, BucketOrder.key(true).comparator()); + + InternalVariableWidthHistogram.EmptyBucketInfo emptyBucketInfo = new InternalVariableWidthHistogram.EmptyBucketInfo( + buildEmptySubAggregations()); + + return new InternalVariableWidthHistogram(name, bucketsToFormat, emptyBucketInfo, numBuckets, formatter, metadata()); + }; + + return new InternalAggregation[] { resultBuilder.apply(buckets) }; + + } + + @Override + public InternalAggregation buildEmptyAggregation() { + InternalVariableWidthHistogram.EmptyBucketInfo emptyBucketInfo = new InternalVariableWidthHistogram.EmptyBucketInfo( + buildEmptySubAggregations() + ); + return new InternalVariableWidthHistogram(name(), Collections.emptyList(), emptyBucketInfo, numBuckets, formatter, metadata()); + } + + @Override + public void doClose() { + Releasables.close(collector); + } + +} + diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregatorFactory.java new file mode 100644 index 0000000000000..2b60fac8c686e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregatorFactory.java @@ -0,0 +1,85 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.histogram; + +import org.elasticsearch.index.query.QueryShardContext; +import org.elasticsearch.search.aggregations.AggregationExecutionException; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.aggregations.support.AggregatorSupplier; +import org.elasticsearch.search.aggregations.support.CoreValuesSourceType; +import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory; +import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; +import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.Map; + +public class VariableWidthHistogramAggregatorFactory extends ValuesSourceAggregatorFactory { + + public static void registerAggregators(ValuesSourceRegistry.Builder builder) { + builder.register(VariableWidthHistogramAggregationBuilder.NAME, CoreValuesSourceType.NUMERIC, + (VariableWidthHistogramAggregatorSupplier) VariableWidthHistogramAggregator::new); + } + + private final int numBuckets; + private final int shardSize; + private final int initialBuffer; + + VariableWidthHistogramAggregatorFactory(String name, + ValuesSourceConfig config, + int numBuckets, + int shardSize, + int initialBuffer, + QueryShardContext queryShardContext, + AggregatorFactory parent, + AggregatorFactories.Builder subFactoriesBuilder, + Map metadata) throws IOException{ + super(name, config, queryShardContext, parent, subFactoriesBuilder, metadata); + this.numBuckets = numBuckets; + this.shardSize = shardSize; + this.initialBuffer = initialBuffer; + } + + @Override + protected Aggregator doCreateInternal(SearchContext searchContext, + Aggregator parent, + boolean collectsFromSingleBucket, + Map metadata) throws IOException{ + AggregatorSupplier aggregatorSupplier = queryShardContext.getValuesSourceRegistry().getAggregator(config, + VariableWidthHistogramAggregationBuilder.NAME); + if (aggregatorSupplier instanceof VariableWidthHistogramAggregatorSupplier == false) { + throw new AggregationExecutionException("Registry miss-match - expected HistogramAggregatorSupplier, found [" + + aggregatorSupplier.getClass().toString() + "]"); + } + return ((VariableWidthHistogramAggregatorSupplier) aggregatorSupplier).build(name, factories, numBuckets, shardSize, initialBuffer, + config, searchContext, parent, metadata); + } + + @Override + protected Aggregator createUnmapped(SearchContext searchContext, + Aggregator parent, + Map metadata) throws IOException { + return new VariableWidthHistogramAggregator(name, factories, numBuckets, shardSize, initialBuffer, config, + searchContext, parent, metadata); + } +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregatorSupplier.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregatorSupplier.java new file mode 100644 index 0000000000000..9e0abdeec9633 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregatorSupplier.java @@ -0,0 +1,45 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.histogram; + +import org.elasticsearch.common.Nullable; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.support.AggregatorSupplier; +import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.Map; + +@FunctionalInterface +public interface VariableWidthHistogramAggregatorSupplier extends AggregatorSupplier { + Aggregator build( + String name, + AggregatorFactories factories, + int numBuckets, + int shardSize, + int initialBuffer, + @Nullable ValuesSourceConfig valuesSourceConfig, + SearchContext aggregationContext, + Aggregator parent, + Map metadata + ) throws IOException; +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationInspectionHelper.java b/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationInspectionHelper.java index 9785ff971e83b..6864d958dbdc3 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationInspectionHelper.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationInspectionHelper.java @@ -24,6 +24,7 @@ import org.elasticsearch.search.aggregations.bucket.filter.InternalFilters; import org.elasticsearch.search.aggregations.bucket.geogrid.InternalGeoGrid; import org.elasticsearch.search.aggregations.bucket.global.InternalGlobal; +import org.elasticsearch.search.aggregations.bucket.histogram.InternalVariableWidthHistogram; import org.elasticsearch.search.aggregations.bucket.histogram.InternalAutoDateHistogram; import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram; import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram; @@ -124,6 +125,10 @@ public static boolean hasValue(InternalAutoDateHistogram agg) { return agg.getBuckets().stream().anyMatch(bucket -> bucket.getDocCount() > 0); } + public static boolean hasValue(InternalVariableWidthHistogram agg) { + return agg.getBuckets().stream().anyMatch(bucket -> bucket.getDocCount() > 0); + } + public static boolean hasValue(InternalComposite agg) { return agg.getBuckets().stream().anyMatch(bucket -> bucket.getDocCount() > 0); } diff --git a/server/src/test/java/org/elasticsearch/common/util/BinarySearcherTests.java b/server/src/test/java/org/elasticsearch/common/util/BinarySearcherTests.java new file mode 100644 index 0000000000000..bbdb8077aec58 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/common/util/BinarySearcherTests.java @@ -0,0 +1,141 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.util; + +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.test.ESTestCase; +import org.junit.Before; + +import java.util.Arrays; + +public class BinarySearcherTests extends ESTestCase { + + private BigArrays randombigArrays() { + return new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); + } + + private BigArrays bigArrays; + + @Before + public void init() { + bigArrays = randombigArrays(); + } + + public void testDoubleBinarySearch() throws Exception { + final int size = randomIntBetween(50, 10000); + DoubleArray bigArray = new BigDoubleArray(size, bigArrays, false); + double[] array = new double[size]; + + // Fill array with sorted values + double currentValue = randomDoubleBetween(-100, 100, true); + for (int i = 0; i < size; ++i) { + bigArray.set(i, currentValue); + array[i] = currentValue; + currentValue += randomDoubleBetween(0, 30, false); + } + + // Pick a number to search for + int index = randomIntBetween(0, size - 1); + double searchFor = bigArray.get(index); + if (randomBoolean()) { + // Pick a number where there is no exact match, but that is closest to array.get(index) + if (randomBoolean()) { + // Pick a number above array.get(index) + if (index < size - 1) { + // Divide by 3 so that it's closer to array.get(index) than to array.get(index + 1) + searchFor += (bigArray.get(index + 1) - bigArray.get(index)) / 3; + } else { + // There is nothing about index + searchFor += 0.1; + } + } else { + // Pick one below array.get(index) + if (index > 0) { + searchFor -= (bigArray.get(index) - bigArray.get(index - 1)) / 3; + } else { + // There is nothing below index + searchFor -= 0.1; + } + } + } + + BigArrays.DoubleBinarySearcher searcher = new BigArrays.DoubleBinarySearcher(bigArray); + assertEquals(index, searcher.search(0, size - 1, searchFor)); + + // Sanity check: confirm that ArrayUtils.binarySearch() returns the same index + int arraysIndex = Arrays.binarySearch(array, searchFor); + if(arraysIndex < 0){ + // Arrays.binarySearch didn't find an exact match + arraysIndex = -(arraysIndex + 1); + } + + // Arrays.binarySearch always rounds down whereas BinarySearcher rounds to the closest index + // So sometimes they will be off by 1 + assertEquals(Math.abs(index - arraysIndex) <= 1, true); + + Releasables.close(bigArray); + } + + class IntBinarySearcher extends BinarySearcher { + + int[] array; + int searchFor; + + IntBinarySearcher(int[] array, int searchFor) { + this.array = array; + this.searchFor = searchFor; + } + + @Override + protected int compare(int index) { + return Integer.compare(array[index], searchFor); + } + + @Override + protected double distance(int index) { + return Math.abs(array[index] - searchFor); + } + } + + public void testCompareWithArraysBinarySearch() throws Exception { + int size = randomIntBetween(30, 10000); + int[] array = new int[size]; + for (int i = 0; i < size; i++) { + array[i] = randomInt(); + } + Arrays.sort(array); + int searchFor = randomInt(); + BinarySearcher searcher = new IntBinarySearcher(array, searchFor); + + int searcherIndex = searcher.search(0, size-1); + int arraysIndex = Arrays.binarySearch(array, searchFor); + + if(arraysIndex < 0){ + // Arrays.binarySearch didn't find an exact match + arraysIndex = -(arraysIndex + 1); + } + + // Arrays.binarySearch always rounds down whereas BinarySearcher rounds to the closest index + // So sometimes they will be off by 1 + assertEquals(Math.abs(searcherIndex - arraysIndex) <= 1, true); + } +} diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/AggregationsTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/AggregationsTests.java index 51c77a430c87b..d574e254c5469 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/AggregationsTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/AggregationsTests.java @@ -41,6 +41,7 @@ import org.elasticsearch.search.aggregations.bucket.histogram.InternalAutoDateHistogramTests; import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogramTests; import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogramTests; +import org.elasticsearch.search.aggregations.bucket.histogram.InternalVariableWidthHistogramTests; import org.elasticsearch.search.aggregations.bucket.missing.InternalMissingTests; import org.elasticsearch.search.aggregations.bucket.nested.InternalNestedTests; import org.elasticsearch.search.aggregations.bucket.nested.InternalReverseNestedTests; @@ -102,7 +103,6 @@ * */ public class AggregationsTests extends ESTestCase { - private static final List> aggsTests = getAggsTests(); private static List> getAggsTests() { @@ -131,6 +131,7 @@ private static List> getAggsTests() { aggsTests.add(new InternalHistogramTests()); aggsTests.add(new InternalDateHistogramTests()); aggsTests.add(new InternalAutoDateHistogramTests()); + aggsTests.add(new InternalVariableWidthHistogramTests()); aggsTests.add(new LongTermsTests()); aggsTests.add(new DoubleTermsTests()); aggsTests.add(new StringTermsTests()); diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalVariableWidthHistogramTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalVariableWidthHistogramTests.java new file mode 100644 index 0000000000000..232dc24dd95d9 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalVariableWidthHistogramTests.java @@ -0,0 +1,394 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.histogram; + +import org.apache.lucene.util.TestUtil; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.common.util.MockPageCacheRecycler; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.MultiBucketConsumerService; +import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.test.InternalMultiBucketAggregationTestCase; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class InternalVariableWidthHistogramTests extends + InternalMultiBucketAggregationTestCase{ + + private DocValueFormat format; + private InternalVariableWidthHistogram.EmptyBucketInfo emptyBucktInfo; + private int numBuckets; + + @Override + public void setUp() throws Exception { + super.setUp(); + format = randomNumericDocValueFormat(); + emptyBucktInfo = new InternalVariableWidthHistogram.EmptyBucketInfo(InternalAggregations.EMPTY); + this.numBuckets = 3; + } + + private InternalVariableWidthHistogram createEmptyTestInstance(){ + String name = randomAlphaOfLength(5); + Map metadata = null; + if (randomBoolean()) { + metadata = new HashMap<>(); + int metadataCount = between(0, 10); + while (metadata.size() < metadataCount) { + metadata.put(randomAlphaOfLength(5), randomAlphaOfLength(5)); + } + } + List buckets = new ArrayList<>(); + return new InternalVariableWidthHistogram(name, buckets, emptyBucktInfo, numBuckets, format, metadata); + } + + @Override + protected InternalVariableWidthHistogram createTestInstance(String name, + Map metaData, + InternalAggregations aggregations) { + final double base = randomIntBetween(-50, 50); + final int numBuckets = randomIntBetween(1, 3); + List buckets = new ArrayList<>(); + double curKey = base; + for (int i = 0; i < numBuckets; ++i) { + final int docCount = TestUtil.nextInt(random(), 1, 50); + double add = randomDoubleBetween(1, 10, true); + curKey += add; + buckets.add(new InternalVariableWidthHistogram.Bucket( + curKey, + new InternalVariableWidthHistogram.Bucket.BucketBounds(curKey - (add / 3), curKey + (add / 3)), + docCount, + format, + InternalAggregations.EMPTY + )); + } + return new InternalVariableWidthHistogram(name, buckets, emptyBucktInfo, numBuckets, format, metaData); + } + + @Override + protected Class implementationClass() { + return ParsedVariableWidthHistogram.class; + } + @Override + protected InternalVariableWidthHistogram mutateInstance(InternalVariableWidthHistogram instance) { + String name = instance.getName(); + List buckets = instance.getBuckets(); + int targetBuckets = instance.getTargetBuckets(); + InternalVariableWidthHistogram.EmptyBucketInfo emptyBucketInfo = instance.getEmptyBucketInfo(); + Map metadata = instance.getMetadata(); + switch (between(0, 2)) { + case 0: + name += randomAlphaOfLength(5); + break; + case 1: + buckets = new ArrayList<>(buckets); + double boundMin = randomDouble(); + double boundMax = Math.abs(boundMin) * 2; + buckets.add(new InternalVariableWidthHistogram.Bucket( + randomDouble(), + new InternalVariableWidthHistogram.Bucket.BucketBounds(boundMin, boundMax), + randomIntBetween(1, 100), + format, + InternalAggregations.EMPTY + )); + break; + case 2: + emptyBucketInfo = null; + if (metadata == null) { + metadata = new HashMap<>(1); + } else { + metadata = new HashMap<>(instance.getMetadata()); + } + metadata.put(randomAlphaOfLength(15), randomInt()); + break; + default: + throw new AssertionError("Illegal randomisation branch"); + } + return new InternalVariableWidthHistogram(name, buckets, emptyBucketInfo, targetBuckets, format, metadata); + } + + public void testSingleShardReduceLong() { + InternalVariableWidthHistogram dummy_histogram = createEmptyTestInstance(); + List buckets = new ArrayList<>(); + for (long value : new long[]{1, 2, 5, 10, 12, 200}) { + InternalVariableWidthHistogram.Bucket.BucketBounds bounds = + new InternalVariableWidthHistogram.Bucket.BucketBounds(value, value + 1); + InternalVariableWidthHistogram.Bucket bucket = new InternalVariableWidthHistogram.Bucket( + value, bounds, 3, format, InternalAggregations.EMPTY + ); + buckets.add(bucket); + } + InternalVariableWidthHistogram histogram = dummy_histogram.create(buckets); + + MockBigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); + ScriptService mockScriptService = mockScriptService(); + + MultiBucketConsumerService.MultiBucketConsumer bucketConsumer = + new MultiBucketConsumerService.MultiBucketConsumer(DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)); + InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forFinalReduction( + bigArrays, mockScriptService, bucketConsumer, PipelineAggregator.PipelineTree.EMPTY); + + ArrayList aggs = new ArrayList<>(); + aggs.add(histogram); + List reduced_buckets = + ((InternalVariableWidthHistogram) histogram.reduce(aggs, context)).getBuckets(); + + // Final clusters should be [ (1,2,5), (10,12), 200) ] + // Final centroids should be [ 3, 11, 200 ] + // Final keys should be [ 1, 5, 200 ] + double double_error = 1d / 10000d; + assertEquals(1d, reduced_buckets.get(0).min(), double_error); + assertEquals((8d/3d), (double) reduced_buckets.get(0).getKey(), double_error); + assertEquals(9, reduced_buckets.get(0).getDocCount()); + assertEquals(10d, reduced_buckets.get(1).min(), double_error); + assertEquals(11d, (double) reduced_buckets.get(1).getKey(), double_error); + assertEquals(6, reduced_buckets.get(1).getDocCount()); + assertEquals(200d, reduced_buckets.get(2).min(), double_error); + assertEquals(200d, (double) reduced_buckets.get(2).getKey(), double_error); + assertEquals(3, reduced_buckets.get(2).getDocCount()); + } + + public void testSingleShardReduceDouble() { + InternalVariableWidthHistogram dummy_histogram = createEmptyTestInstance(); + List buckets = new ArrayList<>(); + for (double value : new double[]{-1.3, -1.3, 12.0, 13.0, 20.0, 21.5, 23.0, 24.5}) { + InternalVariableWidthHistogram.Bucket.BucketBounds bounds = + new InternalVariableWidthHistogram.Bucket.BucketBounds(value - 0.7, value + 1); + InternalVariableWidthHistogram.Bucket bucket = new InternalVariableWidthHistogram.Bucket( + value, bounds, 1, format, InternalAggregations.EMPTY + ); + buckets.add(bucket); + } + InternalVariableWidthHistogram histogram = dummy_histogram.create(buckets); + + MockBigArrays bigArrays = + new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); + ScriptService mockScriptService = mockScriptService(); + + MultiBucketConsumerService.MultiBucketConsumer bucketConsumer = + new MultiBucketConsumerService.MultiBucketConsumer(DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)); + InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forFinalReduction( + bigArrays, mockScriptService, bucketConsumer, PipelineAggregator.PipelineTree.EMPTY); + + ArrayList aggs = new ArrayList<>(); + aggs.add(histogram); + List reduced_buckets = + ((InternalVariableWidthHistogram) histogram.reduce(aggs, context)).getBuckets(); + + // Final clusters should be [ (-1.3,-1.3), (12.0,13.0), (20.0, 21.5, 23.0, 24.5) ] + // Final centroids should be [ -1.3, 12.5, 22.25 ] + // Final keys should be [ -1.3, 11.7, 19.7 ] + double double_error = 1d / 10000d; + assertEquals(-2.0, reduced_buckets.get(0).min(), double_error); + assertEquals(-1.3, (double)reduced_buckets.get(0).getKey(), double_error); + assertEquals(2, reduced_buckets.get(0).getDocCount()); + assertEquals(11.3, reduced_buckets.get(1).min(), double_error); + assertEquals(12.5, (double)reduced_buckets.get(1).getKey(), double_error); + assertEquals(2, reduced_buckets.get(1).getDocCount()); + assertEquals(19.3, reduced_buckets.get(2).min(), double_error); + assertEquals(22.25, (double)reduced_buckets.get(2).getKey(), double_error); + assertEquals(4, reduced_buckets.get(2).getDocCount()); + } + + public void testMultipleShardsReduce() { + InternalVariableWidthHistogram dummy_histogram = createEmptyTestInstance(); + + List buckets1 = new ArrayList<>(); + for (long value : new long[]{1, 5, 6, 10}) { + InternalVariableWidthHistogram.Bucket.BucketBounds bounds = + new InternalVariableWidthHistogram.Bucket.BucketBounds(value, value + 1); + InternalVariableWidthHistogram.Bucket bucket = new InternalVariableWidthHistogram.Bucket( + value, bounds, 1, format, InternalAggregations.EMPTY + ); + buckets1.add(bucket); + } + + List buckets2 = new ArrayList<>(); + for (long value : new long[]{2, 3, 6, 7}) { + InternalVariableWidthHistogram.Bucket.BucketBounds bounds = + new InternalVariableWidthHistogram.Bucket.BucketBounds(value, value + 1); + InternalVariableWidthHistogram.Bucket bucket = new InternalVariableWidthHistogram.Bucket( + value, bounds, 1, format, InternalAggregations.EMPTY + ); + buckets2.add(bucket); + } + + List buckets3 = new ArrayList<>(); + for (long value : new long[]{0, 2, 12}) { + InternalVariableWidthHistogram.Bucket.BucketBounds bounds = + new InternalVariableWidthHistogram.Bucket.BucketBounds(value, value + 1); + InternalVariableWidthHistogram.Bucket bucket = new InternalVariableWidthHistogram.Bucket( + value, bounds, 1, format, InternalAggregations.EMPTY + ); + buckets3.add(bucket); + } + + InternalVariableWidthHistogram histogram1 = dummy_histogram.create(buckets1); + InternalVariableWidthHistogram histogram2 = dummy_histogram.create(buckets2); + InternalVariableWidthHistogram histogram3 = dummy_histogram.create(buckets3); + + MockBigArrays bigArrays = + new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); + ScriptService mockScriptService = mockScriptService(); + + MultiBucketConsumerService.MultiBucketConsumer bucketConsumer = + new MultiBucketConsumerService.MultiBucketConsumer(DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)); + InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forFinalReduction( + bigArrays, mockScriptService, bucketConsumer, PipelineAggregator.PipelineTree.EMPTY); + + ArrayList aggs = new ArrayList<>(); + aggs.add(histogram1); + aggs.add(histogram2); + aggs.add(histogram3); + List reduced_buckets = + ((InternalVariableWidthHistogram) histogram1.reduce(aggs, context)).getBuckets(); + + // Final clusters should be [ (0, 1, 2, 2, 3), (5, 6, 6, 7), (10, 12) ] + // Final centroids should be [ 2, 6, 11 ] + // Final keys should be [ 1, 5, 10 ] + double double_error = 1d / 10000d; + assertEquals(0d, reduced_buckets.get(0).min(), double_error); + assertEquals(1.6d, (double)reduced_buckets.get(0).getKey(), double_error); + assertEquals(5, reduced_buckets.get(0).getDocCount()); + assertEquals(5d, reduced_buckets.get(1).min(), double_error); + assertEquals(6d, (double) reduced_buckets.get(1).getKey(), double_error); + assertEquals(4, reduced_buckets.get(1).getDocCount()); + assertEquals(10d, reduced_buckets.get(2).min(), double_error); + assertEquals(11d, (double) reduced_buckets.get(2).getKey(), double_error); + assertEquals(2, reduced_buckets.get(2).getDocCount()); + } + + public void testOverlappingReduceResult() { + InternalVariableWidthHistogram dummy_histogram = createEmptyTestInstance(); + List buckets = new ArrayList<>(); + for (long value : new long[]{1, 2, 4, 10}) { + InternalVariableWidthHistogram.Bucket.BucketBounds bounds = + new InternalVariableWidthHistogram.Bucket.BucketBounds(value, value + 3); + InternalVariableWidthHistogram.Bucket bucket = new InternalVariableWidthHistogram.Bucket( + value, bounds, 4, format, InternalAggregations.EMPTY + ); + buckets.add(bucket); + } + InternalVariableWidthHistogram histogram = dummy_histogram.create(buckets); + + MockBigArrays bigArrays = + new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); + ScriptService mockScriptService = mockScriptService(); + + MultiBucketConsumerService.MultiBucketConsumer bucketConsumer = + new MultiBucketConsumerService.MultiBucketConsumer(DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)); + InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forFinalReduction( + bigArrays, mockScriptService, bucketConsumer, PipelineAggregator.PipelineTree.EMPTY); + + ArrayList aggs = new ArrayList<>(); + aggs.add(histogram); + List reduced_buckets = + ((InternalVariableWidthHistogram) histogram.reduce(aggs, context)).getBuckets(); + + // Expected clusters: [ (1, 2), (4), 10) ] + // Expected centroids: [ 1.5, 4, 10 ] + // Expected cluster (min, max): [ (1, 5), (4, 7), (10, 13) ] + // Expected keys: [ 1, 4.5, 10 ] + // Expected doc counts: [8, 4, 4] + double double_error = 1d / 10000d; + assertEquals(1d, reduced_buckets.get(0).min(), double_error); + assertEquals(1.5, (double) reduced_buckets.get(0).getKey(), double_error); + assertEquals(8, reduced_buckets.get(0).getDocCount()); + assertEquals(4.5, reduced_buckets.get(1).min(), double_error); + assertEquals(4d, (double) reduced_buckets.get(1).getKey(), double_error); + assertEquals(4, reduced_buckets.get(1).getDocCount()); + assertEquals(10d, reduced_buckets.get(2).min(), double_error); + assertEquals(10d, (double) reduced_buckets.get(2).getKey(), double_error); + assertEquals(4, reduced_buckets.get(2).getDocCount()); + } + + /** + * When buckets have the same min after the reduce phase, they should be merged. + */ + public void testSameMinMerge() { + InternalVariableWidthHistogram dummy_histogram = createEmptyTestInstance(); + List buckets = new ArrayList<>(); + for (long value : new long[]{1, 100, 700}) { + InternalVariableWidthHistogram.Bucket.BucketBounds bounds; + if(value == 1 || value == 100) { + bounds = new InternalVariableWidthHistogram.Bucket.BucketBounds( + 1, value + ); + } else{ + bounds = new InternalVariableWidthHistogram.Bucket.BucketBounds( + value, value + 1 + ); + } + InternalVariableWidthHistogram.Bucket bucket = new InternalVariableWidthHistogram.Bucket( + value, bounds, 1, format, InternalAggregations.EMPTY + ); + buckets.add(bucket); + } + InternalVariableWidthHistogram histogram = dummy_histogram.create(buckets); + + MockBigArrays bigArrays = + new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); + ScriptService mockScriptService = mockScriptService(); + + MultiBucketConsumerService.MultiBucketConsumer bucketConsumer = + new MultiBucketConsumerService.MultiBucketConsumer(DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)); + InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forFinalReduction( + bigArrays, mockScriptService, bucketConsumer, PipelineAggregator.PipelineTree.EMPTY); + + ArrayList aggs = new ArrayList<>(); + aggs.add(histogram); + List reduced_buckets = + ((InternalVariableWidthHistogram) histogram.reduce(aggs, context)).getBuckets(); + + // Expected clusters: [ (1), (100), (700) ] + // Expected clusters after same min merge: [ (1, 100), (700) ] + // Expected centroids: [ 101/2, 700 ] + // Expected keys: [ 1, 700 ] + // Expected doc counts: [2, 1] + double double_error = 1d / 10000d; + assertEquals(2, reduced_buckets.size()); + assertEquals(1d, reduced_buckets.get(0).min(), double_error); + assertEquals((101d/2d), (double) reduced_buckets.get(0).getKey(), double_error); + assertEquals(2, reduced_buckets.get(0).getDocCount()); + assertEquals(700d, reduced_buckets.get(1).min(), double_error); + assertEquals(700d, (double) reduced_buckets.get(1).getKey(), double_error); + assertEquals(1, reduced_buckets.get(1).getDocCount()); + } + + @Override + protected void assertReduced(InternalVariableWidthHistogram reduced, List inputs) { + // It's very difficult to determine what the buckets should be without running the clustering algorithm. + // For now, randomized tests are avoided. Refer to the hardcoded written tests above. + } +} diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregatorTests.java new file mode 100644 index 0000000000000..761e5abaac3b1 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregatorTests.java @@ -0,0 +1,544 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.histogram; + +import org.apache.lucene.document.Document; +import org.apache.lucene.document.SortedNumericDocValuesField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.RandomIndexWriter; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.NumericUtils; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.NumberFieldMapper; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.AggregatorTestCase; +import org.elasticsearch.search.aggregations.bucket.terms.InternalTerms; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.InternalStats; +import org.elasticsearch.search.aggregations.metrics.StatsAggregationBuilder; +import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; + +public class VariableWidthHistogramAggregatorTests extends AggregatorTestCase { + + private static final String NUMERIC_FIELD = "numeric"; + + private static final Query DEFAULT_QUERY = new MatchAllDocsQuery(); + private VariableWidthHistogramAggregationBuilder aggregationBuilder; + + public void testNoDocs() throws Exception{ + final List dataset = Arrays.asList(); + testBothCases(DEFAULT_QUERY, dataset, true, + aggregation -> aggregation.field(NUMERIC_FIELD).setNumBuckets(2).setShardSize(6).setInitialBuffer(4), + histogram -> { + final List buckets = histogram.getBuckets(); + assertEquals(0, buckets.size()); + }); + } + + public void testMoreClustersThanDocs() throws Exception { + final List dataset = Arrays.asList(-3L, 10L, -200L); + double doubleError = 1d / 10000d; + + // Each document should have its own cluster, since shard_size > # docs + final Map expectedDocCount = new HashMap<>(); + expectedDocCount.put(-200d, 1); + expectedDocCount.put(-3d, 1); + expectedDocCount.put(10d, 1); + final Map expectedMins = new HashMap<>(); + expectedMins.put(-200d, -200d); + expectedMins.put(-3d, -3d); + expectedMins.put(10d, 10d); + + testBothCases(DEFAULT_QUERY, dataset, true, + aggregation -> aggregation.field(NUMERIC_FIELD).setNumBuckets(4).setShardSize(4), + histogram -> { + final List buckets = histogram.getBuckets(); + assertEquals(expectedDocCount.size(), buckets.size()); + buckets.forEach(bucket -> { + assertEquals(expectedDocCount.getOrDefault(bucket.getKey(), 0).longValue(), bucket.getDocCount()); + assertEquals(expectedMins.getOrDefault(bucket.getKey(), 0d).doubleValue(), bucket.min(), doubleError); + }); + }); + } + + public void testLongs() throws Exception { + final List dataset = Arrays.asList(5L, -3L, 1L, -2L, 4L); + double doubleError = 1d / 10000d; + + // Expected clusters: [ (-3, -2), (1), (4, 5) ] + // Corresponding keys (centroids): [ -2.5, 1, 4.5 ] + + final Map expectedDocCount = new HashMap<>(); + expectedDocCount.put(-2.5, 2); + expectedDocCount.put(1d, 1); + expectedDocCount.put(4.5, 2); + + final Map expectedMins = new HashMap<>(); + expectedMins.put(-2.5, -3d); + expectedMins.put(1d, 1d); + expectedMins.put(4.5, 4d); + + final Map expectedMaxes = new HashMap<>(); + expectedMaxes.put(-2.5, -2d); + expectedMaxes.put(1d, 1d); + expectedMaxes.put(4.5, 5d); + + testSearchCase(DEFAULT_QUERY, dataset, false, + aggregation -> aggregation.field(NUMERIC_FIELD).setNumBuckets(3).setShardSize(6).setInitialBuffer(3), + histogram -> { + final List buckets = histogram.getBuckets(); + assertEquals(expectedDocCount.size(), buckets.size()); + buckets.forEach(bucket -> { + assertEquals(expectedDocCount.getOrDefault(bucket.getKey(), 0).longValue(), bucket.getDocCount()); + assertEquals(expectedMins.getOrDefault(bucket.getKey(), 0d).doubleValue(), bucket.min(), doubleError); + assertEquals(expectedMaxes.getOrDefault(bucket.getKey(), 0d).doubleValue(), bucket.max(), doubleError); + }); + }); + } + + public void testDoubles() throws Exception { + final List dataset = Arrays.asList(3.3, 8.8, 1.2, 5.3, 2.26, -0.4, 5.9); + double doubleError = 1d / 10000d; + + // Search (no reduce) + + // (Cache limit < shard size) is illogical, but it simplifies testing + // Once the cache is full, the algorithm creates (3/4 * shard_size = 4) initial buckets. + // So, there will initially be a bucket for each of the first 4 values + // Expected clusters from search: [ (-0.4, 1.2), (2.26, 3.3), (5.3, 5.9) (8.8)] + // Corresponding keys (centroids): [ 0.4, 2.78, 5.6, 8.8] + final Map expectedDocCountOnlySearch = new HashMap<>(); + expectedDocCountOnlySearch.put(-0.4, 2); + expectedDocCountOnlySearch.put(2.26, 2); + expectedDocCountOnlySearch.put(5.3, 2); + expectedDocCountOnlySearch.put(8.8, 1); + + // Index these by min, rather than centroid, to avoid slight precision errors (Ex. 3.999999 vs 4.0) that arise from double division + final Map expectedCentroidsOnlySearch = new HashMap<>(); + expectedCentroidsOnlySearch.put(-0.4, 0.4); + expectedCentroidsOnlySearch.put(2.26, 2.78); + expectedCentroidsOnlySearch.put(5.3, 5.6); + expectedCentroidsOnlySearch.put(8.8, 8.8); + final Map expectedMaxesOnlySearch = new HashMap<>(); + expectedMaxesOnlySearch.put(-0.4, 1.2); + expectedMaxesOnlySearch.put(2.26, 3.3); + expectedMaxesOnlySearch.put(5.3, 5.9); + expectedMaxesOnlySearch.put(8.8, 8.8); + + testSearchCase(DEFAULT_QUERY, dataset, false, + aggregation -> aggregation.field(NUMERIC_FIELD).setNumBuckets(2).setShardSize(6).setInitialBuffer(4), + histogram -> { + final List buckets = histogram.getBuckets(); + assertEquals(expectedCentroidsOnlySearch.size(), buckets.size()); + buckets.forEach(bucket -> { + assertEquals(expectedDocCountOnlySearch.getOrDefault(bucket.min(), 0).longValue(), bucket.getDocCount(), doubleError); + assertEquals(expectedCentroidsOnlySearch.getOrDefault(bucket.min(), 0d).doubleValue(), bucket.centroid(), doubleError); + assertEquals(expectedMaxesOnlySearch.getOrDefault(bucket.min(), 0d).doubleValue(), bucket.max(), doubleError); + }); + }); + + // Search + Reduce + + // Before reducing we have one bucket per doc + // Expected clusters from search: [ (-0.4, 1.2), (2.26, 3.3), (5.3, 5.9) (8.8)] + // Corresponding keys (centroids): [ 0.4, 2.78, 5.6, 8.8] + final Map expectedDocCountSearchReduce = new HashMap<>(); + expectedDocCountSearchReduce.put(-0.4, 2); + expectedDocCountSearchReduce.put(2.26, 2); + expectedDocCountSearchReduce.put(5.3, 2); + expectedDocCountSearchReduce.put(8.8, 1); + + // Indexed by min + final Map expectedCentroidsSearchReduce = new HashMap<>(); + expectedCentroidsSearchReduce.put(-0.4, 0.4); + expectedCentroidsSearchReduce.put(2.26,2.78); + expectedCentroidsSearchReduce.put(5.3, 5.6); + expectedCentroidsSearchReduce.put(8.8, 8.8); + final Map expectedMaxesSearchReduce = new HashMap<>(); + expectedMaxesSearchReduce.put(-0.4, 1.2); + expectedMaxesSearchReduce.put(2.26,3.3); + expectedMaxesSearchReduce.put(5.3, 5.9); + expectedMaxesSearchReduce.put(8.8, 8.8); + + testSearchAndReduceCase(DEFAULT_QUERY, dataset, false, + aggregation -> aggregation.field(NUMERIC_FIELD).setNumBuckets(4).setShardSize(6).setInitialBuffer(4), + histogram -> { + final List buckets = histogram.getBuckets(); + assertEquals(expectedDocCountSearchReduce.size(), buckets.size()); + buckets.forEach(bucket -> { + long expectedDocCount = expectedDocCountSearchReduce.getOrDefault(bucket.min(), 0).longValue(); + double expectedCentroid = expectedCentroidsSearchReduce.getOrDefault(bucket.min(), 0d).doubleValue(); + double expectedMax = expectedMaxesSearchReduce.getOrDefault(bucket.min(), 0d).doubleValue(); + assertEquals(expectedDocCount, bucket.getDocCount(), doubleError); + assertEquals(expectedCentroid, bucket.centroid(), doubleError); + assertEquals(expectedMax, bucket.max(), doubleError); + }); + }); + } + + // Once the cache limit is reached, cached documents are collected into (3/4 * shard_size) buckets + // A new bucket should be added when there is a document that is distant from all existing buckets + public void testNewBucketCreation() throws Exception { + final List dataset = Arrays.asList(-1, 1, 3, 5, 7, 9, 11, 13, 15, 17, 19, 40, 30, 25, 32, 38, 80, 50, 75); + double doubleError = 1d / 10000d; + + // Search (no reduce) + + // Expected clusters: [ (-1), (1), (3), (5), (7), (9), (11), (13), (15), (17), + // (19), (25, 30, 32), (38, 40), (50), (75, 80) ] + // Corresponding keys (centroids): [ -1, 1, 3, ..., 17, 19, 29, 39, 50, 77.5] + // Note: New buckets are created for 30, 50, and 80 because they are distant from the other buckets + final List keys = Arrays.asList(-1d, 1d, 3d, 5d, 7d, 9d, 11d, 13d, 15d, 17d, 19d, 29d, 39d, 50d, 77.5d); + final List mins = Arrays.asList(-1d, 1d, 3d, 5d, 7d, 9d, 11d, 13d, 15d, 17d, 19d, 25d, 38d, 50d, 75d); + final List maxes = Arrays.asList(-1d, 1d, 3d, 5d, 7d, 9d, 11d, 13d, 15d, 17d, 19d, 32d, 40d, 50d, 80d); + final List docCounts = Arrays.asList(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 3, 2, 1, 2); + assert keys.size() == docCounts.size() && keys.size() == keys.size(); + + final Map expectedDocCountOnlySearch = new HashMap<>(); + final Map expectedMinsOnlySearch = new HashMap<>(); + final Map expectedMaxesOnlySearch = new HashMap<>(); + for(int i=0; i aggregation.field(NUMERIC_FIELD).setNumBuckets(2).setShardSize(16).setInitialBuffer(12), + histogram -> { + final List buckets = histogram.getBuckets(); + assertEquals(expectedDocCountOnlySearch.size(), buckets.size()); + buckets.forEach(bucket -> { + long expectedDocCount = expectedDocCountOnlySearch.getOrDefault(bucket.getKey(), 0).longValue(); + double expectedCentroid = expectedMinsOnlySearch.getOrDefault(bucket.getKey(), 0d).doubleValue(); + double expectedMax = expectedMaxesOnlySearch.getOrDefault(bucket.getKey(), 0d).doubleValue(); + assertEquals(expectedDocCount, bucket.getDocCount()); + assertEquals(expectedCentroid, bucket.min(), doubleError); + assertEquals(expectedMax, bucket.max(), doubleError); + }); + }); + } + + // There should not be more than `shard_size` documents on a node, even when very distant documents appear + public void testNewBucketLimit() throws Exception{ + final List dataset = Arrays.asList(1,2,3,4,5, 10, 20, 50, 100, 5400, -900); + double doubleError = 1d / 10000d; + + // Expected clusters: [ (-900, 1, 2), (3, 4), (5), (10, 20, 50, 100, 5400)] + // Corresponding keys (centroids): [ -299, 3.5, 5, 1116] + final Map expectedDocCount = new HashMap<>(); + expectedDocCount.put(-299d, 3); + expectedDocCount.put(3.5d, 2); + expectedDocCount.put(5d, 1); + expectedDocCount.put(1116d, 5); + + final Map expectedMins = new HashMap<>(); + expectedMins.put(-299d, -900d); + expectedMins.put(3.5d, 3d); + expectedMins.put(5d, 5d); + expectedMins.put(1116d, 10d); + + final Map expectedMaxes = new HashMap<>(); + expectedMaxes.put(-299d, 2d); + expectedMaxes.put(3.5d, 4d); + expectedMaxes.put(5d, 5d); + expectedMaxes.put(1116d, 5400d); + + testSearchCase(DEFAULT_QUERY, dataset, false, + aggregation -> aggregation.field(NUMERIC_FIELD) .setNumBuckets(2).setShardSize(4).setInitialBuffer(5), + histogram -> { + final List buckets = histogram.getBuckets(); + assertEquals(expectedDocCount.size(), buckets.size()); + buckets.forEach(bucket -> { + assertEquals(expectedDocCount.getOrDefault(bucket.getKey(), 0).longValue(), bucket.getDocCount()); + assertEquals(expectedMins.getOrDefault(bucket.getKey(), 0d).doubleValue(), bucket.min(), doubleError); + assertEquals(expectedMaxes.getOrDefault(bucket.getKey(), 0d).doubleValue(), bucket.max(), doubleError); + }); + }); + + } + + + public void testSimpleSubAggregations() throws IOException{ + final List dataset = Arrays.asList(5, 1, 9, 2, 8); + + testSearchAndReduceCase(DEFAULT_QUERY, dataset, false, + aggregation -> aggregation.field(NUMERIC_FIELD) + .setNumBuckets(3) + .setInitialBuffer(3) + .setShardSize(4) + .subAggregation(AggregationBuilders.stats("stats").field(NUMERIC_FIELD)), + histogram -> { + final List buckets = histogram.getBuckets(); + double deltaError = 1d/10000d; + + // Expected clusters: [ (1, 2), (5), (8,9) ] + + InternalStats stats = histogram.getBuckets().get(0).getAggregations().get("stats"); + assertEquals(1, stats.getMin(), deltaError); + assertEquals(2, stats.getMax(), deltaError); + assertEquals(2, stats.getCount()); + assertTrue(AggregationInspectionHelper.hasValue(stats)); + + stats = histogram.getBuckets().get(1).getAggregations().get("stats"); + assertEquals(5, stats.getMin(), deltaError); + assertEquals(5, stats.getMax(), deltaError); + assertEquals(1, stats.getCount()); + assertTrue(AggregationInspectionHelper.hasValue(stats)); + + stats = histogram.getBuckets().get(2).getAggregations().get("stats"); + assertEquals(8, stats.getMin(), deltaError); + assertEquals(9, stats.getMax(), deltaError); + assertEquals(2, stats.getCount()); + assertTrue(AggregationInspectionHelper.hasValue(stats)); + }); + } + + public void testComplexSubAggregations() throws IOException{ + final List dataset = Arrays.asList(5, 4, 3, 2, 1, 0, 6, 7, 8, 9, 10, 11); + + testSearchCase(DEFAULT_QUERY, dataset, false, + aggregation -> aggregation.field(NUMERIC_FIELD) + .setNumBuckets(3) + .setInitialBuffer(12) + .setShardSize(4) + .subAggregation(new StatsAggregationBuilder("stats").field(NUMERIC_FIELD)), + histogram -> { + final List buckets = histogram.getBuckets(); + double deltaError = 1d / 10000d; + + // Expected clusters: [ (0, 1, 2, 3), (4, 5, 6, 7), (8, 9, 10, 11) ] + + InternalStats stats = histogram.getBuckets().get(0).getAggregations().get("stats"); + assertEquals(0d, stats.getMin(), deltaError); + assertEquals(3L, stats.getMax(), deltaError); + assertEquals(4L, stats.getCount()); + assertTrue(AggregationInspectionHelper.hasValue(stats)); + + stats = histogram.getBuckets().get(1).getAggregations().get("stats"); + assertEquals(4d, stats.getMin(), deltaError); + assertEquals(7d, stats.getMax(), deltaError); + assertEquals(4L, stats.getCount()); + assertTrue(AggregationInspectionHelper.hasValue(stats)); + + stats = histogram.getBuckets().get(2).getAggregations().get("stats"); + assertEquals(8d, stats.getMin(), deltaError); + assertEquals(11d, stats.getMax(), deltaError); + assertEquals(4L, stats.getCount()); + assertTrue(AggregationInspectionHelper.hasValue(stats)); + }); + } + + public void testSubAggregationReduction() throws IOException{ + final List dataset = Arrays.asList(1L, 1L, 1L, 2L, 2L); + + testSearchCase(DEFAULT_QUERY, dataset, false, + aggregation -> aggregation.field(NUMERIC_FIELD) + .setNumBuckets(3) + .setInitialBuffer(12) + .setShardSize(4) + .subAggregation(new TermsAggregationBuilder("terms") + .field(NUMERIC_FIELD) + .shardSize(2) + .size(1)), + histogram -> { + final List buckets = histogram.getBuckets(); + double deltaError = 1d / 10000d; + + // This is a test to make sure that the sub aggregations get reduced + // This terms sub aggregation has shardSize (2) != size (1), so we will get 1 bucket only if + // InternalVariableWidthHistogram reduces the sub aggregations. + + InternalTerms terms = histogram.getBuckets().get(0).getAggregations().get("terms"); + assertEquals(1L, terms.getBuckets().size(), deltaError); + assertEquals(1L, ((InternalTerms.Bucket) terms.getBuckets().get(0)).getKey()); + }); + } + + public void testMultipleSegments() throws IOException{ + final List dataset = Arrays.asList(1001, 1002, 1, 2, 1003, 3, 1004, 1005, 4, 5); + + // There should be two clusters: (1, 2, 3, 4, 5) and (1001, 1002, 1003, 1004, 1005) + // We can't enable multiple segments per index for many of the tests above, because the clusters are too close. + // Slight randomization --> different caches in the aggregator --> different clusters + // However, these two clusters are so far apart that even if a doc from one ends up in the other, + // the centroids will not change much. + // To account for this case of a document switching clusters, we check that each cluster centroid is within + // a certain range, rather than asserting exact values. + + testSearchAndReduceCase(DEFAULT_QUERY, dataset, true, + aggregation -> aggregation.field(NUMERIC_FIELD) + .setNumBuckets(2) + .setInitialBuffer(4) + .setShardSize(3) + .subAggregation(new StatsAggregationBuilder("stats").field(NUMERIC_FIELD)), + histogram -> { + final List buckets = histogram.getBuckets(); + double deltaError = 1d / 10000d; + + assertEquals(2, buckets.size()); + + // The smaller cluster + assertEquals(4 <= buckets.get(0).getDocCount() && buckets.get(0).getDocCount() <= 6, true); + assertEquals(0 <= buckets.get(0).centroid() && buckets.get(0).centroid() <= 200d, true); + assertEquals(1, buckets.get(0).min(), deltaError); + + // The bigger cluster + assertEquals(4 <= buckets.get(1).getDocCount() && buckets.get(1).getDocCount() <= 6, true); + assertEquals(800d <= buckets.get(1).centroid() && buckets.get(1).centroid() <= 1005d, true); + assertEquals(1005, buckets.get(1).max(), deltaError); + }); + + } + + + private void testSearchCase(final Query query, final List dataset, boolean multipleSegments, + final Consumer configure, + final Consumer verify) throws IOException { + executeTestCase(false, query, dataset, multipleSegments, configure, verify); + } + + + private void testSearchAndReduceCase(final Query query, final List dataset, boolean multipleSegments, + final Consumer configure, + final Consumer verify) throws IOException { + executeTestCase(true, query, dataset, multipleSegments, configure, verify); + } + + private void testBothCases(final Query query, final List dataset, boolean multipleSegments, + final Consumer configure, + final Consumer verify) throws IOException { + executeTestCase(true, query, dataset, multipleSegments, configure, verify); + executeTestCase(false, query, dataset, multipleSegments, configure, verify); + } + + @Override + protected IndexSettings createIndexSettings() { + final Settings nodeSettings = Settings.builder() + .put("search.max_buckets", 25000).build(); + return new IndexSettings( + IndexMetadata.builder("_index") + .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0) + .creationDate(System.currentTimeMillis()) + .build(), + nodeSettings + ); + } + + private void executeTestCase(final boolean reduced, final Query query, + final List dataset, boolean multipleSegments, + final Consumer configure, + final Consumer verify) throws IOException { + try (Directory directory = newDirectory()) { + try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) { + indexSampleData(dataset, indexWriter, multipleSegments); + } + + try (IndexReader indexReader = DirectoryReader.open(directory)) { + final IndexSearcher indexSearcher = newSearcher(indexReader, true, true); + + final VariableWidthHistogramAggregationBuilder aggregationBuilder = + new VariableWidthHistogramAggregationBuilder("_name"); + if (configure != null) { + configure.accept(aggregationBuilder); + } + + final MappedFieldType fieldType; + if(dataset.size() == 0 || dataset.get(0) instanceof Double){ + fieldType = new NumberFieldMapper.NumberFieldType(aggregationBuilder.field(), NumberFieldMapper.NumberType.DOUBLE); + } else if(dataset.get(0) instanceof Long){ + fieldType = new NumberFieldMapper.NumberFieldType(aggregationBuilder.field(), NumberFieldMapper.NumberType.LONG); + } else if (dataset.get(0) instanceof Integer){ + fieldType = new NumberFieldMapper.NumberFieldType(aggregationBuilder.field(), NumberFieldMapper.NumberType.INTEGER); + } else { + throw new IOException("Test data has an invalid type"); + } + + + + final InternalVariableWidthHistogram histogram; + if (reduced) { + histogram = searchAndReduce(indexSearcher, query, aggregationBuilder, fieldType); + } else { + histogram = search(indexSearcher, query, aggregationBuilder, fieldType); + } + verify.accept(histogram); + } + } + } + + private void indexSampleData(List dataset, RandomIndexWriter indexWriter, boolean multipleSegments) throws IOException { + if(!multipleSegments) { + // Put all of the documents into one segment + List documents = new ArrayList<>(); + for (final Number doc : dataset) { + final Document document = new Document(); + long fieldVal = convertDocumentToSortableValue(doc); + document.add(new SortedNumericDocValuesField(NUMERIC_FIELD, fieldVal)); + documents.add(document); + } + indexWriter.addDocuments(documents); + } else { + // Create multiple segments in the index + final Document document = new Document(); + for (final Number doc : dataset) { + if (frequently()) { + indexWriter.commit(); + } + + long fieldVal = convertDocumentToSortableValue(doc); + document.add(new SortedNumericDocValuesField(NUMERIC_FIELD, fieldVal)); + indexWriter.addDocument(document); + document.clear(); + } + } + } + + long convertDocumentToSortableValue(Number doc) throws IOException{ + if (doc instanceof Double) { + return NumericUtils.doubleToSortableLong(doc.doubleValue()); + } else if (doc instanceof Integer) { + return doc.intValue(); + } else if (doc instanceof Long) { + return doc.longValue(); + } + throw new IOException("Document has an invalid type"); + } + + +} diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java index f53f40d89ef2a..87b38e80ff9ea 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java @@ -67,6 +67,8 @@ import org.elasticsearch.search.aggregations.bucket.histogram.ParsedAutoDateHistogram; import org.elasticsearch.search.aggregations.bucket.histogram.ParsedDateHistogram; import org.elasticsearch.search.aggregations.bucket.histogram.ParsedHistogram; +import org.elasticsearch.search.aggregations.bucket.histogram.ParsedVariableWidthHistogram; +import org.elasticsearch.search.aggregations.bucket.histogram.VariableWidthHistogramAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.missing.MissingAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.missing.ParsedMissing; import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregationBuilder; @@ -246,6 +248,7 @@ public ReduceContext forFinalReduction() { map.put(HistogramAggregationBuilder.NAME, (p, c) -> ParsedHistogram.fromXContent(p, (String) c)); map.put(DateHistogramAggregationBuilder.NAME, (p, c) -> ParsedDateHistogram.fromXContent(p, (String) c)); map.put(AutoDateHistogramAggregationBuilder.NAME, (p, c) -> ParsedAutoDateHistogram.fromXContent(p, (String) c)); + map.put(VariableWidthHistogramAggregationBuilder.NAME, (p, c) -> ParsedVariableWidthHistogram.fromXContent(p, (String) c)); map.put(StringTerms.NAME, (p, c) -> ParsedStringTerms.fromXContent(p, (String) c)); map.put(LongTerms.NAME, (p, c) -> ParsedLongTerms.fromXContent(p, (String) c)); map.put(DoubleTerms.NAME, (p, c) -> ParsedDoubleTerms.fromXContent(p, (String) c)); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Aggregations.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Aggregations.java index a011931a57a78..9251941ee0b95 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Aggregations.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Aggregations.java @@ -78,7 +78,8 @@ public final class Aggregations { "string_stats", // https://github.com/elastic/elasticsearch/issues/51925 "top_hits", "top_metrics", // https://github.com/elastic/elasticsearch/issues/52236 - "t_test" // https://github.com/elastic/elasticsearch/issues/54503 + "t_test", // https://github.com/elastic/elasticsearch/issues/54503, + "variable_width_histogram" // https://github.com/elastic/elasticsearch/issues/58140 ); private Aggregations() {}