Skip to content

Commit

Permalink
Add history and summary types to telegraf and prometheus plugins
Browse files Browse the repository at this point in the history
  • Loading branch information
jdoupe committed Oct 24, 2017
1 parent 0e6a70b commit cad142c
Show file tree
Hide file tree
Showing 8 changed files with 300 additions and 75 deletions.
12 changes: 12 additions & 0 deletions accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@ type Accumulator interface {
tags map[string]string,
t ...time.Time)

// AddSummary is the same as AddFields, but will add the metric as a "Summary" type
AddSummary(measurement string,
fields map[string]interface{},
tags map[string]string,
t ...time.Time)

// AddHistogram is the same as AddFields, but will add the metric as a "Histogram" type
AddHistogram(measurement string,
fields map[string]interface{},
tags map[string]string,
t ...time.Time)

SetPrecision(precision, interval time.Duration)

AddError(err error)
Expand Down
22 changes: 22 additions & 0 deletions agent/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,28 @@ func (ac *accumulator) AddCounter(
}
}

func (ac *accumulator) AddSummary(
measurement string,
fields map[string]interface{},
tags map[string]string,
t ...time.Time,
) {
if m := ac.maker.MakeMetric(measurement, fields, tags, telegraf.Summary, ac.getTime(t)); m != nil {
ac.metrics <- m
}
}

func (ac *accumulator) AddHistogram(
measurement string,
fields map[string]interface{},
tags map[string]string,
t ...time.Time,
) {
if m := ac.maker.MakeMetric(measurement, fields, tags, telegraf.Histogram, ac.getTime(t)); m != nil {
ac.metrics <- m
}
}

