From f26cceaf6fc72fbb9410bf03d2ce080c4f4d30bc Mon Sep 17 00:00:00 2001 From: Wesley Kim Date: Tue, 12 Jan 2021 16:53:41 -0500 Subject: [PATCH] [coordinator] Rollout augmentM3Tags flag to true by default (#3082) * [coordinator] Rollout augmentM3Tags flag to true by default * Fixup operator docs link * Fixup doc site links --- site/content/m3query/architecture/_index.md | 2 +- .../m3query/config/annotated_config.yaml | 2 +- site/content/operator/api.md | 2 +- .../m3coordinator/downsample/downsampler.go | 1 - .../downsample/metrics_appender.go | 61 +++++-------------- .../downsample/metrics_appender_test.go | 5 +- .../m3coordinator/downsample/options.go | 37 +++-------- 7 files changed, 30 insertions(+), 80 deletions(-) diff --git a/site/content/m3query/architecture/_index.md b/site/content/m3query/architecture/_index.md index b9747a458e..11dda3cda7 100644 --- a/site/content/m3query/architecture/_index.md +++ b/site/content/m3query/architecture/_index.md @@ -9,4 +9,4 @@ chapter: true ## Overview -M3 Query and M3 Coordinator are written entirely in Go, M3 Query is as a query engine for [M3DB](https://m3db.github.io/m3/) and M3 Coordinator is a remote read/write endpoint for Prometheus and M3DB. To learn more about Prometheus's remote endpoints and storage, [see here](https://prometheus.io/docs/operating/integrations/#remote-endpoints-and-storage). +M3 Query and M3 Coordinator are written entirely in Go, M3 Query is as a query engine for [M3DB](https://m3db.io/) and M3 Coordinator is a remote read/write endpoint for Prometheus and M3DB. To learn more about Prometheus's remote endpoints and storage, [see here](https://prometheus.io/docs/operating/integrations/#remote-endpoints-and-storage). diff --git a/site/content/m3query/config/annotated_config.yaml b/site/content/m3query/config/annotated_config.yaml index 3b5da983d2..e29e868a5f 100644 --- a/site/content/m3query/config/annotated_config.yaml +++ b/site/content/m3query/config/annotated_config.yaml @@ -63,7 +63,7 @@ writeWorkerPoolPolicy: size: tagOptions: - # See here for more information: http://m3db.github.io/m3/how_to/query/#id-generation + # See here for more information under ID generation: https://m3db.io/docs/how_to/query/ idScheme: # lookbackDuration defines, at each step, how long we lookback until we see a non-NaN value. diff --git a/site/content/operator/api.md b/site/content/operator/api.md index 3c18b884b7..fc6c4f7d2b 100644 --- a/site/content/operator/api.md +++ b/site/content/operator/api.md @@ -220,7 +220,7 @@ Namespace defines an M3DB namespace or points to a preset M3DB namespace. ## NamespaceOptions -NamespaceOptions defines parameters for an M3DB namespace. See https://m3db.github.io/m3/operational_guide/namespace_configuration/ for more details. +NamespaceOptions defines parameters for an M3DB namespace. See https://m3db.io/docs/operational_guide/namespace_configuration/ for more details. | Field | Description | Scheme | Required | | ----- | ----------- | ------ | -------- | diff --git a/src/cmd/services/m3coordinator/downsample/downsampler.go b/src/cmd/services/m3coordinator/downsample/downsampler.go index 9a635758cc..fef7abe4d9 100644 --- a/src/cmd/services/m3coordinator/downsample/downsampler.go +++ b/src/cmd/services/m3coordinator/downsample/downsampler.go @@ -135,7 +135,6 @@ func defaultMetricsAppenderOptions(opts DownsamplerOptions, agg agg) metricsAppe metricTagsIteratorPool: agg.pools.metricTagsIteratorPool, debugLogging: debugLogging, logger: logger, - augmentM3Tags: agg.augmentM3Tags, } } diff --git a/src/cmd/services/m3coordinator/downsample/metrics_appender.go b/src/cmd/services/m3coordinator/downsample/metrics_appender.go index 72d295f200..288404a68c 100644 --- a/src/cmd/services/m3coordinator/downsample/metrics_appender.go +++ b/src/cmd/services/m3coordinator/downsample/metrics_appender.go @@ -102,7 +102,6 @@ type metricsAppenderOptions struct { matcher matcher.Matcher tagEncoderPool serialize.TagEncoderPool metricTagsIteratorPool serialize.MetricTagsIteratorPool - augmentM3Tags bool clockOpts clock.Options debugLogging bool @@ -149,19 +148,16 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp } tags := a.originalTags - // Augment tags if necessary. - if a.augmentM3Tags { - // NB (@shreyas): Add the metric type tag. The tag has the prefix - // __m3_. All tags with that prefix are only used for the purpose of - // filter match and then stripped off before we actually send to the aggregator. - switch opts.MetricType { - case ts.M3MetricTypeCounter: - tags.append(metric.M3TypeTag, metric.M3CounterValue) - case ts.M3MetricTypeGauge: - tags.append(metric.M3TypeTag, metric.M3GaugeValue) - case ts.M3MetricTypeTimer: - tags.append(metric.M3TypeTag, metric.M3TimerValue) - } + // NB (@shreyas): Add the metric type tag. The tag has the prefix + // __m3_. All tags with that prefix are only used for the purpose of + // filter match and then stripped off before we actually send to the aggregator. + switch opts.MetricType { + case ts.M3MetricTypeCounter: + tags.append(metric.M3TypeTag, metric.M3CounterValue) + case ts.M3MetricTypeGauge: + tags.append(metric.M3TypeTag, metric.M3GaugeValue) + case ts.M3MetricTypeTimer: + tags.append(metric.M3TypeTag, metric.M3TimerValue) } // Sort tags @@ -190,11 +186,8 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp matchResult := a.matcher.ForwardMatch(id, fromNanos, toNanos) id.Close() - // If we augmented metrics tags before running the forward match, then - // filter them out. - if a.augmentM3Tags { - tags.filterPrefix(metric.M3MetricsPrefix) - } + // filter out augmented metrics tags + tags.filterPrefix(metric.M3MetricsPrefix) var dropApplyResult metadata.ApplyOrRemoveDropPoliciesResult if opts.Override { @@ -215,7 +208,7 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp append(a.curr.Pipelines, pipelines.Pipelines...) } - if err := a.addSamplesAppenders(tags, a.curr, unownedID); err != nil { + if err := a.addSamplesAppenders(tags, a.curr); err != nil { return SamplesAppenderResult{}, err } } else { @@ -358,7 +351,7 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp a.debugLogMatch("downsampler using built mapping staged metadatas", debugLogMatchOptions{Meta: []metadata.StagedMetadata{a.curr}}) - if err := a.addSamplesAppenders(tags, a.curr, unownedID); err != nil { + if err := a.addSamplesAppenders(tags, a.curr); err != nil { return SamplesAppenderResult{}, err } } @@ -472,31 +465,7 @@ func (a *metricsAppender) resetTags() { a.originalTags = nil } -func (a *metricsAppender) addSamplesAppenders( - originalTags *tags, - stagedMetadata metadata.StagedMetadata, - unownedID []byte, -) error { - // Check if any of the pipelines have tags or a graphite prefix to set. - var tagsExist bool - for _, pipeline := range stagedMetadata.Pipelines { - if len(pipeline.Tags) > 0 || len(pipeline.GraphitePrefix) > 0 { - tagsExist = true - break - } - } - - // If we do not need to do any tag augmentation then just return. - if !a.augmentM3Tags && !tagsExist { - a.multiSamplesAppender.addSamplesAppender(samplesAppender{ - agg: a.agg, - clientRemote: a.clientRemote, - unownedID: unownedID, - stagedMetadatas: []metadata.StagedMetadata{stagedMetadata}, - }) - return nil - } - +func (a *metricsAppender) addSamplesAppenders(originalTags *tags, stagedMetadata metadata.StagedMetadata) error { var ( pipelines []metadata.PipelineMetadata ) diff --git a/src/cmd/services/m3coordinator/downsample/metrics_appender_test.go b/src/cmd/services/m3coordinator/downsample/metrics_appender_test.go index 8f2907d29b..003906e57a 100644 --- a/src/cmd/services/m3coordinator/downsample/metrics_appender_test.go +++ b/src/cmd/services/m3coordinator/downsample/metrics_appender_test.go @@ -119,8 +119,9 @@ func TestSamplesAppenderPoolResetsTagsAcrossSamples(t *testing.T) { } // NB: expected ID is generated into human-readable form - // from tags in ForwardMatch mock above. - expected := fmt.Sprintf("foo%d-bar%d", i, i) + // from tags in ForwardMatch mock above. Also include the m3 type, which is included when matching. + // nolint:scopelint + expected := fmt.Sprintf("__m3_type__-gauge,foo%d-bar%d", i, i) if expected != u.ID.String() { // NB: if this fails, appender is holding state after Finalize. return fmt.Errorf("expected ID %s, got %s", expected, u.ID.String()) diff --git a/src/cmd/services/m3coordinator/downsample/options.go b/src/cmd/services/m3coordinator/downsample/options.go index 07ea13d37f..03bf33026a 100644 --- a/src/cmd/services/m3coordinator/downsample/options.go +++ b/src/cmd/services/m3coordinator/downsample/options.go @@ -24,7 +24,6 @@ import ( "errors" "fmt" "runtime" - "strings" "time" "github.com/m3db/m3/src/aggregator/aggregator" @@ -225,10 +224,9 @@ type agg struct { aggregator aggregator.Aggregator clientRemote client.Client - clockOpts clock.Options - matcher matcher.Matcher - pools aggPools - augmentM3Tags bool + clockOpts clock.Options + matcher matcher.Matcher + pools aggPools } // Configuration configurates a downsampler. @@ -262,14 +260,6 @@ type Configuration struct { // EntryTTL determines how long an entry remains alive before it may be expired due to inactivity. EntryTTL time.Duration `yaml:"entryTTL"` - - // AugmentM3Tags will augment the metric type to aggregated metrics - // to be used within the filter for rules. If enabled, for example, - // your filter can specify '__m3_type__:gauge' to filter by gauges. - // This is particularly useful for Graphite metrics today. - // Furthermore, the option is automatically enabled if static rules are - // used and any filter contain an __m3_type__ tag. - AugmentM3Tags bool `yaml:"augmentM3Tags"` } // MatcherConfiguration is the configuration for the rule matcher. @@ -658,7 +648,6 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) { scope = instrumentOpts.MetricsScope() logger = instrumentOpts.Logger() openTimeout = defaultOpenTimeout - augmentM3Tags = cfg.AugmentM3Tags namespaceTag = defaultNamespaceTag ) if o.StorageFlushConcurrency > 0 { @@ -717,9 +706,6 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) { rs := rules.NewEmptyRuleSet(defaultConfigInMemoryNamespace, updateMetadata) for _, mappingRule := range cfg.Rules.MappingRules { - if strings.Contains(mappingRule.Filter, metric.M3MetricsPrefixString) { - augmentM3Tags = true - } rule, err := mappingRule.Rule() if err != nil { return agg{}, err @@ -732,9 +718,6 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) { } for _, rollupRule := range cfg.Rules.RollupRules { - if strings.Contains(rollupRule.Filter, metric.M3MetricsPrefixString) { - augmentM3Tags = true - } rule, err := rollupRule.Rule() if err != nil { return agg{}, err @@ -788,10 +771,9 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) { } return agg{ - clientRemote: client, - matcher: matcher, - pools: pools, - augmentM3Tags: augmentM3Tags, + clientRemote: client, + matcher: matcher, + pools: pools, }, nil } @@ -953,10 +935,9 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) { } return agg{ - aggregator: aggregatorInstance, - matcher: matcher, - pools: pools, - augmentM3Tags: augmentM3Tags, + aggregator: aggregatorInstance, + matcher: matcher, + pools: pools, }, nil }