From ff670fe6262a8cdced9a4573564b23171f7bf363 Mon Sep 17 00:00:00 2001 From: James Dorfman Date: Tue, 23 Jun 2020 09:26:54 -0400 Subject: [PATCH] Add Variable Width Histogram Aggregation (backport of #42035) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements a new histogram aggregation called `variable_width_histogram` which dynamically determines bucket intervals based on document groupings. These groups are determined by running a one-pass clustering algorithm on each shard and then reducing each shard's clusters using an agglomerative clustering algorithm. This PR addresses #9572. The shard-level clustering is done in one pass to minimize memory overhead. The algorithm was lightly inspired by [this paper](https://ieeexplore.ieee.org/abstract/document/1198387). It fetches a small number of documents to sample the data and determine initial clusters. Subsequent documents are then placed into one of these clusters, or a new one if they are an outlier. This algorithm is described in more details in the aggregation's docs. At reduce time, a [hierarchical agglomerative clustering](https://en.wikipedia.org/wiki/Hierarchical_clustering) algorithm inspired by [this paper](https://arxiv.org/abs/1802.00304) continually merges the closest buckets from all shards (based on their centroids) until the target number of buckets is reached. The final values produced by this aggregation are approximate. Each bucket's min value is used as its key in the histogram. Furthermore, buckets are merged based on their centroids and not their bounds. So it is possible that adjacent buckets will overlap after reduction. Because each bucket's key is its min, this overlap is not shown in the final histogram. However, when such overlap occurs, we set the key of the bucket with the larger centroid to the midpoint between its minimum and the smaller bucket’s maximum: `min[large] = (min[large] + max[small]) / 2`. This heuristic is expected to increases the accuracy of the clustering. Nodes are unable to share centroids during the shard-level clustering phase. In the future, resolving https://github.com/elastic/elasticsearch/issues/50863 would let us solve this issue. It doesn’t make sense for this aggregation to support the `min_doc_count` parameter, since clusters are determined dynamically. The `order` parameter is not supported here to keep this large PR from becoming too complex. --- .../client/RestHighLevelClient.java | 4 + ...ariablewidthhistogram-aggregation.asciidoc | 91 +++ .../350_variable_width_histogram.yml | 50 ++ .../elasticsearch/common/util/BigArrays.java | 30 + .../common/util/BinarySearcher.java | 117 ++++ .../elasticsearch/search/SearchModule.java | 7 + .../search/aggregations/Aggregation.java | 4 + .../bucket/BucketsAggregator.java | 6 + .../InternalVariableWidthHistogram.java | 608 ++++++++++++++++++ .../ParsedVariableWidthHistogram.java | 206 ++++++ ...iableWidthHistogramAggregationBuilder.java | 187 ++++++ .../VariableWidthHistogramAggregator.java | 584 +++++++++++++++++ ...riableWidthHistogramAggregatorFactory.java | 85 +++ ...iableWidthHistogramAggregatorSupplier.java | 45 ++ .../support/AggregationInspectionHelper.java | 5 + .../common/util/BinarySearcherTests.java | 141 ++++ .../aggregations/AggregationsTests.java | 3 +- .../InternalVariableWidthHistogramTests.java | 394 ++++++++++++ ...VariableWidthHistogramAggregatorTests.java | 544 ++++++++++++++++ .../test/InternalAggregationTestCase.java | 3 + .../transforms/pivot/Aggregations.java | 3 +- 21 files changed, 3115 insertions(+), 2 deletions(-) create mode 100644 docs/reference/aggregations/bucket/variablewidthhistogram-aggregation.asciidoc create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/350_variable_width_histogram.yml create mode 100644 server/src/main/java/org/elasticsearch/common/util/BinarySearcher.java create mode 100644 server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalVariableWidthHistogram.java create mode 100644 server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/ParsedVariableWidthHistogram.java create mode 100644 server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregationBuilder.java create mode 100644 server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregator.java create mode 100644 server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregatorFactory.java create mode 100644 server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregatorSupplier.java create mode 100644 server/src/test/java/org/elasticsearch/common/util/BinarySearcherTests.java create mode 100644 server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalVariableWidthHistogramTests.java create mode 100644 server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregatorTests.java diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java index d1b7c45b90576..4c21067d9519a 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java @@ -111,6 +111,8 @@ import org.elasticsearch.search.aggregations.bucket.histogram.ParsedAutoDateHistogram; import org.elasticsearch.search.aggregations.bucket.histogram.ParsedDateHistogram; import org.elasticsearch.search.aggregations.bucket.histogram.ParsedHistogram; +import org.elasticsearch.search.aggregations.bucket.histogram.ParsedVariableWidthHistogram; +import org.elasticsearch.search.aggregations.bucket.histogram.VariableWidthHistogramAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.missing.MissingAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.missing.ParsedMissing; import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregationBuilder; @@ -1929,6 +1931,8 @@ static List getDefaultNamedXContents() { map.put(HistogramAggregationBuilder.NAME, (p, c) -> ParsedHistogram.fromXContent(p, (String) c)); map.put(DateHistogramAggregationBuilder.NAME, (p, c) -> ParsedDateHistogram.fromXContent(p, (String) c)); map.put(AutoDateHistogramAggregationBuilder.NAME, (p, c) -> ParsedAutoDateHistogram.fromXContent(p, (String) c)); + map.put(VariableWidthHistogramAggregationBuilder.NAME, + (p, c) -> ParsedVariableWidthHistogram.fromXContent(p, (String) c)); map.put(StringTerms.NAME, (p, c) -> ParsedStringTerms.fromXContent(p, (String) c)); map.put(LongTerms.NAME, (p, c) -> ParsedLongTerms.fromXContent(p, (String) c)); map.put(DoubleTerms.NAME, (p, c) -> ParsedDoubleTerms.fromXContent(p, (String) c)); diff --git a/docs/reference/aggregations/bucket/variablewidthhistogram-aggregation.asciidoc b/docs/reference/aggregations/bucket/variablewidthhistogram-aggregation.asciidoc new file mode 100644 index 0000000000000..535d1f24c0272 --- /dev/null +++ b/docs/reference/aggregations/bucket/variablewidthhistogram-aggregation.asciidoc @@ -0,0 +1,91 @@ +[[search-aggregations-bucket-variablewidthhistogram-aggregation]] +=== Variable Width Histogram Aggregation + +This is a multi-bucket aggregation similar to <>. +However, the width of each bucket is not specified. Rather, a target number of buckets is provided and bucket intervals +are dynamically determined based on the document distribution. This is done using a simple one-pass document clustering algorithm +that aims to obtain low distances between bucket centroids. Unlike other multi-bucket aggregations, the intervals will not +necessarily have a uniform width. + +TIP: The number of buckets returned will always be less than or equal to the target number. + +Requesting a target of 2 buckets. + +[source,console] +-------------------------------------------------- +POST /sales/_search?size=0 +{ + "aggs" : { + "prices" : { + "variable_width_histogram" : { + "field" : "price", + "buckets" : 2 + } + } + } +} +-------------------------------------------------- +// TEST[setup:sales] + +Response: + +[source,console-result] +-------------------------------------------------- +{ + ... + "aggregations": { + "prices" : { + "buckets": [ + { + "min": 10.0, + "key": 30.0, + "max": 50.0, + "doc_count": 2 + }, + { + "min": 150.0, + "key": 185.0, + "max": 200.0, + "doc_count": 5 + } + ] + } + } +} +-------------------------------------------------- +// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/] + +==== Clustering Algorithm +Each shard fetches the first `initial_buffer` documents and stores them in memory. Once the buffer is full, these documents +are sorted and linearly separated into `3/4 * shard_size buckets`. +Next each remaining documents is either collected into the nearest bucket, or placed into a new bucket if it is distant +from all the existing ones. At most `shard_size` total buckets are created. + +In the reduce step, the coordinating node sorts the buckets from all shards by their centroids. Then, the two buckets +with the nearest centroids are repeatedly merged until the target number of buckets is achieved. +This merging procedure is a form of https://en.wikipedia.org/wiki/Hierarchical_clustering[agglomerative hierarchical clustering]. + +TIP: A shard can return fewer than `shard_size` buckets, but it cannot return more. + +==== Shard size +The `shard_size` parameter specifies the number of buckets that the coordinating node will request from each shard. +A higher `shard_size` leads each shard to produce smaller buckets. This reduce the likelihood of buckets overlapping +after the reduction step. Increasing the `shard_size` will improve the accuracy of the histogram, but it will +also make it more expensive to compute the final result because bigger priority queues will have to be managed on a +shard level, and the data transfers between the nodes and the client will be larger. + +TIP: Parameters `buckets`, `shard_size`, and `initial_buffer` are optional. By default, `buckets = 10`, `shard_size = 500` and `initial_buffer = min(50 * shard_size, 50000)`. + +==== Initial Buffer +The `initial_buffer` parameter can be used to specify the number of individual documents that will be stored in memory +on a shard before the initial bucketing algorithm is run. Bucket distribution is determined using this sample +of `initial_buffer` documents. So, although a higher `initial_buffer` will use more memory, it will lead to more representative +clusters. + +==== Bucket bounds are approximate +During the reduce step, the master node continuously merges the two buckets with the nearest centroids. If two buckets have +overlapping bounds but distant centroids, then it is possible that they will not be merged. Because of this, after +reduction the maximum value in some interval (`max`) might be greater than the minimum value in the subsequent +bucket (`min`). To reduce the impact of this error, when such an overlap occurs the bound between these intervals is adjusted to be `(max + min) / 2`. + +TIP: Bucket bounds are very sensitive to outliers diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/350_variable_width_histogram.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/350_variable_width_histogram.yml new file mode 100644 index 0000000000000..071e543e8a25e --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/350_variable_width_histogram.yml @@ -0,0 +1,50 @@ +setup: + - do: + indices.create: + index: test + body: + settings: + number_of_replicas: 0 + mappings: + properties: + number: + type: integer + - do: + bulk: + refresh: true + index: test + body: + - '{"index": {}}' + - '{"number": -3}' + - '{"index": {}}' + - '{"number": -2}' + - '{"index": {}}' + - '{"number": 1}' + - '{"index": {}}' + - '{"number": 4}' + - '{"index": {}}' + - '{"number": 5}' + +--- +"basic": + - skip: + version: " - 7.99.99" + reason: added in 8.0.0 (to be backported to 7.9.0) + - do: + search: + body: + size: 0 + aggs: + histo: + variable_width_histogram: + field: number + buckets: 3 + - match: { hits.total.value: 5 } + - length: { aggregations.histo.buckets: 3 } + - match: { aggregations.histo.buckets.0.key: -2.5 } + - match: { aggregations.histo.buckets.0.doc_count: 2 } + - match: { aggregations.histo.buckets.1.key: 1.0 } + - match: { aggregations.histo.buckets.1.doc_count: 1 } + - match: { aggregations.histo.buckets.2.key: 4.5 } + - match: { aggregations.histo.buckets.2.doc_count: 2 } + diff --git a/server/src/main/java/org/elasticsearch/common/util/BigArrays.java b/server/src/main/java/org/elasticsearch/common/util/BigArrays.java index 46c83b5d9cc51..12be79179b1e1 100644 --- a/server/src/main/java/org/elasticsearch/common/util/BigArrays.java +++ b/server/src/main/java/org/elasticsearch/common/util/BigArrays.java @@ -691,6 +691,35 @@ public DoubleArray grow(DoubleArray array, long minSize) { return resize(array, newSize); } + public static class DoubleBinarySearcher extends BinarySearcher{ + + DoubleArray array; + double searchFor; + + public DoubleBinarySearcher(DoubleArray array){ + this.array = array; + this.searchFor = Integer.MIN_VALUE; + } + + @Override + protected int compare(int index) { + // Prevent use of BinarySearcher.search() and force the use of DoubleBinarySearcher.search() + assert this.searchFor != Integer.MIN_VALUE; + + return Double.compare(array.get(index), searchFor); + } + + @Override + protected double distance(int index) { + return Math.abs(array.get(index) - searchFor); + } + + public int search(int from, int to, double searchFor) { + this.searchFor = searchFor; + return super.search(from, to); + } + } + /** * Allocate a new {@link FloatArray}. * @param size the initial length of the array @@ -782,3 +811,4 @@ public ObjectArray grow(ObjectArray array, long minSize) { return resize(array, newSize); } } + diff --git a/server/src/main/java/org/elasticsearch/common/util/BinarySearcher.java b/server/src/main/java/org/elasticsearch/common/util/BinarySearcher.java new file mode 100644 index 0000000000000..dee1571fe6b82 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/util/BinarySearcher.java @@ -0,0 +1,117 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.util; + +/** + * Performs binary search on an arbitrary data structure. + * + * To do a search, create a subclass and implement custom {@link #compare(int)} and {@link #distance(int)} methods. + * + * {@link BinarySearcher} knows nothing about the value being searched for or the underlying data structure. + * These things should be determined by the subclass in its overridden methods. + * + * Refer to {@link BigArrays.DoubleBinarySearcher} for an example. + * + * NOTE: this class is not thread safe + */ +public abstract class BinarySearcher{ + + /** + * @return a negative integer, zero, or a positive integer if the array's value at index is less than, + * equal to, or greater than the value being searched for. + */ + protected abstract int compare(int index); + + /** + * @return the magnitude of the distance between the element at index and the value being searched for. + * It will usually be Math.abs(array[index] - searchValue). + */ + protected abstract double distance(int index); + + /** + * @return the index who's underlying value is closest to the value being searched for. + */ + private int getClosestIndex(int index1, int index2){ + if(distance(index1) < distance(index2)){ + return index1; + } else { + return index2; + } + } + + /** + * Uses a binary search to determine the index of the element within the index range {from, ... , to} that is + * closest to the search value. + * + * Unlike most binary search implementations, the value being searched for is not an argument to search method. + * Rather, this value should be stored by the subclass along with the underlying array. + * + * @return the index of the closest element. + * + * Requires: The underlying array should be sorted. + **/ + public int search(int from, int to){ + while(from < to){ + int mid = (from + to) >>> 1; + int compareResult = compare(mid); + + if(compareResult == 0){ + // arr[mid] == value + return mid; + } else if(compareResult < 0){ + // arr[mid] < val + + if(mid < to) { + // Check if val is between (mid, mid + 1) before setting left = mid + 1 + // (mid < to) ensures that mid + 1 is not out of bounds + int compareValAfterMid = compare(mid + 1); + if (compareValAfterMid > 0) { + return getClosestIndex(mid, mid + 1); + } + } else if(mid == to){ + // val > arr[mid] and there are no more elements above mid, so mid is the closest + return mid; + } + + from = mid + 1; + } else{ + // arr[mid] > val + + if(mid > from) { + // Check if val is between (mid - 1, mid) + // (mid > from) ensures that mid - 1 is not out of bounds + int compareValBeforeMid = compare(mid - 1); + if (compareValBeforeMid < 0) { + // val is between indices (mid - 1), mid + return getClosestIndex(mid, mid - 1); + } + } else if(mid == 0){ + // val < arr[mid] and there are no more candidates below mid, so mid is the closest + return mid; + } + + to = mid - 1; + } + } + + return from; + } + +} diff --git a/server/src/main/java/org/elasticsearch/search/SearchModule.java b/server/src/main/java/org/elasticsearch/search/SearchModule.java index b6afe65b79ac5..2207d47ae4873 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchModule.java +++ b/server/src/main/java/org/elasticsearch/search/SearchModule.java @@ -119,9 +119,11 @@ import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.global.InternalGlobal; import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.histogram.VariableWidthHistogramAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.InternalAutoDateHistogram; +import org.elasticsearch.search.aggregations.bucket.histogram.InternalVariableWidthHistogram; import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram; import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram; import org.elasticsearch.search.aggregations.bucket.missing.InternalMissing; @@ -475,6 +477,11 @@ private ValuesSourceRegistry registerAggregations(List plugins) { AutoDateHistogramAggregationBuilder.PARSER) .addResultReader(InternalAutoDateHistogram::new) .setAggregatorRegistrar(AutoDateHistogramAggregationBuilder::registerAggregators), builder); + registerAggregation(new AggregationSpec(VariableWidthHistogramAggregationBuilder.NAME, + VariableWidthHistogramAggregationBuilder::new, + VariableWidthHistogramAggregationBuilder.PARSER) + .addResultReader(InternalVariableWidthHistogram::new) + .setAggregatorRegistrar(VariableWidthHistogramAggregationBuilder::registerAggregators), builder); registerAggregation(new AggregationSpec(GeoDistanceAggregationBuilder.NAME, GeoDistanceAggregationBuilder::new, GeoDistanceAggregationBuilder::parse) .addResultReader(InternalGeoDistance::new) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/Aggregation.java b/server/src/main/java/org/elasticsearch/search/aggregations/Aggregation.java index 9fe13602896b6..e355185aa823a 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/Aggregation.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/Aggregation.java @@ -68,5 +68,9 @@ final class CommonFields extends ParseField.CommonFields { public static final ParseField FROM_AS_STRING = new ParseField("from_as_string"); public static final ParseField TO = new ParseField("to"); public static final ParseField TO_AS_STRING = new ParseField("to_as_string"); + public static final ParseField MIN = new ParseField("min"); + public static final ParseField MIN_AS_STRING = new ParseField("min_as_string"); + public static final ParseField MAX = new ParseField("max"); + public static final ParseField MAX_AS_STRING = new ParseField("max_as_string"); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java index 3ed81bc4a8a0a..9e2ed84f6e3c0 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java @@ -101,6 +101,12 @@ public final void collectExistingBucket(LeafBucketCollector subCollector, int do subCollector.collect(doc, bucketOrd); } + /** + * This only tidies up doc counts. Call {@link MergingBucketsDeferringCollector#mergeBuckets(long[])} to merge the actual + * ordinals and doc ID deltas. + * + * Refer to that method for documentation about the merge map. + */ public final void mergeBuckets(long[] mergeMap, long newNumBuckets) { try (IntArray oldDocCounts = docCounts) { docCounts = bigArrays.newIntArray(newNumBuckets, true); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalVariableWidthHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalVariableWidthHistogram.java new file mode 100644 index 0000000000000..ced1a4915bfdf --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalVariableWidthHistogram.java @@ -0,0 +1,608 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.histogram; + +import org.apache.lucene.util.PriorityQueue; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; +import org.elasticsearch.search.aggregations.KeyComparable; +import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public class InternalVariableWidthHistogram + extends InternalMultiBucketAggregation + implements Histogram, HistogramFactory{ + + public static class Bucket extends InternalMultiBucketAggregation.InternalBucket + implements Histogram.Bucket, KeyComparable { + + public static class BucketBounds { + public double min; + public double max; + + public BucketBounds(double min, double max) { + assert min <= max; + this.min = min; + this.max = max; + } + + public BucketBounds(StreamInput in) throws IOException { + this(in.readDouble(), in.readDouble()); + } + + public void writeTo(StreamOutput out) throws IOException { + out.writeDouble(min); + out.writeDouble(max); + } + + public boolean equals(Object obj){ + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + BucketBounds that = (BucketBounds) obj; + return min == that.min && max == that.max; + } + + @Override + public int hashCode() { + return Objects.hash(getClass(), min, max); + } + } + + private final BucketBounds bounds; + private long docCount; + private InternalAggregations aggregations; + protected final transient DocValueFormat format; + private double centroid; + + public Bucket(double centroid, + BucketBounds bounds, + long docCount, + DocValueFormat format, + InternalAggregations aggregations) { + this.format = format; + this.centroid = centroid; + this.bounds = bounds; + this.docCount = docCount; + this.aggregations = aggregations; + } + + /** + * Read from a stream. + */ + public Bucket(StreamInput in, DocValueFormat format) throws IOException { + this.format = format; + centroid = in.readDouble(); + docCount = in.readVLong(); + bounds = new BucketBounds(in); + aggregations = new InternalAggregations(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeDouble(centroid); + out.writeVLong(docCount); + bounds.writeTo(out); + aggregations.writeTo(out); + } + + @Override + public boolean equals(Object obj) { + if (obj == null || obj.getClass() != InternalVariableWidthHistogram.Bucket.class) { + return false; + } + InternalVariableWidthHistogram.Bucket that = (InternalVariableWidthHistogram.Bucket) obj; + return centroid == that.centroid + && bounds.equals(that.bounds) + && docCount == that.docCount + && Objects.equals(aggregations, that.aggregations); + } + + @Override + public int hashCode() { + return Objects.hash(getClass(), centroid, bounds, docCount, aggregations); + } + + @Override + public String getKeyAsString() { + return format.format((double) getKey()).toString(); + } + + /** + * Buckets are compared using their centroids. But, in the final XContent returned by the aggregation, + * we want the bucket's key to be its min. Otherwise, it would look like the distances between centroids + * are buckets, which is incorrect. + */ + @Override + public Object getKey() { return centroid; } + + public double min() { return bounds.min; } + + public double max() { return bounds.max; } + + public double centroid() { return centroid; } + + @Override + public long getDocCount() { + return docCount; + } + + @Override + public Aggregations getAggregations() { + return aggregations; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + String keyAsString = format.format((double) getKey()).toString(); + builder.startObject(); + + builder.field(CommonFields.MIN.getPreferredName(), min()); + if (format != DocValueFormat.RAW) { + builder.field(CommonFields.MIN_AS_STRING.getPreferredName(), format.format(min())); + } + + builder.field(CommonFields.KEY.getPreferredName(), getKey()); + if (format != DocValueFormat.RAW) { + builder.field(CommonFields.KEY_AS_STRING.getPreferredName(), keyAsString); + } + + builder.field(CommonFields.MAX.getPreferredName(), max()); + if (format != DocValueFormat.RAW) { + builder.field(CommonFields.MAX_AS_STRING.getPreferredName(), format.format(max())); + } + + builder.field(CommonFields.DOC_COUNT.getPreferredName(), docCount); + aggregations.toXContentInternal(builder, params); + builder.endObject(); + return builder; + } + + @Override + public int compareKey(InternalVariableWidthHistogram.Bucket other) { + return Double.compare(centroid, other.centroid); // Use centroid for bucket ordering + } + + public DocValueFormat getFormatter() { + return format; + } + } + + static class EmptyBucketInfo { + + final InternalAggregations subAggregations; + + EmptyBucketInfo(InternalAggregations subAggregations) { + this.subAggregations = subAggregations; + } + + EmptyBucketInfo(StreamInput in) throws IOException { + this(new InternalAggregations(in)); + } + + public void writeTo(StreamOutput out) throws IOException { + subAggregations.writeTo(out); + } + + @Override + public boolean equals(Object obj) { + if (obj == null || getClass() != obj.getClass()) { + return false; + } + EmptyBucketInfo that = (EmptyBucketInfo) obj; + return Objects.equals(subAggregations, that.subAggregations); + } + + @Override + public int hashCode() { + return Objects.hash(getClass(), subAggregations); + } + } + + private List buckets; + private final DocValueFormat format; + private final int targetNumBuckets; + final EmptyBucketInfo emptyBucketInfo; + + InternalVariableWidthHistogram(String name, List buckets, EmptyBucketInfo emptyBucketInfo, int targetNumBuckets, + DocValueFormat formatter, Map metaData){ + super(name, metaData); + this.buckets = buckets; + this.emptyBucketInfo = emptyBucketInfo; + this.format = formatter; + this.targetNumBuckets = targetNumBuckets; + } + + /** + * Stream from a stream. + */ + public InternalVariableWidthHistogram(StreamInput in) throws IOException{ + super(in); + emptyBucketInfo = new EmptyBucketInfo(in); + format = in.readNamedWriteable(DocValueFormat.class); + buckets = in.readList(stream -> new Bucket(stream, format)); + targetNumBuckets = in.readVInt(); + } + + @Override + protected void doWriteTo(StreamOutput out) throws IOException { + emptyBucketInfo.writeTo(out); + out.writeNamedWriteable(format); + out.writeList(buckets); + out.writeVInt(targetNumBuckets); + } + + @Override + public String getWriteableName() { + return VariableWidthHistogramAggregationBuilder.NAME; + } + + @Override + public List getBuckets() { + return Collections.unmodifiableList(buckets); + } + + DocValueFormat getFormatter() { + return format; + } + + public int getTargetBuckets() { + return targetNumBuckets; + } + + public EmptyBucketInfo getEmptyBucketInfo() { + return emptyBucketInfo; + } + + @Override + public InternalVariableWidthHistogram create(List buckets) { + return new InternalVariableWidthHistogram(name, buckets, emptyBucketInfo, targetNumBuckets, + format, metadata); + } + + @Override + public Bucket createBucket(InternalAggregations aggregations, Bucket prototype) { + return new Bucket(prototype.centroid, prototype.bounds, prototype.docCount, prototype.format, aggregations); + } + + @Override + public Bucket createBucket(Number key, long docCount, InternalAggregations aggregations) { + return new Bucket(key.doubleValue(), new Bucket.BucketBounds(key.doubleValue(), key.doubleValue()), + docCount, format, aggregations); + } + + @Override + public Number getKey(MultiBucketsAggregation.Bucket bucket) { + return ((Bucket) bucket).centroid; + } + + @Override + public Number nextKey(Number key) { + return nextKey(key.doubleValue()); + } + + /** + * This method should not be called for this specific subclass of InternalHistogram, since there should not be + * empty buckets when clustering. += */ + private double nextKey(double key){ return key + 1; } + + private static class IteratorAndCurrent { + + private final Iterator iterator; + private Bucket current; + + IteratorAndCurrent(Iterator iterator) { + this.iterator = iterator; + current = iterator.next(); + } + + } + + @Override + protected Bucket reduceBucket(List buckets, ReduceContext context) { + List aggregations = new ArrayList<>(buckets.size()); + long docCount = 0; + double min = Double.POSITIVE_INFINITY; + double max = Double.NEGATIVE_INFINITY; + double sum = 0; + for (InternalVariableWidthHistogram.Bucket bucket : buckets) { + docCount += bucket.docCount; + min = Math.min(min, bucket.bounds.min); + max = Math.max(max, bucket.bounds.max); + sum += bucket.docCount * bucket.centroid; + aggregations.add((InternalAggregations) bucket.getAggregations()); + } + InternalAggregations aggs = InternalAggregations.reduce(aggregations, context); + double centroid = sum / docCount; + Bucket.BucketBounds bounds = new Bucket.BucketBounds(min, max); + return new Bucket(centroid, bounds, docCount, format, aggs); + } + + public List reduceBuckets(List aggregations, ReduceContext reduceContext) { + PriorityQueue pq = new PriorityQueue(aggregations.size()) { + @Override + protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { + return Double.compare(a.current.centroid, b.current.centroid) < 0; + } + }; + for (InternalAggregation aggregation : aggregations) { + InternalVariableWidthHistogram histogram = (InternalVariableWidthHistogram) aggregation; + if (histogram.buckets.isEmpty() == false) { + pq.add(new IteratorAndCurrent(histogram.buckets.iterator())); + } + } + + List reducedBuckets = new ArrayList<>(); + if(pq.size() > 0) { + double key = pq.top().current.centroid(); + // list of buckets coming from different shards that have the same key + List currentBuckets = new ArrayList<>(); + do { + IteratorAndCurrent top = pq.top(); + + if (Double.compare(top.current.centroid(), key) != 0) { + // The key changes, reduce what we already buffered and reset the buffer for current buckets. + final Bucket reduced = reduceBucket(currentBuckets, reduceContext); + reduceContext.consumeBucketsAndMaybeBreak(1); + reducedBuckets.add(reduced); + currentBuckets.clear(); + key = top.current.centroid(); + } + + currentBuckets.add(top.current); + + if (top.iterator.hasNext()) { + Bucket next = top.iterator.next(); + assert next.compareKey(top.current) >= 0 : "shards must return data sorted by centroid"; + top.current = next; + pq.updateTop(); + } else { + pq.pop(); + } + } while(pq.size() > 0); + + if (currentBuckets.isEmpty() == false) { + final Bucket reduced = reduceBucket(currentBuckets, reduceContext); + reduceContext.consumeBucketsAndMaybeBreak(1); + reducedBuckets.add(reduced); + } + } + + mergeBucketsIfNeeded(reducedBuckets, targetNumBuckets, reduceContext); + + return reducedBuckets; + } + + + class BucketRange{ + int startIdx; + int endIdx; + + /** + * These are optional utility fields + * They're useful for determining whether buckets should be merged + */ + double min; + double max; + double centroid; + long docCount; + + public void mergeWith(BucketRange other){ + startIdx = Math.min(startIdx, other.startIdx); + endIdx = Math.max(endIdx, other.endIdx); + + if(docCount + other.docCount > 0) { + // Avoids div by 0 error. This condition could be false if the optional docCount field was not set + centroid = ((centroid * docCount) + (other.centroid * other.docCount)) / (docCount + other.docCount); + docCount += other.docCount; + } + min = Math.min(min, other.min); + max = Math.max(max, other.max); + } + } + + /** + * For each range {startIdx, endIdx} in ranges, all the buckets in that index range + * from buckets are merged, and this merged bucket replaces the entire range. + */ + private void mergeBucketsWithPlan(List buckets, List plan, ReduceContext reduceContext){ + for(int i = plan.size() - 1; i >= 0; i--) { + BucketRange range = plan.get(i); + int endIdx = range.endIdx; + int startIdx = range.startIdx; + + if(startIdx == endIdx) continue; + + List toMerge = new ArrayList<>(); + for(int idx = endIdx; idx > startIdx; idx--){ + toMerge.add(buckets.get(idx)); + buckets.remove(idx); + } + toMerge.add(buckets.get(startIdx)); // Don't remove the startIdx bucket because it will be replaced by the merged bucket + + reduceContext.consumeBucketsAndMaybeBreak(- (toMerge.size() - 1)); + Bucket merged_bucket = reduceBucket(toMerge, reduceContext); + + buckets.set(startIdx, merged_bucket); + } + } + + /** + * Makes a merge plan by simulating the merging of the two closest buckets, until the target number of buckets is reached. + * Distance is determined by centroid comparison. + * Then, this plan is actually executed and the underlying buckets are merged. + * + * Requires: buckets is sorted by centroid. + */ + private void mergeBucketsIfNeeded(List buckets, int targetNumBuckets, ReduceContext reduceContext) { + // Make a plan for getting the target number of buckets + // Each range represents a set of adjacent bucket indices of buckets that will be merged together + List ranges = new ArrayList<>(); + + // Initialize each range to represent an individual bucket + for (int i = 0; i < buckets.size(); i++) { + // Since buckets is sorted by centroid, ranges will be as well + BucketRange range = new BucketRange(); + range.centroid = buckets.get(i).centroid; + range.docCount = buckets.get(i).getDocCount(); + range.startIdx = i; + range.endIdx = i; + ranges.add(range); + } + + // Continually merge the two closest ranges until the target is reached + while (ranges.size() > targetNumBuckets) { + + // Find two closest ranges (i.e. the two closest buckets after the previous merges are completed) + // We only have to make one pass through the list because it is sorted by centroid + int closestIdx = 0; // After this loop, (closestIdx, closestIdx + 1) will be the 2 closest buckets + double smallest_distance = Double.POSITIVE_INFINITY; + for (int i = 0; i < ranges.size() - 1; i++) { + double new_distance = ranges.get(i + 1).centroid - ranges.get(i).centroid; // Positive because buckets is sorted + if (new_distance < smallest_distance) { + closestIdx = i; + smallest_distance = new_distance; + } + } + // Merge the two closest ranges + ranges.get(closestIdx).mergeWith(ranges.get(closestIdx + 1)); + ranges.remove(closestIdx + 1); + } + + // Execute the plan (merge the underlying buckets) + mergeBucketsWithPlan(buckets, ranges, reduceContext); + } + + private void mergeBucketsWithSameMin(List buckets, ReduceContext reduceContext){ + // Create a merge plan + List ranges = new ArrayList<>(); + + // Initialize each range to represent an individual bucket + for (int i = 0; i < buckets.size(); i++) { + BucketRange range = new BucketRange(); + range.min = buckets.get(i).min(); + range.startIdx = i; + range.endIdx = i; + ranges.add(range); + } + + // Merge ranges with same min value + int i = 0; + while(i < ranges.size() - 1){ + BucketRange range = ranges.get(i); + BucketRange nextRange = ranges.get(i+1); + + if(range.min == nextRange.min){ + range.mergeWith(nextRange); + ranges.remove(i+1); + } else{ + i++; + } + } + + // Execute the plan (merge the underlying buckets) + mergeBucketsWithPlan(buckets, ranges, reduceContext); + } + + /** + * When two adjacent buckets A, B overlap (A.max > B.min) then their boundary is set to + * the midpoint: (A.max + B.min) / 2. + * + * After this adjustment, A will contain more values than indicated and B will have less. + */ + private void adjustBoundsForOverlappingBuckets(List buckets, ReduceContext reduceContext){ + for(int i = 1; i < buckets.size(); i++){ + Bucket curBucket = buckets.get(i); + Bucket prevBucket = buckets.get(i-1); + if(curBucket.bounds.min < prevBucket.bounds.max){ + // We don't want overlapping buckets --> Adjust their bounds + // TODO: Think of a fairer way to do this. Should prev.max = cur.min? + curBucket.bounds.min = (prevBucket.bounds.max + curBucket.bounds.min) / 2; + prevBucket.bounds.max = curBucket.bounds.min; + } + } + } + + @Override + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { + List reducedBuckets = reduceBuckets(aggregations, reduceContext); + + if(reduceContext.isFinalReduce()) { + buckets.sort(Comparator.comparing(Bucket::min)); + mergeBucketsWithSameMin(reducedBuckets, reduceContext); + adjustBoundsForOverlappingBuckets(reducedBuckets, reduceContext); + } + return new InternalVariableWidthHistogram(getName(), reducedBuckets, emptyBucketInfo, targetNumBuckets, + format, metadata); + } + + @Override + public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { + builder.startArray(CommonFields.BUCKETS.getPreferredName()); + for (Bucket bucket : buckets) { + bucket.toXContent(builder, params); + } + builder.endArray(); + return builder; + } + + @Override + public InternalAggregation createAggregation(List buckets) { + // convert buckets to the right type + List buckets2 = new ArrayList<>(buckets.size()); + for (Object b : buckets) { + buckets2.add((Bucket) b); + } + buckets2 = Collections.unmodifiableList(buckets2); + return new InternalVariableWidthHistogram(name, buckets2, emptyBucketInfo, targetNumBuckets, + format, getMetadata()); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + if (super.equals(obj) == false) return false; + + InternalVariableWidthHistogram that = (InternalVariableWidthHistogram) obj; + return Objects.equals(buckets, that.buckets) + && Objects.equals(format, that.format) + && Objects.equals(emptyBucketInfo, that.emptyBucketInfo); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), buckets, format, emptyBucketInfo); + } +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/ParsedVariableWidthHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/ParsedVariableWidthHistogram.java new file mode 100644 index 0000000000000..2fa188980164e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/ParsedVariableWidthHistogram.java @@ -0,0 +1,206 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.histogram; + +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentParserUtils; +import org.elasticsearch.search.aggregations.Aggregation; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; + +public class ParsedVariableWidthHistogram extends ParsedMultiBucketAggregation + implements Histogram{ + + @Override + public String getType() { return VariableWidthHistogramAggregationBuilder.NAME; } + + @Override + public List getBuckets() { return buckets; } + + private static ObjectParser PARSER = + new ObjectParser<>( + ParsedVariableWidthHistogram.class.getSimpleName(), + true, + ParsedVariableWidthHistogram::new + ) ; + static { + declareMultiBucketAggregationFields(PARSER, + parser -> ParsedBucket.fromXContent(parser, false), + parser -> ParsedBucket.fromXContent(parser, true)); + } + + public static ParsedVariableWidthHistogram fromXContent(XContentParser parser, String name) throws IOException { + ParsedVariableWidthHistogram aggregation = PARSER.parse(parser, null); + aggregation.setName(name); + return aggregation; + } + + + public static class ParsedBucket extends ParsedMultiBucketAggregation.ParsedBucket implements Histogram.Bucket{ + private Double key; + + private Double min; + private Double max; + + private String minAsString; + private String maxAsString; + + @Override + public Object getKey() { + return key; + } + + @Override + public String getKeyAsString() { + String keyAsString = super.getKeyAsString(); + if (keyAsString != null) { + return keyAsString; + } + if (key != null) { + return Double.toString(key); + } + return null; + } + + public void setMin(Double min) { + this.min = min; + } + + public void setMinAsString(String minAsString){ + this.minAsString = minAsString; + } + + public double getMin() { + return min; + } + + public String getMinAsString() { + if (minAsString != null) { + return minAsString; + } + if (min != null) { + return Double.toString(min); + } + return null; + } + + public void setMax(Double max){ + this.max = max; + } + + public void setMaxAsString(String maxAsString){ + this.maxAsString = maxAsString; + } + + public double getMax() { + return max; + } + + public String getMaxAsString() { + if (maxAsString != null) { + return maxAsString; + } + if (max != null) { + return Double.toString(max); + } + return null; + } + + static ParsedBucket fromXContent(XContentParser parser, boolean keyed) throws IOException { + final ParsedBucket bucket = new ParsedBucket(); + bucket.setKeyed(keyed); + XContentParser.Token token = parser.currentToken(); + String currentFieldName = parser.currentName(); + if (keyed) { + ensureExpectedToken(XContentParser.Token.FIELD_NAME, token, parser::getTokenLocation); + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation); + } + + List aggregations = new ArrayList<>(); + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token.isValue()) { + if (CommonFields.KEY_AS_STRING.getPreferredName().equals(currentFieldName)) { + bucket.setKeyAsString(parser.text()); + } else if (CommonFields.KEY.getPreferredName().equals(currentFieldName)) { + bucket.key = parser.doubleValue(); + } else if (CommonFields.MIN_AS_STRING.getPreferredName().equals(currentFieldName)) { + bucket.setMinAsString(parser.text()); + } else if (CommonFields.MIN.getPreferredName().equals(currentFieldName)) { + bucket.setMin(parser.doubleValue()); + } else if (CommonFields.MAX_AS_STRING.getPreferredName().equals(currentFieldName)) { + bucket.setMaxAsString(parser.text()); + } else if (CommonFields.MAX.getPreferredName().equals(currentFieldName)) { + bucket.setMax(parser.doubleValue()); + } else if (CommonFields.DOC_COUNT.getPreferredName().equals(currentFieldName)) { + bucket.setDocCount(parser.longValue()); + } + } else if (token == XContentParser.Token.START_OBJECT) { + if (CommonFields.KEY.getPreferredName().equals(currentFieldName)) { + bucket.key = parser.doubleValue(); + } else { + XContentParserUtils.parseTypedKeysObject(parser, Aggregation.TYPED_KEYS_DELIMITER, Aggregation.class, + aggregations::add); + } + } + } + bucket.setAggregations(new Aggregations(aggregations)); + return bucket; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + if (isKeyed()) { + builder.startObject(getKeyAsString()); + } else { + builder.startObject(); + } + + if (minAsString != null) { + builder.field(CommonFields.MIN_AS_STRING.getPreferredName(), minAsString); + } + builder.field(CommonFields.MIN.getPreferredName(), getMin()); + + if (super.getKeyAsString() != null) { + builder.field(CommonFields.KEY_AS_STRING.getPreferredName(), getKeyAsString()); + } + keyToXContent(builder); + + if (maxAsString != null) { + builder.field(CommonFields.MAX_AS_STRING.getPreferredName(), maxAsString); + } + builder.field(CommonFields.MAX.getPreferredName(), getMax()); + + builder.field(CommonFields.DOC_COUNT.getPreferredName(), getDocCount()); + getAggregations().toXContentInternal(builder, params); + builder.endObject(); + return builder; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregationBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregationBuilder.java new file mode 100644 index 0000000000000..5b7b42e62535e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregationBuilder.java @@ -0,0 +1,187 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.histogram; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.query.QueryShardContext; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.aggregations.MultiBucketConsumerService; +import org.elasticsearch.search.aggregations.support.CoreValuesSourceType; +import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder; +import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory; +import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; +import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry; +import org.elasticsearch.search.aggregations.support.ValuesSourceType; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +public class VariableWidthHistogramAggregationBuilder + extends ValuesSourceAggregationBuilder { + + public static final String NAME = "variable_width_histogram"; + + private static final ParseField NUM_BUCKETS_FIELD = new ParseField("buckets"); + + private static ParseField INITIAL_BUFFER_FIELD = new ParseField("initial_buffer"); + + private static final ParseField SHARD_SIZE_FIELD = new ParseField("shard_size"); + + public static final ObjectParser PARSER = + ObjectParser.fromBuilder(NAME, VariableWidthHistogramAggregationBuilder::new); + static{ + ValuesSourceAggregationBuilder.declareFields(PARSER, true, true, true); + PARSER.declareInt(VariableWidthHistogramAggregationBuilder::setNumBuckets, NUM_BUCKETS_FIELD); + PARSER.declareInt(VariableWidthHistogramAggregationBuilder::setShardSize, SHARD_SIZE_FIELD); + PARSER.declareInt(VariableWidthHistogramAggregationBuilder::setInitialBuffer, INITIAL_BUFFER_FIELD); + } + + private int numBuckets = 10; + private int shardSize = numBuckets * 50; + private int initialBuffer = Math.min(10 * this.shardSize, 50000); + + public static void registerAggregators(ValuesSourceRegistry.Builder builder) { + VariableWidthHistogramAggregatorFactory.registerAggregators(builder); + } + /** Create a new builder with the given name. */ + public VariableWidthHistogramAggregationBuilder(String name) { + super(name); + } + + /** Read in object data from a stream, for internal use only. */ + public VariableWidthHistogramAggregationBuilder(StreamInput in) throws IOException { + super(in); + numBuckets = in.readVInt(); + } + + protected VariableWidthHistogramAggregationBuilder(VariableWidthHistogramAggregationBuilder clone, + AggregatorFactories.Builder factoriesBuilder, + Map metaData) { + super(clone, factoriesBuilder, metaData); + this.numBuckets = clone.numBuckets; + } + + @Override + protected ValuesSourceType defaultValueSourceType() { + return CoreValuesSourceType.NUMERIC; + } + + public VariableWidthHistogramAggregationBuilder setNumBuckets(int numBuckets){ + if (numBuckets <= 0) { + throw new IllegalArgumentException(NUM_BUCKETS_FIELD.getPreferredName() + " must be greater than 0 for [" + + name + "]"); + } else if (numBuckets > 50000){ + throw new IllegalArgumentException(NUM_BUCKETS_FIELD.getPreferredName() + " must not be greater than 50,000 for [" + + name + "]"); + } + this.numBuckets = numBuckets; + return this; + } + + public VariableWidthHistogramAggregationBuilder setShardSize(int shardSize){ + if (shardSize < numBuckets) { + throw new IllegalArgumentException(SHARD_SIZE_FIELD.getPreferredName() + " must not be less than " + + NUM_BUCKETS_FIELD.getPreferredName() + " for [" + name + "]"); + } + this.shardSize = shardSize; + return this; + } + + public VariableWidthHistogramAggregationBuilder setInitialBuffer(int initialBuffer){ + if (initialBuffer < numBuckets) { + // If numBuckets buckets are being returned, then at least that many must be stored in memory + throw new IllegalArgumentException(INITIAL_BUFFER_FIELD.getPreferredName() + " must be greater than numBuckets " + + NUM_BUCKETS_FIELD.getPreferredName() + " for [" + name + "]"); + + } + this.initialBuffer = initialBuffer; + return this; + } + + public int getNumBuckets(){ return numBuckets; } + + public int getShardSize(){ return shardSize; } + + public int getInitialBuffer(){ return initialBuffer; } + + @Override + public BucketCardinality bucketCardinality() { + return BucketCardinality.MANY; + } + + @Override + protected AggregationBuilder shallowCopy(AggregatorFactories.Builder factoriesBuilder, Map metaData) { + return new VariableWidthHistogramAggregationBuilder(this, factoriesBuilder, metaData); + } + + @Override + protected void innerWriteTo(StreamOutput out) throws IOException { + out.writeVInt(numBuckets); + } + + @Override + protected ValuesSourceAggregatorFactory innerBuild(QueryShardContext queryShardContext, + ValuesSourceConfig config, + AggregatorFactory parent, + AggregatorFactories.Builder subFactoriesBuilder) throws IOException { + + Settings settings = queryShardContext.getIndexSettings().getNodeSettings(); + int maxBuckets = MultiBucketConsumerService.MAX_BUCKET_SETTING.get(settings); + if (numBuckets > maxBuckets) { + throw new IllegalArgumentException(NUM_BUCKETS_FIELD.getPreferredName()+ + " must be less than " + maxBuckets); + } + return new VariableWidthHistogramAggregatorFactory(name, config, numBuckets, shardSize, initialBuffer, + queryShardContext, parent, subFactoriesBuilder, metadata); + } + + @Override + protected XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { + builder.field(NUM_BUCKETS_FIELD.getPreferredName(), numBuckets); + return builder; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), numBuckets, shardSize, initialBuffer); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + if (super.equals(obj) == false) return false; + VariableWidthHistogramAggregationBuilder other = (VariableWidthHistogramAggregationBuilder) obj; + return Objects.equals(numBuckets, other.numBuckets) + && Objects.equals(shardSize, other.shardSize) + && Objects.equals(initialBuffer, other.initialBuffer); + } + + @Override + public String getType() { return NAME; } +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregator.java new file mode 100644 index 0000000000000..3e32da0709eb5 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregator.java @@ -0,0 +1,584 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.histogram; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.util.CollectionUtil; +import org.apache.lucene.util.InPlaceMergeSorter; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.DoubleArray; +import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.BucketOrder; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; +import org.elasticsearch.search.aggregations.bucket.DeferableBucketAggregator; +import org.elasticsearch.search.aggregations.bucket.DeferringBucketCollector; +import org.elasticsearch.search.aggregations.bucket.MergingBucketsDeferringCollector; +import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregator; +import org.elasticsearch.search.aggregations.support.ValuesSource; +import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +public class VariableWidthHistogramAggregator extends DeferableBucketAggregator { + + /** + * This aggregator goes through multiple phases of collection. Each phase has a different CollectionPhase::collectValue + * implementation + * + * Running a clustering algorithm like K-Means is unfeasible because large indices don't fit into memory. + * But having multiple collection phases lets us accurately bucket the docs in one pass. + */ + private abstract class CollectionPhase implements Releasable { + + /** + * This method will collect the doc and then either return itself or a new CollectionPhase + * It is responsible for determining when a phase is over and what phase will run next + */ + abstract CollectionPhase collectValue(LeafBucketCollector sub, int doc, double val) throws IOException; + + + /** + * @return the final number of buckets that will be used + * If this is not the final phase, then an instance of the next phase is created and it is asked for this answer. + */ + abstract int finalNumBuckets(); + + /** + * If this CollectionPhase is the final phase then this method will build and return the i'th bucket + * Otherwise, it will create an instance of the next phase and ask it for the i'th bucket (naturally, if that phase + * not the last phase then it will do the same and so on...) + */ + abstract InternalVariableWidthHistogram.Bucket buildBucket(int bucketOrd, InternalAggregations subAggregations) throws IOException; + + } + + /** + * Phase 1: Build up a buffer of docs (i.e. give each new doc its own bucket). No clustering decisions are made here. + * Building this buffer lets us analyze the distribution of the data before we begin clustering. + */ + private class BufferValuesPhase extends CollectionPhase{ + + private DoubleArray buffer; + private int bufferSize; + private int bufferLimit; + private MergeBucketsPhase mergeBucketsPhase; + + BufferValuesPhase(int bufferLimit){ + this.buffer = bigArrays.newDoubleArray(1); + this.bufferSize = 0; + this.bufferLimit = bufferLimit; + this.mergeBucketsPhase = null; + } + + @Override + public CollectionPhase collectValue(LeafBucketCollector sub, int doc, double val) throws IOException{ + if (bufferSize < bufferLimit) { + // Add to the buffer i.e store the doc in a new bucket + buffer = bigArrays.grow(buffer, bufferSize + 1); + buffer.set((long) bufferSize, val); + collectBucket(sub, doc, bufferSize); + bufferSize += 1; + } + + if(bufferSize == bufferLimit) { + // We have hit the buffer limit. Switch to merge mode + CollectionPhase mergeBuckets = new MergeBucketsPhase(buffer, bufferSize); + Releasables.close(this); + return mergeBuckets; + } else { + // There is still room in the buffer + return this; + } + } + + int finalNumBuckets(){ + return getMergeBucketPhase().finalNumBuckets(); + } + + @Override + InternalVariableWidthHistogram.Bucket buildBucket(int bucketOrd, InternalAggregations subAggregations) throws IOException{ + InternalVariableWidthHistogram.Bucket bucket = getMergeBucketPhase().buildBucket(bucketOrd, subAggregations); + return bucket; + } + + MergeBucketsPhase getMergeBucketPhase(){ + if(mergeBucketsPhase == null){ + mergeBucketsPhase = new MergeBucketsPhase(buffer, bufferSize); + } + return mergeBucketsPhase; + } + + @Override + public void close() { + if(mergeBucketsPhase != null){ + Releasables.close(mergeBucketsPhase); + } + Releasables.close(buffer); + } + } + + /** + * Phase 2: This phase is initialized with the buffer created in Phase 1. + * It is responsible for merging the buffered docs into a smaller number of buckets and then determining which existing + * bucket all subsequent docs belong to. New buckets will be created for docs that are distant from all existing ones + */ + private class MergeBucketsPhase extends CollectionPhase{ + /** + * "Cluster" refers to intermediate buckets during collection + * They are kept sorted by centroid. The i'th index in all these arrays always refers to the i'th cluster + */ + public DoubleArray clusterMaxes; + public DoubleArray clusterMins; + public DoubleArray clusterCentroids; + public DoubleArray clusterSizes; // clusterSizes != bucketDocCounts when clusters are in the middle of a merge + public int numClusters; + + private int avgBucketDistance; + + MergeBucketsPhase(DoubleArray buffer, int bufferSize) { + // Cluster the documents to reduce the number of buckets + // Target shardSizes * (3/4) buckets so that there's room for more distant buckets to be added during rest of collection + bucketBufferedDocs(buffer, bufferSize, shardSize * 3 / 4); + + if(bufferSize > 1) { + // Calculate the average distance between buckets + // Subsequent documents will be compared with this value to determine if they should be collected into + // an existing bucket or into a new bucket + // This can be done in a single linear scan because buckets are sorted by centroid + int sum = 0; + for (int i = 0; i < numClusters - 1; i++) { + sum += clusterCentroids.get(i + 1) - clusterCentroids.get(i); + } + avgBucketDistance = (sum / (numClusters - 1)); + } + } + + /** + * Sorts the indices of values by their underlying value + * This will produce a merge map whose application will sort values + */ + private class ClusterSorter extends InPlaceMergeSorter { + + final DoubleArray values; + final long[] indexes; + int length; + + ClusterSorter(DoubleArray values, int length){ + this.values = values; + this.length = length; + + this.indexes = new long[length]; + for(int i = 0; i < indexes.length; i++){ + indexes[i] = i; + } + } + + @Override + protected int compare(int i, int j) { + double iVal = values.get(indexes[i]); + double jVal = values.get(indexes[j]); + return Double.compare(iVal, jVal); + } + + @Override + protected void swap(int i, int j) { + long hold = indexes[i]; + indexes[i] = indexes[j]; + indexes[j] = hold; + } + + /** + * Produces a merge map where `mergeMap[i]` represents the index that values[i] + * would be moved to if values were sorted + * In other words, this method produces a merge map that will sort values + * + * See BucketsAggregator::mergeBuckets to learn more about the merge map + */ + public long[] generateMergeMap(){ + sort(0, indexes.length); + return indexes; + } + } + + /** + * Sorting the documents by key lets us bucket the documents into groups with a single linear scan + * + * But we can't do this by just sorting buffer, because we also need to generate a merge map + * for every change we make to the list, so that we can apply the changes to the underlying buckets as well. + * + * By just creating a merge map, we eliminate the need to actually sort buffer. We can just + * use the merge map to find any doc's sorted index. + */ + private void bucketBufferedDocs(final DoubleArray buffer, final int bufferSize, final int numBuckets){ + // Allocate space for the clusters about to be created + clusterMins = bigArrays.newDoubleArray(1); + clusterMaxes = bigArrays.newDoubleArray(1); + clusterCentroids = bigArrays.newDoubleArray(1); + clusterSizes = bigArrays.newDoubleArray(1); + numClusters = 0; + + ClusterSorter sorter = new ClusterSorter(buffer, bufferSize); + long[] mergeMap = sorter.generateMergeMap(); + + // Naively use basic linear separation to group the first bufferSize docs into initialNumBuckets buckets + // This will require modifying the merge map, which currently represents a sorted list of buckets with 1 doc / bucket + int docsPerBucket = (int) Math.ceil((double) bufferSize / (double) numBuckets); + int bucketOrd = 0; + for(int i = 0; i < mergeMap.length; i++){ + // mergeMap[i] is the index of the i'th smallest doc + double val = buffer.get(mergeMap[i]); + + // Put the i'th smallest doc into the bucket at bucketOrd + mergeMap[i] = (int)(mergeMap[i]/docsPerBucket); + if(bucketOrd == numClusters){ + createAndAppendNewCluster(val); + } else { + addToCluster(bucketOrd, val); + } + + if((i + 1) % docsPerBucket == 0){ + // This bucket is full. Make a new one + bucketOrd += 1; + } + } + + mergeBuckets(mergeMap, numBuckets); + if (deferringCollector != null) { + deferringCollector.mergeBuckets(mergeMap); + } + } + + @Override + public CollectionPhase collectValue(LeafBucketCollector sub, int doc, double val) throws IOException{ + int bucketOrd = getNearestBucket(val); + double distance = Math.abs(clusterCentroids.get(bucketOrd)- val); + if(bucketOrd == -1 || distance > (2 * avgBucketDistance) && numClusters < shardSize) { + // Make a new bucket since the document is distant from all existing buckets + // TODO: (maybe) Create a new bucket for all distant docs and merge down to shardSize buckets at end + + createAndAppendNewCluster(val); + collectBucket(sub, doc, numClusters - 1); + + if(val > clusterCentroids.get(bucketOrd)){ + // Insert just ahead of bucketOrd so that the array remains sorted + bucketOrd += 1; + } + moveLastCluster(bucketOrd); + } else { + addToCluster(bucketOrd, val); + collectExistingBucket(sub, doc, bucketOrd); + } + return this; + } + + /** + * Creates a new cluster with value and appends it to the cluster arrays + */ + private void createAndAppendNewCluster(double value){ + // Ensure there is space for the cluster + clusterMaxes = bigArrays.grow(clusterMaxes, numClusters + 1); // + 1 because indexing starts at 0 + clusterMins = bigArrays.grow(clusterMins, numClusters + 1); + clusterCentroids = bigArrays.grow(clusterCentroids, numClusters + 1); + clusterSizes = bigArrays.grow(clusterSizes, numClusters + 1); + + // Initialize the cluster at the end of the array + clusterMaxes.set(numClusters, value); + clusterMins.set(numClusters, value); + clusterCentroids.set(numClusters, value); + clusterSizes.set(numClusters, 1); + + numClusters += 1; + } + + /** + * Move the last cluster to position idx + * This is expensive because a merge map of size numClusters is created, so don't call this method too often + * + * TODO: Make this more efficient + */ + private void moveLastCluster(int index){ + if(index != numClusters - 1) { + + // Move the cluster metadata + double holdMax = clusterMaxes.get(numClusters-1); + double holdMin = clusterMins.get(numClusters-1); + double holdCentroid = clusterCentroids.get(numClusters-1); + double holdSize = clusterSizes.get(numClusters-1); + for (int i = numClusters - 1; i > index; i--) { + // The clusters in range {index ... numClusters - 1} move up 1 index to make room for the new cluster + clusterMaxes.set(i, clusterMaxes.get(i-1)); + clusterMins.set(i, clusterMins.get(i-1)); + clusterCentroids.set(i, clusterCentroids.get(i-1)); + clusterSizes.set(i, clusterSizes.get(i-1)); + } + clusterMaxes.set(index, holdMax); + clusterMins.set(index, holdMin); + clusterCentroids.set(index, holdCentroid); + clusterSizes.set(index, holdSize); + + // Move the underlying buckets + long[] mergeMap = new long[numClusters]; + for (int i = 0; i < index; i++) { + // The clusters in range {0 ... idx - 1} don't move + mergeMap[i] = i; + } + for (int i = index; i < numClusters - 1; i++) { + // The clusters in range {index ... numClusters - 1} shift up + mergeMap[i] = i + 1; + } + // Finally, the new cluster moves to index + mergeMap[numClusters - 1] = index; + + // TODO: Create a moveLastCluster() method in BucketsAggregator which is like BucketsAggregator::mergeBuckets, + // except it doesn't require a merge map. This would be more efficient as there would be no need to create a + // merge map on every call. + mergeBuckets(mergeMap, numClusters); + if (deferringCollector != null) { + deferringCollector.mergeBuckets(mergeMap); + } + } + } + + /** + * Adds val to the cluster at index bucketOrd. + * The cluster's centroid, min, max, and size are recalculated. + */ + private void addToCluster(int bucketOrd, double val){ + assert bucketOrd < numClusters; + + double max = Math.max(clusterMaxes.get(bucketOrd), val); + double min = Math.min(clusterMins.get(bucketOrd), val); + + // Recalculate the centroid + double oldCentroid = clusterCentroids.get(bucketOrd); + double size = clusterSizes.get(bucketOrd); + double newCentroid = ((oldCentroid * size) + val) / (size + 1); + + clusterMaxes.set(bucketOrd, max); + clusterMins.set(bucketOrd, min); + clusterCentroids.set(bucketOrd, newCentroid); + clusterSizes.increment(bucketOrd, 1); + } + + /** + * Returns the ordinal of the bucket whose centroid is closest to val, or -1 if there are no buckets. + **/ + private int getNearestBucket(double value){ + if (numClusters == 0){ + return -1; + } + BigArrays.DoubleBinarySearcher binarySearcher = new BigArrays.DoubleBinarySearcher(clusterCentroids); + return binarySearcher.search(0, numClusters - 1, value); + } + + @Override + int finalNumBuckets(){ + return numClusters; + } + + @Override + InternalVariableWidthHistogram.Bucket buildBucket(int bucketOrd, InternalAggregations subAggregations){ + return new InternalVariableWidthHistogram.Bucket( + clusterCentroids.get(bucketOrd), + new InternalVariableWidthHistogram.Bucket.BucketBounds(clusterMins.get(bucketOrd), clusterMaxes.get(bucketOrd)), + bucketDocCount(bucketOrd), + formatter, + subAggregations); + } + + @Override + public void close() { + Releasables.close(clusterMaxes, clusterMins, clusterCentroids, clusterSizes); + } + } + + private final ValuesSource.Numeric valuesSource; + private final DocValueFormat formatter; + + // Aggregation parameters + private final int numBuckets; + private final int shardSize; + private final int bufferLimit; + + final BigArrays bigArrays; + private CollectionPhase collector; + + private MergingBucketsDeferringCollector deferringCollector; + + VariableWidthHistogramAggregator(String name, AggregatorFactories factories, int numBuckets, int shardSize, + int initialBuffer, @Nullable ValuesSourceConfig valuesSourceConfig, + SearchContext context, Aggregator parent, + Map metadata) throws IOException{ + super(name, factories, context, parent, metadata); + + this.numBuckets = numBuckets; + this.valuesSource = (ValuesSource.Numeric) valuesSourceConfig.getValuesSource(); + this.formatter = valuesSourceConfig.format(); + this.shardSize = shardSize; + this.bufferLimit = initialBuffer; + + bigArrays = context.bigArrays(); + collector = new BufferValuesPhase(this.bufferLimit); + + String scoringAgg = subAggsNeedScore(); + String nestedAgg = descendsFromNestedAggregator(parent); + if (scoringAgg != null && nestedAgg != null) { + /* + * Terms agg would force the collect mode to depth_first here, because + * we need to access the score of nested documents in a sub-aggregation + * and we are not able to generate this score while replaying deferred documents. + * + * But the VariableWidthHistogram agg _must_ execute in breadth first since it relies on + * deferring execution, so we just have to throw up our hands and refuse + */ + throw new IllegalStateException("VariableWidthHistogram agg [" + name() + "] is the child of the nested agg [" + nestedAgg + + "], and also has a scoring child agg [" + scoringAgg + "]. This combination is not supported because " + + "it requires executing in [depth_first] mode, which the VariableWidthHistogram agg cannot do."); + } + } + + private String subAggsNeedScore() { + for (Aggregator subAgg : subAggregators) { + if (subAgg.scoreMode().needsScores()) { + return subAgg.name(); + } + } + return null; + } + + private String descendsFromNestedAggregator(Aggregator parent) { + while (parent != null) { + if (parent.getClass() == NestedAggregator.class) { + return parent.name(); + } + parent = parent.parent(); + } + return null; + } + + @Override + public ScoreMode scoreMode() { + if (valuesSource != null && valuesSource.needsScores()) { + return ScoreMode.COMPLETE; + } + return super.scoreMode(); + } + + @Override + protected boolean shouldDefer(Aggregator aggregator) { + return true; + } + + @Override + public DeferringBucketCollector getDeferringCollector() { + deferringCollector = new MergingBucketsDeferringCollector(context, descendsFromGlobalAggregator(parent())); + return deferringCollector; + } + + @Override + protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { + if (valuesSource == null) { + return LeafBucketCollector.NO_OP_COLLECTOR; + } + final SortedNumericDoubleValues values = valuesSource.doubleValues(ctx); + return new LeafBucketCollectorBase(sub, values){ + @Override + public void collect(int doc, long bucket) throws IOException { + assert bucket == 0; + if(values.advanceExact(doc)){ + final int valuesCount = values.docValueCount(); + double prevVal = Double.NEGATIVE_INFINITY; + for (int i = 0; i < valuesCount; ++i) { + double val = values.nextValue(); + assert val >= prevVal; + if (val == prevVal){ + continue; + } + + collector = collector.collectValue(sub, doc, val); + } + } + } + }; + } + + + @Override + public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + int numClusters = collector.finalNumBuckets(); + + long[] bucketOrdsToCollect = new long[numClusters]; + for (int i = 0; i < numClusters; i++) { + bucketOrdsToCollect[i] = i; + } + + InternalAggregations[] subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect); + + List buckets = new ArrayList<>(numClusters); + for (int bucketOrd = 0; bucketOrd < numClusters; bucketOrd++) { + buckets.add(collector.buildBucket(bucketOrd, subAggregationResults[bucketOrd])); + } + + Function, InternalAggregation> resultBuilder = bucketsToFormat -> { + // The contract of the histogram aggregation is that shards must return + // buckets ordered by centroid in ascending order + CollectionUtil.introSort(bucketsToFormat, BucketOrder.key(true).comparator()); + + InternalVariableWidthHistogram.EmptyBucketInfo emptyBucketInfo = new InternalVariableWidthHistogram.EmptyBucketInfo( + buildEmptySubAggregations()); + + return new InternalVariableWidthHistogram(name, bucketsToFormat, emptyBucketInfo, numBuckets, formatter, metadata()); + }; + + return new InternalAggregation[] { resultBuilder.apply(buckets) }; + + } + + @Override + public InternalAggregation buildEmptyAggregation() { + InternalVariableWidthHistogram.EmptyBucketInfo emptyBucketInfo = new InternalVariableWidthHistogram.EmptyBucketInfo( + buildEmptySubAggregations() + ); + return new InternalVariableWidthHistogram(name(), Collections.emptyList(), emptyBucketInfo, numBuckets, formatter, metadata()); + } + + @Override + public void doClose() { + Releasables.close(collector); + } + +} + diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregatorFactory.java new file mode 100644 index 0000000000000..2b60fac8c686e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregatorFactory.java @@ -0,0 +1,85 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.histogram; + +import org.elasticsearch.index.query.QueryShardContext; +import org.elasticsearch.search.aggregations.AggregationExecutionException; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.aggregations.support.AggregatorSupplier; +import org.elasticsearch.search.aggregations.support.CoreValuesSourceType; +import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory; +import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; +import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.Map; + +public class VariableWidthHistogramAggregatorFactory extends ValuesSourceAggregatorFactory { + + public static void registerAggregators(ValuesSourceRegistry.Builder builder) { + builder.register(VariableWidthHistogramAggregationBuilder.NAME, CoreValuesSourceType.NUMERIC, + (VariableWidthHistogramAggregatorSupplier) VariableWidthHistogramAggregator::new); + } + + private final int numBuckets; + private final int shardSize; + private final int initialBuffer; + + VariableWidthHistogramAggregatorFactory(String name, + ValuesSourceConfig config, + int numBuckets, + int shardSize, + int initialBuffer, + QueryShardContext queryShardContext, + AggregatorFactory parent, + AggregatorFactories.Builder subFactoriesBuilder, + Map metadata) throws IOException{ + super(name, config, queryShardContext, parent, subFactoriesBuilder, metadata); + this.numBuckets = numBuckets; + this.shardSize = shardSize; + this.initialBuffer = initialBuffer; + } + + @Override + protected Aggregator doCreateInternal(SearchContext searchContext, + Aggregator parent, + boolean collectsFromSingleBucket, + Map metadata) throws IOException{ + AggregatorSupplier aggregatorSupplier = queryShardContext.getValuesSourceRegistry().getAggregator(config, + VariableWidthHistogramAggregationBuilder.NAME); + if (aggregatorSupplier instanceof VariableWidthHistogramAggregatorSupplier == false) { + throw new AggregationExecutionException("Registry miss-match - expected HistogramAggregatorSupplier, found [" + + aggregatorSupplier.getClass().toString() + "]"); + } + return ((VariableWidthHistogramAggregatorSupplier) aggregatorSupplier).build(name, factories, numBuckets, shardSize, initialBuffer, + config, searchContext, parent, metadata); + } + + @Override + protected Aggregator createUnmapped(SearchContext searchContext, + Aggregator parent, + Map metadata) throws IOException { + return new VariableWidthHistogramAggregator(name, factories, numBuckets, shardSize, initialBuffer, config, + searchContext, parent, metadata); + } +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregatorSupplier.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregatorSupplier.java new file mode 100644 index 0000000000000..9e0abdeec9633 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregatorSupplier.java @@ -0,0 +1,45 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.histogram; + +import org.elasticsearch.common.Nullable; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.support.AggregatorSupplier; +import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.Map; + +@FunctionalInterface +public interface VariableWidthHistogramAggregatorSupplier extends AggregatorSupplier { + Aggregator build( + String name, + AggregatorFactories factories, + int numBuckets, + int shardSize, + int initialBuffer, + @Nullable ValuesSourceConfig valuesSourceConfig, + SearchContext aggregationContext, + Aggregator parent, + Map metadata + ) throws IOException; +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationInspectionHelper.java b/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationInspectionHelper.java index 9785ff971e83b..6864d958dbdc3 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationInspectionHelper.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationInspectionHelper.java @@ -24,6 +24,7 @@ import org.elasticsearch.search.aggregations.bucket.filter.InternalFilters; import org.elasticsearch.search.aggregations.bucket.geogrid.InternalGeoGrid; import org.elasticsearch.search.aggregations.bucket.global.InternalGlobal; +import org.elasticsearch.search.aggregations.bucket.histogram.InternalVariableWidthHistogram; import org.elasticsearch.search.aggregations.bucket.histogram.InternalAutoDateHistogram; import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram; import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram; @@ -124,6 +125,10 @@ public static boolean hasValue(InternalAutoDateHistogram agg) { return agg.getBuckets().stream().anyMatch(bucket -> bucket.getDocCount() > 0); } + public static boolean hasValue(InternalVariableWidthHistogram agg) { + return agg.getBuckets().stream().anyMatch(bucket -> bucket.getDocCount() > 0); + } + public static boolean hasValue(InternalComposite agg) { return agg.getBuckets().stream().anyMatch(bucket -> bucket.getDocCount() > 0); } diff --git a/server/src/test/java/org/elasticsearch/common/util/BinarySearcherTests.java b/server/src/test/java/org/elasticsearch/common/util/BinarySearcherTests.java new file mode 100644 index 0000000000000..bbdb8077aec58 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/common/util/BinarySearcherTests.java @@ -0,0 +1,141 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.util; + +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.test.ESTestCase; +import org.junit.Before; + +import java.util.Arrays; + +public class BinarySearcherTests extends ESTestCase { + + private BigArrays randombigArrays() { + return new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); + } + + private BigArrays bigArrays; + + @Before + public void init() { + bigArrays = randombigArrays(); + } + + public void testDoubleBinarySearch() throws Exception { + final int size = randomIntBetween(50, 10000); + DoubleArray bigArray = new BigDoubleArray(size, bigArrays, false); + double[] array = new double[size]; + + // Fill array with sorted values + double currentValue = randomDoubleBetween(-100, 100, true); + for (int i = 0; i < size; ++i) { + bigArray.set(i, currentValue); + array[i] = currentValue; + currentValue += randomDoubleBetween(0, 30, false); + } + + // Pick a number to search for + int index = randomIntBetween(0, size - 1); + double searchFor = bigArray.get(index); + if (randomBoolean()) { + // Pick a number where there is no exact match, but that is closest to array.get(index) + if (randomBoolean()) { + // Pick a number above array.get(index) + if (index < size - 1) { + // Divide by 3 so that it's closer to array.get(index) than to array.get(index + 1) + searchFor += (bigArray.get(index + 1) - bigArray.get(index)) / 3; + } else { + // There is nothing about index + searchFor += 0.1; + } + } else { + // Pick one below array.get(index) + if (index > 0) { + searchFor -= (bigArray.get(index) - bigArray.get(index - 1)) / 3; + } else { + // There is nothing below index + searchFor -= 0.1; + } + } + } + + BigArrays.DoubleBinarySearcher searcher = new BigArrays.DoubleBinarySearcher(bigArray); + assertEquals(index, searcher.search(0, size - 1, searchFor)); + + // Sanity check: confirm that ArrayUtils.binarySearch() returns the same index + int arraysIndex = Arrays.binarySearch(array, searchFor); + if(arraysIndex < 0){ + // Arrays.binarySearch didn't find an exact match + arraysIndex = -(arraysIndex + 1); + } + + // Arrays.binarySearch always rounds down whereas BinarySearcher rounds to the closest index + // So sometimes they will be off by 1 + assertEquals(Math.abs(index - arraysIndex) <= 1, true); + + Releasables.close(bigArray); + } + + class IntBinarySearcher extends BinarySearcher { + + int[] array; + int searchFor; + + IntBinarySearcher(int[] array, int searchFor) { + this.array = array; + this.searchFor = searchFor; + } + + @Override + protected int compare(int index) { + return Integer.compare(array[index], searchFor); + } + + @Override + protected double distance(int index) { + return Math.abs(array[index] - searchFor); + } + } + + public void testCompareWithArraysBinarySearch() throws Exception { + int size = randomIntBetween(30, 10000); + int[] array = new int[size]; + for (int i = 0; i < size; i++) { + array[i] = randomInt(); + } + Arrays.sort(array); + int searchFor = randomInt(); + BinarySearcher searcher = new IntBinarySearcher(array, searchFor); + + int searcherIndex = searcher.search(0, size-1); + int arraysIndex = Arrays.binarySearch(array, searchFor); + + if(arraysIndex < 0){ + // Arrays.binarySearch didn't find an exact match + arraysIndex = -(arraysIndex + 1); + } + + // Arrays.binarySearch always rounds down whereas BinarySearcher rounds to the closest index + // So sometimes they will be off by 1 + assertEquals(Math.abs(searcherIndex - arraysIndex) <= 1, true); + } +} diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/AggregationsTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/AggregationsTests.java index 51c77a430c87b..d574e254c5469 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/AggregationsTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/AggregationsTests.java @@ -41,6 +41,7 @@ import org.elasticsearch.search.aggregations.bucket.histogram.InternalAutoDateHistogramTests; import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogramTests; import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogramTests; +import org.elasticsearch.search.aggregations.bucket.histogram.InternalVariableWidthHistogramTests; import org.elasticsearch.search.aggregations.bucket.missing.InternalMissingTests; import org.elasticsearch.search.aggregations.bucket.nested.InternalNestedTests; import org.elasticsearch.search.aggregations.bucket.nested.InternalReverseNestedTests; @@ -102,7 +103,6 @@ * */ public class AggregationsTests extends ESTestCase { - private static final List> aggsTests = getAggsTests(); private static List> getAggsTests() { @@ -131,6 +131,7 @@ private static List> getAggsTests() { aggsTests.add(new InternalHistogramTests()); aggsTests.add(new InternalDateHistogramTests()); aggsTests.add(new InternalAutoDateHistogramTests()); + aggsTests.add(new InternalVariableWidthHistogramTests()); aggsTests.add(new LongTermsTests()); aggsTests.add(new DoubleTermsTests()); aggsTests.add(new StringTermsTests()); diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalVariableWidthHistogramTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalVariableWidthHistogramTests.java new file mode 100644 index 0000000000000..232dc24dd95d9 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalVariableWidthHistogramTests.java @@ -0,0 +1,394 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.histogram; + +import org.apache.lucene.util.TestUtil; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.common.util.MockPageCacheRecycler; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.MultiBucketConsumerService; +import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.test.InternalMultiBucketAggregationTestCase; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class InternalVariableWidthHistogramTests extends + InternalMultiBucketAggregationTestCase{ + + private DocValueFormat format; + private InternalVariableWidthHistogram.EmptyBucketInfo emptyBucktInfo; + private int numBuckets; + + @Override + public void setUp() throws Exception { + super.setUp(); + format = randomNumericDocValueFormat(); + emptyBucktInfo = new InternalVariableWidthHistogram.EmptyBucketInfo(InternalAggregations.EMPTY); + this.numBuckets = 3; + } + + private InternalVariableWidthHistogram createEmptyTestInstance(){ + String name = randomAlphaOfLength(5); + Map metadata = null; + if (randomBoolean()) { + metadata = new HashMap<>(); + int metadataCount = between(0, 10); + while (metadata.size() < metadataCount) { + metadata.put(randomAlphaOfLength(5), randomAlphaOfLength(5)); + } + } + List buckets = new ArrayList<>(); + return new InternalVariableWidthHistogram(name, buckets, emptyBucktInfo, numBuckets, format, metadata); + } + + @Override + protected InternalVariableWidthHistogram createTestInstance(String name, + Map metaData, + InternalAggregations aggregations) { + final double base = randomIntBetween(-50, 50); + final int numBuckets = randomIntBetween(1, 3); + List buckets = new ArrayList<>(); + double curKey = base; + for (int i = 0; i < numBuckets; ++i) { + final int docCount = TestUtil.nextInt(random(), 1, 50); + double add = randomDoubleBetween(1, 10, true); + curKey += add; + buckets.add(new InternalVariableWidthHistogram.Bucket( + curKey, + new InternalVariableWidthHistogram.Bucket.BucketBounds(curKey - (add / 3), curKey + (add / 3)), + docCount, + format, + InternalAggregations.EMPTY + )); + } + return new InternalVariableWidthHistogram(name, buckets, emptyBucktInfo, numBuckets, format, metaData); + } + + @Override + protected Class implementationClass() { + return ParsedVariableWidthHistogram.class; + } + @Override + protected InternalVariableWidthHistogram mutateInstance(InternalVariableWidthHistogram instance) { + String name = instance.getName(); + List buckets = instance.getBuckets(); + int targetBuckets = instance.getTargetBuckets(); + InternalVariableWidthHistogram.EmptyBucketInfo emptyBucketInfo = instance.getEmptyBucketInfo(); + Map metadata = instance.getMetadata(); + switch (between(0, 2)) { + case 0: + name += randomAlphaOfLength(5); + break; + case 1: + buckets = new ArrayList<>(buckets); + double boundMin = randomDouble(); + double boundMax = Math.abs(boundMin) * 2; + buckets.add(new InternalVariableWidthHistogram.Bucket( + randomDouble(), + new InternalVariableWidthHistogram.Bucket.BucketBounds(boundMin, boundMax), + randomIntBetween(1, 100), + format, + InternalAggregations.EMPTY + )); + break; + case 2: + emptyBucketInfo = null; + if (metadata == null) { + metadata = new HashMap<>(1); + } else { + metadata = new HashMap<>(instance.getMetadata()); + } + metadata.put(randomAlphaOfLength(15), randomInt()); + break; + default: + throw new AssertionError("Illegal randomisation branch"); + } + return new InternalVariableWidthHistogram(name, buckets, emptyBucketInfo, targetBuckets, format, metadata); + } + + public void testSingleShardReduceLong() { + InternalVariableWidthHistogram dummy_histogram = createEmptyTestInstance(); + List buckets = new ArrayList<>(); + for (long value : new long[]{1, 2, 5, 10, 12, 200}) { + InternalVariableWidthHistogram.Bucket.BucketBounds bounds = + new InternalVariableWidthHistogram.Bucket.BucketBounds(value, value + 1); + InternalVariableWidthHistogram.Bucket bucket = new InternalVariableWidthHistogram.Bucket( + value, bounds, 3, format, InternalAggregations.EMPTY + ); + buckets.add(bucket); + } + InternalVariableWidthHistogram histogram = dummy_histogram.create(buckets); + + MockBigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); + ScriptService mockScriptService = mockScriptService(); + + MultiBucketConsumerService.MultiBucketConsumer bucketConsumer = + new MultiBucketConsumerService.MultiBucketConsumer(DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)); + InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forFinalReduction( + bigArrays, mockScriptService, bucketConsumer, PipelineAggregator.PipelineTree.EMPTY); + + ArrayList aggs = new ArrayList<>(); + aggs.add(histogram); + List reduced_buckets = + ((InternalVariableWidthHistogram) histogram.reduce(aggs, context)).getBuckets(); + + // Final clusters should be [ (1,2,5), (10,12), 200) ] + // Final centroids should be [ 3, 11, 200 ] + // Final keys should be [ 1, 5, 200 ] + double double_error = 1d / 10000d; + assertEquals(1d, reduced_buckets.get(0).min(), double_error); + assertEquals((8d/3d), (double) reduced_buckets.get(0).getKey(), double_error); + assertEquals(9, reduced_buckets.get(0).getDocCount()); + assertEquals(10d, reduced_buckets.get(1).min(), double_error); + assertEquals(11d, (double) reduced_buckets.get(1).getKey(), double_error); + assertEquals(6, reduced_buckets.get(1).getDocCount()); + assertEquals(200d, reduced_buckets.get(2).min(), double_error); + assertEquals(200d, (double) reduced_buckets.get(2).getKey(), double_error); + assertEquals(3, reduced_buckets.get(2).getDocCount()); + } + + public void testSingleShardReduceDouble() { + InternalVariableWidthHistogram dummy_histogram = createEmptyTestInstance(); + List buckets = new ArrayList<>(); + for (double value : new double[]{-1.3, -1.3, 12.0, 13.0, 20.0, 21.5, 23.0, 24.5}) { + InternalVariableWidthHistogram.Bucket.BucketBounds bounds = + new InternalVariableWidthHistogram.Bucket.BucketBounds(value - 0.7, value + 1); + InternalVariableWidthHistogram.Bucket bucket = new InternalVariableWidthHistogram.Bucket( + value, bounds, 1, format, InternalAggregations.EMPTY + ); + buckets.add(bucket); + } + InternalVariableWidthHistogram histogram = dummy_histogram.create(buckets); + + MockBigArrays bigArrays = + new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); + ScriptService mockScriptService = mockScriptService(); + + MultiBucketConsumerService.MultiBucketConsumer bucketConsumer = + new MultiBucketConsumerService.MultiBucketConsumer(DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)); + InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forFinalReduction( + bigArrays, mockScriptService, bucketConsumer, PipelineAggregator.PipelineTree.EMPTY); + + ArrayList aggs = new ArrayList<>(); + aggs.add(histogram); + List reduced_buckets = + ((InternalVariableWidthHistogram) histogram.reduce(aggs, context)).getBuckets(); + + // Final clusters should be [ (-1.3,-1.3), (12.0,13.0), (20.0, 21.5, 23.0, 24.5) ] + // Final centroids should be [ -1.3, 12.5, 22.25 ] + // Final keys should be [ -1.3, 11.7, 19.7 ] + double double_error = 1d / 10000d; + assertEquals(-2.0, reduced_buckets.get(0).min(), double_error); + assertEquals(-1.3, (double)reduced_buckets.get(0).getKey(), double_error); + assertEquals(2, reduced_buckets.get(0).getDocCount()); + assertEquals(11.3, reduced_buckets.get(1).min(), double_error); + assertEquals(12.5, (double)reduced_buckets.get(1).getKey(), double_error); + assertEquals(2, reduced_buckets.get(1).getDocCount()); + assertEquals(19.3, reduced_buckets.get(2).min(), double_error); + assertEquals(22.25, (double)reduced_buckets.get(2).getKey(), double_error); + assertEquals(4, reduced_buckets.get(2).getDocCount()); + } + + public void testMultipleShardsReduce() { + InternalVariableWidthHistogram dummy_histogram = createEmptyTestInstance(); + + List buckets1 = new ArrayList<>(); + for (long value : new long[]{1, 5, 6, 10}) { + InternalVariableWidthHistogram.Bucket.BucketBounds bounds = + new InternalVariableWidthHistogram.Bucket.BucketBounds(value, value + 1); + InternalVariableWidthHistogram.Bucket bucket = new InternalVariableWidthHistogram.Bucket( + value, bounds, 1, format, InternalAggregations.EMPTY + ); + buckets1.add(bucket); + } + + List buckets2 = new ArrayList<>(); + for (long value : new long[]{2, 3, 6, 7}) { + InternalVariableWidthHistogram.Bucket.BucketBounds bounds = + new InternalVariableWidthHistogram.Bucket.BucketBounds(value, value + 1); + InternalVariableWidthHistogram.Bucket bucket = new InternalVariableWidthHistogram.Bucket( + value, bounds, 1, format, InternalAggregations.EMPTY + ); + buckets2.add(bucket); + } + + List buckets3 = new ArrayList<>(); + for (long value : new long[]{0, 2, 12}) { + InternalVariableWidthHistogram.Bucket.BucketBounds bounds = + new InternalVariableWidthHistogram.Bucket.BucketBounds(value, value + 1); + InternalVariableWidthHistogram.Bucket bucket = new InternalVariableWidthHistogram.Bucket( + value, bounds, 1, format, InternalAggregations.EMPTY + ); + buckets3.add(bucket); + } + + InternalVariableWidthHistogram histogram1 = dummy_histogram.create(buckets1); + InternalVariableWidthHistogram histogram2 = dummy_histogram.create(buckets2); + InternalVariableWidthHistogram histogram3 = dummy_histogram.create(buckets3); + + MockBigArrays bigArrays = + new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); + ScriptService mockScriptService = mockScriptService(); + + MultiBucketConsumerService.MultiBucketConsumer bucketConsumer = + new MultiBucketConsumerService.MultiBucketConsumer(DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)); + InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forFinalReduction( + bigArrays, mockScriptService, bucketConsumer, PipelineAggregator.PipelineTree.EMPTY); + + ArrayList aggs = new ArrayList<>(); + aggs.add(histogram1); + aggs.add(histogram2); + aggs.add(histogram3); + List reduced_buckets = + ((InternalVariableWidthHistogram) histogram1.reduce(aggs, context)).getBuckets(); + + // Final clusters should be [ (0, 1, 2, 2, 3), (5, 6, 6, 7), (10, 12) ] + // Final centroids should be [ 2, 6, 11 ] + // Final keys should be [ 1, 5, 10 ] + double double_error = 1d / 10000d; + assertEquals(0d, reduced_buckets.get(0).min(), double_error); + assertEquals(1.6d, (double)reduced_buckets.get(0).getKey(), double_error); + assertEquals(5, reduced_buckets.get(0).getDocCount()); + assertEquals(5d, reduced_buckets.get(1).min(), double_error); + assertEquals(6d, (double) reduced_buckets.get(1).getKey(), double_error); + assertEquals(4, reduced_buckets.get(1).getDocCount()); + assertEquals(10d, reduced_buckets.get(2).min(), double_error); + assertEquals(11d, (double) reduced_buckets.get(2).getKey(), double_error); + assertEquals(2, reduced_buckets.get(2).getDocCount()); + } + + public void testOverlappingReduceResult() { + InternalVariableWidthHistogram dummy_histogram = createEmptyTestInstance(); + List buckets = new ArrayList<>(); + for (long value : new long[]{1, 2, 4, 10}) { + InternalVariableWidthHistogram.Bucket.BucketBounds bounds = + new InternalVariableWidthHistogram.Bucket.BucketBounds(value, value + 3); + InternalVariableWidthHistogram.Bucket bucket = new InternalVariableWidthHistogram.Bucket( + value, bounds, 4, format, InternalAggregations.EMPTY + ); + buckets.add(bucket); + } + InternalVariableWidthHistogram histogram = dummy_histogram.create(buckets); + + MockBigArrays bigArrays = + new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); + ScriptService mockScriptService = mockScriptService(); + + MultiBucketConsumerService.MultiBucketConsumer bucketConsumer = + new MultiBucketConsumerService.MultiBucketConsumer(DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)); + InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forFinalReduction( + bigArrays, mockScriptService, bucketConsumer, PipelineAggregator.PipelineTree.EMPTY); + + ArrayList aggs = new ArrayList<>(); + aggs.add(histogram); + List reduced_buckets = + ((InternalVariableWidthHistogram) histogram.reduce(aggs, context)).getBuckets(); + + // Expected clusters: [ (1, 2), (4), 10) ] + // Expected centroids: [ 1.5, 4, 10 ] + // Expected cluster (min, max): [ (1, 5), (4, 7), (10, 13) ] + // Expected keys: [ 1, 4.5, 10 ] + // Expected doc counts: [8, 4, 4] + double double_error = 1d / 10000d; + assertEquals(1d, reduced_buckets.get(0).min(), double_error); + assertEquals(1.5, (double) reduced_buckets.get(0).getKey(), double_error); + assertEquals(8, reduced_buckets.get(0).getDocCount()); + assertEquals(4.5, reduced_buckets.get(1).min(), double_error); + assertEquals(4d, (double) reduced_buckets.get(1).getKey(), double_error); + assertEquals(4, reduced_buckets.get(1).getDocCount()); + assertEquals(10d, reduced_buckets.get(2).min(), double_error); + assertEquals(10d, (double) reduced_buckets.get(2).getKey(), double_error); + assertEquals(4, reduced_buckets.get(2).getDocCount()); + } + + /** + * When buckets have the same min after the reduce phase, they should be merged. + */ + public void testSameMinMerge() { + InternalVariableWidthHistogram dummy_histogram = createEmptyTestInstance(); + List buckets = new ArrayList<>(); + for (long value : new long[]{1, 100, 700}) { + InternalVariableWidthHistogram.Bucket.BucketBounds bounds; + if(value == 1 || value == 100) { + bounds = new InternalVariableWidthHistogram.Bucket.BucketBounds( + 1, value + ); + } else{ + bounds = new InternalVariableWidthHistogram.Bucket.BucketBounds( + value, value + 1 + ); + } + InternalVariableWidthHistogram.Bucket bucket = new InternalVariableWidthHistogram.Bucket( + value, bounds, 1, format, InternalAggregations.EMPTY + ); + buckets.add(bucket); + } + InternalVariableWidthHistogram histogram = dummy_histogram.create(buckets); + + MockBigArrays bigArrays = + new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); + ScriptService mockScriptService = mockScriptService(); + + MultiBucketConsumerService.MultiBucketConsumer bucketConsumer = + new MultiBucketConsumerService.MultiBucketConsumer(DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)); + InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forFinalReduction( + bigArrays, mockScriptService, bucketConsumer, PipelineAggregator.PipelineTree.EMPTY); + + ArrayList aggs = new ArrayList<>(); + aggs.add(histogram); + List reduced_buckets = + ((InternalVariableWidthHistogram) histogram.reduce(aggs, context)).getBuckets(); + + // Expected clusters: [ (1), (100), (700) ] + // Expected clusters after same min merge: [ (1, 100), (700) ] + // Expected centroids: [ 101/2, 700 ] + // Expected keys: [ 1, 700 ] + // Expected doc counts: [2, 1] + double double_error = 1d / 10000d; + assertEquals(2, reduced_buckets.size()); + assertEquals(1d, reduced_buckets.get(0).min(), double_error); + assertEquals((101d/2d), (double) reduced_buckets.get(0).getKey(), double_error); + assertEquals(2, reduced_buckets.get(0).getDocCount()); + assertEquals(700d, reduced_buckets.get(1).min(), double_error); + assertEquals(700d, (double) reduced_buckets.get(1).getKey(), double_error); + assertEquals(1, reduced_buckets.get(1).getDocCount()); + } + + @Override + protected void assertReduced(InternalVariableWidthHistogram reduced, List inputs) { + // It's very difficult to determine what the buckets should be without running the clustering algorithm. + // For now, randomized tests are avoided. Refer to the hardcoded written tests above. + } +} diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregatorTests.java new file mode 100644 index 0000000000000..761e5abaac3b1 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregatorTests.java @@ -0,0 +1,544 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.histogram; + +import org.apache.lucene.document.Document; +import org.apache.lucene.document.SortedNumericDocValuesField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.RandomIndexWriter; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.NumericUtils; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.NumberFieldMapper; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.AggregatorTestCase; +import org.elasticsearch.search.aggregations.bucket.terms.InternalTerms; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.InternalStats; +import org.elasticsearch.search.aggregations.metrics.StatsAggregationBuilder; +import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; + +public class VariableWidthHistogramAggregatorTests extends AggregatorTestCase { + + private static final String NUMERIC_FIELD = "numeric"; + + private static final Query DEFAULT_QUERY = new MatchAllDocsQuery(); + private VariableWidthHistogramAggregationBuilder aggregationBuilder; + + public void testNoDocs() throws Exception{ + final List dataset = Arrays.asList(); + testBothCases(DEFAULT_QUERY, dataset, true, + aggregation -> aggregation.field(NUMERIC_FIELD).setNumBuckets(2).setShardSize(6).setInitialBuffer(4), + histogram -> { + final List buckets = histogram.getBuckets(); + assertEquals(0, buckets.size()); + }); + } + + public void testMoreClustersThanDocs() throws Exception { + final List dataset = Arrays.asList(-3L, 10L, -200L); + double doubleError = 1d / 10000d; + + // Each document should have its own cluster, since shard_size > # docs + final Map expectedDocCount = new HashMap<>(); + expectedDocCount.put(-200d, 1); + expectedDocCount.put(-3d, 1); + expectedDocCount.put(10d, 1); + final Map expectedMins = new HashMap<>(); + expectedMins.put(-200d, -200d); + expectedMins.put(-3d, -3d); + expectedMins.put(10d, 10d); + + testBothCases(DEFAULT_QUERY, dataset, true, + aggregation -> aggregation.field(NUMERIC_FIELD).setNumBuckets(4).setShardSize(4), + histogram -> { + final List buckets = histogram.getBuckets(); + assertEquals(expectedDocCount.size(), buckets.size()); + buckets.forEach(bucket -> { + assertEquals(expectedDocCount.getOrDefault(bucket.getKey(), 0).longValue(), bucket.getDocCount()); + assertEquals(expectedMins.getOrDefault(bucket.getKey(), 0d).doubleValue(), bucket.min(), doubleError); + }); + }); + } + + public void testLongs() throws Exception { + final List dataset = Arrays.asList(5L, -3L, 1L, -2L, 4L); + double doubleError = 1d / 10000d; + + // Expected clusters: [ (-3, -2), (1), (4, 5) ] + // Corresponding keys (centroids): [ -2.5, 1, 4.5 ] + + final Map expectedDocCount = new HashMap<>(); + expectedDocCount.put(-2.5, 2); + expectedDocCount.put(1d, 1); + expectedDocCount.put(4.5, 2); + + final Map expectedMins = new HashMap<>(); + expectedMins.put(-2.5, -3d); + expectedMins.put(1d, 1d); + expectedMins.put(4.5, 4d); + + final Map expectedMaxes = new HashMap<>(); + expectedMaxes.put(-2.5, -2d); + expectedMaxes.put(1d, 1d); + expectedMaxes.put(4.5, 5d); + + testSearchCase(DEFAULT_QUERY, dataset, false, + aggregation -> aggregation.field(NUMERIC_FIELD).setNumBuckets(3).setShardSize(6).setInitialBuffer(3), + histogram -> { + final List buckets = histogram.getBuckets(); + assertEquals(expectedDocCount.size(), buckets.size()); + buckets.forEach(bucket -> { + assertEquals(expectedDocCount.getOrDefault(bucket.getKey(), 0).longValue(), bucket.getDocCount()); + assertEquals(expectedMins.getOrDefault(bucket.getKey(), 0d).doubleValue(), bucket.min(), doubleError); + assertEquals(expectedMaxes.getOrDefault(bucket.getKey(), 0d).doubleValue(), bucket.max(), doubleError); + }); + }); + } + + public void testDoubles() throws Exception { + final List dataset = Arrays.asList(3.3, 8.8, 1.2, 5.3, 2.26, -0.4, 5.9); + double doubleError = 1d / 10000d; + + // Search (no reduce) + + // (Cache limit < shard size) is illogical, but it simplifies testing + // Once the cache is full, the algorithm creates (3/4 * shard_size = 4) initial buckets. + // So, there will initially be a bucket for each of the first 4 values + // Expected clusters from search: [ (-0.4, 1.2), (2.26, 3.3), (5.3, 5.9) (8.8)] + // Corresponding keys (centroids): [ 0.4, 2.78, 5.6, 8.8] + final Map expectedDocCountOnlySearch = new HashMap<>(); + expectedDocCountOnlySearch.put(-0.4, 2); + expectedDocCountOnlySearch.put(2.26, 2); + expectedDocCountOnlySearch.put(5.3, 2); + expectedDocCountOnlySearch.put(8.8, 1); + + // Index these by min, rather than centroid, to avoid slight precision errors (Ex. 3.999999 vs 4.0) that arise from double division + final Map expectedCentroidsOnlySearch = new HashMap<>(); + expectedCentroidsOnlySearch.put(-0.4, 0.4); + expectedCentroidsOnlySearch.put(2.26, 2.78); + expectedCentroidsOnlySearch.put(5.3, 5.6); + expectedCentroidsOnlySearch.put(8.8, 8.8); + final Map expectedMaxesOnlySearch = new HashMap<>(); + expectedMaxesOnlySearch.put(-0.4, 1.2); + expectedMaxesOnlySearch.put(2.26, 3.3); + expectedMaxesOnlySearch.put(5.3, 5.9); + expectedMaxesOnlySearch.put(8.8, 8.8); + + testSearchCase(DEFAULT_QUERY, dataset, false, + aggregation -> aggregation.field(NUMERIC_FIELD).setNumBuckets(2).setShardSize(6).setInitialBuffer(4), + histogram -> { + final List buckets = histogram.getBuckets(); + assertEquals(expectedCentroidsOnlySearch.size(), buckets.size()); + buckets.forEach(bucket -> { + assertEquals(expectedDocCountOnlySearch.getOrDefault(bucket.min(), 0).longValue(), bucket.getDocCount(), doubleError); + assertEquals(expectedCentroidsOnlySearch.getOrDefault(bucket.min(), 0d).doubleValue(), bucket.centroid(), doubleError); + assertEquals(expectedMaxesOnlySearch.getOrDefault(bucket.min(), 0d).doubleValue(), bucket.max(), doubleError); + }); + }); + + // Search + Reduce + + // Before reducing we have one bucket per doc + // Expected clusters from search: [ (-0.4, 1.2), (2.26, 3.3), (5.3, 5.9) (8.8)] + // Corresponding keys (centroids): [ 0.4, 2.78, 5.6, 8.8] + final Map expectedDocCountSearchReduce = new HashMap<>(); + expectedDocCountSearchReduce.put(-0.4, 2); + expectedDocCountSearchReduce.put(2.26, 2); + expectedDocCountSearchReduce.put(5.3, 2); + expectedDocCountSearchReduce.put(8.8, 1); + + // Indexed by min + final Map expectedCentroidsSearchReduce = new HashMap<>(); + expectedCentroidsSearchReduce.put(-0.4, 0.4); + expectedCentroidsSearchReduce.put(2.26,2.78); + expectedCentroidsSearchReduce.put(5.3, 5.6); + expectedCentroidsSearchReduce.put(8.8, 8.8); + final Map expectedMaxesSearchReduce = new HashMap<>(); + expectedMaxesSearchReduce.put(-0.4, 1.2); + expectedMaxesSearchReduce.put(2.26,3.3); + expectedMaxesSearchReduce.put(5.3, 5.9); + expectedMaxesSearchReduce.put(8.8, 8.8); + + testSearchAndReduceCase(DEFAULT_QUERY, dataset, false, + aggregation -> aggregation.field(NUMERIC_FIELD).setNumBuckets(4).setShardSize(6).setInitialBuffer(4), + histogram -> { + final List buckets = histogram.getBuckets(); + assertEquals(expectedDocCountSearchReduce.size(), buckets.size()); + buckets.forEach(bucket -> { + long expectedDocCount = expectedDocCountSearchReduce.getOrDefault(bucket.min(), 0).longValue(); + double expectedCentroid = expectedCentroidsSearchReduce.getOrDefault(bucket.min(), 0d).doubleValue(); + double expectedMax = expectedMaxesSearchReduce.getOrDefault(bucket.min(), 0d).doubleValue(); + assertEquals(expectedDocCount, bucket.getDocCount(), doubleError); + assertEquals(expectedCentroid, bucket.centroid(), doubleError); + assertEquals(expectedMax, bucket.max(), doubleError); + }); + }); + } + + // Once the cache limit is reached, cached documents are collected into (3/4 * shard_size) buckets + // A new bucket should be added when there is a document that is distant from all existing buckets + public void testNewBucketCreation() throws Exception { + final List dataset = Arrays.asList(-1, 1, 3, 5, 7, 9, 11, 13, 15, 17, 19, 40, 30, 25, 32, 38, 80, 50, 75); + double doubleError = 1d / 10000d; + + // Search (no reduce) + + // Expected clusters: [ (-1), (1), (3), (5), (7), (9), (11), (13), (15), (17), + // (19), (25, 30, 32), (38, 40), (50), (75, 80) ] + // Corresponding keys (centroids): [ -1, 1, 3, ..., 17, 19, 29, 39, 50, 77.5] + // Note: New buckets are created for 30, 50, and 80 because they are distant from the other buckets + final List keys = Arrays.asList(-1d, 1d, 3d, 5d, 7d, 9d, 11d, 13d, 15d, 17d, 19d, 29d, 39d, 50d, 77.5d); + final List mins = Arrays.asList(-1d, 1d, 3d, 5d, 7d, 9d, 11d, 13d, 15d, 17d, 19d, 25d, 38d, 50d, 75d); + final List maxes = Arrays.asList(-1d, 1d, 3d, 5d, 7d, 9d, 11d, 13d, 15d, 17d, 19d, 32d, 40d, 50d, 80d); + final List docCounts = Arrays.asList(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 3, 2, 1, 2); + assert keys.size() == docCounts.size() && keys.size() == keys.size(); + + final Map expectedDocCountOnlySearch = new HashMap<>(); + final Map expectedMinsOnlySearch = new HashMap<>(); + final Map expectedMaxesOnlySearch = new HashMap<>(); + for(int i=0; i aggregation.field(NUMERIC_FIELD).setNumBuckets(2).setShardSize(16).setInitialBuffer(12), + histogram -> { + final List buckets = histogram.getBuckets(); + assertEquals(expectedDocCountOnlySearch.size(), buckets.size()); + buckets.forEach(bucket -> { + long expectedDocCount = expectedDocCountOnlySearch.getOrDefault(bucket.getKey(), 0).longValue(); + double expectedCentroid = expectedMinsOnlySearch.getOrDefault(bucket.getKey(), 0d).doubleValue(); + double expectedMax = expectedMaxesOnlySearch.getOrDefault(bucket.getKey(), 0d).doubleValue(); + assertEquals(expectedDocCount, bucket.getDocCount()); + assertEquals(expectedCentroid, bucket.min(), doubleError); + assertEquals(expectedMax, bucket.max(), doubleError); + }); + }); + } + + // There should not be more than `shard_size` documents on a node, even when very distant documents appear + public void testNewBucketLimit() throws Exception{ + final List dataset = Arrays.asList(1,2,3,4,5, 10, 20, 50, 100, 5400, -900); + double doubleError = 1d / 10000d; + + // Expected clusters: [ (-900, 1, 2), (3, 4), (5), (10, 20, 50, 100, 5400)] + // Corresponding keys (centroids): [ -299, 3.5, 5, 1116] + final Map expectedDocCount = new HashMap<>(); + expectedDocCount.put(-299d, 3); + expectedDocCount.put(3.5d, 2); + expectedDocCount.put(5d, 1); + expectedDocCount.put(1116d, 5); + + final Map expectedMins = new HashMap<>(); + expectedMins.put(-299d, -900d); + expectedMins.put(3.5d, 3d); + expectedMins.put(5d, 5d); + expectedMins.put(1116d, 10d); + + final Map expectedMaxes = new HashMap<>(); + expectedMaxes.put(-299d, 2d); + expectedMaxes.put(3.5d, 4d); + expectedMaxes.put(5d, 5d); + expectedMaxes.put(1116d, 5400d); + + testSearchCase(DEFAULT_QUERY, dataset, false, + aggregation -> aggregation.field(NUMERIC_FIELD) .setNumBuckets(2).setShardSize(4).setInitialBuffer(5), + histogram -> { + final List buckets = histogram.getBuckets(); + assertEquals(expectedDocCount.size(), buckets.size()); + buckets.forEach(bucket -> { + assertEquals(expectedDocCount.getOrDefault(bucket.getKey(), 0).longValue(), bucket.getDocCount()); + assertEquals(expectedMins.getOrDefault(bucket.getKey(), 0d).doubleValue(), bucket.min(), doubleError); + assertEquals(expectedMaxes.getOrDefault(bucket.getKey(), 0d).doubleValue(), bucket.max(), doubleError); + }); + }); + + } + + + public void testSimpleSubAggregations() throws IOException{ + final List dataset = Arrays.asList(5, 1, 9, 2, 8); + + testSearchAndReduceCase(DEFAULT_QUERY, dataset, false, + aggregation -> aggregation.field(NUMERIC_FIELD) + .setNumBuckets(3) + .setInitialBuffer(3) + .setShardSize(4) + .subAggregation(AggregationBuilders.stats("stats").field(NUMERIC_FIELD)), + histogram -> { + final List buckets = histogram.getBuckets(); + double deltaError = 1d/10000d; + + // Expected clusters: [ (1, 2), (5), (8,9) ] + + InternalStats stats = histogram.getBuckets().get(0).getAggregations().get("stats"); + assertEquals(1, stats.getMin(), deltaError); + assertEquals(2, stats.getMax(), deltaError); + assertEquals(2, stats.getCount()); + assertTrue(AggregationInspectionHelper.hasValue(stats)); + + stats = histogram.getBuckets().get(1).getAggregations().get("stats"); + assertEquals(5, stats.getMin(), deltaError); + assertEquals(5, stats.getMax(), deltaError); + assertEquals(1, stats.getCount()); + assertTrue(AggregationInspectionHelper.hasValue(stats)); + + stats = histogram.getBuckets().get(2).getAggregations().get("stats"); + assertEquals(8, stats.getMin(), deltaError); + assertEquals(9, stats.getMax(), deltaError); + assertEquals(2, stats.getCount()); + assertTrue(AggregationInspectionHelper.hasValue(stats)); + }); + } + + public void testComplexSubAggregations() throws IOException{ + final List dataset = Arrays.asList(5, 4, 3, 2, 1, 0, 6, 7, 8, 9, 10, 11); + + testSearchCase(DEFAULT_QUERY, dataset, false, + aggregation -> aggregation.field(NUMERIC_FIELD) + .setNumBuckets(3) + .setInitialBuffer(12) + .setShardSize(4) + .subAggregation(new StatsAggregationBuilder("stats").field(NUMERIC_FIELD)), + histogram -> { + final List buckets = histogram.getBuckets(); + double deltaError = 1d / 10000d; + + // Expected clusters: [ (0, 1, 2, 3), (4, 5, 6, 7), (8, 9, 10, 11) ] + + InternalStats stats = histogram.getBuckets().get(0).getAggregations().get("stats"); + assertEquals(0d, stats.getMin(), deltaError); + assertEquals(3L, stats.getMax(), deltaError); + assertEquals(4L, stats.getCount()); + assertTrue(AggregationInspectionHelper.hasValue(stats)); + + stats = histogram.getBuckets().get(1).getAggregations().get("stats"); + assertEquals(4d, stats.getMin(), deltaError); + assertEquals(7d, stats.getMax(), deltaError); + assertEquals(4L, stats.getCount()); + assertTrue(AggregationInspectionHelper.hasValue(stats)); + + stats = histogram.getBuckets().get(2).getAggregations().get("stats"); + assertEquals(8d, stats.getMin(), deltaError); + assertEquals(11d, stats.getMax(), deltaError); + assertEquals(4L, stats.getCount()); + assertTrue(AggregationInspectionHelper.hasValue(stats)); + }); + } + + public void testSubAggregationReduction() throws IOException{ + final List dataset = Arrays.asList(1L, 1L, 1L, 2L, 2L); + + testSearchCase(DEFAULT_QUERY, dataset, false, + aggregation -> aggregation.field(NUMERIC_FIELD) + .setNumBuckets(3) + .setInitialBuffer(12) + .setShardSize(4) + .subAggregation(new TermsAggregationBuilder("terms") + .field(NUMERIC_FIELD) + .shardSize(2) + .size(1)), + histogram -> { + final List buckets = histogram.getBuckets(); + double deltaError = 1d / 10000d; + + // This is a test to make sure that the sub aggregations get reduced + // This terms sub aggregation has shardSize (2) != size (1), so we will get 1 bucket only if + // InternalVariableWidthHistogram reduces the sub aggregations. + + InternalTerms terms = histogram.getBuckets().get(0).getAggregations().get("terms"); + assertEquals(1L, terms.getBuckets().size(), deltaError); + assertEquals(1L, ((InternalTerms.Bucket) terms.getBuckets().get(0)).getKey()); + }); + } + + public void testMultipleSegments() throws IOException{ + final List dataset = Arrays.asList(1001, 1002, 1, 2, 1003, 3, 1004, 1005, 4, 5); + + // There should be two clusters: (1, 2, 3, 4, 5) and (1001, 1002, 1003, 1004, 1005) + // We can't enable multiple segments per index for many of the tests above, because the clusters are too close. + // Slight randomization --> different caches in the aggregator --> different clusters + // However, these two clusters are so far apart that even if a doc from one ends up in the other, + // the centroids will not change much. + // To account for this case of a document switching clusters, we check that each cluster centroid is within + // a certain range, rather than asserting exact values. + + testSearchAndReduceCase(DEFAULT_QUERY, dataset, true, + aggregation -> aggregation.field(NUMERIC_FIELD) + .setNumBuckets(2) + .setInitialBuffer(4) + .setShardSize(3) + .subAggregation(new StatsAggregationBuilder("stats").field(NUMERIC_FIELD)), + histogram -> { + final List buckets = histogram.getBuckets(); + double deltaError = 1d / 10000d; + + assertEquals(2, buckets.size()); + + // The smaller cluster + assertEquals(4 <= buckets.get(0).getDocCount() && buckets.get(0).getDocCount() <= 6, true); + assertEquals(0 <= buckets.get(0).centroid() && buckets.get(0).centroid() <= 200d, true); + assertEquals(1, buckets.get(0).min(), deltaError); + + // The bigger cluster + assertEquals(4 <= buckets.get(1).getDocCount() && buckets.get(1).getDocCount() <= 6, true); + assertEquals(800d <= buckets.get(1).centroid() && buckets.get(1).centroid() <= 1005d, true); + assertEquals(1005, buckets.get(1).max(), deltaError); + }); + + } + + + private void testSearchCase(final Query query, final List dataset, boolean multipleSegments, + final Consumer configure, + final Consumer verify) throws IOException { + executeTestCase(false, query, dataset, multipleSegments, configure, verify); + } + + + private void testSearchAndReduceCase(final Query query, final List dataset, boolean multipleSegments, + final Consumer configure, + final Consumer verify) throws IOException { + executeTestCase(true, query, dataset, multipleSegments, configure, verify); + } + + private void testBothCases(final Query query, final List dataset, boolean multipleSegments, + final Consumer configure, + final Consumer verify) throws IOException { + executeTestCase(true, query, dataset, multipleSegments, configure, verify); + executeTestCase(false, query, dataset, multipleSegments, configure, verify); + } + + @Override + protected IndexSettings createIndexSettings() { + final Settings nodeSettings = Settings.builder() + .put("search.max_buckets", 25000).build(); + return new IndexSettings( + IndexMetadata.builder("_index") + .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0) + .creationDate(System.currentTimeMillis()) + .build(), + nodeSettings + ); + } + + private void executeTestCase(final boolean reduced, final Query query, + final List dataset, boolean multipleSegments, + final Consumer configure, + final Consumer verify) throws IOException { + try (Directory directory = newDirectory()) { + try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) { + indexSampleData(dataset, indexWriter, multipleSegments); + } + + try (IndexReader indexReader = DirectoryReader.open(directory)) { + final IndexSearcher indexSearcher = newSearcher(indexReader, true, true); + + final VariableWidthHistogramAggregationBuilder aggregationBuilder = + new VariableWidthHistogramAggregationBuilder("_name"); + if (configure != null) { + configure.accept(aggregationBuilder); + } + + final MappedFieldType fieldType; + if(dataset.size() == 0 || dataset.get(0) instanceof Double){ + fieldType = new NumberFieldMapper.NumberFieldType(aggregationBuilder.field(), NumberFieldMapper.NumberType.DOUBLE); + } else if(dataset.get(0) instanceof Long){ + fieldType = new NumberFieldMapper.NumberFieldType(aggregationBuilder.field(), NumberFieldMapper.NumberType.LONG); + } else if (dataset.get(0) instanceof Integer){ + fieldType = new NumberFieldMapper.NumberFieldType(aggregationBuilder.field(), NumberFieldMapper.NumberType.INTEGER); + } else { + throw new IOException("Test data has an invalid type"); + } + + + + final InternalVariableWidthHistogram histogram; + if (reduced) { + histogram = searchAndReduce(indexSearcher, query, aggregationBuilder, fieldType); + } else { + histogram = search(indexSearcher, query, aggregationBuilder, fieldType); + } + verify.accept(histogram); + } + } + } + + private void indexSampleData(List dataset, RandomIndexWriter indexWriter, boolean multipleSegments) throws IOException { + if(!multipleSegments) { + // Put all of the documents into one segment + List documents = new ArrayList<>(); + for (final Number doc : dataset) { + final Document document = new Document(); + long fieldVal = convertDocumentToSortableValue(doc); + document.add(new SortedNumericDocValuesField(NUMERIC_FIELD, fieldVal)); + documents.add(document); + } + indexWriter.addDocuments(documents); + } else { + // Create multiple segments in the index + final Document document = new Document(); + for (final Number doc : dataset) { + if (frequently()) { + indexWriter.commit(); + } + + long fieldVal = convertDocumentToSortableValue(doc); + document.add(new SortedNumericDocValuesField(NUMERIC_FIELD, fieldVal)); + indexWriter.addDocument(document); + document.clear(); + } + } + } + + long convertDocumentToSortableValue(Number doc) throws IOException{ + if (doc instanceof Double) { + return NumericUtils.doubleToSortableLong(doc.doubleValue()); + } else if (doc instanceof Integer) { + return doc.intValue(); + } else if (doc instanceof Long) { + return doc.longValue(); + } + throw new IOException("Document has an invalid type"); + } + + +} diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java index f53f40d89ef2a..87b38e80ff9ea 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java @@ -67,6 +67,8 @@ import org.elasticsearch.search.aggregations.bucket.histogram.ParsedAutoDateHistogram; import org.elasticsearch.search.aggregations.bucket.histogram.ParsedDateHistogram; import org.elasticsearch.search.aggregations.bucket.histogram.ParsedHistogram; +import org.elasticsearch.search.aggregations.bucket.histogram.ParsedVariableWidthHistogram; +import org.elasticsearch.search.aggregations.bucket.histogram.VariableWidthHistogramAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.missing.MissingAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.missing.ParsedMissing; import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregationBuilder; @@ -246,6 +248,7 @@ public ReduceContext forFinalReduction() { map.put(HistogramAggregationBuilder.NAME, (p, c) -> ParsedHistogram.fromXContent(p, (String) c)); map.put(DateHistogramAggregationBuilder.NAME, (p, c) -> ParsedDateHistogram.fromXContent(p, (String) c)); map.put(AutoDateHistogramAggregationBuilder.NAME, (p, c) -> ParsedAutoDateHistogram.fromXContent(p, (String) c)); + map.put(VariableWidthHistogramAggregationBuilder.NAME, (p, c) -> ParsedVariableWidthHistogram.fromXContent(p, (String) c)); map.put(StringTerms.NAME, (p, c) -> ParsedStringTerms.fromXContent(p, (String) c)); map.put(LongTerms.NAME, (p, c) -> ParsedLongTerms.fromXContent(p, (String) c)); map.put(DoubleTerms.NAME, (p, c) -> ParsedDoubleTerms.fromXContent(p, (String) c)); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Aggregations.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Aggregations.java index a011931a57a78..9251941ee0b95 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Aggregations.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Aggregations.java @@ -78,7 +78,8 @@ public final class Aggregations { "string_stats", // https://github.com/elastic/elasticsearch/issues/51925 "top_hits", "top_metrics", // https://github.com/elastic/elasticsearch/issues/52236 - "t_test" // https://github.com/elastic/elasticsearch/issues/54503 + "t_test", // https://github.com/elastic/elasticsearch/issues/54503, + "variable_width_histogram" // https://github.com/elastic/elasticsearch/issues/58140 ); private Aggregations() {}