diff --git a/cmd/stackdriver-prometheus-sidecar/main.go b/cmd/stackdriver-prometheus-sidecar/main.go index 767864be..48d200bb 100644 --- a/cmd/stackdriver-prometheus-sidecar/main.go +++ b/cmd/stackdriver-prometheus-sidecar/main.go @@ -24,6 +24,7 @@ import ( "os/signal" "path" "path/filepath" + "regexp" "runtime" "syscall" "time" @@ -155,6 +156,7 @@ func main() { walDirectory string prometheusURL *url.URL listenAddress string + filters []string logLevel promlog.AllowedLevel }{} @@ -190,6 +192,9 @@ func main() { a.Flag("web.listen-address", "Address to listen on for UI, API, and telemetry."). Default("0.0.0.0:9091").StringVar(&cfg.listenAddress) + a.Flag("filter", "Label matcher which must pass for a series to be forwarded to Stackdriver,"). + StringsVar(&cfg.filters) + promlogflag.AddFlags(a, &cfg.logLevel) _, err := a.Parse(os.Args[1:]) @@ -226,6 +231,12 @@ func main() { staticLabels["_debug"] = "debug" } + filters, err := parseFilters(cfg.filters) + if err != nil { + fmt.Fprintln(os.Stderr, "Error parsing filters:", err) + os.Exit(2) + } + cfg.projectIdResource = fmt.Sprintf("projects/%v", *projectId) targetsURL, err := cfg.prometheusURL.Parse(targets.DefaultAPIEndpoint) if err != nil { @@ -279,6 +290,7 @@ func main() { log.With(logger, "component", "Prometheus reader"), cfg.walDirectory, tailer, + filters, retrieval.TargetsWithDiscoveredLabels(targetCache, labels.FromMap(staticLabels)), metadataCache, queueManager, @@ -456,3 +468,31 @@ func waitForPrometheus(ctx context.Context, logger log.Logger, promURL *url.URL) } } } + +func parseFilters(strs []string) (matchers []*labels.Matcher, err error) { + pattern := regexp.MustCompile(`^([a-zA-Z0-9_]+)(=|!=|=~|!~)"(.+)"$`) + + for _, s := range strs { + parts := pattern.FindStringSubmatch(s) + if len(parts) != 4 { + return nil, fmt.Errorf("invalid filter %q", s) + } + var matcherType labels.MatchType + switch parts[2] { + case "=": + matcherType = labels.MatchEqual + case "!=": + matcherType = labels.MatchNotEqual + case "=~": + matcherType = labels.MatchRegexp + case "!~": + matcherType = labels.MatchNotRegexp + } + matcher, err := labels.NewMatcher(matcherType, parts[1], parts[3]) + if err != nil { + return nil, fmt.Errorf("invalid filter %q: %s", s, err) + } + matchers = append(matchers, matcher) + } + return matchers, nil +} diff --git a/retrieval/manager.go b/retrieval/manager.go index 982c3578..5be4972f 100644 --- a/retrieval/manager.go +++ b/retrieval/manager.go @@ -81,6 +81,7 @@ func NewPrometheusReader( logger log.Logger, walDirectory string, tailer *tail.Tailer, + filters []*labels.Matcher, targetGetter TargetGetter, metadataGetter MetadataGetter, appender Appender, @@ -92,6 +93,7 @@ func NewPrometheusReader( appender: appender, logger: logger, tailer: tailer, + filters: filters, walDirectory: walDirectory, targetGetter: targetGetter, metadataGetter: metadataGetter, @@ -103,6 +105,7 @@ type PrometheusReader struct { logger log.Logger walDirectory string tailer *tail.Tailer + filters []*labels.Matcher targetGetter TargetGetter metadataGetter MetadataGetter appender Appender @@ -138,7 +141,14 @@ func init() { func (r *PrometheusReader) Run(ctx context.Context, startOffset int) error { level.Info(r.logger).Log("msg", "Starting Prometheus reader...") - seriesCache := newSeriesCache(r.logger, r.walDirectory, r.targetGetter, r.metadataGetter, ResourceMappings) + seriesCache := newSeriesCache( + r.logger, + r.walDirectory, + r.filters, + r.targetGetter, + r.metadataGetter, + ResourceMappings, + ) go seriesCache.run(ctx) builder := &sampleBuilder{series: seriesCache} diff --git a/retrieval/manager_test.go b/retrieval/manager_test.go index 39760d48..48b5aa25 100644 --- a/retrieval/manager_test.go +++ b/retrieval/manager_test.go @@ -81,7 +81,7 @@ func TestReader_Progress(t *testing.T) { "job1/inst1/metric1": &scrape.MetricMetadata{Type: textparse.MetricTypeGauge, Metric: "metric1"}, } - r := NewPrometheusReader(nil, dir, tailer, targetMap, metadataMap, &nopAppender{}) + r := NewPrometheusReader(nil, dir, tailer, nil, targetMap, metadataMap, &nopAppender{}) r.progressSaveInterval = 200 * time.Millisecond // Populate sample data @@ -136,7 +136,7 @@ func TestReader_Progress(t *testing.T) { } recorder := &nopAppender{} - r = NewPrometheusReader(nil, dir, tailer, targetMap, metadataMap, recorder) + r = NewPrometheusReader(nil, dir, tailer, nil, targetMap, metadataMap, recorder) go r.Run(ctx, progressOffset) // Wait for reader to process until the end. diff --git a/retrieval/series_cache.go b/retrieval/series_cache.go index 568a5e51..9d182015 100644 --- a/retrieval/series_cache.go +++ b/retrieval/series_cache.go @@ -75,6 +75,7 @@ type seriesGetter interface { type seriesCache struct { logger log.Logger dir string + filters []*promlabels.Matcher targets TargetGetter metadata MetadataGetter resourceMaps []ResourceMap @@ -122,13 +123,21 @@ func (e *seriesCacheEntry) shouldRefresh() bool { return !e.populated() && time.Since(e.lastRefresh) > refreshInterval } -func newSeriesCache(logger log.Logger, dir string, targets TargetGetter, metadata MetadataGetter, resourceMaps []ResourceMap) *seriesCache { +func newSeriesCache( + logger log.Logger, + dir string, + filters []*promlabels.Matcher, + targets TargetGetter, + metadata MetadataGetter, + resourceMaps []ResourceMap, +) *seriesCache { if logger == nil { logger = log.NewNopLogger() } return &seriesCache{ logger: logger, dir: dir, + filters: filters, targets: targets, metadata: metadata, resourceMaps: resourceMaps, @@ -285,6 +294,11 @@ func (c *seriesCache) getResetAdjusted(ref uint64, t int64, v float64) (int64, f // set the label set for the given reference. // maxSegment indicates the the highest segment at which the series was possibly defined. func (c *seriesCache) set(ctx context.Context, ref uint64, lset labels.Labels, maxSegment int) error { + for _, m := range c.filters { + if v := lset.Get(m.Name); !m.Matches(v) { + return nil + } + } c.mtx.Lock() c.entries[ref] = &seriesCacheEntry{ maxSegment: maxSegment, diff --git a/retrieval/series_cache_test.go b/retrieval/series_cache_test.go index b22b5c88..0ba441e2 100644 --- a/retrieval/series_cache_test.go +++ b/retrieval/series_cache_test.go @@ -43,7 +43,7 @@ func TestScrapeCache_GarbageCollect(t *testing.T) { // Initialize the series cache with "empty" target and metadata maps. // The series we use below have no job, instance, and __name__ labels set, which // will result in those empty lookup keys. - c := newSeriesCache(nil, dir, + c := newSeriesCache(nil, dir, nil, targetMap{"/": &targets.Target{}}, metadataMap{"//": &scrape.MetricMetadata{Type: textparse.MetricTypeGauge}}, []ResourceMap{ @@ -181,7 +181,7 @@ func TestSeriesCache_Refresh(t *testing.T) { } targetMap := targetMap{} metadataMap := metadataMap{} - c := newSeriesCache(nil, "", targetMap, metadataMap, resourceMaps) + c := newSeriesCache(nil, "", nil, targetMap, metadataMap, resourceMaps) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -225,3 +225,50 @@ func TestSeriesCache_Refresh(t *testing.T) { t.Errorf("expected metadata but got none, error: %s", err) } } + +func TestSeriesCache_Filter(t *testing.T) { + resourceMaps := []ResourceMap{ + { + Type: "resource2", + LabelMap: map[string]labelTranslation{"__resource_a": constValue("resource_a")}, + }, + } + // Populate the getters with data. + targetMap := targetMap{ + "job1/inst1": &targets.Target{ + Labels: promlabels.FromStrings("job", "job1", "instance", "inst1"), + DiscoveredLabels: promlabels.FromStrings("__resource_a", "resource2_a"), + }, + } + metadataMap := metadataMap{ + "job1/inst1/metric1": &scrape.MetricMetadata{Type: textparse.MetricTypeGauge, Metric: "metric1"}, + } + c := newSeriesCache(nil, "", []*promlabels.Matcher{ + &promlabels.Matcher{Type: promlabels.MatchEqual, Name: "a", Value: "a1"}, + &promlabels.Matcher{Type: promlabels.MatchEqual, Name: "b", Value: "b1"}, + }, targetMap, metadataMap, resourceMaps) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Test base case of metric that passes all filters. This primarily + // ensures that our setup is correct and metrics aren't dropped for reasons + // other than the filter. + err := c.set(ctx, 1, labels.FromStrings("__name__", "metric1", "job", "job1", "instance", "inst1", "a", "a1", "b", "b1"), 1) + if err != nil { + t.Fatal(err) + } + if _, ok, err := c.get(ctx, 1); !ok || err != nil { + t.Fatalf("metric not found: %s", err) + } + // Test filtered metric. + err = c.set(ctx, 2, labels.FromStrings("__name__", "metric1", "job", "job1", "instance", "inst1", "a", "a1", "b", "b2"), 1) + if err != nil { + t.Fatal(err) + } + if _, ok, err := c.get(ctx, 2); err != nil { + t.Fatalf("error retrieving metric: %s", err) + } else if ok { + t.Fatalf("metric was not filtered") + } +} diff --git a/retrieval/transform_test.go b/retrieval/transform_test.go index 9a04f177..c5581017 100644 --- a/retrieval/transform_test.go +++ b/retrieval/transform_test.go @@ -594,7 +594,7 @@ func TestSampleBuilder(t *testing.T) { var err error var result []*monitoring_pb.TimeSeries - series := newSeriesCache(nil, "", c.targets, c.metadata, resourceMaps) + series := newSeriesCache(nil, "", nil, c.targets, c.metadata, resourceMaps) for ref, s := range c.series { series.set(ctx, ref, s, 0) }