Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: prometheus consul service auto discovery #872

Merged
merged 1 commit into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 120 additions & 5 deletions inputs/prometheus/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,28 @@

import (
"bytes"
"context"
"fmt"
"log"
"net/url"
"strings"
"sync"
"text/template"
"time"

"flashcat.cloud/categraf/config"

"github.com/hashicorp/consul/api"
)

type ConsulConfig struct {
// Address of the Consul agent. The address must contain a hostname or an IP address
// and optionally a port (format: "host:port").
Enabled bool `toml:"enabled"`
Agent string `toml:"agent"`
Queries []*ConsulQuery `toml:"query"`
Catalog *api.Catalog `toml:"-"`
Enabled bool `toml:"enabled"`
Agent string `toml:"agent"`
QueryInterval config.Duration `toml:"query_interval"`
Queries []*ConsulQuery `toml:"query"`
Catalog *api.Catalog `toml:"-"`
}

// One Consul service discovery query
Expand All @@ -40,6 +46,9 @@

serviceURLTemplate *template.Template
serviceExtraTagsTemplate map[string]*template.Template

// Store last error status and change log level depending on repeated occurrence
lastQueryFailed bool
}

func (ins *Instance) InitConsulClient() error {
Expand Down Expand Up @@ -85,7 +94,7 @@
return nil
}

func (ins *Instance) UrlsFromConsul() ([]ScrapeUrl, error) {
func (ins *Instance) UrlsFromConsul(ctx context.Context) ([]ScrapeUrl, error) {
if !ins.ConsulConfig.Enabled {
return []ScrapeUrl{}, nil
}
Expand Down Expand Up @@ -135,9 +144,115 @@
}
}

if ins.firstRun {
var wg sync.WaitGroup
consulAPIConfig := api.DefaultConfig()
if ins.ConsulConfig.Agent != "" {
consulAPIConfig.Address = ins.ConsulConfig.Agent
}

consul, err := api.NewClient(consulAPIConfig)
if err != nil {
return []ScrapeUrl{}, fmt.Errorf("cannot connect to the Consul agent: %w", err)
}
catalog := consul.Catalog()

wg.Add(1)
go func() {
// Store last error status and change log level depending on repeated occurrence
var refreshFailed = false
defer wg.Done()
err := ins.refreshConsulServices(catalog)
if err != nil {
refreshFailed = true
log.Printf("Unable to refresh Consul services: %v\n", err)
}
for {
select {
case <-ctx.Done():
return
case <-time.After(time.Duration(ins.ConsulConfig.QueryInterval)):
err := ins.refreshConsulServices(catalog)
if err != nil {
message := fmt.Sprintf("Unable to refresh Consul services: %v", err)
if refreshFailed {
log.Println("E!", message)
} else {
log.Println("W!", message)
}
refreshFailed = true
} else if refreshFailed {
refreshFailed = false
log.Println("Successfully refreshed Consul services after previous errors")
}
}
}
}()
ins.firstRun = false
wg.Wait()
}

return returls, nil
}