// AddError passes a runtime error to the accumulator.
// The error will be tagged with the plugin name and written to the log.
func (ac *accumulator) AddError(err error) {
Expand Down
2 changes: 2 additions & 0 deletions metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ const (
Counter
Gauge
Untyped
Summary
Histogram
)

type Metric interface {
Expand Down
8 changes: 6 additions & 2 deletions plugins/inputs/prometheus/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ func valueType(mt dto.MetricType) telegraf.ValueType {
return telegraf.Counter
case dto.MetricType_GAUGE:
return telegraf.Gauge
case dto.MetricType_SUMMARY:
return telegraf.Summary
case dto.MetricType_HISTOGRAM:
return telegraf.Histogram
default:
return telegraf.Untyped
}
Expand Down Expand Up @@ -145,11 +149,11 @@ func getNameAndValue(m *dto.Metric) map[string]interface{} {
fields["gauge"] = float64(m.GetGauge().GetValue())
}
} else if m.Counter != nil {
if !math.IsNaN(m.GetGauge().GetValue()) {
if !math.IsNaN(m.GetCounter().GetValue()) {
fields["counter"] = float64(m.GetCounter().GetValue())
}
} else if m.Untyped != nil {
if !math.IsNaN(m.GetGauge().GetValue()) {
if !math.IsNaN(m.GetUntyped().GetValue()) {
fields["value"] = float64(m.GetUntyped().GetValue())
}
}
Expand Down
4 changes: 4 additions & 0 deletions plugins/inputs/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,10 @@ func (p *Prometheus) gatherURL(url UrlAndAddress, acc telegraf.Accumulator) erro
acc.AddCounter(metric.Name(), metric.Fields(), tags, metric.Time())
case telegraf.Gauge:
acc.AddGauge(metric.Name(), metric.Fields(), tags, metric.Time())
case telegraf.Summary:
acc.AddSummary(metric.Name(), metric.Fields(), tags, metric.Time())
case telegraf.Histogram:
acc.AddHistogram(metric.Name(), metric.Fields(), tags, metric.Time())
default:
acc.AddFields(metric.Name(), metric.Fields(), tags, metric.Time())
}
Expand Down
214 changes: 152 additions & 62 deletions plugins/outputs/prometheus_client/prometheus_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"regexp"
"sort"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -28,8 +29,13 @@ type SampleID string
type Sample struct {
// Labels are the Prometheus labels.
Labels map[string]string
// Value is the value in the Prometheus output.
Value float64
// Value is the value in the Prometheus output. Only one of these will populated.
Value float64
HistogramValue map[float64]uint64
SummaryValue map[float64]float64
// Histograms and Summaries need a count and a sum
Count uint64
Sum float64
// Expiration is the deadline that this Sample is valid until.
Expiration time.Time
}
Expand All @@ -38,8 +44,9 @@ type Sample struct {
type MetricFamily struct {
// Samples are the Sample belonging to this MetricFamily.
Samples map[SampleID]*Sample
// Type of the Value.
ValueType prometheus.ValueType
// Need the telegraf ValueType because there isn't a Prometheus ValueType
// representing Histogram or Summary
TelegrafValueType telegraf.ValueType
// LabelSet is the label counts for all Samples.
LabelSet map[string]int
}
Expand Down Expand Up @@ -189,7 +196,16 @@ func (p *PrometheusClient) Collect(ch chan<- prometheus.Metric) {
labels = append(labels, v)
}

metric, err := prometheus.NewConstMetric(desc, family.ValueType, sample.Value, labels...)
var metric prometheus.Metric
var err error
switch family.TelegrafValueType {
case telegraf.Summary:
metric, err = prometheus.NewConstSummary(desc, sample.Count, sample.Sum, sample.SummaryValue, labels...)
case telegraf.Histogram:
metric, err = prometheus.NewConstHistogram(desc, sample.Count, sample.Sum, sample.HistogramValue, labels...)
default:
metric, err = prometheus.NewConstMetric(desc, getPromValueType(family.TelegrafValueType), sample.Value, labels...)
}
if err != nil {
log.Printf("E! Error creating prometheus metric, "+
"key: %s, labels: %v,\nerr: %s\n",
Expand All @@ -205,7 +221,7 @@ func sanitize(value string) string {
return invalidNameCharRE.ReplaceAllString(value, "_")
}

func valueType(tt telegraf.ValueType) prometheus.ValueType {
func getPromValueType(tt telegraf.ValueType) prometheus.ValueType {
switch tt {
case telegraf.Counter:
return prometheus.CounterValue
Expand All @@ -226,6 +242,30 @@ func CreateSampleID(tags map[string]string) SampleID {
return SampleID(strings.Join(pairs, ","))
}

func addSample(fam *MetricFamily, sample *Sample, sampleID SampleID) {

for k, _ := range sample.Labels {
fam.LabelSet[k]++
}

fam.Samples[sampleID] = sample
}

func (p *PrometheusClient) addMetricFamily(point telegraf.Metric, sample *Sample, mname string, sampleID SampleID) {
var fam *MetricFamily
var ok bool
if fam, ok = p.fam[mname]; !ok {
fam = &MetricFamily{
Samples: make(map[SampleID]*Sample),
TelegrafValueType: point.Type(),
LabelSet: make(map[string]int),
}
p.fam[mname] = fam
}

addSample(fam, sample, sampleID)
}

func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
p.Lock()
defer p.Unlock()
Expand All @@ -234,7 +274,6 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {

for _, point := range metrics {
tags := point.Tags()
vt := valueType(point.Type())
sampleID := CreateSampleID(tags)

labels := make(map[string]string)
Expand All @@ -251,77 +290,128 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
}
}

for fn, fv := range point.Fields() {
// Ignore string and bool fields.
var value float64
switch fv := fv.(type) {
case int64:
value = float64(fv)
case float64:
value = fv
default:
continue
}
switch point.Type() {
case telegraf.Summary:
var mname string
var sum float64
var count uint64
summaryvalue := make(map[float64]float64)
for fn, fv := range point.Fields() {
var value float64
switch fv := fv.(type) {
case int64:
value = float64(fv)
case float64:
value = fv
default:
continue
}

switch fn {
case "sum":
sum = value
case "count":
count = uint64(value)
default:
limit, err := strconv.ParseFloat(fn, 64)
if err == nil {
summaryvalue[limit] = value
}
}
}
sample := &Sample{
Labels: labels,
Value: value,
Expiration: now.Add(p.ExpirationInterval.Duration),
Labels: labels,
SummaryValue: summaryvalue,
Count: count,
Sum: sum,
Expiration: now.Add(p.ExpirationInterval.Duration),
}
mname = sanitize(point.Name())

// Special handling of value field; supports passthrough from
// the prometheus input.
p.addMetricFamily(point, sample, mname, sampleID)

case telegraf.Histogram:
var mname string
switch point.Type() {
case telegraf.Counter:
if fn == "counter" {
mname = sanitize(point.Name())
var sum float64
var count uint64
histogramvalue := make(map[float64]uint64)
for fn, fv := range point.Fields() {
var value float64
switch fv := fv.(type) {
case int64:
value = float64(fv)
case float64:
value = fv
default:
continue
}
case telegraf.Gauge:
if fn == "gauge" {
mname = sanitize(point.Name())

switch fn {
case "sum":
sum = value
case "count":
count = uint64(value)
default:
limit, err := strconv.ParseFloat(fn, 64)
if err == nil {
histogramvalue[limit] = uint64(value)
}
}
}
if mname == "" {
if fn == "value" {
mname = sanitize(point.Name())
} else {
mname = sanitize(fmt.Sprintf("%s_%s", point.Name(), fn))
}
sample := &Sample{
Labels: labels,
HistogramValue: histogramvalue,
Count: count,
Sum: sum,
Expiration: now.Add(p.ExpirationInterval.Duration),
}
mname = sanitize(point.Name())

var fam *MetricFamily
var ok bool
if fam, ok = p.fam[mname]; !ok {
fam = &MetricFamily{
Samples: make(map[SampleID]*Sample),
ValueType: vt,
LabelSet: make(map[string]int),
p.addMetricFamily(point, sample, mname, sampleID)

default:
for fn, fv := range point.Fields() {
// Ignore string and bool fields.
var value float64
switch fv := fv.(type) {
case int64:
value = float64(fv)
case float64:
value = fv
default:
continue
}
p.fam[mname] = fam
} else {
// Metrics can be untyped even though the corresponding plugin
// creates them with a type. This happens when the metric was
// transferred over the network in a format that does not
// preserve value type and received using an input such as a
// queue consumer. To avoid issues we automatically upgrade
// value type from untyped to a typed metric.
if fam.ValueType == prometheus.UntypedValue {
fam.ValueType = vt

sample := &Sample{
Labels: labels,
Value: value,
Expiration: now.Add(p.ExpirationInterval.Duration),
}

if vt != prometheus.UntypedValue && fam.ValueType != vt {
// Don't return an error since this would be a permanent error
log.Printf("Mixed ValueType for measurement %q; dropping point", point.Name())
break
// Special handling of value field; supports passthrough from
// the prometheus input.
var mname string
switch point.Type() {
case telegraf.Counter:
if fn == "counter" {
mname = sanitize(point.Name())
}
case telegraf.Gauge:
if fn == "gauge" {
mname = sanitize(point.Name())
}
}
if mname == "" {
if fn == "value" {
mname = sanitize(point.Name())
} else {
mname = sanitize(fmt.Sprintf("%s_%s", point.Name(), fn))
}
}
}

for k, _ := range sample.Labels {
fam.LabelSet[k]++
}
p.addMetricFamily(point, sample, mname, sampleID)

fam.Samples[sampleID] = sample
}
}
}
return nil
Expand Down
Loading

0 comments on commit cad142c

Please sign in to comment.