Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use the new scrape config endpoint for the prometheus receiver #14464

Merged
4 changes: 2 additions & 2 deletions receiver/prometheusreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ func (cfg *Config) Validate() error {
}

func (cfg *Config) validatePromConfig(promConfig *promconfig.Config) error {
if len(promConfig.ScrapeConfigs) == 0 {
return errors.New("no Prometheus scrape_configs")
if len(promConfig.ScrapeConfigs) == 0 && cfg.TargetAllocator == nil {
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
return errors.New("no Prometheus scrape_configs or target_allocator set")
}

// Reject features that Prometheus supports but that the receiver doesn't support:
Expand Down
140 changes: 97 additions & 43 deletions receiver/prometheusreceiver/metrics_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,28 @@
package prometheusreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver"

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"os"
"regexp"
"sync"
"time"

"github.com/go-kit/log"
"github.com/mitchellh/hashstructure/v2"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
promHTTP "github.com/prometheus/prometheus/discovery/http"
"github.com/prometheus/prometheus/scrape"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.uber.org/zap"
"gopkg.in/yaml.v2"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/internal"
)
Expand All @@ -41,11 +46,19 @@ const (
gcIntervalDelta = 1 * time.Minute
)

// closeOnce ensures that the scrape manager is started after the discovery manager
type closeOnce struct {
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
C chan struct{}
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
once sync.Once
Close func()
}

// pReceiver is the type that provides Prometheus scraper/receiver functionality.
type pReceiver struct {
cfg *Config
consumer consumer.Metrics
cancelFunc context.CancelFunc
cfg *Config
consumer consumer.Metrics
cancelFunc context.CancelFunc
reloadReady *closeOnce

settings component.ReceiverCreateSettings
scrapeManager *scrape.Manager
Expand All @@ -59,10 +72,19 @@ type linkJSON struct {

// New creates a new prometheus.Receiver reference.
func newPrometheusReceiver(set component.ReceiverCreateSettings, cfg *Config, next consumer.Metrics) *pReceiver {
reloadReady := &closeOnce{
C: make(chan struct{}),
}
reloadReady.Close = func() {
reloadReady.once.Do(func() {
close(reloadReady.C)
})
}
pr := &pReceiver{
cfg: cfg,
consumer: next,
settings: set,
cfg: cfg,
consumer: next,
settings: set,
reloadReady: reloadReady,
}
return pr
}
Expand Down Expand Up @@ -92,36 +114,48 @@ func (r *pReceiver) Start(_ context.Context, host component.Host) error {

allocConf := r.cfg.TargetAllocator
if allocConf != nil {
go func() {
// immediately sync jobs and not wait for the first tick
savedHash, _ := r.syncTargetAllocator(uint64(0), allocConf, baseCfg)
r.targetAllocatorIntervalTicker = time.NewTicker(allocConf.Interval)
for {
<-r.targetAllocatorIntervalTicker.C
hash, err := r.syncTargetAllocator(savedHash, allocConf, baseCfg)
if err != nil {
r.settings.Logger.Error(err.Error())
continue
}
savedHash = hash
}
}()
r.initTargetAllocator(allocConf, baseCfg)
}

r.reloadReady.Close()

return nil
}

func (r *pReceiver) initTargetAllocator(allocConf *targetAllocator, baseCfg *config.Config) {
r.settings.Logger.Info("Starting target allocator discovery")
// immediately sync jobs and not wait for the first tick
savedHash, err := r.syncTargetAllocator(uint64(0), allocConf, baseCfg)
if err != nil {
r.settings.Logger.Error(err.Error())
return
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
}
r.targetAllocatorIntervalTicker = time.NewTicker(allocConf.Interval)
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
go func() {
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
<-r.reloadReady.C
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
for {
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
<-r.targetAllocatorIntervalTicker.C
hash, err := r.syncTargetAllocator(savedHash, allocConf, baseCfg)
if err != nil {
r.settings.Logger.Error(err.Error())
continue
}
savedHash = hash
}
}()
}

// syncTargetAllocator request jobs from targetAllocator and update underlying receiver, if the response does not match the provided compareHash.
// baseDiscoveryCfg can be used to provide additional ScrapeConfigs which will be added to the retrieved jobs.
func (r *pReceiver) syncTargetAllocator(compareHash uint64, allocConf *targetAllocator, baseCfg *config.Config) (uint64, error) {
r.settings.Logger.Debug("Syncing target allocator jobs")
jobObject, err := getJobResponse(allocConf.Endpoint)
scrapeConfigsResponse, err := r.getScrapeConfigsResponse(allocConf.Endpoint)
if err != nil {
r.settings.Logger.Error("Failed to retrieve job list", zap.Error(err))
return 0, err
}

hash, err := hashstructure.Hash(jobObject, hashstructure.FormatV2, nil)
hash, err := hashstructure.Hash(scrapeConfigsResponse, hashstructure.FormatV2, nil)
if err != nil {
r.settings.Logger.Error("Failed to hash job list", zap.Error(err))
return 0, err
Expand All @@ -131,29 +165,29 @@ func (r *pReceiver) syncTargetAllocator(compareHash uint64, allocConf *targetAll
return hash, nil
}

cfg := *baseCfg
// Clear out the current configurations
baseCfg.ScrapeConfigs = []*config.ScrapeConfig{}

for _, linkJSON := range *jobObject {
for jobName, scrapeConfig := range scrapeConfigsResponse {
var httpSD promHTTP.SDConfig
if allocConf.HTTPSDConfig == nil {
httpSD = promHTTP.SDConfig{}
httpSD = promHTTP.SDConfig{
RefreshInterval: model.Duration(30 * time.Second),
}
} else {
httpSD = *allocConf.HTTPSDConfig
}

httpSD.URL = fmt.Sprintf("%s%s?collector_id=%s", allocConf.Endpoint, linkJSON.Link, allocConf.CollectorID)

scrapeCfg := &config.ScrapeConfig{
JobName: linkJSON.Link,
ServiceDiscoveryConfigs: discovery.Configs{
&httpSD,
},
escapedJob := url.QueryEscape(jobName)
httpSD.URL = fmt.Sprintf("%s/jobs/%s/targets?collector_id=%s", allocConf.Endpoint, escapedJob, allocConf.CollectorID)
httpSD.HTTPClientConfig.FollowRedirects = false
scrapeConfig.ServiceDiscoveryConfigs = discovery.Configs{
&httpSD,
}

cfg.ScrapeConfigs = append(cfg.ScrapeConfigs, scrapeCfg)
baseCfg.ScrapeConfigs = append(baseCfg.ScrapeConfigs, scrapeConfig)
}

err = r.applyCfg(&cfg)
err = r.applyCfg(baseCfg)
if err != nil {
r.settings.Logger.Error("Failed to apply new scrape configuration", zap.Error(err))
return 0, err
Expand All @@ -162,26 +196,40 @@ func (r *pReceiver) syncTargetAllocator(compareHash uint64, allocConf *targetAll
return hash, nil
}

func getJobResponse(baseURL string) (*map[string]linkJSON, error) {
jobURLString := fmt.Sprintf("%s/jobs", baseURL)
_, err := url.Parse(jobURLString) // check if valid
// instantiateShard inserts the SHARD environment variable in the returned configuration
func (r *pReceiver) instantiateShard(body []byte) []byte {
shard, ok := os.LookupEnv("SHARD")
if !ok {
shard = "0"
}
return bytes.ReplaceAll(body, []byte("$(SHARD)"), []byte(shard))
}

func (r *pReceiver) getScrapeConfigsResponse(baseURL string) (map[string]*config.ScrapeConfig, error) {
scrapeConfigsURL := fmt.Sprintf("%s/scrape_configs", baseURL)
_, err := url.Parse(scrapeConfigsURL) // check if valid
if err != nil {
return nil, err
}

resp, err := http.Get(jobURLString) //nolint
resp, err := http.Get(scrapeConfigsURL) //nolint
if err != nil {
return nil, err
}

defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

jobObject := &map[string]linkJSON{}
err = json.NewDecoder(resp.Body).Decode(jobObject)
jobToScrapeConfig := map[string]*config.ScrapeConfig{}
envReplacedBody := r.instantiateShard(body)
err = yaml.Unmarshal(envReplacedBody, &jobToScrapeConfig)
if err != nil {
return nil, err
}
return jobObject, nil
return jobToScrapeConfig, nil
}

func (r *pReceiver) applyCfg(cfg *config.Config) error {
Expand All @@ -192,6 +240,7 @@ func (r *pReceiver) applyCfg(cfg *config.Config) error {
discoveryCfg := make(map[string]discovery.Configs)
for _, scrapeConfig := range cfg.ScrapeConfigs {
discoveryCfg[scrapeConfig.JobName] = scrapeConfig.ServiceDiscoveryConfigs
r.settings.Logger.Info("Scrape job added", zap.String("jobName", scrapeConfig.JobName))
}
if err := r.discoveryManager.ApplyConfig(discoveryCfg); err != nil {
return err
Expand All @@ -203,6 +252,7 @@ func (r *pReceiver) initPrometheusComponents(ctx context.Context, host component
r.discoveryManager = discovery.NewManager(ctx, logger)

go func() {
r.settings.Logger.Info("Starting discovery manager")
if err := r.discoveryManager.Run(); err != nil {
r.settings.Logger.Error("Discovery manager failed", zap.Error(err))
host.ReportFatalError(err)
Expand All @@ -228,7 +278,10 @@ func (r *pReceiver) initPrometheusComponents(ctx context.Context, host component
r.cfg.PrometheusConfig.GlobalConfig.ExternalLabels,
)
r.scrapeManager = scrape.NewManager(&scrape.Options{PassMetadataInContext: true}, logger, store)

go func() {
<-r.reloadReady.C
r.settings.Logger.Info("Starting scrape manager")
if err := r.scrapeManager.Run(r.discoveryManager.SyncCh()); err != nil {
r.settings.Logger.Error("Scrape manager failed", zap.Error(err))
host.ReportFatalError(err)
Expand Down Expand Up @@ -257,6 +310,7 @@ func gcInterval(cfg *config.Config) time.Duration {
func (r *pReceiver) Shutdown(context.Context) error {
r.cancelFunc()
r.scrapeManager.Stop()
r.reloadReady.Close()
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
if r.targetAllocatorIntervalTicker != nil {
r.targetAllocatorIntervalTicker.Stop()
}
Expand Down
Loading