func (ins *Instance) refreshConsulServices(c *api.Catalog) error {
consulServiceURLs := make(map[string]ScrapeUrl)

if ins.DebugMod {

Check failure on line 201 in inputs/prometheus/consul.go

View workflow job for this annotation

GitHub Actions / Code Analysis

ins.DebugMod undefined (type *Instance has no field or method DebugMod) (typecheck)
log.Println("Refreshing Consul services")
}

for _, q := range ins.ConsulConfig.Queries {
queryOptions := api.QueryOptions{}
if q.ServiceDc != "" {
queryOptions.Datacenter = q.ServiceDc
}

// Request services from Consul
consulServices, _, err := c.Service(q.ServiceName, q.ServiceTag, &queryOptions)
if err != nil {
return err
}
if len(consulServices) == 0 {
if ins.DebugMod {

Check failure on line 217 in inputs/prometheus/consul.go

View workflow job for this annotation

GitHub Actions / Code Analysis

ins.DebugMod undefined (type *Instance has no field or method DebugMod) (typecheck)
log.Printf("Queried Consul for Service (%s, %s) but did not find any instances\n", q.ServiceName, q.ServiceTag)
}
continue
}
if ins.DebugMod {

Check failure on line 222 in inputs/prometheus/consul.go

View workflow job for this annotation

GitHub Actions / Code Analysis

ins.DebugMod undefined (type *Instance has no field or method DebugMod) (typecheck)
log.Printf("Queried Consul for Service (%s, %s) and found %d instances\n", q.ServiceName, q.ServiceTag, len(consulServices))
}

for _, consulService := range consulServices {
uaa, err := ins.getConsulServiceURL(q, consulService)
if err != nil {
message := fmt.Sprintf("Unable to get scrape URLs from Consul for Service (%s, %s): %s", q.ServiceName, q.ServiceTag, err)
if q.lastQueryFailed {
log.Println("E!", message)
} else {
log.Println("W!", message)
}
q.lastQueryFailed = true
break
}
if q.lastQueryFailed {
log.Printf("Created scrape URLs from Consul for Service (%s, %s)\n", q.ServiceName, q.ServiceTag)
}
q.lastQueryFailed = false
log.Printf("Adding scrape URL from Consul for Service (%s, %s): %s\n", q.ServiceName, q.ServiceTag, uaa.URL.String())
consulServiceURLs[uaa.URL.String()] = *uaa
}
}

ins.lock.Lock()
for _, u := range consulServiceURLs {
ins.URLs = append(ins.URLs, u.URL.String())
}
ins.lock.Unlock()

return nil
}

func (ins *Instance) getConsulServiceURL(q *ConsulQuery, s *api.CatalogService) (*ScrapeUrl, error) {
var buffer bytes.Buffer
buffer.Reset()
Expand Down
16 changes: 13 additions & 3 deletions inputs/prometheus/prometheus.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package prometheus

import (
"context"
"io"
"log"
"net/http"
Expand Down Expand Up @@ -42,6 +43,9 @@ type Instance struct {

ignoreMetricsFilter filter.Filter
ignoreLabelKeysFilter filter.Filter
cancel context.CancelFunc
lock sync.Mutex
firstRun bool
tls.ClientConfig
client *http.Client
}
Expand Down Expand Up @@ -76,6 +80,7 @@ func (ins *Instance) Init() error {
if ins.Timeout <= 0 {
ins.Timeout = config.Duration(time.Second * 3)
}
ins.firstRun = true

client, err := ins.createHTTPClient()
if err != nil {
Expand Down Expand Up @@ -152,6 +157,7 @@ func (p *Prometheus) GetInstances() []inputs.Instance {
}

func (ins *Instance) Gather(slist *types.SampleList) {
var ctx context.Context
urlwg := new(sync.WaitGroup)
defer urlwg.Wait()

Expand All @@ -167,7 +173,8 @@ func (ins *Instance) Gather(slist *types.SampleList) {
go ins.gatherUrl(urlwg, slist, ScrapeUrl{URL: u, Tags: map[string]string{}})
}

urls, err := ins.UrlsFromConsul()
ctx, ins.cancel = context.WithCancel(context.Background())
urls, err := ins.UrlsFromConsul(ctx)
if err != nil {
log.Println("E! failed to query urls from consul:", err)
return
Expand Down Expand Up @@ -230,8 +237,7 @@ func (ins *Instance) gatherUrl(urlwg *sync.WaitGroup, slist *types.SampleList, u

slist.PushFront(types.NewSample("", "up", 1, labels))

parser := prometheus.NewParser(ins.NamePrefix, labels, res.Header, ins.DuplicationAllowed,
ins.ignoreMetricsFilter, ins.ignoreLabelKeysFilter)
parser := prometheus.NewParser(ins.NamePrefix, labels, res.Header, ins.DuplicationAllowed, ins.ignoreMetricsFilter, ins.ignoreLabelKeysFilter)
if err = parser.Parse(body, slist); err != nil {
log.Println("E! failed to parse response body, url:", u.String(), "error:", err)
}
Expand Down Expand Up @@ -262,3 +268,7 @@ func (ins *Instance) setHeaders(req *http.Request) {
req.Header.Set(ins.Headers[i], ins.Headers[i+1])
}
}

func (ins *Instance) Drop() {
ins.cancel()
}
Loading