diff --git a/inputs/prometheus/consul.go b/inputs/prometheus/consul.go index 1be1142b..981bdf46 100644 --- a/inputs/prometheus/consul.go +++ b/inputs/prometheus/consul.go @@ -2,11 +2,16 @@ package prometheus import ( "bytes" + "context" "fmt" "log" "net/url" "strings" + "sync" "text/template" + "time" + + "flashcat.cloud/categraf/config" "github.com/hashicorp/consul/api" ) @@ -14,10 +19,11 @@ import ( type ConsulConfig struct { // Address of the Consul agent. The address must contain a hostname or an IP address // and optionally a port (format: "host:port"). - Enabled bool `toml:"enabled"` - Agent string `toml:"agent"` - Queries []*ConsulQuery `toml:"query"` - Catalog *api.Catalog `toml:"-"` + Enabled bool `toml:"enabled"` + Agent string `toml:"agent"` + QueryInterval config.Duration `toml:"query_interval"` + Queries []*ConsulQuery `toml:"query"` + Catalog *api.Catalog `toml:"-"` } // One Consul service discovery query @@ -40,6 +46,9 @@ type ConsulQuery struct { serviceURLTemplate *template.Template serviceExtraTagsTemplate map[string]*template.Template + + // Store last error status and change log level depending on repeated occurrence + lastQueryFailed bool } func (ins *Instance) InitConsulClient() error { @@ -85,7 +94,7 @@ func (ins *Instance) InitConsulClient() error { return nil } -func (ins *Instance) UrlsFromConsul() ([]ScrapeUrl, error) { +func (ins *Instance) UrlsFromConsul(ctx context.Context) ([]ScrapeUrl, error) { if !ins.ConsulConfig.Enabled { return []ScrapeUrl{}, nil } @@ -135,9 +144,115 @@ func (ins *Instance) UrlsFromConsul() ([]ScrapeUrl, error) { } } + if ins.firstRun { + var wg sync.WaitGroup + consulAPIConfig := api.DefaultConfig() + if ins.ConsulConfig.Agent != "" { + consulAPIConfig.Address = ins.ConsulConfig.Agent + } + + consul, err := api.NewClient(consulAPIConfig) + if err != nil { + return []ScrapeUrl{}, fmt.Errorf("cannot connect to the Consul agent: %w", err) + } + catalog := consul.Catalog() + + wg.Add(1) + go func() { + // Store last error status and change log level depending on repeated occurrence + var refreshFailed = false + defer wg.Done() + err := ins.refreshConsulServices(catalog) + if err != nil { + refreshFailed = true + log.Printf("Unable to refresh Consul services: %v\n", err) + } + for { + select { + case <-ctx.Done(): + return + case <-time.After(time.Duration(ins.ConsulConfig.QueryInterval)): + err := ins.refreshConsulServices(catalog) + if err != nil { + message := fmt.Sprintf("Unable to refresh Consul services: %v", err) + if refreshFailed { + log.Println("E!", message) + } else { + log.Println("W!", message) + } + refreshFailed = true + } else if refreshFailed { + refreshFailed = false + log.Println("Successfully refreshed Consul services after previous errors") + } + } + } + }() + ins.firstRun = false + wg.Wait() + } + return returls, nil } +func (ins *Instance) refreshConsulServices(c *api.Catalog) error { + consulServiceURLs := make(map[string]ScrapeUrl) + + if ins.DebugMod { + log.Println("Refreshing Consul services") + } + + for _, q := range ins.ConsulConfig.Queries { + queryOptions := api.QueryOptions{} + if q.ServiceDc != "" { + queryOptions.Datacenter = q.ServiceDc + } + + // Request services from Consul + consulServices, _, err := c.Service(q.ServiceName, q.ServiceTag, &queryOptions) + if err != nil { + return err + } + if len(consulServices) == 0 { + if ins.DebugMod { + log.Printf("Queried Consul for Service (%s, %s) but did not find any instances\n", q.ServiceName, q.ServiceTag) + } + continue + } + if ins.DebugMod { + log.Printf("Queried Consul for Service (%s, %s) and found %d instances\n", q.ServiceName, q.ServiceTag, len(consulServices)) + } + + for _, consulService := range consulServices { + uaa, err := ins.getConsulServiceURL(q, consulService) + if err != nil { + message := fmt.Sprintf("Unable to get scrape URLs from Consul for Service (%s, %s): %s", q.ServiceName, q.ServiceTag, err) + if q.lastQueryFailed { + log.Println("E!", message) + } else { + log.Println("W!", message) + } + q.lastQueryFailed = true + break + } + if q.lastQueryFailed { + log.Printf("Created scrape URLs from Consul for Service (%s, %s)\n", q.ServiceName, q.ServiceTag) + } + q.lastQueryFailed = false + log.Printf("Adding scrape URL from Consul for Service (%s, %s): %s\n", q.ServiceName, q.ServiceTag, uaa.URL.String()) + consulServiceURLs[uaa.URL.String()] = *uaa + } + } + + ins.lock.Lock() + for _, u := range consulServiceURLs { + ins.URLs = append(ins.URLs, u.URL.String()) + } + ins.lock.Unlock() + + return nil +} + func (ins *Instance) getConsulServiceURL(q *ConsulQuery, s *api.CatalogService) (*ScrapeUrl, error) { var buffer bytes.Buffer buffer.Reset() diff --git a/inputs/prometheus/prometheus.go b/inputs/prometheus/prometheus.go index aa8890bb..31d3446c 100644 --- a/inputs/prometheus/prometheus.go +++ b/inputs/prometheus/prometheus.go @@ -1,6 +1,7 @@ package prometheus import ( + "context" "io" "log" "net/http" @@ -42,6 +43,9 @@ type Instance struct { ignoreMetricsFilter filter.Filter ignoreLabelKeysFilter filter.Filter + cancel context.CancelFunc + lock sync.Mutex + firstRun bool tls.ClientConfig client *http.Client } @@ -76,6 +80,7 @@ func (ins *Instance) Init() error { if ins.Timeout <= 0 { ins.Timeout = config.Duration(time.Second * 3) } + ins.firstRun = true client, err := ins.createHTTPClient() if err != nil { @@ -152,6 +157,7 @@ func (p *Prometheus) GetInstances() []inputs.Instance { } func (ins *Instance) Gather(slist *types.SampleList) { + var ctx context.Context urlwg := new(sync.WaitGroup) defer urlwg.Wait() @@ -167,7 +173,8 @@ func (ins *Instance) Gather(slist *types.SampleList) { go ins.gatherUrl(urlwg, slist, ScrapeUrl{URL: u, Tags: map[string]string{}}) } - urls, err := ins.UrlsFromConsul() + ctx, ins.cancel = context.WithCancel(context.Background()) + urls, err := ins.UrlsFromConsul(ctx) if err != nil { log.Println("E! failed to query urls from consul:", err) return @@ -230,8 +237,7 @@ func (ins *Instance) gatherUrl(urlwg *sync.WaitGroup, slist *types.SampleList, u slist.PushFront(types.NewSample("", "up", 1, labels)) - parser := prometheus.NewParser(ins.NamePrefix, labels, res.Header, ins.DuplicationAllowed, - ins.ignoreMetricsFilter, ins.ignoreLabelKeysFilter) + parser := prometheus.NewParser(ins.NamePrefix, labels, res.Header, ins.DuplicationAllowed, ins.ignoreMetricsFilter, ins.ignoreLabelKeysFilter) if err = parser.Parse(body, slist); err != nil { log.Println("E! failed to parse response body, url:", u.String(), "error:", err) } @@ -262,3 +268,7 @@ func (ins *Instance) setHeaders(req *http.Request) { req.Header.Set(ins.Headers[i], ins.Headers[i+1]) } } + +func (ins *Instance) Drop() { + ins.cancel() +}