Skip to content

Commit

Permalink
feat(inputs.statsd): add median timing calculation to statsd input pl…
Browse files Browse the repository at this point in the history
…ugin (#11518)
  • Loading branch information
amarinderca authored Jul 22, 2022
1 parent 79235cb commit d84bf9a
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 4 deletions.
2 changes: 2 additions & 0 deletions plugins/inputs/statsd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ metric type:
for that stat during that interval.
- `statsd_<name>_mean`: The mean is the average of all values statsd saw
for that stat during that interval.
- `statsd_<name>_median`: The median is the middle of all values statsd saw
for that stat during that interval.
- `statsd_<name>_stddev`: The stddev is the sample standard deviation
of all values statsd saw for that stat during that interval.
- `statsd_<name>_sum`: The sum is the sample sum of all values statsd saw
Expand Down
44 changes: 40 additions & 4 deletions plugins/inputs/statsd/running_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
)

const defaultPercentileLimit = 1000
const defaultMedianLimit = 1000

// RunningStats calculates a running mean, variance, standard deviation,
// lower bound, upper bound, count, and can calculate estimated percentiles.
Expand All @@ -31,12 +32,19 @@ type RunningStats struct {

// cache if we have sorted the list so that we never re-sort a sorted list,
// which can have very bad performance.
sorted bool
SortedPerc bool

// Array used to calculate estimated median values
// We will store a maximum of MedLimit values, at which point we will start
// slicing old values
med []float64
MedLimit int
MedInsertIndex int
}

