Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

For Prometheus Input add ability to query Consul Service catalog #5464

Merged
merged 1 commit into from
Jul 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions plugins/inputs/prometheus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,19 @@ in Prometheus format.
# field selector to target pods
# eg. To scrape pods on a specific node
# kubernetes_field_selector = "spec.nodeName=$HOSTNAME"

## Scrape Services available in Consul Catalog
# [inputs.prometheus.consul]
# enabled = true
# agent = "http://localhost:8500"
# query_interval = "5m"

# [[inputs.prometheus.consul.query]]
# name = "a service name"
# tag = "a service tag"
# url = 'http://{{if ne .ServiceAddress ""}}{{.ServiceAddress}}{{else}}{{.Address}}{{end}}:{{.ServicePort}}/{{with .ServiceMeta.metrics_path}}{{.}}{{else}}metrics{{end}}'
# [inputs.prometheus.consul.query.tags]
# host = "{{.Node}}"

## Use bearer token for authorization. ('bearer_token' takes priority)
# bearer_token = "/path/to/bearer/token"
Expand Down Expand Up @@ -117,6 +130,26 @@ env:

If using node level scrape scope, `pod_scrape_interval` specifies how often (in seconds) the pod list for scraping should updated. If not specified, the default is 60 seconds.

#### Consul Service Discovery

Enabling this option and configuring consul `agent` url will allow the plugin to query
consul catalog for available services. Using `query_interval` the plugin will periodically
query the consul catalog for services with `name` and `tag` and refresh the list of scraped urls.
It can use the information from the catalog to build the scraped url and additional tags from a template.

Multiple consul queries can be configured, each for different service.
The following example fields can be used in url or tag templates:
* Node
* Address
* NodeMeta
* ServicePort
* ServiceAddress
* ServiceTags
* ServiceMeta

For full list of available fields and their type see struct CatalogService in
https://github.com/hashicorp/consul/blob/master/api/catalog.go

#### Bearer Token

If set, the file specified by the `bearer_token` parameter will be read on
Expand Down
208 changes: 208 additions & 0 deletions plugins/inputs/prometheus/consul.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
package prometheus

import (
"bytes"
"context"
"fmt"
"net/url"
"strings"
"text/template"
"time"

"github.com/hashicorp/consul/api"
"github.com/influxdata/telegraf/config"
)

type ConsulConfig struct {
// Address of the Consul agent. The address must contain a hostname or an IP address
// and optionally a port (format: "host:port").
Enabled bool `toml:"enabled"`
Agent string `toml:"agent"`
QueryInterval config.Duration `toml:"query_interval"`
Queries []*ConsulQuery `toml:"query"`
}

// One Consul service discovery query
type ConsulQuery struct {
// A name of the searched services (not ID)
ServiceName string `toml:"name"`

// A tag of the searched services
ServiceTag string `toml:"tag"`

// A DC of the searched services
ServiceDc string `toml:"dc"`

// A template URL of the Prometheus gathering interface. The hostname part
// of the URL will be replaced by discovered address and port.
ServiceURL string `toml:"url"`

// Extra tags to add to metrics found in Consul
ServiceExtraTags map[string]string `toml:"tags"`

serviceURLTemplate *template.Template
serviceExtraTagsTemplate map[string]*template.Template

// Store last error status and change log level depending on repeated occurence
lastQueryFailed bool
}

