Skip to content

Commit

Permalink
Add topk processor plugin (influxdata#4096)
Browse files Browse the repository at this point in the history
  • Loading branch information
mirath authored and arkady-emelyanov committed May 18, 2018
1 parent a2c05d2 commit aadef7d
Show file tree
Hide file tree
Showing 5 changed files with 1,202 additions and 0 deletions.
1 change: 1 addition & 0 deletions plugins/processors/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ package all
import (
_ "github.com/influxdata/telegraf/plugins/processors/override"
_ "github.com/influxdata/telegraf/plugins/processors/printer"
_ "github.com/influxdata/telegraf/plugins/processors/topk"
)
74 changes: 74 additions & 0 deletions plugins/processors/topk/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# TopK Processor Plugin

The TopK processor plugin is a filter designed to get the top series over a period of time. It can be tweaked to do its top k computation over a period of time, so spikes can be smoothed out.

This processor goes through these steps when processing a batch of metrics:

1. Groups metrics in buckets using their tags and name as key
2. Aggregates each of the selected fields for each bucket by the selected aggregation function (sum, mean, etc)
3. Orders the buckets by one of the generated aggregations, returns all metrics in the top `K` buckets, then reorders the buckets by the next of the generated aggregations, returns all metrics in the top `K` buckets, etc, etc, etc, until it runs out of fields.

The plugin makes sure not to duplicate metrics

Note that depending on the amount of metrics on each computed bucket, more than `K` metrics may be returned

### Configuration:

```toml
[[processors.topk]]
## How many seconds between aggregations
# period = 10

## How many top metrics to return
# k = 10

## Over which tags should the aggregation be done. Globs can be specified, in
## which case any tag matching the glob will aggregated over. If set to an
## empty list is no aggregation over tags is done
# group_by = ['*']

## Over which fields are the top k are calculated
# fields = ["value"]

## What aggregation to use. Options: sum, mean, min, max
# aggregation = "mean"

## Instead of the top k largest metrics, return the bottom k lowest metrics
# bottomk = false

## The plugin assigns each metric a GroupBy tag generated from its name and
## tags. If this setting is different than "" the plugin will add a
## tag (which name will be the value of this setting) to each metric with
## the value of the calculated GroupBy tag. Useful for debugging
# add_groupby_tag = ""

## These settings provide a way to know the position of each metric in
## the top k. The 'add_rank_field' setting allows to specify for which
## fields the position is required. If the list is non empty, then a field
## will be added to each and every metric for each string present in this
## setting. This field will contain the ranking of the group that
## the metric belonged to when aggregated over that field.
## The name of the field will be set to the name of the aggregation field,
## suffixed with the string '_topk_rank'
# add_rank_fields = []

## These settings provide a way to know what values the plugin is generating
## when aggregating metrics. The 'add_agregate_field' setting allows to
## specify for which fields the final aggregation value is required. If the
## list is non empty, then a field will be added to each every metric for
## each field present in this setting. This field will contain
## the computed aggregation for the group that the metric belonged to when
## aggregated over that field.
## The name of the field will be set to the name of the aggregation field,
## suffixed with the string '_topk_aggregate'
# add_aggregate_fields = []
```

### Tags:

This processor does not add tags by default. But the setting `add_groupby_tag` will add a tag if set to anything other than ""


### Fields:

This processor does not add fields by default. But the settings `add_rank_fields` and `add_aggregation_fields` will add one or several fields if set to anything other than ""
163 changes: 163 additions & 0 deletions plugins/processors/topk/test_sets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package topk

import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"time"
)

///// Test set 1 /////
var metric11, _ = metric.New(
"m1",
map[string]string{"tag_name": "tag_value1"},
map[string]interface{}{
"a": float64(15.3),
"b": float64(40),
},
time.Now(),
)

var metric12, _ = metric.New(
"m1",
map[string]string{"tag_name": "tag_value1"},
map[string]interface{}{
"a": float64(50),
},
time.Now(),
)

var metric13, _ = metric.New(
"m1",
map[string]string{"tag_name": "tag_value1"},
map[string]interface{}{
"a": float64(0.3),
"c": float64(400),
},
time.Now(),
)

var metric14, _ = metric.New(
"m1",
map[string]string{"tag_name": "tag_value1"},
map[string]interface{}{
"a": float64(24.12),
"b": float64(40),
},
time.Now(),
)

var metric15, _ = metric.New(
"m1",
map[string]string{"tag_name": "tag_value1"},
map[string]interface{}{
"a": float64(50.5),
"h": float64(1),
"u": float64(2.4),
},
time.Now(),
)

var MetricsSet1 = []telegraf.Metric{metric11, metric12, metric13, metric14, metric15}

///// Test set 2 /////
var metric21, _ = metric.New(
"metric1",
map[string]string{
"id": "1",
"tag1": "ONE",
"tag2": "FIVE",
"tag3": "SIX",
"tag4": "EIGHT",
},
map[string]interface{}{
"value": float64(31.31),
"A": float64(95.36),
"C": float64(72.41),
},
time.Now(),
)

var metric22, _ = metric.New(
"metric1",
map[string]string{
"id": "2",
"tag1": "TWO",
"tag2": "FOUR",
"tag3": "THREE",
"tag4": "EIGHT",
},
map[string]interface{}{
"value": float64(59.43),
"A": float64(0.6),
},
time.Now(),
)

var metric23, _ = metric.New(
"metric1",
map[string]string{
"id": "3",
"tag1": "TWO",
"tag2": "FOUR",
"tag3": "SIX",
"tag5": "TEN",
},
map[string]interface{}{
"value": float64(74.18),
"A": float64(77.42),
"B": float64(60.96),
},
time.Now(),
)

var metric24, _ = metric.New(
"metric2",
map[string]string{
"id": "4",
"tag1": "ONE",
"tag2": "FIVE",
"tag3": "THREE",
},
map[string]interface{}{
"value": float64(72),
"B": float64(22.1),
"C": float64(30.8),
},
time.Now(),
)

var metric25, _ = metric.New(
"metric2",
map[string]string{
"id": "5",
"tag1": "TWO",
"tag2": "FOUR",
"tag3": "SEVEN",
"tag4": "NINE",
},
map[string]interface{}{
"value": float64(87.92),
"B": float64(81.55),
"C": float64(45.1),
},
time.Now(),
)

var metric26, _ = metric.New(
"metric2",
map[string]string{
"id": "6",
"tag1": "TWO",
"tag2": "FIVE",
"tag3": "SEVEN",
"tag4": "NINE",
},
map[string]interface{}{
"value": float64(75.3),
"A": float64(29.45),
"C": float64(4.86),
},
time.Now(),
)

var MetricsSet2 = []telegraf.Metric{metric21, metric22, metric23, metric24, metric25, metric26}
Loading

0 comments on commit aadef7d

Please sign in to comment.