diff --git a/plugins/inputs/prometheus/parser.go b/plugins/inputs/prometheus/parser.go new file mode 100644 index 0000000000000..b2cd507880bb5 --- /dev/null +++ b/plugins/inputs/prometheus/parser.go @@ -0,0 +1,171 @@ +package prometheus + +// Parser inspired from +// https://github.com/prometheus/prom2json/blob/master/main.go + +import ( + "bufio" + "bytes" + "fmt" + "io" + "math" + "mime" + + "github.com/influxdata/telegraf" + + "github.com/matttproud/golang_protobuf_extensions/pbutil" + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" +) + +// PrometheusParser is an object for Parsing incoming metrics. +type PrometheusParser struct { + // PromFormat + PromFormat map[string]string + // DefaultTags will be added to every parsed metric +// DefaultTags map[string]string +} + +// Parse returns a slice of Metrics from a text representation of a +// metrics +func (p *PrometheusParser) Parse(buf []byte) ([]telegraf.Metric, error) { + var metrics []telegraf.Metric + var parser expfmt.TextParser + // parse even if the buffer begins with a newline + buf = bytes.TrimPrefix(buf, []byte("\n")) + // Read raw data + buffer := bytes.NewBuffer(buf) + reader := bufio.NewReader(buffer) + + // Get format + mediatype, params, err := mime.ParseMediaType(p.PromFormat["Content-Type"]) + // Prepare output + metricFamilies := make(map[string]*dto.MetricFamily) + if err == nil && mediatype == "application/vnd.google.protobuf" && + params["encoding"] == "delimited" && + params["proto"] == "io.prometheus.client.MetricFamily" { + for { + metricFamily := &dto.MetricFamily{} + if _, err = pbutil.ReadDelimited(reader, metricFamily); err != nil { + if err == io.EOF { + break + } + return nil, fmt.Errorf("reading metric family protocol buffer failed: %s", err) + } + metricFamilies[metricFamily.GetName()] = metricFamily + } + } else { + metricFamilies, err = parser.TextToMetricFamilies(reader) + if err != nil { + return nil, fmt.Errorf("reading text format failed: %s", err) + } + // read metrics + for metricName, mf := range metricFamilies { + for _, m := range mf.Metric { + // reading tags + tags := makeLabels(m) +/* + for key, value := range p.DefaultTags { + tags[key] = value + } +*/ + // reading fields + fields := make(map[string]interface{}) + if mf.GetType() == dto.MetricType_SUMMARY { + // summary metric + fields = makeQuantiles(m) + fields["count"] = float64(m.GetHistogram().GetSampleCount()) + fields["sum"] = float64(m.GetSummary().GetSampleSum()) + } else if mf.GetType() == dto.MetricType_HISTOGRAM { + // historgram metric + fields = makeBuckets(m) + fields["count"] = float64(m.GetHistogram().GetSampleCount()) + fields["sum"] = float64(m.GetSummary().GetSampleSum()) + + } else { + // standard metric + fields = getNameAndValue(m) + } + // converting to telegraf metric + if len(fields) > 0 { + metric, err := telegraf.NewMetric(metricName, tags, fields) + if err == nil { + metrics = append(metrics, metric) + } + } + } + } + } + return metrics, err +} + +// Parse one line +func (p *PrometheusParser) ParseLine(line string) (telegraf.Metric, error) { + metrics, err := p.Parse([]byte(line + "\n")) + + if err != nil { + return nil, err + } + + if len(metrics) < 1 { + return nil, fmt.Errorf( + "Can not parse the line: %s, for data format: prometheus", line) + } + + return metrics[0], nil +} + +/* +// Set default tags +func (p *PrometheusParser) SetDefaultTags(tags map[string]string) { + p.DefaultTags = tags +} +*/ + +// Get Quantiles from summary metric +func makeQuantiles(m *dto.Metric) map[string]interface{} { + fields := make(map[string]interface{}) + for _, q := range m.GetSummary().Quantile { + if !math.IsNaN(q.GetValue()) { + fields[fmt.Sprint(q.GetQuantile())] = float64(q.GetValue()) + } + } + return fields +} + +// Get Buckets from histogram metric +func makeBuckets(m *dto.Metric) map[string]interface{} { + fields := make(map[string]interface{}) + for _, b := range m.GetHistogram().Bucket { + fields[fmt.Sprint(b.GetUpperBound())] = float64(b.GetCumulativeCount()) + } + return fields +} + +// Get labels from metric +func makeLabels(m *dto.Metric) map[string]string { + result := map[string]string{} + for _, lp := range m.Label { + result[lp.GetName()] = lp.GetValue() + } + return result +} + +// Get name and value from metric +func getNameAndValue(m *dto.Metric) map[string]interface{} { + fields := make(map[string]interface{}) + if m.Gauge != nil { + if !math.IsNaN(m.GetGauge().GetValue()) { + fields["gauge"] = float64(m.GetGauge().GetValue()) + } + } else if m.Counter != nil { + if !math.IsNaN(m.GetGauge().GetValue()) { + fields["counter"] = float64(m.GetCounter().GetValue()) + } + } else if m.Untyped != nil { + if !math.IsNaN(m.GetGauge().GetValue()) { + fields["value"] = float64(m.GetUntyped().GetValue()) + } + } + return fields +} diff --git a/plugins/inputs/prometheus/parser_test.go b/plugins/inputs/prometheus/parser_test.go new file mode 100644 index 0000000000000..5c33260be87cd --- /dev/null +++ b/plugins/inputs/prometheus/parser_test.go @@ -0,0 +1,175 @@ +package prometheus + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +var exptime = time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) + +const validUniqueGauge = `# HELP cadvisor_version_info A metric with a constant '1' value labeled by kernel version, OS version, docker version, cadvisor version & cadvisor revision. +# TYPE cadvisor_version_info gauge +cadvisor_version_info{cadvisorRevision="",cadvisorVersion="",dockerVersion="1.8.2",kernelVersion="3.10.0-229.20.1.el7.x86_64",osVersion="CentOS Linux 7 (Core)"} 1 +` + +const validUniqueCounter = `# HELP get_token_fail_count Counter of failed Token() requests to the alternate token source +# TYPE get_token_fail_count counter +get_token_fail_count 0 +` + +const validUniqueLine = `# HELP get_token_fail_count Counter of failed Token() requests to the alternate token source +` + +const validUniqueSummary = `# HELP http_request_duration_microseconds The HTTP request latencies in microseconds. +# TYPE http_request_duration_microseconds summary +http_request_duration_microseconds{handler="prometheus",quantile="0.5"} 552048.506 +http_request_duration_microseconds{handler="prometheus",quantile="0.9"} 5.876804288e+06 +http_request_duration_microseconds{handler="prometheus",quantile="0.99"} 5.876804288e+06 +http_request_duration_microseconds_sum{handler="prometheus"} 1.8909097205e+07 +http_request_duration_microseconds_count{handler="prometheus"} 9 +` + +const validUniqueHistogram = `# HELP apiserver_request_latencies Response latency distribution in microseconds for each verb, resource and client. +# TYPE apiserver_request_latencies histogram +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="125000"} 1994 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="250000"} 1997 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="500000"} 2000 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="1e+06"} 2005 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="2e+06"} 2012 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="4e+06"} 2017 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="8e+06"} 2024 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="+Inf"} 2025 +apiserver_request_latencies_sum{resource="bindings",verb="POST"} 1.02726334e+08 +apiserver_request_latencies_count{resource="bindings",verb="POST"} 2025 +` + +const validData = `# HELP cadvisor_version_info A metric with a constant '1' value labeled by kernel version, OS version, docker version, cadvisor version & cadvisor revision. +# TYPE cadvisor_version_info gauge +cadvisor_version_info{cadvisorRevision="",cadvisorVersion="",dockerVersion="1.8.2",kernelVersion="3.10.0-229.20.1.el7.x86_64",osVersion="CentOS Linux 7 (Core)"} 1 +# HELP go_gc_duration_seconds A summary of the GC invocation durations. +# TYPE go_gc_duration_seconds summary +go_gc_duration_seconds{quantile="0"} 0.013534896000000001 +go_gc_duration_seconds{quantile="0.25"} 0.02469263 +go_gc_duration_seconds{quantile="0.5"} 0.033727822000000005 +go_gc_duration_seconds{quantile="0.75"} 0.03840335 +go_gc_duration_seconds{quantile="1"} 0.049956604 +go_gc_duration_seconds_sum 1970.341293002 +go_gc_duration_seconds_count 65952 +# HELP http_request_duration_microseconds The HTTP request latencies in microseconds. +# TYPE http_request_duration_microseconds summary +http_request_duration_microseconds{handler="prometheus",quantile="0.5"} 552048.506 +http_request_duration_microseconds{handler="prometheus",quantile="0.9"} 5.876804288e+06 +http_request_duration_microseconds{handler="prometheus",quantile="0.99"} 5.876804288e+06 +http_request_duration_microseconds_sum{handler="prometheus"} 1.8909097205e+07 +http_request_duration_microseconds_count{handler="prometheus"} 9 +# HELP get_token_fail_count Counter of failed Token() requests to the alternate token source +# TYPE get_token_fail_count counter +get_token_fail_count 0 +# HELP apiserver_request_latencies Response latency distribution in microseconds for each verb, resource and client. +# TYPE apiserver_request_latencies histogram +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="125000"} 1994 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="250000"} 1997 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="500000"} 2000 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="1e+06"} 2005 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="2e+06"} 2012 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="4e+06"} 2017 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="8e+06"} 2024 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="+Inf"} 2025 +apiserver_request_latencies_sum{resource="bindings",verb="POST"} 1.02726334e+08 +apiserver_request_latencies_count{resource="bindings",verb="POST"} 2025 +` + +const prometheusMulti = ` +cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +` + +const prometheusMultiSomeInvalid = ` +cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +cpu,cpu=cpu3, host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +cpu,cpu=cpu4 , usage_idle=99,usage_busy=1 +cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +` + +func TestParseValidPrometheus(t *testing.T) { + parser := PrometheusParser{} + + // Gauge value + metrics, err := parser.Parse([]byte(validUniqueGauge)) + assert.NoError(t, err) + assert.Len(t, metrics, 1) + assert.Equal(t, "cadvisor_version_info", metrics[0].Name()) + assert.Equal(t, map[string]interface{}{ + "gauge": float64(1), + }, metrics[0].Fields()) + assert.Equal(t, map[string]string{ + "osVersion": "CentOS Linux 7 (Core)", + "dockerVersion": "1.8.2", + "kernelVersion": "3.10.0-229.20.1.el7.x86_64", + }, metrics[0].Tags()) + + // Counter value + //parser.SetDefaultTags(map[string]string{"mytag": "mytagvalue"}) + metrics, err = parser.Parse([]byte(validUniqueCounter)) + assert.NoError(t, err) + assert.Len(t, metrics, 1) + assert.Equal(t, "get_token_fail_count", metrics[0].Name()) + assert.Equal(t, map[string]interface{}{ + "counter": float64(0), + }, metrics[0].Fields()) + assert.Equal(t, map[string]string{}, metrics[0].Tags()) + + // Summary data + //parser.SetDefaultTags(map[string]string{}) + metrics, err = parser.Parse([]byte(validUniqueSummary)) + assert.NoError(t, err) + assert.Len(t, metrics, 1) + assert.Equal(t, "http_request_duration_microseconds", metrics[0].Name()) + assert.Equal(t, map[string]interface{}{ + "0.5": 552048.506, + "0.9": 5.876804288e+06, + "0.99": 5.876804288e+06, + "count": 0.0, + "sum": 1.8909097205e+07, + }, metrics[0].Fields()) + assert.Equal(t, map[string]string{"handler": "prometheus"}, metrics[0].Tags()) + + // histogram data + metrics, err = parser.Parse([]byte(validUniqueHistogram)) + assert.NoError(t, err) + assert.Len(t, metrics, 1) + assert.Equal(t, "apiserver_request_latencies", metrics[0].Name()) + assert.Equal(t, map[string]interface{}{ + "500000": 2000.0, + "count": 2025.0, + "sum": 0.0, + "250000": 1997.0, + "2e+06": 2012.0, + "4e+06": 2017.0, + "8e+06": 2024.0, + "+Inf": 2025.0, + "125000": 1994.0, + "1e+06": 2005.0, + }, metrics[0].Fields()) + assert.Equal(t, + map[string]string{"verb": "POST", "resource": "bindings"}, + metrics[0].Tags()) + +} + +func TestParseLineInvalidPrometheus(t *testing.T) { + parser := PrometheusParser{} + metric, err := parser.ParseLine(validUniqueLine) + assert.NotNil(t, err) + assert.Nil(t, metric) + +} diff --git a/plugins/inputs/prometheus/prometheus.go b/plugins/inputs/prometheus/prometheus.go index aea5c5f9512b4..1ecd998d9c27a 100644 --- a/plugins/inputs/prometheus/prometheus.go +++ b/plugins/inputs/prometheus/prometheus.go @@ -5,20 +5,29 @@ import ( "fmt" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" - "github.com/prometheus/common/expfmt" - "github.com/prometheus/common/model" - "io" +// "github.com/influxdata/telegraf/plugins/inputs/prometheus/parser" + "io/ioutil" "net/http" + "regexp" "sync" + "time" ) type Prometheus struct { - Urls []string + Urls []string + Includes []string + Excludes []string } var sampleConfig = ` ### An array of urls to scrape metrics from. urls = ["http://localhost:9100/metrics"] + # List of regexp to include metric only if the + # metric name match with on of the following pattern + includes = [""] + # List of regexp to exclude metric only if the + # metric name match with on of the following pattern + excludes = ["containers_.*"] ` func (r *Prometheus) SampleConfig() string { @@ -34,6 +43,12 @@ var ErrProtocolError = errors.New("prometheus protocol error") // Reads stats from all configured servers accumulates stats. // Returns one of the errors encountered while gather stats (if any). func (g *Prometheus) Gather(acc telegraf.Accumulator) error { + + if len(g.Includes) > 0 && len(g.Excludes) > 0 { + return errors.New(`You can not set includes and + excludes for the same input`) + } + var wg sync.WaitGroup var outerr error @@ -52,6 +67,7 @@ func (g *Prometheus) Gather(acc telegraf.Accumulator) error { } func (g *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error { + collectDate := time.Now() resp, err := http.Get(url) if err != nil { return fmt.Errorf("error making HTTP request to %s: %s", url, err) @@ -60,37 +76,61 @@ func (g *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error { if resp.StatusCode != http.StatusOK { return fmt.Errorf("%s returned HTTP status %s", url, resp.Status) } - format := expfmt.ResponseFormat(resp.Header) - - decoder := expfmt.NewDecoder(resp.Body, format) - options := &expfmt.DecodeOptions{ - Timestamp: model.Now(), + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("error reading body: %s", err) } - sampleDecoder := &expfmt.SampleDecoder{ - Dec: decoder, - Opts: options, + + // Headers + headers := make(map[string]string) + for key, value := range headers { + headers[key] = value } - for { - var samples model.Vector - err := sampleDecoder.Decode(&samples) - if err == io.EOF { - break - } else if err != nil { - return fmt.Errorf("error getting processing samples for %s: %s", - url, err) - } - for _, sample := range samples { - tags := make(map[string]string) - for key, value := range sample.Metric { - if key == model.MetricNameLabel { - continue + // Prepare Prometheus parser config + promparser := PrometheusParser{ + PromFormat: headers, + } + + metrics, err := promparser.Parse(body) + if err != nil { + return fmt.Errorf("error getting processing samples for %s: %s", + url, err) + } + // Add (or not) collected metrics + for _, metric := range metrics { + if len(g.Includes) > 0 { + // includes regexp + IncludeMetric: + for _, include := range g.Includes { + r, err := regexp.Compile(include) + if err == nil { + if r.MatchString(metric.Name()) { + acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), collectDate) + break IncludeMetric + } } - tags[string(key)] = string(value) } - acc.Add("prometheus_"+string(sample.Metric[model.MetricNameLabel]), - float64(sample.Value), tags) + } else if len(g.Excludes) > 0 { + // excludes regexp + includeMetric := true + ExcludeMetric: + for _, exclude := range g.Excludes { + r, err := regexp.Compile(exclude) + if err == nil { + if r.MatchString(metric.Name()) { + includeMetric = false + break ExcludeMetric + } + } + } + if includeMetric { + acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), collectDate) + } + } else { + // no includes/excludes regexp + acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), collectDate) } } diff --git a/plugins/inputs/prometheus/prometheus_test.go b/plugins/inputs/prometheus/prometheus_test.go index 2009cbb116a12..7dcfe49f80a8e 100644 --- a/plugins/inputs/prometheus/prometheus_test.go +++ b/plugins/inputs/prometheus/prometheus_test.go @@ -40,16 +40,46 @@ func TestPrometheusGeneratesMetrics(t *testing.T) { err := p.Gather(&acc) require.NoError(t, err) - expected := []struct { - name string - value float64 - tags map[string]string - }{ - {"prometheus_go_gc_duration_seconds_count", 7, map[string]string{}}, - {"prometheus_go_goroutines", 15, map[string]string{}}, + assert.True(t, acc.HasFloatField("go_gc_duration_seconds", "count")) + assert.True(t, acc.HasFloatField("go_goroutines", "gauge")) +} + +func TestPrometheusGeneratesMetricsInclude(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, sampleTextFormat) + })) + defer ts.Close() + + p := &Prometheus{ + Urls: []string{ts.URL}, + Includes: []string{"go_gorout.*"}, } - for _, e := range expected { - assert.True(t, acc.HasFloatField(e.name, "value")) + var acc testutil.Accumulator + + err := p.Gather(&acc) + require.NoError(t, err) + + assert.False(t, acc.HasFloatField("go_gc_duration_seconds", "count")) + assert.True(t, acc.HasFloatField("go_goroutines", "gauge")) +} + +func TestPrometheusGeneratesMetricsExclude(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, sampleTextFormat) + })) + defer ts.Close() + + p := &Prometheus{ + Urls: []string{ts.URL}, + Excludes: []string{".*_goroutines"}, } + + var acc testutil.Accumulator + + err := p.Gather(&acc) + require.NoError(t, err) + + assert.True(t, acc.HasFloatField("go_gc_duration_seconds", "count")) + assert.False(t, acc.HasFloatField("go_goroutines", "gauge")) } diff --git a/plugins/outputs/prometheus_client/prometheus_client_test.go b/plugins/outputs/prometheus_client/prometheus_client_test.go index 16414a8e43f68..15ed7b7e451f8 100644 --- a/plugins/outputs/prometheus_client/prometheus_client_test.go +++ b/plugins/outputs/prometheus_client/prometheus_client_test.go @@ -54,7 +54,7 @@ func TestPrometheusWritePointEmptyTag(t *testing.T) { require.NoError(t, p.Gather(&acc)) for _, e := range expected { - acc.AssertContainsFields(t, "prometheus_"+e.name, + acc.AssertContainsFields(t, e.name, map[string]interface{}{"value": e.value}) } @@ -84,7 +84,7 @@ func TestPrometheusWritePointEmptyTag(t *testing.T) { require.NoError(t, p.Gather(&acc)) for _, e := range expected2 { - acc.AssertContainsFields(t, "prometheus_"+e.name, + acc.AssertContainsFields(t, e.name, map[string]interface{}{"value": e.value}) } } diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index 982b6bb80c511..daa1c0e4f4c3b 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -38,7 +38,7 @@ type Parser interface { // Config is a struct that covers the data types needed for all parser types, // and can be used to instantiate _any_ of the parsers. type Config struct { - // Dataformat can be one of: json, influx, graphite + // Dataformat can be one of: json, influx, graphite, prometheus DataFormat string // Separator only applied to Graphite data.