func (p *Prometheus) startConsul(ctx context.Context) error {
consulAPIConfig := api.DefaultConfig()
if p.ConsulConfig.Agent != "" {
consulAPIConfig.Address = p.ConsulConfig.Agent
}

consul, err := api.NewClient(consulAPIConfig)
if err != nil {
return fmt.Errorf("cannot connect to the Consul agent: %v", err)
}

// Parse the template for metrics URL, drop queries with template parse errors
i := 0
for _, q := range p.ConsulConfig.Queries {
serviceURLTemplate, err := template.New("URL").Parse(q.ServiceURL)
if err != nil {
p.Log.Errorf("Could not parse the Consul query URL template (%s), skipping it. Error: %s", q.ServiceURL, err)
continue
}
q.serviceURLTemplate = serviceURLTemplate

// Allow to use join function in tags
templateFunctions := template.FuncMap{"join": strings.Join}
// Parse the tag value templates
q.serviceExtraTagsTemplate = make(map[string]*template.Template)
for tagName, tagTemplateString := range q.ServiceExtraTags {
tagTemplate, err := template.New(tagName).Funcs(templateFunctions).Parse(tagTemplateString)
if err != nil {
p.Log.Errorf("Could not parse the Consul query Extra Tag template (%s), skipping it. Error: %s", tagTemplateString, err)
continue
}
q.serviceExtraTagsTemplate[tagName] = tagTemplate
}
p.ConsulConfig.Queries[i] = q
i++
}
// Prevent memory leak by erasing truncated values
for j := i; j < len(p.ConsulConfig.Queries); j++ {
p.ConsulConfig.Queries[j] = nil
}
p.ConsulConfig.Queries = p.ConsulConfig.Queries[:i]

catalog := consul.Catalog()

p.wg.Add(1)
go func() {
// Store last error status and change log level depending on repeated occurence
var refreshFailed = false
defer p.wg.Done()
err := p.refreshConsulServices(catalog)
if err != nil {
refreshFailed = true
p.Log.Errorf("Unable to refreh Consul services: %v", err)
}
for {
select {
case <-ctx.Done():
return
case <-time.After(time.Duration(p.ConsulConfig.QueryInterval)):
err := p.refreshConsulServices(catalog)
if err != nil {
message := fmt.Sprintf("Unable to refreh Consul services: %v", err)
if refreshFailed {
p.Log.Debug(message)
} else {
p.Log.Warn(message)
}
refreshFailed = true
} else if refreshFailed {
refreshFailed = false
p.Log.Info("Successfully refreshed Consul services after previous errors")
}
}
}
}()

return nil
}

func (p *Prometheus) refreshConsulServices(c *api.Catalog) error {
mmolnar marked this conversation as resolved.
Show resolved Hide resolved
consulServiceURLs := make(map[string]URLAndAddress)

p.Log.Debugf("Refreshing Consul services")

for _, q := range p.ConsulConfig.Queries {
queryOptions := api.QueryOptions{}
if q.ServiceDc != "" {
queryOptions.Datacenter = q.ServiceDc
}

// Request services from Consul
consulServices, _, err := c.Service(q.ServiceName, q.ServiceTag, &queryOptions)
if err != nil {
return err
}
if len(consulServices) == 0 {
p.Log.Debugf("Queried Consul for Service (%s, %s) but did not find any instances", q.ServiceName, q.ServiceTag)
continue
}
p.Log.Debugf("Queried Consul for Service (%s, %s) and found %d instances", q.ServiceName, q.ServiceTag, len(consulServices))

for _, consulService := range consulServices {
uaa, err := p.getConsulServiceURL(q, consulService)
if err != nil {
message := fmt.Sprintf("Unable to get scrape URLs from Consul for Service (%s, %s): %s", q.ServiceName, q.ServiceTag, err)
if q.lastQueryFailed {
p.Log.Debug(message)
} else {
p.Log.Warn(message)
}
q.lastQueryFailed = true
break
}
if q.lastQueryFailed {
p.Log.Infof("Created scrape URLs from Consul for Service (%s, %s)", q.ServiceName, q.ServiceTag)
}
q.lastQueryFailed = false
p.Log.Debugf("Adding scrape URL from Consul for Service (%s, %s): %s", q.ServiceName, q.ServiceTag, uaa.URL.String())
consulServiceURLs[uaa.URL.String()] = *uaa
}
}

p.lock.Lock()
p.consulServices = consulServiceURLs
p.lock.Unlock()

return nil
}

func (p *Prometheus) getConsulServiceURL(q *ConsulQuery, s *api.CatalogService) (*URLAndAddress, error) {
var buffer bytes.Buffer
buffer.Reset()
err := q.serviceURLTemplate.Execute(&buffer, s)
if err != nil {
return nil, err
}
serviceURL, err := url.Parse(buffer.String())
if err != nil {
return nil, err
}

extraTags := make(map[string]string)
for tagName, tagTemplate := range q.serviceExtraTagsTemplate {
buffer.Reset()
err = tagTemplate.Execute(&buffer, s)
if err != nil {
return nil, err
}
extraTags[tagName] = buffer.String()
}

p.Log.Debugf("Will scrape metrics from Consul Service %s", serviceURL.String())

return &URLAndAddress{
URL: serviceURL,
OriginalURL: serviceURL,
Tags: extraTags,
}, nil
}
5 changes: 1 addition & 4 deletions plugins/inputs/prometheus/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"net/url"
"os/user"
"path/filepath"
"sync"
"time"

