Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi-pass aggregation support #50863

Open
polyfractal opened this issue Jan 10, 2020 · 9 comments
Open

Multi-pass aggregation support #50863

polyfractal opened this issue Jan 10, 2020 · 9 comments
Labels
:Analytics/Aggregations Aggregations >feature Meta Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo)

Comments

@polyfractal
Copy link
Contributor

polyfractal commented Jan 10, 2020

Aggregations perform a single pass over the data today, reducing the shard-results at the coordinating node and then applying pipeline aggregations on the reduced results. This is ideal for aggregation latency and scalability, but does limit the types of algorithms we can implement.

We would like to start investigating extending the agg framework to multiple passes over the data. In particular, we want to be able to run multiple map-reduce cycles, not just multiple shard-local passes. This is needed when global information (e.g. a global mean) is required for the second pass over the data.

Multi-pass should unblock a number of interesting aggs and enhancements:

Technicals and Open Questions

  • We should probably reuse the existing SearchPhase mechanisms in place. This will limit the amount of new code that we need to write, and should (hopefully) play nicer with mechanisms like CCS
    • One approach is adding a new phase that is executed after AggregationPhase, which can recursively keep calling itself for the next phase to perform multiple passes
    • Alternatively, we could implement a new phase after AggregationPhase which deals with the n+1 passes
  • Should we limit to two passes? n passes, but limited to low number?
  • How do we pass back state data to the nth passes? Global map of state which all aggs can fetch from (based on known ordinal or something?) Pass down specific state to relevant sub-agg?
  • Are multi-pass aggs a new type of agg that encompasses logic for each stage? Or is a multi-pass agg a regular agg for first pass, and a new type of agg for second/etc passes?
  • Do multi-pass agg share the same syntax as existing aggs, in the same tree?

There are some implications to multi-pass and async search which need resolving. Perhaps multi-pass is implemented on a per-"feature" basis (e.g. a dedicated "cluster endpoint" that does kmeans clustering, instead of trying to modify the agg framework more generically)

Probably a lot more points to consider, just wanted to get an initial braindump down :)

@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-analytics-geo (:Analytics/Aggregations)

@nik9000 nik9000 self-assigned this Jan 10, 2020
@rjernst rjernst added the Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) label May 4, 2020
nik9000 pushed a commit that referenced this issue Jun 23, 2020
Implements a new histogram aggregation called `variable_width_histogram` which
dynamically determines bucket intervals based on document groupings. These
groups are determined by running a one-pass clustering algorithm on each shard
and then reducing each shard's clusters using an agglomerative
clustering algorithm.

This PR addresses #9572.

