diff --git a/src/cmd/services/m3coordinator/ingest/carbon/ingest.go b/src/cmd/services/m3coordinator/ingest/carbon/ingest.go index 779caacd07..f88a8ef11d 100644 --- a/src/cmd/services/m3coordinator/ingest/carbon/ingest.go +++ b/src/cmd/services/m3coordinator/ingest/carbon/ingest.go @@ -214,6 +214,24 @@ func (i *ingester) write( WriteOverride: true, } + matched := 0 + defer func() { + if matched == 0 { + // No policies matched. + debugLog := i.logger.Check(zapcore.DebugLevel, "no rules matched carbon metric, skipping") + if debugLog != nil { + debugLog.Write(zap.ByteString("name", resources.name)) + } + return + } + + debugLog := i.logger.Check(zapcore.DebugLevel, "successfully wrote carbon metric") + if debugLog != nil { + debugLog.Write(zap.ByteString("name", resources.name), + zap.Int("matchedRules", matched)) + } + }() + for _, rule := range i.rules { if rule.rule.Pattern == graphite.MatchAllPattern || rule.regexp.Match(resources.name) { // Each rule should only have either mapping rules or storage policies so @@ -228,47 +246,55 @@ func (i *ingester) write( zap.Any("mappingRules", rule.mappingRules), zap.Any("storagePolicies", rule.storagePolicies)) } + // Break because we only want to apply one rule per metric based on which // ever one matches first. - break - } - } + err := i.writeWithOptions(ctx, resources, timestamp, value, + downsampleAndStoragePolicies) + if err != nil { + return false + } - if len(downsampleAndStoragePolicies.DownsampleMappingRules) == 0 && - len(downsampleAndStoragePolicies.WriteStoragePolicies) == 0 { - // Nothing to do if none of the policies matched. - debugLog := i.logger.Check(zapcore.DebugLevel, "no rules matched carbon metric, skipping") - if debugLog != nil { - debugLog.Write(zap.ByteString("name", resources.name)) + matched++ + + // If continue is not specified then we matched the current set of rules. + if !rule.rule.Continue { + break + } } - return false } + return matched > 0 +} + +func (i *ingester) writeWithOptions( + ctx context.Context, + resources *lineResources, + timestamp time.Time, + value float64, + opts ingest.WriteOptions, +) error { resources.datapoints[0] = ts.Datapoint{Timestamp: timestamp, Value: value} tags, err := GenerateTagsFromNameIntoSlice(resources.name, i.tagOpts, resources.tags) if err != nil { i.logger.Error("err generating tags from carbon", zap.String("name", string(resources.name)), zap.Error(err)) i.metrics.malformed.Inc(1) - return false + return err } err = i.downsamplerAndWriter.Write( - ctx, tags, resources.datapoints, xtime.Second, nil, downsampleAndStoragePolicies, + ctx, tags, resources.datapoints, xtime.Second, nil, opts, ) if err != nil { i.logger.Error("err writing carbon metric", zap.String("name", string(resources.name)), zap.Error(err)) i.metrics.err.Inc(1) - return false + return err } - debugLog := i.logger.Check(zapcore.DebugLevel, "successfully wrote carbon metric") - if debugLog != nil { - debugLog.Write(zap.ByteString("name", resources.name)) - } - return true + return nil } func (i *ingester) Close() { diff --git a/src/cmd/services/m3query/config/config.go b/src/cmd/services/m3query/config/config.go index 68cbeddb6a..ca176eb976 100644 --- a/src/cmd/services/m3query/config/config.go +++ b/src/cmd/services/m3query/config/config.go @@ -372,6 +372,7 @@ func (c *CarbonIngesterConfiguration) RulesOrDefault(namespaces m3.ClusterNamesp // ingestion rule. type CarbonIngesterRuleConfiguration struct { Pattern string `yaml:"pattern"` + Continue bool `yaml:"continue"` Aggregation CarbonIngesterAggregationConfiguration `yaml:"aggregation"` Policies []CarbonIngesterStoragePolicyConfiguration `yaml:"policies"` }