From 5bf27b06a48bddeb3d441aebe124505dca14d1cc Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Fri, 17 Apr 2020 12:03:54 +0200 Subject: [PATCH] Fix prometheus histogram rate overflows (#17753) Fix some overflows on Prometheus histogram rate calculations. They could be caused by: * New buckets added to existing histograms on runtime, this happens at least with CockroachDB (see #17736). * Buckets with bigger upper limits have lower counters. This is wrong and has been only reproduced this on tests, but handling it just in case to avoid losing other data if this happens with some service. Rate calculation methods return now also a boolean to be able to differenciate if a zero value is caused because it was the first call, or because it the rate is actually zero. (cherry picked from commit 0afffa8e1e267c745ea949fd2dfeb70bfc5f436f) --- CHANGELOG.next.asciidoc | 2 + .../module/prometheus/collector/counter.go | 32 +- .../prometheus/collector/counter_test.go | 4 +- .../module/prometheus/collector/data.go | 4 +- .../module/prometheus/collector/histogram.go | 25 +- .../prometheus/collector/histogram_test.go | 406 ++++++++++++++++++ 6 files changed, 450 insertions(+), 23 deletions(-) create mode 100644 x-pack/metricbeat/module/prometheus/collector/histogram_test.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 6d6d8f2a534..7e73d867dfa 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -201,6 +201,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix "ID" event generator of Google Cloud module {issue}17160[17160] {pull}17608[17608] - Add privileged option for Auditbeat in Openshift {pull}17637[17637] - Fix storage metricset to allow config without region/zone. {issue}17623[17623] {pull}17624[17624] +- Add a switch to the driver definition on SQL module to use pretty names. {pull}17378[17378] +- Fix overflow on Prometheus rates when new buckets are added on the go. {pull}17753[17753] *Packetbeat* diff --git a/x-pack/metricbeat/module/prometheus/collector/counter.go b/x-pack/metricbeat/module/prometheus/collector/counter.go index f39e6de1763..6f0f72d80eb 100644 --- a/x-pack/metricbeat/module/prometheus/collector/counter.go +++ b/x-pack/metricbeat/module/prometheus/collector/counter.go @@ -22,12 +22,14 @@ type CounterCache interface { Stop() // RateUint64 returns, for a given counter name, the difference between the given value - // and the value that was given in a previous call. It will return 0 on the first call - RateUint64(counterName string, value uint64) uint64 + // and the value that was given in a previous call, and true if a previous value existed. + // It will return 0 and false on the first call. + RateUint64(counterName string, value uint64) (uint64, bool) // RateFloat64 returns, for a given counter name, the difference between the given value - // and the value that was given in a previous call. It will return 0.0 on the first call - RateFloat64(counterName string, value float64) float64 + // and the value that was given in a previous call, and true if a previous value existed. + // It will return 0 and false on the first call. + RateFloat64(counterName string, value float64) (float64, bool) } type counterCache struct { @@ -47,35 +49,37 @@ func NewCounterCache(timeout time.Duration) CounterCache { } // RateUint64 returns, for a given counter name, the difference between the given value -// and the value that was given in a previous call. It will return 0 on the first call -func (c *counterCache) RateUint64(counterName string, value uint64) uint64 { +// and the value that was given in a previous call, and true if a previous value existed. +// It will return 0 and false on the first call. +func (c *counterCache) RateUint64(counterName string, value uint64) (uint64, bool) { prev := c.ints.PutWithTimeout(counterName, value, c.timeout) if prev != nil { if prev.(uint64) > value { // counter reset - return 0 + return 0, true } - return value - prev.(uint64) + return value - prev.(uint64), true } // first put for this value, return rate of 0 - return 0 + return 0, false } // RateFloat64 returns, for a given counter name, the difference between the given value -// and the value that was given in a previous call. It will return 0.0 on the first call -func (c *counterCache) RateFloat64(counterName string, value float64) float64 { +// and the value that was given in a previous call, and true if a previous value existed. +// It will return 0 and false on the first call. +func (c *counterCache) RateFloat64(counterName string, value float64) (float64, bool) { prev := c.floats.PutWithTimeout(counterName, value, c.timeout) if prev != nil { if prev.(float64) > value { // counter reset - return 0 + return 0, true } - return value - prev.(float64) + return value - prev.(float64), true } // first put for this value, return rate of 0 - return 0 + return 0, false } // Start the cache cleanup worker. It mus be called once before start using diff --git a/x-pack/metricbeat/module/prometheus/collector/counter_test.go b/x-pack/metricbeat/module/prometheus/collector/counter_test.go index 3aed4a8d017..dc4e9cd6423 100644 --- a/x-pack/metricbeat/module/prometheus/collector/counter_test.go +++ b/x-pack/metricbeat/module/prometheus/collector/counter_test.go @@ -50,13 +50,13 @@ func Test_CounterCache(t *testing.T) { t.Run(tt.name, func(t *testing.T) { for i, val := range tt.valuesUint64 { want := tt.expectedUin64[i] - if got := tt.counterCache.RateUint64(tt.counterName, val); got != want { + if got, _ := tt.counterCache.RateUint64(tt.counterName, val); got != want { t.Errorf("counterCache.RateUint64() = %v, want %v", got, want) } } for i, val := range tt.valuesFloat64 { want := tt.expectedFloat64[i] - if got := tt.counterCache.RateFloat64(tt.counterName, val); got != want { + if got, _ := tt.counterCache.RateFloat64(tt.counterName, val); got != want { t.Errorf("counterCache.RateFloat64() = %v, want %v", got, want) } } diff --git a/x-pack/metricbeat/module/prometheus/collector/data.go b/x-pack/metricbeat/module/prometheus/collector/data.go index 8e205b66a2d..8f747ce16ff 100644 --- a/x-pack/metricbeat/module/prometheus/collector/data.go +++ b/x-pack/metricbeat/module/prometheus/collector/data.go @@ -171,7 +171,7 @@ func (g *typedGenerator) rateCounterUint64(name string, labels common.MapStr, va } if g.rateCounters { - d["rate"] = g.counterCache.RateUint64(name+labels.String(), value) + d["rate"], _ = g.counterCache.RateUint64(name+labels.String(), value) } return d @@ -184,7 +184,7 @@ func (g *typedGenerator) rateCounterFloat64(name string, labels common.MapStr, v } if g.rateCounters { - d["rate"] = g.counterCache.RateFloat64(name+labels.String(), value) + d["rate"], _ = g.counterCache.RateFloat64(name+labels.String(), value) } return d diff --git a/x-pack/metricbeat/module/prometheus/collector/histogram.go b/x-pack/metricbeat/module/prometheus/collector/histogram.go index 8a62cbf8b97..63ed3bf69ce 100644 --- a/x-pack/metricbeat/module/prometheus/collector/histogram.go +++ b/x-pack/metricbeat/module/prometheus/collector/histogram.go @@ -33,7 +33,7 @@ func promHistogramToES(cc CounterCache, name string, labels common.MapStr, histo // calculate centroids and rated counts var lastUpper, prevUpper float64 - var sumCount uint64 + var sumCount, prevCount uint64 for _, bucket := range histogram.GetBucket() { // Ignore non-numbers if bucket.GetCumulativeCount() == uint64(math.NaN()) || bucket.GetCumulativeCount() == uint64(math.Inf(0)) { @@ -50,10 +50,25 @@ func promHistogramToES(cc CounterCache, name string, labels common.MapStr, histo lastUpper = bucket.GetUpperBound() } - // take count for this period (rate) + deacumulate - count := cc.RateUint64(name+labels.String()+fmt.Sprintf("%f", bucket.GetUpperBound()), bucket.GetCumulativeCount()) - sumCount - counts = append(counts, count) - sumCount += count + // Take count for this period (rate) + countRate, found := cc.RateUint64(name+labels.String()+fmt.Sprintf("%f", bucket.GetUpperBound()), bucket.GetCumulativeCount()) + + switch { + case !found: + // This is a new bucket, consider it zero by now, but still increase the + // sum to don't deviate following buckets that are not new. + counts = append(counts, 0) + sumCount += bucket.GetCumulativeCount() - prevCount + case countRate < sumCount: + // This should never happen, this means something is wrong in the + // prometheus response. Handle it to avoid overflowing when deaccumulating. + counts = append(counts, 0) + default: + // Store the deaccumulated count. + counts = append(counts, countRate-sumCount) + sumCount = countRate + } + prevCount = bucket.GetCumulativeCount() } res := common.MapStr{ diff --git a/x-pack/metricbeat/module/prometheus/collector/histogram_test.go b/x-pack/metricbeat/module/prometheus/collector/histogram_test.go new file mode 100644 index 00000000000..b0906068e76 --- /dev/null +++ b/x-pack/metricbeat/module/prometheus/collector/histogram_test.go @@ -0,0 +1,406 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// +build !integration + +package collector + +import ( + "testing" + "time" + + "github.com/golang/protobuf/proto" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/common" +) + +// TestPromHistogramToES tests that calling promHistogramToES multiple +// times with the same cache produces each time the expected results. +func TestPromHistogramToES(t *testing.T) { + type sample struct { + histogram dto.Histogram + expected common.MapStr + } + + cases := map[string]struct { + samples []sample + }{ + "one histogram": { + samples: []sample{ + { + histogram: dto.Histogram{ + SampleCount: proto.Uint64(10), + SampleSum: proto.Float64(10), + Bucket: []*dto.Bucket{ + { + UpperBound: proto.Float64(0.99), + CumulativeCount: proto.Uint64(10), + }, + }, + }, + expected: common.MapStr{ + "counts": []uint64{0}, + "values": []float64{0.495}, + }, + }, + }, + }, + "two histogram": { + samples: []sample{ + { + histogram: dto.Histogram{ + SampleCount: proto.Uint64(10), + SampleSum: proto.Float64(10), + Bucket: []*dto.Bucket{ + { + UpperBound: proto.Float64(0.99), + CumulativeCount: proto.Uint64(10), + }, + }, + }, + expected: common.MapStr{ + "counts": []uint64{0}, + "values": []float64{0.495}, + }, + }, + { + histogram: dto.Histogram{ + SampleCount: proto.Uint64(12), + SampleSum: proto.Float64(10.123), + Bucket: []*dto.Bucket{ + { + UpperBound: proto.Float64(0.99), + CumulativeCount: proto.Uint64(12), + }, + }, + }, + expected: common.MapStr{ + "counts": []uint64{2}, + "values": []float64{0.495}, + }, + }, + }, + }, + "new bucket on the go": { + samples: []sample{ + { + histogram: dto.Histogram{ + SampleCount: proto.Uint64(10), + SampleSum: proto.Float64(10), + Bucket: []*dto.Bucket{ + { + UpperBound: proto.Float64(0.99), + CumulativeCount: proto.Uint64(10), + }, + }, + }, + expected: common.MapStr{ + "counts": []uint64{0}, + "values": []float64{0.495}, + }, + }, + { + histogram: dto.Histogram{ + SampleCount: proto.Uint64(13), + SampleSum: proto.Float64(15.23), + Bucket: []*dto.Bucket{ + { + UpperBound: proto.Float64(0.99), + CumulativeCount: proto.Uint64(12), + }, + // New bucket on the go + { + UpperBound: proto.Float64(9.99), + CumulativeCount: proto.Uint64(13), + }, + }, + }, + expected: common.MapStr{ + "counts": []uint64{2, 0}, + "values": []float64{0.495, 5.49}, + }, + }, + { + histogram: dto.Histogram{ + SampleCount: proto.Uint64(15), + SampleSum: proto.Float64(16.33), + Bucket: []*dto.Bucket{ + { + UpperBound: proto.Float64(0.99), + CumulativeCount: proto.Uint64(13), + }, + { + UpperBound: proto.Float64(9.99), + CumulativeCount: proto.Uint64(15), + }, + }, + }, + expected: common.MapStr{ + "counts": []uint64{1, 1}, + "values": []float64{0.495, 5.49}, + }, + }, + { + histogram: dto.Histogram{ + SampleCount: proto.Uint64(16), + SampleSum: proto.Float64(16.33), + Bucket: []*dto.Bucket{ + { + UpperBound: proto.Float64(0.99), + CumulativeCount: proto.Uint64(13), + }, + { + UpperBound: proto.Float64(9.99), + CumulativeCount: proto.Uint64(16), + }, + }, + }, + expected: common.MapStr{ + "counts": []uint64{0, 1}, + "values": []float64{0.495, 5.49}, + }, + }, + }, + }, + "new smaller bucket on the go": { + samples: []sample{ + { + histogram: dto.Histogram{ + SampleCount: proto.Uint64(10), + SampleSum: proto.Float64(10), + Bucket: []*dto.Bucket{ + { + UpperBound: proto.Float64(0.99), + CumulativeCount: proto.Uint64(10), + }, + }, + }, + expected: common.MapStr{ + "counts": []uint64{0}, + "values": []float64{0.495}, + }, + }, + { + histogram: dto.Histogram{ + SampleCount: proto.Uint64(13), + SampleSum: proto.Float64(15.23), + Bucket: []*dto.Bucket{ + // New bucket on the go + { + UpperBound: proto.Float64(0.09), + CumulativeCount: proto.Uint64(1), + }, + { + UpperBound: proto.Float64(0.99), + CumulativeCount: proto.Uint64(13), + }, + }, + }, + expected: common.MapStr{ + "counts": []uint64{0, 2}, + "values": []float64{0.045, 0.54}, + }, + }, + { + histogram: dto.Histogram{ + SampleCount: proto.Uint64(15), + SampleSum: proto.Float64(16.33), + Bucket: []*dto.Bucket{ + { + UpperBound: proto.Float64(0.09), + CumulativeCount: proto.Uint64(2), + }, + { + UpperBound: proto.Float64(0.99), + CumulativeCount: proto.Uint64(15), + }, + }, + }, + expected: common.MapStr{ + "counts": []uint64{1, 1}, + "values": []float64{0.045, 0.54}, + }, + }, + { + histogram: dto.Histogram{ + SampleCount: proto.Uint64(16), + SampleSum: proto.Float64(16.33), + Bucket: []*dto.Bucket{ + { + UpperBound: proto.Float64(0.09), + CumulativeCount: proto.Uint64(3), + }, + { + UpperBound: proto.Float64(0.99), + CumulativeCount: proto.Uint64(16), + }, + }, + }, + expected: common.MapStr{ + "counts": []uint64{1, 0}, + "values": []float64{0.045, 0.54}, + }, + }, + }, + }, + "new bucket between two other buckets on the go": { + samples: []sample{ + { + histogram: dto.Histogram{ + SampleCount: proto.Uint64(10), + SampleSum: proto.Float64(10), + Bucket: []*dto.Bucket{ + { + UpperBound: proto.Float64(0.09), + CumulativeCount: proto.Uint64(0), + }, + { + UpperBound: proto.Float64(0.99), + CumulativeCount: proto.Uint64(10), + }, + }, + }, + expected: common.MapStr{ + "counts": []uint64{0, 0}, + "values": []float64{0.045, 0.54}, + }, + }, + { + histogram: dto.Histogram{ + SampleCount: proto.Uint64(13), + SampleSum: proto.Float64(15.23), + Bucket: []*dto.Bucket{ + { + UpperBound: proto.Float64(0.09), + CumulativeCount: proto.Uint64(1), + }, + // New bucket + { + UpperBound: proto.Float64(0.49), + CumulativeCount: proto.Uint64(2), + }, + { + UpperBound: proto.Float64(0.99), + CumulativeCount: proto.Uint64(13), + }, + }, + }, + expected: common.MapStr{ + "counts": []uint64{1, 0, 1}, + "values": []float64{0.045, 0.29000000000000004, 0.74}, + }, + }, + { + histogram: dto.Histogram{ + SampleCount: proto.Uint64(16), + SampleSum: proto.Float64(16.33), + Bucket: []*dto.Bucket{ + { + UpperBound: proto.Float64(0.09), + CumulativeCount: proto.Uint64(2), + }, + { + UpperBound: proto.Float64(0.49), + CumulativeCount: proto.Uint64(4), + }, + { + UpperBound: proto.Float64(0.99), + CumulativeCount: proto.Uint64(16), + }, + }, + }, + expected: common.MapStr{ + "counts": []uint64{1, 1, 1}, + "values": []float64{0.045, 0.29000000000000004, 0.74}, + }, + }, + { + histogram: dto.Histogram{ + SampleCount: proto.Uint64(18), + SampleSum: proto.Float64(16.33), + Bucket: []*dto.Bucket{ + { + UpperBound: proto.Float64(0.09), + CumulativeCount: proto.Uint64(3), + }, + { + UpperBound: proto.Float64(0.49), + CumulativeCount: proto.Uint64(5), + }, + { + UpperBound: proto.Float64(0.99), + CumulativeCount: proto.Uint64(18), + }, + }, + }, + expected: common.MapStr{ + "counts": []uint64{1, 0, 1}, + "values": []float64{0.045, 0.29000000000000004, 0.74}, + }, + }, + }, + }, + "wrong buckets": { + samples: []sample{ + { + histogram: dto.Histogram{ + SampleCount: proto.Uint64(10), + SampleSum: proto.Float64(10), + Bucket: []*dto.Bucket{ + { + UpperBound: proto.Float64(0.09), + CumulativeCount: proto.Uint64(10), + }, + { + UpperBound: proto.Float64(0.99), + CumulativeCount: proto.Uint64(8), + }, + }, + }, + expected: common.MapStr{ + "counts": []uint64{0, 0}, + "values": []float64{0.045, 0.54}, + }, + }, + { + histogram: dto.Histogram{ + SampleCount: proto.Uint64(12), + SampleSum: proto.Float64(10.45), + Bucket: []*dto.Bucket{ + { + UpperBound: proto.Float64(0.09), + CumulativeCount: proto.Uint64(12), + }, + { + UpperBound: proto.Float64(0.99), + CumulativeCount: proto.Uint64(8), + }, + }, + }, + expected: common.MapStr{ + "counts": []uint64{2, 0}, + "values": []float64{0.045, 0.54}, + }, + }, + }, + }, + } + + metricName := "somemetric" + labels := common.MapStr{} + + for title, c := range cases { + t.Run(title, func(t *testing.T) { + cache := NewCounterCache(120 * time.Minute) + + for i, s := range c.samples { + t.Logf("#%d: %+v", i, s.histogram) + result := promHistogramToES(cache, metricName, labels, &s.histogram) + assert.EqualValues(t, s.expected, result) + } + }) + } +}