"github.com/ghodss/yaml"
Expand Down Expand Up @@ -55,7 +54,7 @@ func loadClient(kubeconfigPath string) (*kubernetes.Clientset, error) {
return kubernetes.NewForConfig(&config)
}

func (p *Prometheus) start(ctx context.Context) error {
func (p *Prometheus) startK8s(ctx context.Context) error {
config, err := rest.InClusterConfig()
if err != nil {
return fmt.Errorf("failed to get InClusterConfig - %v", err)
Expand All @@ -77,8 +76,6 @@ func (p *Prometheus) start(ctx context.Context) error {
}
}

p.wg = sync.WaitGroup{}

p.wg.Add(1)
go func() {
defer p.wg.Done()
Expand Down
45 changes: 38 additions & 7 deletions plugins/inputs/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ type Prometheus struct {
// Field Selector/s for Kubernetes
KubernetesFieldSelector string `toml:"kubernetes_field_selector"`

// Consul SD configuration
ConsulConfig ConsulConfig `toml:"consul"`

// Bearer Token authorization file path
BearerToken string `toml:"bearer_token"`
BearerTokenString string `toml:"bearer_token_string"`
Expand Down Expand Up @@ -77,6 +80,9 @@ type Prometheus struct {
podLabelSelector labels.Selector
podFieldSelector fields.Selector
isNodeScrapeScope bool

// List of consul services to scrape
consulServices map[string]URLAndAddress
}

var sampleConfig = `
Expand Down Expand Up @@ -127,6 +133,19 @@ var sampleConfig = `
# eg. To scrape pods on a specific node
# kubernetes_field_selector = "spec.nodeName=$HOSTNAME"

## Scrape Services available in Consul Catalog
# [inputs.prometheus.consul]
# enabled = true
# agent = "http://localhost:8500"
# query_interval = "5m"

# [[inputs.prometheus.consul.query]]
# name = "a service name"
# tag = "a service tag"
# url = 'http://{{if ne .ServiceAddress ""}}{{.ServiceAddress}}{{else}}{{.Address}}{{end}}:{{.ServicePort}}/{{with .ServiceMeta.metrics_path}}{{.}}{{else}}metrics{{end}}'
# [inputs.prometheus.consul.query.tags]
# host = "{{.Node}}"

## Use bearer token for authorization. ('bearer_token' takes priority)
# bearer_token = "/path/to/bearer/token"
## OR
Expand Down Expand Up @@ -238,6 +257,10 @@ func (p *Prometheus) GetAllURLs() (map[string]URLAndAddress, error) {

p.lock.Lock()
defer p.lock.Unlock()
// add all services collected from consul
for k, v := range p.consulServices {
allURLs[k] = v
}
// loop through all pods scraped via the prometheus annotation on the pods
for k, v := range p.kubernetesPods {
allURLs[k] = v
Expand Down Expand Up @@ -463,20 +486,27 @@ func fieldSelectorIsSupported(fieldSelector fields.Selector) (bool, string) {
return true, ""
}

// Start will start the Kubernetes scraping if enabled in the configuration
// Start will start the Kubernetes and/or Consul scraping if enabled in the configuration
func (p *Prometheus) Start(_ telegraf.Accumulator) error {
var ctx context.Context
p.wg = sync.WaitGroup{}
ctx, p.cancel = context.WithCancel(context.Background())

if p.ConsulConfig.Enabled && len(p.ConsulConfig.Queries) > 0 {
if err := p.startConsul(ctx); err != nil {
return err
}
}
if p.MonitorPods {
var ctx context.Context
ctx, p.cancel = context.WithCancel(context.Background())
return p.start(ctx)
if err := p.startK8s(ctx); err != nil {
return err
}
}
return nil
}

func (p *Prometheus) Stop() {
if p.MonitorPods {
p.cancel()
}
p.cancel()
p.wg.Wait()
}

Expand All @@ -485,6 +515,7 @@ func init() {
return &Prometheus{
ResponseTimeout: config.Duration(time.Second * 3),
kubernetesPods: map[string]URLAndAddress{},
consulServices: map[string]URLAndAddress{},
URLTag: "url",
}
})
Expand Down