Skip to content

Commit

Permalink
Fix prometheus job cache exiration by using the longest scrape interval
Browse files Browse the repository at this point in the history
  • Loading branch information
dashpole committed Jan 6, 2022
1 parent 3e36621 commit 4a6ccaa
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
- `zookeeperreceiver`: Fix issue where receiver could panic during shutdown (#7020)
- `prometheusreceiver`: Fix metadata fetching when metrics differ by trimmable suffixes (#6932)
- Sanitize URLs being logged (#7021)
- `prometheusreceiver`: Fix start time tracking for long scrape intervals (#7053)

## 💡 Enhancements 💡

Expand Down
3 changes: 2 additions & 1 deletion receiver/prometheusreceiver/internal/ocastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,15 @@ func NewOcaStore(
ctx context.Context,
sink consumer.Metrics,
set component.ReceiverCreateSettings,
gcInterval time.Duration,
useStartTimeMetric bool,
startTimeMetricRegex string,
receiverID config.ComponentID,
externalLabels labels.Labels,
pdataDirect bool) *OcaStore {
var jobsMap *JobsMapPdata
if !useStartTimeMetric {
jobsMap = NewJobsMapPdata(2 * time.Minute)
jobsMap = NewJobsMapPdata(gcInterval)
}
return &OcaStore{
running: runningStateInit,
Expand Down
3 changes: 2 additions & 1 deletion receiver/prometheusreceiver/internal/ocastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package internal
import (
"context"
"testing"
"time"

"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/scrape"
Expand All @@ -26,7 +27,7 @@ import (
)

func TestOcaStore(t *testing.T) {
o := NewOcaStore(context.Background(), nil, testTelemetry.ToReceiverCreateSettings(), false, "", config.NewComponentID("prometheus"), nil, false)
o := NewOcaStore(context.Background(), nil, testTelemetry.ToReceiverCreateSettings(), 2*time.Minute, false, "", config.NewComponentID("prometheus"), nil, false)
o.SetScrapeManager(&scrape.Manager{})

app := o.Appender(context.Background())
Expand Down
24 changes: 24 additions & 0 deletions receiver/prometheusreceiver/metrics_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package prometheusreceiver // import "github.com/open-telemetry/opentelemetry-co

import (
"context"
"time"

"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/scrape"
"go.opentelemetry.io/collector/component"
Expand All @@ -26,6 +28,11 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/internal"
)

const (
defaultGCInterval = 2 * time.Minute
gcIntervalDelta = 1 * time.Minute
)

// pReceiver is the type that provides Prometheus scraper/receiver functionality.
type pReceiver struct {
cfg *Config
Expand Down Expand Up @@ -76,6 +83,7 @@ func (r *pReceiver) Start(_ context.Context, host component.Host) error {
context.Background(),
r.consumer,
r.settings,
gcInterval(r.cfg.PrometheusConfig),
r.cfg.UseStartTimeMetric,
r.cfg.StartTimeMetricRegex,
r.cfg.ID(),
Expand All @@ -96,6 +104,22 @@ func (r *pReceiver) Start(_ context.Context, host component.Host) error {
return nil
}

// gcInterval returns the longest scrape interval used by a scrape config,
// plus a delta to prevent race conditions.
// This ensures jobs are not garbage collected between scrapes.
func gcInterval(cfg *config.Config) time.Duration {
gcInterval := defaultGCInterval
if time.Duration(cfg.GlobalConfig.ScrapeInterval)+gcIntervalDelta > gcInterval {
gcInterval = time.Duration(cfg.GlobalConfig.ScrapeInterval) + gcIntervalDelta
}
for _, scrapeConfig := range cfg.ScrapeConfigs {
if time.Duration(scrapeConfig.ScrapeInterval)+gcIntervalDelta > gcInterval {
gcInterval = time.Duration(scrapeConfig.ScrapeInterval) + gcIntervalDelta
}
}
return gcInterval
}

// Shutdown stops and cancels the underlying Prometheus scrapers.
func (r *pReceiver) Shutdown(context.Context) error {
r.cancelFunc()
Expand Down
44 changes: 44 additions & 0 deletions receiver/prometheusreceiver/metrics_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ package prometheusreceiver

import (
"testing"
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/model/pdata"
"google.golang.org/protobuf/types/known/timestamppb"
Expand Down Expand Up @@ -1046,3 +1049,44 @@ func verifyUntypedMetrics(t *testing.T, td *testData, resourceMetrics []*pdata.R
}
doCompare(t, "scrape-untypedMetric-1", wantAttributes, m1, e1)
}

func TestGCInterval(t *testing.T) {
for _, tc := range []struct {
desc string
input *config.Config
want time.Duration
}{
{
desc: "default",
input: &config.Config{},
want: defaultGCInterval,
},
{
desc: "global override",
input: &config.Config{
GlobalConfig: config.GlobalConfig{
ScrapeInterval: model.Duration(10 * time.Minute),
},
},
want: 11 * time.Minute,
},
{
desc: "scrape config override",
input: &config.Config{
ScrapeConfigs: []*config.ScrapeConfig{
{
ScrapeInterval: model.Duration(10 * time.Minute),
},
},
},
want: 11 * time.Minute,
},
} {
t.Run(tc.desc, func(t *testing.T) {
got := gcInterval(tc.input)
if got != tc.want {
t.Errorf("gcInterval(%+v) = %v, want %v", tc.input, got, tc.want)
}
})
}
}

0 comments on commit 4a6ccaa

Please sign in to comment.