diff --git a/CHANGELOG.md b/CHANGELOG.md index a476edb82eb2..83e6fd972bea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,7 @@ - `zookeeperreceiver`: Fix issue where receiver could panic during shutdown (#7020) - `prometheusreceiver`: Fix metadata fetching when metrics differ by trimmable suffixes (#6932) - Sanitize URLs being logged (#7021) +- `prometheusreceiver`: Fix start time tracking for long scrape intervals (#7053) - `signalfxexporter`: Don't use syscall to avoid compilation errors on some platforms (#7062) ## 💡 Enhancements 💡 diff --git a/receiver/prometheusreceiver/internal/ocastore.go b/receiver/prometheusreceiver/internal/ocastore.go index c1b13229d25c..3b1e7cff4e6d 100644 --- a/receiver/prometheusreceiver/internal/ocastore.go +++ b/receiver/prometheusreceiver/internal/ocastore.go @@ -60,6 +60,7 @@ func NewOcaStore( ctx context.Context, sink consumer.Metrics, set component.ReceiverCreateSettings, + gcInterval time.Duration, useStartTimeMetric bool, startTimeMetricRegex string, receiverID config.ComponentID, @@ -67,7 +68,7 @@ func NewOcaStore( pdataDirect bool) *OcaStore { var jobsMap *JobsMapPdata if !useStartTimeMetric { - jobsMap = NewJobsMapPdata(2 * time.Minute) + jobsMap = NewJobsMapPdata(gcInterval) } return &OcaStore{ running: runningStateInit, diff --git a/receiver/prometheusreceiver/internal/ocastore_test.go b/receiver/prometheusreceiver/internal/ocastore_test.go index a9a6c46e7d27..fb1a75d8ef6d 100644 --- a/receiver/prometheusreceiver/internal/ocastore_test.go +++ b/receiver/prometheusreceiver/internal/ocastore_test.go @@ -17,6 +17,7 @@ package internal import ( "context" "testing" + "time" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/scrape" @@ -26,7 +27,7 @@ import ( ) func TestOcaStore(t *testing.T) { - o := NewOcaStore(context.Background(), nil, testTelemetry.ToReceiverCreateSettings(), false, "", config.NewComponentID("prometheus"), nil, false) + o := NewOcaStore(context.Background(), nil, testTelemetry.ToReceiverCreateSettings(), 2*time.Minute, false, "", config.NewComponentID("prometheus"), nil, false) o.SetScrapeManager(&scrape.Manager{}) app := o.Appender(context.Background()) diff --git a/receiver/prometheusreceiver/metrics_receiver.go b/receiver/prometheusreceiver/metrics_receiver.go index 13261d091458..81e6c9471d32 100644 --- a/receiver/prometheusreceiver/metrics_receiver.go +++ b/receiver/prometheusreceiver/metrics_receiver.go @@ -16,7 +16,9 @@ package prometheusreceiver // import "github.com/open-telemetry/opentelemetry-co import ( "context" + "time" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery" "github.com/prometheus/prometheus/scrape" "go.opentelemetry.io/collector/component" @@ -26,6 +28,11 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/internal" ) +const ( + defaultGCInterval = 2 * time.Minute + gcIntervalDelta = 1 * time.Minute +) + // pReceiver is the type that provides Prometheus scraper/receiver functionality. type pReceiver struct { cfg *Config @@ -76,6 +83,7 @@ func (r *pReceiver) Start(_ context.Context, host component.Host) error { context.Background(), r.consumer, r.settings, + gcInterval(r.cfg.PrometheusConfig), r.cfg.UseStartTimeMetric, r.cfg.StartTimeMetricRegex, r.cfg.ID(), @@ -96,6 +104,22 @@ func (r *pReceiver) Start(_ context.Context, host component.Host) error { return nil } +// gcInterval returns the longest scrape interval used by a scrape config, +// plus a delta to prevent race conditions. +// This ensures jobs are not garbage collected between scrapes. +func gcInterval(cfg *config.Config) time.Duration { + gcInterval := defaultGCInterval + if time.Duration(cfg.GlobalConfig.ScrapeInterval)+gcIntervalDelta > gcInterval { + gcInterval = time.Duration(cfg.GlobalConfig.ScrapeInterval) + gcIntervalDelta + } + for _, scrapeConfig := range cfg.ScrapeConfigs { + if time.Duration(scrapeConfig.ScrapeInterval)+gcIntervalDelta > gcInterval { + gcInterval = time.Duration(scrapeConfig.ScrapeInterval) + gcIntervalDelta + } + } + return gcInterval +} + // Shutdown stops and cancels the underlying Prometheus scrapers. func (r *pReceiver) Shutdown(context.Context) error { r.cancelFunc() diff --git a/receiver/prometheusreceiver/metrics_receiver_test.go b/receiver/prometheusreceiver/metrics_receiver_test.go index 956dc9c8827a..5308c2add2d3 100644 --- a/receiver/prometheusreceiver/metrics_receiver_test.go +++ b/receiver/prometheusreceiver/metrics_receiver_test.go @@ -16,7 +16,10 @@ package prometheusreceiver import ( "testing" + "time" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/model/pdata" "google.golang.org/protobuf/types/known/timestamppb" @@ -1046,3 +1049,44 @@ func verifyUntypedMetrics(t *testing.T, td *testData, resourceMetrics []*pdata.R } doCompare(t, "scrape-untypedMetric-1", wantAttributes, m1, e1) } + +func TestGCInterval(t *testing.T) { + for _, tc := range []struct { + desc string + input *config.Config + want time.Duration + }{ + { + desc: "default", + input: &config.Config{}, + want: defaultGCInterval, + }, + { + desc: "global override", + input: &config.Config{ + GlobalConfig: config.GlobalConfig{ + ScrapeInterval: model.Duration(10 * time.Minute), + }, + }, + want: 11 * time.Minute, + }, + { + desc: "scrape config override", + input: &config.Config{ + ScrapeConfigs: []*config.ScrapeConfig{ + { + ScrapeInterval: model.Duration(10 * time.Minute), + }, + }, + }, + want: 11 * time.Minute, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + got := gcInterval(tc.input) + if got != tc.want { + t.Errorf("gcInterval(%+v) = %v, want %v", tc.input, got, tc.want) + } + }) + } +}