Skip to content

Commit

Permalink
Add filtering of forwarded metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Fabian Reinartz <freinartz@google.com>
  • Loading branch information
Fabian Reinartz committed Aug 20, 2018
1 parent 5bf957c commit 505b472
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 7 deletions.
40 changes: 40 additions & 0 deletions cmd/stackdriver-prometheus-sidecar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"os/signal"
"path"
"path/filepath"
"regexp"
"runtime"
"syscall"
"time"
Expand Down Expand Up @@ -155,6 +156,7 @@ func main() {
walDirectory string
prometheusURL *url.URL
listenAddress string
filters []string

logLevel promlog.AllowedLevel
}{}
Expand Down Expand Up @@ -190,6 +192,9 @@ func main() {
a.Flag("web.listen-address", "Address to listen on for UI, API, and telemetry.").
Default("0.0.0.0:9091").StringVar(&cfg.listenAddress)

a.Flag("filter", "Label matcher which must pass for a series to be forwarded to Stackdriver,").
StringsVar(&cfg.filters)

promlogflag.AddFlags(a, &cfg.logLevel)

_, err := a.Parse(os.Args[1:])
Expand Down Expand Up @@ -226,6 +231,12 @@ func main() {
staticLabels["_debug"] = "debug"
}

filters, err := parseFilters(cfg.filters)
if err != nil {
fmt.Fprintln(os.Stderr, "Error parsing filters:", err)
os.Exit(2)
}

cfg.projectIdResource = fmt.Sprintf("projects/%v", *projectId)
targetsURL, err := cfg.prometheusURL.Parse(targets.DefaultAPIEndpoint)
if err != nil {
Expand Down Expand Up @@ -279,6 +290,7 @@ func main() {
log.With(logger, "component", "Prometheus reader"),
cfg.walDirectory,
tailer,
filters,
retrieval.TargetsWithDiscoveredLabels(targetCache, labels.FromMap(staticLabels)),
metadataCache,
queueManager,
Expand Down Expand Up @@ -456,3 +468,31 @@ func waitForPrometheus(ctx context.Context, logger log.Logger, promURL *url.URL)
}
}
}

