Skip to content

Commit

Permalink
Add Variable Width Histogram Aggregation (#42035)
Browse files Browse the repository at this point in the history
Implements a new histogram aggregation called `variable_width_histogram` which
dynamically determines bucket intervals based on document groupings. These
groups are determined by running a one-pass clustering algorithm on each shard
and then reducing each shard's clusters using an agglomerative
clustering algorithm.

This PR addresses #9572.

The shard-level clustering is done in one pass to minimize memory overhead. The
algorithm was lightly inspired by
[this paper](https://ieeexplore.ieee.org/abstract/document/1198387). It fetches
a small number of documents to sample the data and determine initial clusters.
Subsequent documents are then placed into one of these clusters, or a new one
if they are an outlier. This algorithm is described in more details in the
aggregation's docs.

At reduce time, a
[hierarchical agglomerative clustering](https://en.wikipedia.org/wiki/Hierarchical_clustering)
algorithm inspired by [this paper](https://arxiv.org/abs/1802.00304)
continually merges the closest buckets from all shards (based on their
centroids) until the target number of buckets is reached.

The final values produced by this aggregation are approximate. Each bucket's
min value is used as its key in the histogram. Furthermore, buckets are merged
based on their centroids and not their bounds. So it is possible that adjacent
buckets will overlap after reduction. Because each bucket's key is its min,
this overlap is not shown in the final histogram. However, when such overlap
occurs, we set the key of the bucket with the larger centroid to the midpoint
between its minimum and the smaller bucket’s maximum:
`min[large] = (min[large] + max[small]) / 2`. This heuristic is expected to
increases the accuracy of the clustering.

Nodes are unable to share centroids during the shard-level clustering phase. In
the future, resolving #50863
would let us solve this issue. 

It doesn’t make sense for this aggregation to support the `min_doc_count`
parameter, since clusters are determined dynamically. The `order` parameter is
not supported here to keep this large PR from becoming too complex.
  • Loading branch information
jamesdorfman authored Jun 23, 2020
1 parent 48f4a8d commit e99d287
Show file tree
Hide file tree
Showing 21 changed files with 3,115 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@
import org.elasticsearch.search.aggregations.bucket.histogram.ParsedAutoDateHistogram;
import org.elasticsearch.search.aggregations.bucket.histogram.ParsedDateHistogram;
import org.elasticsearch.search.aggregations.bucket.histogram.ParsedHistogram;
import org.elasticsearch.search.aggregations.bucket.histogram.ParsedVariableWidthHistogram;
import org.elasticsearch.search.aggregations.bucket.histogram.VariableWidthHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.missing.MissingAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.missing.ParsedMissing;
import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregationBuilder;
Expand Down Expand Up @@ -1929,6 +1931,8 @@ static List<NamedXContentRegistry.Entry> getDefaultNamedXContents() {
map.put(HistogramAggregationBuilder.NAME, (p, c) -> ParsedHistogram.fromXContent(p, (String) c));
map.put(DateHistogramAggregationBuilder.NAME, (p, c) -> ParsedDateHistogram.fromXContent(p, (String) c));
map.put(AutoDateHistogramAggregationBuilder.NAME, (p, c) -> ParsedAutoDateHistogram.fromXContent(p, (String) c));
map.put(VariableWidthHistogramAggregationBuilder.NAME,
(p, c) -> ParsedVariableWidthHistogram.fromXContent(p, (String) c));
map.put(StringTerms.NAME, (p, c) -> ParsedStringTerms.fromXContent(p, (String) c));
map.put(LongTerms.NAME, (p, c) -> ParsedLongTerms.fromXContent(p, (String) c));
map.put(DoubleTerms.NAME, (p, c) -> ParsedDoubleTerms.fromXContent(p, (String) c));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
[[search-aggregations-bucket-variablewidthhistogram-aggregation]]
=== Variable Width Histogram Aggregation

This is a multi-bucket aggregation similar to <<search-aggregations-bucket-histogram-aggregation>>.
However, the width of each bucket is not specified. Rather, a target number of buckets is provided and bucket intervals
are dynamically determined based on the document distribution. This is done using a simple one-pass document clustering algorithm
that aims to obtain low distances between bucket centroids. Unlike other multi-bucket aggregations, the intervals will not
necessarily have a uniform width.

TIP: The number of buckets returned will always be less than or equal to the target number.

Requesting a target of 2 buckets.

[source,console]
--------------------------------------------------
POST /sales/_search?size=0
{
"aggs" : {
"prices" : {
"variable_width_histogram" : {
"field" : "price",
"buckets" : 2
}
}
}
}
--------------------------------------------------
// TEST[setup:sales]

Response:

[source,console-result]
--------------------------------------------------
{
...
"aggregations": {
"prices" : {
"buckets": [
{
"min": 10.0,
"key": 30.0,
"max": 50.0,
"doc_count": 2
},
{
"min": 150.0,
"key": 185.0,
"max": 200.0,
"doc_count": 5
}
]
}
}
}
--------------------------------------------------
// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/]

==== Clustering Algorithm
Each shard fetches the first `initial_buffer` documents and stores them in memory. Once the buffer is full, these documents
are sorted and linearly separated into `3/4 * shard_size buckets`.
Next each remaining documents is either collected into the nearest bucket, or placed into a new bucket if it is distant
from all the existing ones. At most `shard_size` total buckets are created.

In the reduce step, the coordinating node sorts the buckets from all shards by their centroids. Then, the two buckets
with the nearest centroids are repeatedly merged until the target number of buckets is achieved.
This merging procedure is a form of https://en.wikipedia.org/wiki/Hierarchical_clustering[agglomerative hierarchical clustering].

TIP: A shard can return fewer than `shard_size` buckets, but it cannot return more.

==== Shard size
The `shard_size` parameter specifies the number of buckets that the coordinating node will request from each shard.
A higher `shard_size` leads each shard to produce smaller buckets. This reduce the likelihood of buckets overlapping
after the reduction step. Increasing the `shard_size` will improve the accuracy of the histogram, but it will
also make it more expensive to compute the final result because bigger priority queues will have to be managed on a
shard level, and the data transfers between the nodes and the client will be larger.

TIP: Parameters `buckets`, `shard_size`, and `initial_buffer` are optional. By default, `buckets = 10`, `shard_size = 500` and `initial_buffer = min(50 * shard_size, 50000)`.

==== Initial Buffer
The `initial_buffer` parameter can be used to specify the number of individual documents that will be stored in memory
on a shard before the initial bucketing algorithm is run. Bucket distribution is determined using this sample
of `initial_buffer` documents. So, although a higher `initial_buffer` will use more memory, it will lead to more representative
clusters.

==== Bucket bounds are approximate
During the reduce step, the master node continuously merges the two buckets with the nearest centroids. If two buckets have
overlapping bounds but distant centroids, then it is possible that they will not be merged. Because of this, after
reduction the maximum value in some interval (`max`) might be greater than the minimum value in the subsequent
bucket (`min`). To reduce the impact of this error, when such an overlap occurs the bound between these intervals is adjusted to be `(max + min) / 2`.

TIP: Bucket bounds are very sensitive to outliers
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
setup:
- do:
indices.create:
index: test
body:
settings:
number_of_replicas: 0
mappings:
properties:
number:
type: integer
- do:
bulk:
refresh: true
index: test
body:
- '{"index": {}}'
- '{"number": -3}'
- '{"index": {}}'
- '{"number": -2}'
- '{"index": {}}'
- '{"number": 1}'
- '{"index": {}}'
- '{"number": 4}'
- '{"index": {}}'
- '{"number": 5}'

---
"basic":
- skip:
version: " - 7.99.99"
reason: added in 8.0.0 (to be backported to 7.9.0)
- do:
search:
body:
size: 0
aggs:
histo:
variable_width_histogram:
field: number
buckets: 3
- match: { hits.total.value: 5 }
- length: { aggregations.histo.buckets: 3 }
- match: { aggregations.histo.buckets.0.key: -2.5 }
- match: { aggregations.histo.buckets.0.doc_count: 2 }
- match: { aggregations.histo.buckets.1.key: 1.0 }
- match: { aggregations.histo.buckets.1.doc_count: 1 }
- match: { aggregations.histo.buckets.2.key: 4.5 }
- match: { aggregations.histo.buckets.2.doc_count: 2 }

30 changes: 30 additions & 0 deletions server/src/main/java/org/elasticsearch/common/util/BigArrays.java
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,35 @@ public DoubleArray grow(DoubleArray array, long minSize) {
return resize(array, newSize);
}

public static class DoubleBinarySearcher extends BinarySearcher{

DoubleArray array;
double searchFor;

public DoubleBinarySearcher(DoubleArray array){
this.array = array;
this.searchFor = Integer.MIN_VALUE;
}

@Override
protected int compare(int index) {
// Prevent use of BinarySearcher.search() and force the use of DoubleBinarySearcher.search()
assert this.searchFor != Integer.MIN_VALUE;

return Double.compare(array.get(index), searchFor);
}

@Override
protected double distance(int index) {
return Math.abs(array.get(index) - searchFor);
}

public int search(int from, int to, double searchFor) {
this.searchFor = searchFor;
return super.search(from, to);
}
}

/**
* Allocate a new {@link FloatArray}.
* @param size the initial length of the array
Expand Down Expand Up @@ -782,3 +811,4 @@ public <T> ObjectArray<T> grow(ObjectArray<T> array, long minSize) {
return resize(array, newSize);
}
}

117 changes: 117 additions & 0 deletions server/src/main/java/org/elasticsearch/common/util/BinarySearcher.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.common.util;

/**
* Performs binary search on an arbitrary data structure.
*
* To do a search, create a subclass and implement custom {@link #compare(int)} and {@link #distance(int)} methods.
*
* {@link BinarySearcher} knows nothing about the value being searched for or the underlying data structure.
* These things should be determined by the subclass in its overridden methods.
*
* Refer to {@link BigArrays.DoubleBinarySearcher} for an example.
*
* NOTE: this class is not thread safe
*/
public abstract class BinarySearcher{

/**
* @return a negative integer, zero, or a positive integer if the array's value at <code>index</code> is less than,
* equal to, or greater than the value being searched for.
*/
protected abstract int compare(int index);

/**
* @return the magnitude of the distance between the element at <code>index</code> and the value being searched for.
* It will usually be <code>Math.abs(array[index] - searchValue)</code>.
*/
protected abstract double distance(int index);

/**
* @return the index who's underlying value is closest to the value being searched for.
*/
private int getClosestIndex(int index1, int index2){
if(distance(index1) < distance(index2)){
return index1;
} else {
return index2;
}
}

/**
* Uses a binary search to determine the index of the element within the index range {from, ... , to} that is
* closest to the search value.
*
* Unlike most binary search implementations, the value being searched for is not an argument to search method.
* Rather, this value should be stored by the subclass along with the underlying array.
*
* @return the index of the closest element.
*
* Requires: The underlying array should be sorted.
**/
public int search(int from, int to){
while(from < to){
int mid = (from + to) >>> 1;
int compareResult = compare(mid);

if(compareResult == 0){
// arr[mid] == value
return mid;
} else if(compareResult < 0){
// arr[mid] < val

if(mid < to) {
// Check if val is between (mid, mid + 1) before setting left = mid + 1
// (mid < to) ensures that mid + 1 is not out of bounds
int compareValAfterMid = compare(mid + 1);
if (compareValAfterMid > 0) {
return getClosestIndex(mid, mid + 1);
}
} else if(mid == to){
// val > arr[mid] and there are no more elements above mid, so mid is the closest
return mid;
}

from = mid + 1;
} else{
// arr[mid] > val

if(mid > from) {
// Check if val is between (mid - 1, mid)
// (mid > from) ensures that mid - 1 is not out of bounds
int compareValBeforeMid = compare(mid - 1);
if (compareValBeforeMid < 0) {
// val is between indices (mid - 1), mid
return getClosestIndex(mid, mid - 1);
}
} else if(mid == 0){
// val < arr[mid] and there are no more candidates below mid, so mid is the closest
return mid;
}

to = mid - 1;
}
}

return from;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,11 @@
import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.global.InternalGlobal;
import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.VariableWidthHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalAutoDateHistogram;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalVariableWidthHistogram;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram;
import org.elasticsearch.search.aggregations.bucket.missing.InternalMissing;
Expand Down Expand Up @@ -432,6 +434,11 @@ private ValuesSourceRegistry registerAggregations(List<SearchPlugin> plugins) {
AutoDateHistogramAggregationBuilder.PARSER)
.addResultReader(InternalAutoDateHistogram::new)
.setAggregatorRegistrar(AutoDateHistogramAggregationBuilder::registerAggregators), builder);
registerAggregation(new AggregationSpec(VariableWidthHistogramAggregationBuilder.NAME,
VariableWidthHistogramAggregationBuilder::new,
VariableWidthHistogramAggregationBuilder.PARSER)
.addResultReader(InternalVariableWidthHistogram::new)
.setAggregatorRegistrar(VariableWidthHistogramAggregationBuilder::registerAggregators), builder);
registerAggregation(new AggregationSpec(GeoDistanceAggregationBuilder.NAME, GeoDistanceAggregationBuilder::new,
GeoDistanceAggregationBuilder::parse)
.addResultReader(InternalGeoDistance::new)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,9 @@ final class CommonFields extends ParseField.CommonFields {
public static final ParseField FROM_AS_STRING = new ParseField("from_as_string");
public static final ParseField TO = new ParseField("to");
public static final ParseField TO_AS_STRING = new ParseField("to_as_string");
public static final ParseField MIN = new ParseField("min");
public static final ParseField MIN_AS_STRING = new ParseField("min_as_string");
public static final ParseField MAX = new ParseField("max");
public static final ParseField MAX_AS_STRING = new ParseField("max_as_string");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ public final void collectExistingBucket(LeafBucketCollector subCollector, int do
subCollector.collect(doc, bucketOrd);
}

/**
* This only tidies up doc counts. Call {@link MergingBucketsDeferringCollector#mergeBuckets(long[])} to merge the actual
* ordinals and doc ID deltas.
*
* Refer to that method for documentation about the merge map.
*/
public final void mergeBuckets(long[] mergeMap, long newNumBuckets) {
try (IntArray oldDocCounts = docCounts) {
docCounts = bigArrays.newIntArray(newNumBuckets, true);
Expand Down
Loading

0 comments on commit e99d287

Please sign in to comment.