The shard-level clustering is done in one pass to minimize memory overhead. The
algorithm was lightly inspired by
[this paper](https://ieeexplore.ieee.org/abstract/document/1198387). It fetches
a small number of documents to sample the data and determine initial clusters.
Subsequent documents are then placed into one of these clusters, or a new one
if they are an outlier. This algorithm is described in more details in the
aggregation's docs.

At reduce time, a
[hierarchical agglomerative clustering](https://en.wikipedia.org/wiki/Hierarchical_clustering)
algorithm inspired by [this paper](https://arxiv.org/abs/1802.00304)
continually merges the closest buckets from all shards (based on their
centroids) until the target number of buckets is reached.

The final values produced by this aggregation are approximate. Each bucket's
min value is used as its key in the histogram. Furthermore, buckets are merged
based on their centroids and not their bounds. So it is possible that adjacent
buckets will overlap after reduction. Because each bucket's key is its min,
this overlap is not shown in the final histogram. However, when such overlap
occurs, we set the key of the bucket with the larger centroid to the midpoint
between its minimum and the smaller bucket’s maximum:
`min[large] = (min[large] + max[small]) / 2`. This heuristic is expected to
increases the accuracy of the clustering.

Nodes are unable to share centroids during the shard-level clustering phase. In
the future, resolving #50863
would let us solve this issue. 

It doesn’t make sense for this aggregation to support the `min_doc_count`
parameter, since clusters are determined dynamically. The `order` parameter is
not supported here to keep this large PR from becoming too complex.
nik9000 pushed a commit to nik9000/elasticsearch that referenced this issue Jun 23, 2020
Implements a new histogram aggregation called `variable_width_histogram` which
dynamically determines bucket intervals based on document groupings. These
groups are determined by running a one-pass clustering algorithm on each shard
and then reducing each shard's clusters using an agglomerative
clustering algorithm.

This PR addresses elastic#9572.

The shard-level clustering is done in one pass to minimize memory overhead. The
algorithm was lightly inspired by
[this paper](https://ieeexplore.ieee.org/abstract/document/1198387). It fetches
a small number of documents to sample the data and determine initial clusters.
Subsequent documents are then placed into one of these clusters, or a new one
if they are an outlier. This algorithm is described in more details in the
aggregation's docs.

At reduce time, a
[hierarchical agglomerative clustering](https://en.wikipedia.org/wiki/Hierarchical_clustering)
algorithm inspired by [this paper](https://arxiv.org/abs/1802.00304)
continually merges the closest buckets from all shards (based on their
centroids) until the target number of buckets is reached.

The final values produced by this aggregation are approximate. Each bucket's
min value is used as its key in the histogram. Furthermore, buckets are merged
based on their centroids and not their bounds. So it is possible that adjacent
buckets will overlap after reduction. Because each bucket's key is its min,
this overlap is not shown in the final histogram. However, when such overlap
occurs, we set the key of the bucket with the larger centroid to the midpoint
between its minimum and the smaller bucket’s maximum:
`min[large] = (min[large] + max[small]) / 2`. This heuristic is expected to
increases the accuracy of the clustering.

Nodes are unable to share centroids during the shard-level clustering phase. In
the future, resolving elastic#50863
would let us solve this issue.

It doesn’t make sense for this aggregation to support the `min_doc_count`
parameter, since clusters are determined dynamically. The `order` parameter is
not supported here to keep this large PR from becoming too complex.
@polyfractal
Copy link
Contributor Author

Did some thinking on this last week. The technically easiest route is a dedicated endpoint-per-functionality (_analytics/cluster, etc), since we can build whatever needs to be built without regards to other aggs. This is also the least useful to users, since they'll have to construct "alternate universe" aggs with a different API.

The best user situation is a unified DSL where aggs "just work" regardless of how many passes are needed. This is difficult technically because all existing aggs would need to learn how to deal with intermediate state, interact with multiple passes, etc.

As a middle ground, I'd like to suggest the following:

  • MP aggs share the same DSL, and are specified in the "aggs" portion of a request as usual
  • MP aggs must be top-level
  • MP aggs may not have non-MP sub-aggs. E.g. they do not "interact" with regular aggs at all, cannot be used with pipelines, etc
    • For a first iteration, we might decide that MP aggs can't even have MP sub-aggs, since the intermediate state could explode in size/complexity
  • MP aggs derive from a new set of base classes that understand how to (de)-serialize intermediate state and perform a second pass over the data
  • MP agg and regular agg Internal results are mergeable. Since they do not interact, and do not overlap, we should be able to cleanly merge the results of both agg types during a final reduction. This also means that async search can largely "just work" because we can just continue to wait on pending MP shard results and then merge them in

From a technical point of view, the first pass over the data could work as normal. When the coordinator receives intermediate results, it would need to identify all MP shard results, merge them together, and then execute the secondary phase returning back to data nodes. How this is handled on the data nodes is unclear, maybe a new phase? Or we treat it like a regular agg phase and with a different set of aggregators?

nik9000 added a commit that referenced this issue Jun 25, 2020
Implements a new histogram aggregation called `variable_width_histogram` which
dynamically determines bucket intervals based on document groupings. These
groups are determined by running a one-pass clustering algorithm on each shard
and then reducing each shard's clusters using an agglomerative
clustering algorithm.

This PR addresses #9572.

The shard-level clustering is done in one pass to minimize memory overhead. The
algorithm was lightly inspired by
[this paper](https://ieeexplore.ieee.org/abstract/document/1198387). It fetches
a small number of documents to sample the data and determine initial clusters.
Subsequent documents are then placed into one of these clusters, or a new one
if they are an outlier. This algorithm is described in more details in the
aggregation's docs.

At reduce time, a
[hierarchical agglomerative clustering](https://en.wikipedia.org/wiki/Hierarchical_clustering)
algorithm inspired by [this paper](https://arxiv.org/abs/1802.00304)
continually merges the closest buckets from all shards (based on their
centroids) until the target number of buckets is reached.

The final values produced by this aggregation are approximate. Each bucket's
min value is used as its key in the histogram. Furthermore, buckets are merged
based on their centroids and not their bounds. So it is possible that adjacent
buckets will overlap after reduction. Because each bucket's key is its min,
this overlap is not shown in the final histogram. However, when such overlap
occurs, we set the key of the bucket with the larger centroid to the midpoint
between its minimum and the smaller bucket’s maximum:
`min[large] = (min[large] + max[small]) / 2`. This heuristic is expected to
increases the accuracy of the clustering.

Nodes are unable to share centroids during the shard-level clustering phase. In
the future, resolving #50863
would let us solve this issue.

It doesn’t make sense for this aggregation to support the `min_doc_count`
parameter, since clusters are determined dynamically. The `order` parameter is
not supported here to keep this large PR from becoming too complex.

Co-authored-by: James Dorfman <jamesdorfman@users.noreply.github.com>
@rayafratkina
Copy link

Kibana applications have a bunch of dependencies on this work. Is there an update on when this is planned?

cc @timductive @VijayDoshi

@nik9000
Copy link
Member

nik9000 commented Mar 18, 2021

We have no plans to work on it any time soon and the last time a few of us talked about this together we were not aware anyone was really looking forward to this work in anything but the most abstract, pie in the sky sort of way.

I think it's worth talking with @giladgal about what you are waiting on. This is a huge project and none of us have time for it any time soon and we don't have a concrete "first thing" folks are waiting on.

@VijayDoshi
Copy link

@sajjadwahmed we believe there are quite a few dependencies on this for Solutions to adopt Lens/Datagrid more fully. Let's discuss priority/scope with the relevant engineering leadership in the next Kibana/ES sync. It will likely make sense to bring a few folks from the team to the meeting. @rayafratkina @mfinkle can you help get the appropriate people to the next sync and put it on the agenda?

@giladgal
Copy link
Contributor

To clarify, it is on the analytics team's roadmap. It is on hold due to lack of resources and prioritization.

@wylieconlon
Copy link

Kibana already has a few multi-step queries, and we are trying to understand whether these would be implemented as part of a generic multi-pass aggregation API, or as separate features that get exposed publicly. Here is a collection of the high-level ideas that I've seen most often:

Kibana already uses:

  1. Other bucket for Terms

This is enabled by default in Lens, so it's frequently used. Kibana abstracts this by sending multiple requests to ES.
Each Other bucket is a set of filters that exclude the Top N values from its parent bucket.
The Other bucket can have nested buckets and metrics.

The simplest API here is exactly what the Filters aggregation already supports, an other_bucket_key:
https://www.elastic.co/guide/en/elasticsearch/reference/7.12/search-aggregations-bucket-filters-aggregation.html#other-bucket

  1. Using the min/max values of a field to determine the interval

When the user builds a number histogram, Kibana fetches the min/max values of the field and then sets the histogram
interval based on the result. This lets users visualize their data without needing to know the distribution first.
This can be expressed programmatically, like in this SQL query:

SELECT COUNT(), HISTOGRAM(
  bytes,
  (SELECT MAX(bytes) - MIN(bytes) FROM index WHERE ...) / 100
) from index

On the roadmap

  1. Time offsets

Time offsets can be thought of as a reference time period + a followup query for each individual time period. If the query involves a date histogram, this could produce hundreds or thousands of separate filters. It's definitely possible to build this inside Kibana, but we are worried about the performance implications.

We have already discussed this use case and made a proposal here: https://gist.github.com/flash1293/f56fb1fa437ce97abd3d0ce3c546d480

Long-term ideas

  1. Visualizing only documents above the 95th percentile (or similar threshold)

It takes 2 passes to figure out where the 95th percentile is, and then make a new query
based on the results of the first. It's not the same as significant terms because there is no
"foreground" set.

  1. The field names are not known ahead of time

This is something I have seen users request in the forums, and the use case is usually that they are trying to find
outliers in their data. There are two passes involved:

  • Phase 1: Collect the values from your field keys that contains other field names
  • Phase 2: Construct a query for each key to calculate your metric

Probably out of scope:

  1. Most uses of transforms

I originally thought that transforms would overlap with multi-pass aggs, but I now think that they are
very separate. Transforms are useful when looking at both low- and high-cardinality datasets that need
to be pivoted for better analytics, while multi-pass aggs might only be useful on low-cardinality queries
like the ones I've described above.

cc @flash1293 @ppisljar @ghudgins

@flash1293
Copy link
Contributor

Not sure whether related and this might be a bit more vague/longer-term, but depending on how it's exposed it might allow us to do sub-selects in SQL as well which would make the SQL interface much more powerful.

@ghudgins
Copy link

use case: construct a query with a filter defined by the top_hits output of a separate query

example syntax is a kibana formula: sum(apiCountAny, kql='buildId : last_value(buildId)'

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Analytics/Aggregations Aggregations >feature Meta Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo)
Projects
None yet
Development

No branches or pull requests

10 participants