Skip to content

Commit

Permalink
[coordinator] Add continue to rule when matching Graphite rules (#2063)
Browse files Browse the repository at this point in the history
  • Loading branch information
robskillington authored Jan 7, 2020
1 parent 492d042 commit 9445b04
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 18 deletions.
62 changes: 44 additions & 18 deletions src/cmd/services/m3coordinator/ingest/carbon/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,24 @@ func (i *ingester) write(
WriteOverride: true,
}

matched := 0
defer func() {
if matched == 0 {
// No policies matched.
debugLog := i.logger.Check(zapcore.DebugLevel, "no rules matched carbon metric, skipping")
if debugLog != nil {
debugLog.Write(zap.ByteString("name", resources.name))
}
return
}

debugLog := i.logger.Check(zapcore.DebugLevel, "successfully wrote carbon metric")
if debugLog != nil {
debugLog.Write(zap.ByteString("name", resources.name),
zap.Int("matchedRules", matched))
}
}()

for _, rule := range i.rules {
if rule.rule.Pattern == graphite.MatchAllPattern || rule.regexp.Match(resources.name) {
// Each rule should only have either mapping rules or storage policies so
Expand All @@ -228,47 +246,55 @@ func (i *ingester) write(
zap.Any("mappingRules", rule.mappingRules),
zap.Any("storagePolicies", rule.storagePolicies))
}

// Break because we only want to apply one rule per metric based on which
// ever one matches first.
break
}
}
err := i.writeWithOptions(ctx, resources, timestamp, value,
downsampleAndStoragePolicies)
if err != nil {
return false
}

if len(downsampleAndStoragePolicies.DownsampleMappingRules) == 0 &&
len(downsampleAndStoragePolicies.WriteStoragePolicies) == 0 {
// Nothing to do if none of the policies matched.
debugLog := i.logger.Check(zapcore.DebugLevel, "no rules matched carbon metric, skipping")
if debugLog != nil {
debugLog.Write(zap.ByteString("name", resources.name))
matched++

// If continue is not specified then we matched the current set of rules.
if !rule.rule.Continue {
break
}
}
return false
}

return matched > 0
}

func (i *ingester) writeWithOptions(
ctx context.Context,
resources *lineResources,
timestamp time.Time,
value float64,
opts ingest.WriteOptions,
) error {
resources.datapoints[0] = ts.Datapoint{Timestamp: timestamp, Value: value}
tags, err := GenerateTagsFromNameIntoSlice(resources.name, i.tagOpts, resources.tags)
if err != nil {
i.logger.Error("err generating tags from carbon",
zap.String("name", string(resources.name)), zap.Error(err))
i.metrics.malformed.Inc(1)
return false
return err
}

err = i.downsamplerAndWriter.Write(
ctx, tags, resources.datapoints, xtime.Second, nil, downsampleAndStoragePolicies,
ctx, tags, resources.datapoints, xtime.Second, nil, opts,
)

if err != nil {
i.logger.Error("err writing carbon metric",
zap.String("name", string(resources.name)), zap.Error(err))
i.metrics.err.Inc(1)
return false
return err
}

debugLog := i.logger.Check(zapcore.DebugLevel, "successfully wrote carbon metric")
if debugLog != nil {
debugLog.Write(zap.ByteString("name", resources.name))
}
return true
return nil
}

func (i *ingester) Close() {
Expand Down
1 change: 1 addition & 0 deletions src/cmd/services/m3query/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ func (c *CarbonIngesterConfiguration) RulesOrDefault(namespaces m3.ClusterNamesp
// ingestion rule.
type CarbonIngesterRuleConfiguration struct {
Pattern string `yaml:"pattern"`
Continue bool `yaml:"continue"`
Aggregation CarbonIngesterAggregationConfiguration `yaml:"aggregation"`
Policies []CarbonIngesterStoragePolicyConfiguration `yaml:"policies"`
}
Expand Down

0 comments on commit 9445b04

Please sign in to comment.