Skip to content

Commit

Permalink
Fix prometheus passthrough for existing value types (#3351)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored Oct 18, 2017
1 parent 9b59cdd commit 6e5915c
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 22 deletions.
13 changes: 12 additions & 1 deletion plugins/inputs/prometheus/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func Parse(buf []byte, header http.Header) ([]telegraf.Metric, error) {
} else {
t = time.Now()
}
metric, err := metric.New(metricName, tags, fields, t)
metric, err := metric.New(metricName, tags, fields, t, valueType(mf.GetType()))
if err == nil {
metrics = append(metrics, metric)
}
Expand All @@ -97,6 +97,17 @@ func Parse(buf []byte, header http.Header) ([]telegraf.Metric, error) {
return metrics, err
}

func valueType(mt dto.MetricType) telegraf.ValueType {
switch mt {
case dto.MetricType_COUNTER:
return telegraf.Counter
case dto.MetricType_GAUGE:
return telegraf.Gauge
default:
return telegraf.Untyped
}
}

// Get Quantiles from summary metric
func makeQuantiles(m *dto.Metric) map[string]interface{} {
fields := make(map[string]interface{})
Expand Down
10 changes: 9 additions & 1 deletion plugins/inputs/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,15 @@ func (p *Prometheus) gatherURL(url UrlAndAddress, acc telegraf.Accumulator) erro
if url.Address != "" {
tags["address"] = url.Address
}
acc.AddFields(metric.Name(), metric.Fields(), tags, metric.Time())

switch metric.Type() {
case telegraf.Counter:
acc.AddCounter(metric.Name(), metric.Fields(), tags, metric.Time())
case telegraf.Gauge:
acc.AddGauge(metric.Name(), metric.Fields(), tags, metric.Time())
default:
acc.AddFields(metric.Name(), metric.Fields(), tags, metric.Time())
}
}

return nil
Expand Down
42 changes: 37 additions & 5 deletions plugins/outputs/prometheus_client/prometheus_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"log"
"net/http"
"os"
"regexp"
"sort"
"strings"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

var invalidNameCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
Expand Down Expand Up @@ -46,6 +48,7 @@ type PrometheusClient struct {
Listen string
ExpirationInterval internal.Duration `toml:"expiration_interval"`
Path string `toml:"path"`
CollectorsExclude []string `toml:"collectors_exclude"`

server *http.Server

Expand All @@ -62,11 +65,26 @@ var sampleConfig = `
## Interval to expire metrics and not deliver to prometheus, 0 == no expiration
# expiration_interval = "60s"
## Collectors to enable, valid entries are "gocollector" and "process".
## If unset, both are enabled.
collectors_exclude = ["gocollector", "process"]
`

func (p *PrometheusClient) Start() error {
prometheus.Register(p)

for _, collector := range p.CollectorsExclude {
switch collector {
case "gocollector":
prometheus.Unregister(prometheus.NewGoCollector())
case "process":
prometheus.Unregister(prometheus.NewProcessCollector(os.Getpid(), ""))
default:
return fmt.Errorf("unrecognized collector %s", collector)
}
}

if p.Listen == "" {
p.Listen = "localhost:9273"
}
Expand All @@ -76,7 +94,9 @@ func (p *PrometheusClient) Start() error {
}

mux := http.NewServeMux()
mux.Handle(p.Path, prometheus.Handler())
mux.Handle(p.Path, promhttp.HandlerFor(
prometheus.DefaultGatherer,
promhttp.HandlerOpts{ErrorHandling: promhttp.ContinueOnError}))

p.server = &http.Server{
Addr: p.Listen,
Expand Down Expand Up @@ -243,10 +263,22 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
// Special handling of value field; supports passthrough from
// the prometheus input.
var mname string
if fn == "value" {
mname = sanitize(point.Name())
} else {
mname = sanitize(fmt.Sprintf("%s_%s", point.Name(), fn))
switch point.Type() {
case telegraf.Counter:
if fn == "counter" {
mname = sanitize(point.Name())
}
case telegraf.Gauge:
if fn == "gauge" {
mname = sanitize(point.Name())
}
}
if mname == "" {
if fn == "value" {
mname = sanitize(point.Name())
} else {
mname = sanitize(fmt.Sprintf("%s_%s", point.Name(), fn))
}
}

var fam *MetricFamily
Expand Down
78 changes: 63 additions & 15 deletions plugins/outputs/prometheus_client/prometheus_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,21 +107,69 @@ func TestWrite_SkipNonNumberField(t *testing.T) {
require.False(t, ok)
}

func TestWrite_Counter(t *testing.T) {
client := NewClient()

p1, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": 42},
time.Now(),
telegraf.Counter)
err = client.Write([]telegraf.Metric{p1})
require.NoError(t, err)

fam, ok := client.fam["foo"]
require.True(t, ok)
require.Equal(t, prometheus.CounterValue, fam.ValueType)
func TestWrite_Counters(t *testing.T) {
type args struct {
measurement string
tags map[string]string
fields map[string]interface{}
valueType telegraf.ValueType
}
var tests = []struct {
name string
args args
err error
metricName string
promType prometheus.ValueType
}{
{
name: "field named value is not added to metric name",
args: args{
measurement: "foo",
fields: map[string]interface{}{"value": 42},
valueType: telegraf.Counter,
},
metricName: "foo",
promType: prometheus.CounterValue,
},
{
name: "field named counter is not added to metric name",
args: args{
measurement: "foo",
fields: map[string]interface{}{"counter": 42},
valueType: telegraf.Counter,
},
metricName: "foo",
promType: prometheus.CounterValue,
},
{
name: "field with any other name is added to metric name",
args: args{
measurement: "foo",
fields: map[string]interface{}{"other": 42},
valueType: telegraf.Counter,
},
metricName: "foo_other",
promType: prometheus.CounterValue,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m, err := metric.New(
tt.args.measurement,
tt.args.tags,
tt.args.fields,
time.Now(),
tt.args.valueType,
)
client := NewClient()
err = client.Write([]telegraf.Metric{m})
require.Equal(t, tt.err, err)

fam, ok := client.fam[tt.metricName]
require.True(t, ok)
require.Equal(t, tt.promType, fam.ValueType)
})
}
}

func TestWrite_Sanitize(t *testing.T) {
Expand Down

0 comments on commit 6e5915c

Please sign in to comment.