func parseFilters(strs []string) (matchers []*labels.Matcher, err error) {
pattern := regexp.MustCompile(`^([a-zA-Z0-9_]+)(=|!=|=~|!~)"(.+)"$`)

for _, s := range strs {
parts := pattern.FindStringSubmatch(s)
if len(parts) != 4 {
return nil, fmt.Errorf("invalid filter %q", s)
}
var matcherType labels.MatchType
switch parts[2] {
case "=":
matcherType = labels.MatchEqual
case "!=":
matcherType = labels.MatchNotEqual
case "=~":
matcherType = labels.MatchRegexp
case "!~":
matcherType = labels.MatchNotRegexp
}
matcher, err := labels.NewMatcher(matcherType, parts[1], parts[3])
if err != nil {
return nil, fmt.Errorf("invalid filter %q: %s", s, err)
}
matchers = append(matchers, matcher)
}
return matchers, nil
}
12 changes: 11 additions & 1 deletion retrieval/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func NewPrometheusReader(
logger log.Logger,
walDirectory string,
tailer *tail.Tailer,
filters []*labels.Matcher,
targetGetter TargetGetter,
metadataGetter MetadataGetter,
appender Appender,
Expand All @@ -92,6 +93,7 @@ func NewPrometheusReader(
appender: appender,
logger: logger,
tailer: tailer,
filters: filters,
walDirectory: walDirectory,
targetGetter: targetGetter,
metadataGetter: metadataGetter,
Expand All @@ -103,6 +105,7 @@ type PrometheusReader struct {
logger log.Logger
walDirectory string
tailer *tail.Tailer
filters []*labels.Matcher
targetGetter TargetGetter
metadataGetter MetadataGetter
appender Appender
Expand Down Expand Up @@ -138,7 +141,14 @@ func init() {
func (r *PrometheusReader) Run(ctx context.Context, startOffset int) error {
level.Info(r.logger).Log("msg", "Starting Prometheus reader...")

seriesCache := newSeriesCache(r.logger, r.walDirectory, r.targetGetter, r.metadataGetter, ResourceMappings)
seriesCache := newSeriesCache(
r.logger,
r.walDirectory,
r.filters,
r.targetGetter,
r.metadataGetter,
ResourceMappings,
)
go seriesCache.run(ctx)

builder := &sampleBuilder{series: seriesCache}
Expand Down
4 changes: 2 additions & 2 deletions retrieval/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestReader_Progress(t *testing.T) {
"job1/inst1/metric1": &scrape.MetricMetadata{Type: textparse.MetricTypeGauge, Metric: "metric1"},
}

r := NewPrometheusReader(nil, dir, tailer, targetMap, metadataMap, &nopAppender{})
r := NewPrometheusReader(nil, dir, tailer, nil, targetMap, metadataMap, &nopAppender{})
r.progressSaveInterval = 200 * time.Millisecond

// Populate sample data
Expand Down Expand Up @@ -136,7 +136,7 @@ func TestReader_Progress(t *testing.T) {
}

recorder := &nopAppender{}
r = NewPrometheusReader(nil, dir, tailer, targetMap, metadataMap, recorder)
r = NewPrometheusReader(nil, dir, tailer, nil, targetMap, metadataMap, recorder)
go r.Run(ctx, progressOffset)

// Wait for reader to process until the end.
Expand Down
16 changes: 15 additions & 1 deletion retrieval/series_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type seriesGetter interface {
type seriesCache struct {
logger log.Logger
dir string
filters []*promlabels.Matcher
targets TargetGetter
metadata MetadataGetter
resourceMaps []ResourceMap
Expand Down Expand Up @@ -122,13 +123,21 @@ func (e *seriesCacheEntry) shouldRefresh() bool {
return !e.populated() && time.Since(e.lastRefresh) > refreshInterval
}

func newSeriesCache(logger log.Logger, dir string, targets TargetGetter, metadata MetadataGetter, resourceMaps []ResourceMap) *seriesCache {
func newSeriesCache(
logger log.Logger,
dir string,
filters []*promlabels.Matcher,
targets TargetGetter,
metadata MetadataGetter,
resourceMaps []ResourceMap,
) *seriesCache {
if logger == nil {
logger = log.NewNopLogger()
}
return &seriesCache{
logger: logger,
dir: dir,
filters: filters,
targets: targets,
metadata: metadata,
resourceMaps: resourceMaps,
Expand Down Expand Up @@ -285,6 +294,11 @@ func (c *seriesCache) getResetAdjusted(ref uint64, t int64, v float64) (int64, f
// set the label set for the given reference.
// maxSegment indicates the the highest segment at which the series was possibly defined.
func (c *seriesCache) set(ctx context.Context, ref uint64, lset labels.Labels, maxSegment int) error {
for _, m := range c.filters {
if v := lset.Get(m.Name); !m.Matches(v) {
return nil
}
}
c.mtx.Lock()
c.entries[ref] = &seriesCacheEntry{
maxSegment: maxSegment,
Expand Down
51 changes: 49 additions & 2 deletions retrieval/series_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestScrapeCache_GarbageCollect(t *testing.T) {
// Initialize the series cache with "empty" target and metadata maps.
// The series we use below have no job, instance, and __name__ labels set, which
// will result in those empty lookup keys.
c := newSeriesCache(nil, dir,
c := newSeriesCache(nil, dir, nil,
targetMap{"/": &targets.Target{}},
metadataMap{"//": &scrape.MetricMetadata{Type: textparse.MetricTypeGauge}},
[]ResourceMap{
Expand Down Expand Up @@ -181,7 +181,7 @@ func TestSeriesCache_Refresh(t *testing.T) {
}
targetMap := targetMap{}
metadataMap := metadataMap{}
c := newSeriesCache(nil, "", targetMap, metadataMap, resourceMaps)
c := newSeriesCache(nil, "", nil, targetMap, metadataMap, resourceMaps)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -225,3 +225,50 @@ func TestSeriesCache_Refresh(t *testing.T) {
t.Errorf("expected metadata but got none, error: %s", err)
}
}

func TestSeriesCache_Filter(t *testing.T) {
resourceMaps := []ResourceMap{
{
Type: "resource2",
LabelMap: map[string]labelTranslation{"__resource_a": constValue("resource_a")},
},
}
// Populate the getters with data.
targetMap := targetMap{
"job1/inst1": &targets.Target{
Labels: promlabels.FromStrings("job", "job1", "instance", "inst1"),
DiscoveredLabels: promlabels.FromStrings("__resource_a", "resource2_a"),
},
}
metadataMap := metadataMap{
"job1/inst1/metric1": &scrape.MetricMetadata{Type: textparse.MetricTypeGauge, Metric: "metric1"},
}
c := newSeriesCache(nil, "", []*promlabels.Matcher{
&promlabels.Matcher{Type: promlabels.MatchEqual, Name: "a", Value: "a1"},
&promlabels.Matcher{Type: promlabels.MatchEqual, Name: "b", Value: "b1"},
}, targetMap, metadataMap, resourceMaps)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Test base case of metric that passes all filters. This primarily
// ensures that our setup is correct and metrics aren't dropped for reasons
// other than the filter.
err := c.set(ctx, 1, labels.FromStrings("__name__", "metric1", "job", "job1", "instance", "inst1", "a", "a1", "b", "b1"), 1)
if err != nil {
t.Fatal(err)
}
if _, ok, err := c.get(ctx, 1); !ok || err != nil {
t.Fatalf("metric not found: %s", err)
}
// Test filtered metric.
err = c.set(ctx, 2, labels.FromStrings("__name__", "metric1", "job", "job1", "instance", "inst1", "a", "a1", "b", "b2"), 1)
if err != nil {
t.Fatal(err)
}
if _, ok, err := c.get(ctx, 2); err != nil {
t.Fatalf("error retrieving metric: %s", err)
} else if ok {
t.Fatalf("metric was not filtered")
}
}
2 changes: 1 addition & 1 deletion retrieval/transform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ func TestSampleBuilder(t *testing.T) {
var err error
var result []*monitoring_pb.TimeSeries

series := newSeriesCache(nil, "", c.targets, c.metadata, resourceMaps)
series := newSeriesCache(nil, "", nil, c.targets, c.metadata, resourceMaps)
for ref, s := range c.series {
series.set(ctx, ref, s, 0)
}
Expand Down

0 comments on commit 505b472

Please sign in to comment.