Skip to content

Commit

Permalink
[coordinator] Rollout augmentM3Tags flag to true by default (#3082)
Browse files Browse the repository at this point in the history
* [coordinator] Rollout augmentM3Tags flag to true by default

* Fixup operator docs link

* Fixup doc site links
  • Loading branch information
wesleyk authored Jan 12, 2021
1 parent e22fb12 commit f26ccea
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 80 deletions.
2 changes: 1 addition & 1 deletion site/content/m3query/architecture/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ chapter: true

## Overview

M3 Query and M3 Coordinator are written entirely in Go, M3 Query is as a query engine for [M3DB](https://m3db.github.io/m3/) and M3 Coordinator is a remote read/write endpoint for Prometheus and M3DB. To learn more about Prometheus's remote endpoints and storage, [see here](https://prometheus.io/docs/operating/integrations/#remote-endpoints-and-storage).
M3 Query and M3 Coordinator are written entirely in Go, M3 Query is as a query engine for [M3DB](https://m3db.io/) and M3 Coordinator is a remote read/write endpoint for Prometheus and M3DB. To learn more about Prometheus's remote endpoints and storage, [see here](https://prometheus.io/docs/operating/integrations/#remote-endpoints-and-storage).
2 changes: 1 addition & 1 deletion site/content/m3query/config/annotated_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ writeWorkerPoolPolicy:
size: <int>

tagOptions:
# See here for more information: http://m3db.github.io/m3/how_to/query/#id-generation
# See here for more information under ID generation: https://m3db.io/docs/how_to/query/
idScheme: <id_scheme>

# lookbackDuration defines, at each step, how long we lookback until we see a non-NaN value.
Expand Down
2 changes: 1 addition & 1 deletion site/content/operator/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ Namespace defines an M3DB namespace or points to a preset M3DB namespace.

## NamespaceOptions

NamespaceOptions defines parameters for an M3DB namespace. See https://m3db.github.io/m3/operational_guide/namespace_configuration/ for more details.
NamespaceOptions defines parameters for an M3DB namespace. See https://m3db.io/docs/operational_guide/namespace_configuration/ for more details.

| Field | Description | Scheme | Required |
| ----- | ----------- | ------ | -------- |
Expand Down
1 change: 0 additions & 1 deletion src/cmd/services/m3coordinator/downsample/downsampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ func defaultMetricsAppenderOptions(opts DownsamplerOptions, agg agg) metricsAppe
metricTagsIteratorPool: agg.pools.metricTagsIteratorPool,
debugLogging: debugLogging,
logger: logger,
augmentM3Tags: agg.augmentM3Tags,
}
}

Expand Down
61 changes: 15 additions & 46 deletions src/cmd/services/m3coordinator/downsample/metrics_appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ type metricsAppenderOptions struct {
matcher matcher.Matcher
tagEncoderPool serialize.TagEncoderPool
metricTagsIteratorPool serialize.MetricTagsIteratorPool
augmentM3Tags bool

clockOpts clock.Options
debugLogging bool
Expand Down Expand Up @@ -149,19 +148,16 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp
}
tags := a.originalTags

// Augment tags if necessary.
if a.augmentM3Tags {
// NB (@shreyas): Add the metric type tag. The tag has the prefix
// __m3_. All tags with that prefix are only used for the purpose of
// filter match and then stripped off before we actually send to the aggregator.
switch opts.MetricType {
case ts.M3MetricTypeCounter:
tags.append(metric.M3TypeTag, metric.M3CounterValue)
case ts.M3MetricTypeGauge:
tags.append(metric.M3TypeTag, metric.M3GaugeValue)
case ts.M3MetricTypeTimer:
tags.append(metric.M3TypeTag, metric.M3TimerValue)
}
// NB (@shreyas): Add the metric type tag. The tag has the prefix
// __m3_. All tags with that prefix are only used for the purpose of
// filter match and then stripped off before we actually send to the aggregator.
switch opts.MetricType {
case ts.M3MetricTypeCounter:
tags.append(metric.M3TypeTag, metric.M3CounterValue)
case ts.M3MetricTypeGauge:
tags.append(metric.M3TypeTag, metric.M3GaugeValue)
case ts.M3MetricTypeTimer:
tags.append(metric.M3TypeTag, metric.M3TimerValue)
}

// Sort tags
Expand Down Expand Up @@ -190,11 +186,8 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp
matchResult := a.matcher.ForwardMatch(id, fromNanos, toNanos)
id.Close()

// If we augmented metrics tags before running the forward match, then
// filter them out.
if a.augmentM3Tags {
tags.filterPrefix(metric.M3MetricsPrefix)
}
// filter out augmented metrics tags
tags.filterPrefix(metric.M3MetricsPrefix)

var dropApplyResult metadata.ApplyOrRemoveDropPoliciesResult
if opts.Override {
Expand All @@ -215,7 +208,7 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp
append(a.curr.Pipelines, pipelines.Pipelines...)
}

if err := a.addSamplesAppenders(tags, a.curr, unownedID); err != nil {
if err := a.addSamplesAppenders(tags, a.curr); err != nil {
return SamplesAppenderResult{}, err
}
} else {
Expand Down Expand Up @@ -358,7 +351,7 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp
a.debugLogMatch("downsampler using built mapping staged metadatas",
debugLogMatchOptions{Meta: []metadata.StagedMetadata{a.curr}})

if err := a.addSamplesAppenders(tags, a.curr, unownedID); err != nil {
if err := a.addSamplesAppenders(tags, a.curr); err != nil {
return SamplesAppenderResult{}, err
}
}
Expand Down Expand Up @@ -472,31 +465,7 @@ func (a *metricsAppender) resetTags() {
a.originalTags = nil
}

func (a *metricsAppender) addSamplesAppenders(
originalTags *tags,
stagedMetadata metadata.StagedMetadata,
unownedID []byte,
) error {
// Check if any of the pipelines have tags or a graphite prefix to set.
var tagsExist bool
for _, pipeline := range stagedMetadata.Pipelines {
if len(pipeline.Tags) > 0 || len(pipeline.GraphitePrefix) > 0 {
tagsExist = true
break
}
}

// If we do not need to do any tag augmentation then just return.
if !a.augmentM3Tags && !tagsExist {
a.multiSamplesAppender.addSamplesAppender(samplesAppender{
agg: a.agg,
clientRemote: a.clientRemote,
unownedID: unownedID,
stagedMetadatas: []metadata.StagedMetadata{stagedMetadata},
})
return nil
}

func (a *metricsAppender) addSamplesAppenders(originalTags *tags, stagedMetadata metadata.StagedMetadata) error {
var (
pipelines []metadata.PipelineMetadata
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,9 @@ func TestSamplesAppenderPoolResetsTagsAcrossSamples(t *testing.T) {
}

// NB: expected ID is generated into human-readable form
// from tags in ForwardMatch mock above.
expected := fmt.Sprintf("foo%d-bar%d", i, i)
// from tags in ForwardMatch mock above. Also include the m3 type, which is included when matching.
// nolint:scopelint
expected := fmt.Sprintf("__m3_type__-gauge,foo%d-bar%d", i, i)
if expected != u.ID.String() {
// NB: if this fails, appender is holding state after Finalize.
return fmt.Errorf("expected ID %s, got %s", expected, u.ID.String())
Expand Down
37 changes: 9 additions & 28 deletions src/cmd/services/m3coordinator/downsample/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"errors"
"fmt"
"runtime"
"strings"
"time"

"github.com/m3db/m3/src/aggregator/aggregator"
Expand Down Expand Up @@ -225,10 +224,9 @@ type agg struct {
aggregator aggregator.Aggregator
clientRemote client.Client

clockOpts clock.Options
matcher matcher.Matcher
pools aggPools
augmentM3Tags bool
clockOpts clock.Options
matcher matcher.Matcher
pools aggPools
}

// Configuration configurates a downsampler.
Expand Down Expand Up @@ -262,14 +260,6 @@ type Configuration struct {

// EntryTTL determines how long an entry remains alive before it may be expired due to inactivity.
EntryTTL time.Duration `yaml:"entryTTL"`

// AugmentM3Tags will augment the metric type to aggregated metrics
// to be used within the filter for rules. If enabled, for example,
// your filter can specify '__m3_type__:gauge' to filter by gauges.
// This is particularly useful for Graphite metrics today.
// Furthermore, the option is automatically enabled if static rules are
// used and any filter contain an __m3_type__ tag.
AugmentM3Tags bool `yaml:"augmentM3Tags"`
}

// MatcherConfiguration is the configuration for the rule matcher.
Expand Down Expand Up @@ -658,7 +648,6 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) {
scope = instrumentOpts.MetricsScope()
logger = instrumentOpts.Logger()
openTimeout = defaultOpenTimeout
augmentM3Tags = cfg.AugmentM3Tags
namespaceTag = defaultNamespaceTag
)
if o.StorageFlushConcurrency > 0 {
Expand Down Expand Up @@ -717,9 +706,6 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) {
rs := rules.NewEmptyRuleSet(defaultConfigInMemoryNamespace,
updateMetadata)
for _, mappingRule := range cfg.Rules.MappingRules {
if strings.Contains(mappingRule.Filter, metric.M3MetricsPrefixString) {
augmentM3Tags = true
}
rule, err := mappingRule.Rule()
if err != nil {
return agg{}, err
Expand All @@ -732,9 +718,6 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) {
}

for _, rollupRule := range cfg.Rules.RollupRules {
if strings.Contains(rollupRule.Filter, metric.M3MetricsPrefixString) {
augmentM3Tags = true
}
rule, err := rollupRule.Rule()
if err != nil {
return agg{}, err
Expand Down Expand Up @@ -788,10 +771,9 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) {
}

return agg{
clientRemote: client,
matcher: matcher,
pools: pools,
augmentM3Tags: augmentM3Tags,
clientRemote: client,
matcher: matcher,
pools: pools,
}, nil
}

Expand Down Expand Up @@ -953,10 +935,9 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) {
}

return agg{
aggregator: aggregatorInstance,
matcher: matcher,
pools: pools,
augmentM3Tags: augmentM3Tags,
aggregator: aggregatorInstance,
matcher: matcher,
pools: pools,
}, nil
}

Expand Down

0 comments on commit f26ccea

Please sign in to comment.