func (rs *RunningStats) AddValue(v float64) {
// Whenever a value is added, the list is no longer sorted.
rs.sorted = false
rs.SortedPerc = false

if rs.n == 0 {
rs.k = v
Expand All @@ -45,7 +53,12 @@ func (rs *RunningStats) AddValue(v float64) {
if rs.PercLimit == 0 {
rs.PercLimit = defaultPercentileLimit
}
if rs.MedLimit == 0 {
rs.MedLimit = defaultMedianLimit
rs.MedInsertIndex = 0
}
rs.perc = make([]float64, 0, rs.PercLimit)
rs.med = make([]float64, 0, rs.MedLimit)
}

// These are used for the running mean and variance
Expand All @@ -69,12 +82,35 @@ func (rs *RunningStats) AddValue(v float64) {
// Reached limit, choose random index to overwrite in the percentile array
rs.perc[rand.Intn(len(rs.perc))] = v
}

if len(rs.med) < rs.MedLimit {
rs.med = append(rs.med, v)
} else {
// Reached limit, start over
rs.med[rs.MedInsertIndex] = v
}
rs.MedInsertIndex = (rs.MedInsertIndex + 1) % rs.MedLimit
}

func (rs *RunningStats) Mean() float64 {
return rs.k + rs.ex/float64(rs.n)
}

func (rs *RunningStats) Median() float64 {
// Need to sort for median, but keep temporal order
var values []float64
values = append(values, rs.med...)
sort.Float64s(values)
count := len(values)
if count == 0 {
return 0
} else if count%2 == 0 {
return (values[count/2-1] + values[count/2]) / 2
} else {
return values[count/2]
}
}

func (rs *RunningStats) Variance() float64 {
return (rs.ex2 - (rs.ex*rs.ex)/float64(rs.n)) / float64(rs.n)
}
Expand Down Expand Up @@ -104,9 +140,9 @@ func (rs *RunningStats) Percentile(n float64) float64 {
n = 100
}

if !rs.sorted {
if !rs.SortedPerc {
sort.Float64s(rs.perc)
rs.sorted = true
rs.SortedPerc = true
}

i := float64(len(rs.perc)) * n / float64(100)
Expand Down
30 changes: 30 additions & 0 deletions plugins/inputs/statsd/running_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ func TestRunningStats_Single(t *testing.T) {
if rs.Mean() != 10.1 {
t.Errorf("Expected %v, got %v", 10.1, rs.Mean())
}
if rs.Median() != 10.1 {
t.Errorf("Expected %v, got %v", 10.1, rs.Median())
}
if rs.Upper() != 10.1 {
t.Errorf("Expected %v, got %v", 10.1, rs.Upper())
}
Expand Down Expand Up @@ -61,6 +64,9 @@ func TestRunningStats_Duplicate(t *testing.T) {
if rs.Mean() != 10.1 {
t.Errorf("Expected %v, got %v", 10.1, rs.Mean())
}
if rs.Median() != 10.1 {
t.Errorf("Expected %v, got %v", 10.1, rs.Median())
}
if rs.Upper() != 10.1 {
t.Errorf("Expected %v, got %v", 10.1, rs.Upper())
}
Expand Down Expand Up @@ -105,6 +111,9 @@ func TestRunningStats(t *testing.T) {
if rs.Mean() != 15.9375 {
t.Errorf("Expected %v, got %v", 15.9375, rs.Mean())
}
if rs.Median() != 10.5 {
t.Errorf("Expected %v, got %v", 10.5, rs.Median())
}
if rs.Upper() != 45 {
t.Errorf("Expected %v, got %v", 45, rs.Upper())
}
Expand Down Expand Up @@ -164,3 +173,24 @@ func TestRunningStats_PercentileLimit(t *testing.T) {
func fuzzyEqual(a, b, epsilon float64) bool {
return math.Abs(a-b) <= epsilon
}

// Test that the median limit is respected and MedInsertIndex is properly incrementing index.
func TestRunningStats_MedianLimitIndex(t *testing.T) {
rs := RunningStats{}
rs.MedLimit = 10
values := []float64{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}

for _, v := range values {
rs.AddValue(v)
}

if rs.Count() != 11 {
t.Errorf("Expected %v, got %v", 11, rs.Count())
}
if len(rs.med) != 10 {
t.Errorf("Expected %v, got %v", 10, len(rs.med))
}
if rs.MedInsertIndex != 1 {
t.Errorf("Expected %v, got %v", 0, rs.MedInsertIndex)
}
}
1 change: 1 addition & 0 deletions plugins/inputs/statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
prefix = fieldName + "_"
}
fields[prefix+"mean"] = stats.Mean()
fields[prefix+"median"] = stats.Median()
fields[prefix+"stddev"] = stats.Stddev()
fields[prefix+"sum"] = stats.Sum()
fields[prefix+"upper"] = stats.Upper()
Expand Down
6 changes: 6 additions & 0 deletions plugins/inputs/statsd/statsd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ func TestParse_Timings(t *testing.T) {
"count": int64(5),
"lower": float64(1),
"mean": float64(3),
"median": float64(1),
"stddev": float64(4),
"sum": float64(15),
"upper": float64(11),
Expand Down Expand Up @@ -913,6 +914,7 @@ func TestParse_DataDogTags(t *testing.T) {
"count": 10,
"lower": float64(3),
"mean": float64(3),
"median": float64(3),
"stddev": float64(0),
"sum": float64(30),
"upper": float64(3),
Expand Down Expand Up @@ -1211,6 +1213,7 @@ func TestParse_TimingsMultipleFieldsWithTemplate(t *testing.T) {
"success_count": int64(5),
"success_lower": float64(1),
"success_mean": float64(3),
"success_median": float64(1),
"success_stddev": float64(4),
"success_sum": float64(15),
"success_upper": float64(11),
Expand All @@ -1219,6 +1222,7 @@ func TestParse_TimingsMultipleFieldsWithTemplate(t *testing.T) {
"error_count": int64(5),
"error_lower": float64(2),
"error_mean": float64(6),
"error_median": float64(2),
"error_stddev": float64(8),
"error_sum": float64(30),
"error_upper": float64(22),
Expand Down Expand Up @@ -1259,6 +1263,7 @@ func TestParse_TimingsMultipleFieldsWithoutTemplate(t *testing.T) {
"count": int64(5),
"lower": float64(1),
"mean": float64(3),
"median": float64(1),
"stddev": float64(4),
"sum": float64(15),
"upper": float64(11),
Expand All @@ -1268,6 +1273,7 @@ func TestParse_TimingsMultipleFieldsWithoutTemplate(t *testing.T) {
"count": int64(5),
"lower": float64(2),
"mean": float64(6),
"median": float64(2),
"stddev": float64(8),
"sum": float64(30),
"upper": float64(22),
Expand Down

0 comments on commit d84bf9a

Please sign in to comment.