Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix prometheus job cache expiration by using the longest scrape interval #7053

Merged
merged 2 commits into from
Jan 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
- `zookeeperreceiver`: Fix issue where receiver could panic during shutdown (#7020)
- `prometheusreceiver`: Fix metadata fetching when metrics differ by trimmable suffixes (#6932)
- Sanitize URLs being logged (#7021)
- `prometheusreceiver`: Fix start time tracking for long scrape intervals (#7053)
- `signalfxexporter`: Don't use syscall to avoid compilation errors on some platforms (#7062)

## 💡 Enhancements 💡
Expand Down
3 changes: 2 additions & 1 deletion receiver/prometheusreceiver/internal/ocastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,15 @@ func NewOcaStore(
ctx context.Context,
sink consumer.Metrics,
set component.ReceiverCreateSettings,
gcInterval time.Duration,
useStartTimeMetric bool,
startTimeMetricRegex string,
receiverID config.ComponentID,
externalLabels labels.Labels,
pdataDirect bool) *OcaStore {
var jobsMap *JobsMapPdata
if !useStartTimeMetric {
jobsMap = NewJobsMapPdata(2 * time.Minute)
jobsMap = NewJobsMapPdata(gcInterval)
}
return &OcaStore{
running: runningStateInit,
Expand Down
3 changes: 2 additions & 1 deletion receiver/prometheusreceiver/internal/ocastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package internal
import (
"context"
"testing"
"time"

"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/scrape"
Expand All @@ -26,7 +27,7 @@ import (
)

func TestOcaStore(t *testing.T) {
o := NewOcaStore(context.Background(), nil, testTelemetry.ToReceiverCreateSettings(), false, "", config.NewComponentID("prometheus"), nil, false)
o := NewOcaStore(context.Background(), nil, testTelemetry.ToReceiverCreateSettings(), 2*time.Minute, false, "", config.NewComponentID("prometheus"), nil, false)
o.SetScrapeManager(&scrape.Manager{})

app := o.Appender(context.Background())
Expand Down
24 changes: 24 additions & 0 deletions receiver/prometheusreceiver/metrics_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package prometheusreceiver // import "github.com/open-telemetry/opentelemetry-co

import (
"context"
"time"

"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/scrape"
"go.opentelemetry.io/collector/component"
Expand All @@ -26,6 +28,11 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/internal"
)

const (
defaultGCInterval = 2 * time.Minute
gcIntervalDelta = 1 * time.Minute
)

// pReceiver is the type that provides Prometheus scraper/receiver functionality.
type pReceiver struct {
cfg *Config
Expand Down Expand Up @@ -76,6 +83,7 @@ func (r *pReceiver) Start(_ context.Context, host component.Host) error {
context.Background(),
r.consumer,
r.settings,
gcInterval(r.cfg.PrometheusConfig),
r.cfg.UseStartTimeMetric,
r.cfg.StartTimeMetricRegex,
r.cfg.ID(),
Expand All @@ -96,6 +104,22 @@ func (r *pReceiver) Start(_ context.Context, host component.Host) error {
return nil
}

// gcInterval returns the longest scrape interval used by a scrape config,
// plus a delta to prevent race conditions.
// This ensures jobs are not garbage collected between scrapes.
func gcInterval(cfg *config.Config) time.Duration {
gcInterval := defaultGCInterval
if time.Duration(cfg.GlobalConfig.ScrapeInterval)+gcIntervalDelta > gcInterval {
gcInterval = time.Duration(cfg.GlobalConfig.ScrapeInterval) + gcIntervalDelta
}
for _, scrapeConfig := range cfg.ScrapeConfigs {
if time.Duration(scrapeConfig.ScrapeInterval)+gcIntervalDelta > gcInterval {
gcInterval = time.Duration(scrapeConfig.ScrapeInterval) + gcIntervalDelta
}
}
return gcInterval
}

// Shutdown stops and cancels the underlying Prometheus scrapers.
func (r *pReceiver) Shutdown(context.Context) error {
r.cancelFunc()
Expand Down
44 changes: 44 additions & 0 deletions receiver/prometheusreceiver/metrics_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ package prometheusreceiver

import (
"testing"
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/model/pdata"
"google.golang.org/protobuf/types/known/timestamppb"
Expand Down Expand Up @@ -1046,3 +1049,44 @@ func verifyUntypedMetrics(t *testing.T, td *testData, resourceMetrics []*pdata.R
}
doCompare(t, "scrape-untypedMetric-1", wantAttributes, m1, e1)
}

func TestGCInterval(t *testing.T) {
for _, tc := range []struct {
desc string
input *config.Config
want time.Duration
}{
{
desc: "default",
input: &config.Config{},
want: defaultGCInterval,
},
{
desc: "global override",
input: &config.Config{
GlobalConfig: config.GlobalConfig{
ScrapeInterval: model.Duration(10 * time.Minute),
},
},
want: 11 * time.Minute,
},
{
desc: "scrape config override",
input: &config.Config{
ScrapeConfigs: []*config.ScrapeConfig{
{
ScrapeInterval: model.Duration(10 * time.Minute),
},
},
},
want: 11 * time.Minute,
},
} {
t.Run(tc.desc, func(t *testing.T) {
got := gcInterval(tc.input)
if got != tc.want {
t.Errorf("gcInterval(%+v) = %v, want %v", tc.input, got, tc.want)
}
})
}
}