Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[coordinator] Add Graphite rewrite cleanup directive for cleansing incoming metrics #3047

Merged
merged 9 commits into from
Dec 27, 2020
17 changes: 8 additions & 9 deletions src/cluster/etcd/watchmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/integration"
"golang.org/x/net/context"

"github.com/m3db/m3/src/x/clock"
)

func TestWatchChan(t *testing.T) {
Expand Down Expand Up @@ -242,13 +244,9 @@ func TestWatchNoLeader(t *testing.T) {
require.NoError(t, err)

// give some time for watch to be updated
for i := 0; i < 10; i++ {
if atomic.LoadInt32(&updateCalled) == int32(2) {
break
}
time.Sleep(watchInitAndRetryDelay)
runtime.Gosched()
}
require.True(t, clock.WaitUntil(func() bool {
return atomic.LoadInt32(&updateCalled) >= 2
}, 30*time.Second))

updates := atomic.LoadInt32(&updateCalled)
if updates < 2 {
Expand Down Expand Up @@ -295,9 +293,10 @@ func TestWatchCompactedRevision(t *testing.T) {
})

go wh.Watch("foo")
time.Sleep(3 * wh.opts.WatchChanInitTimeout())

assert.Equal(t, int32(3), atomic.LoadInt32(updateCalled))
require.True(t, clock.WaitUntil(func() bool {
return atomic.LoadInt32(updateCalled) == 3
}, 30*time.Second))

lastRead := atomic.LoadInt32(updateCalled)
ec.Put(context.Background(), "foo", "bar-11")
Expand Down
17 changes: 9 additions & 8 deletions src/cmd/services/m3coordinator/ingest/carbon/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

// Package ingestcarbon implements a carbon ingester.
package ingestcarbon

import (
Expand Down Expand Up @@ -73,7 +74,7 @@ var (
type Options struct {
InstrumentOptions instrument.Options
WorkerPool xsync.PooledWorkerPool
IngesterConfig *config.CarbonIngesterConfiguration
IngesterConfig config.CarbonIngesterConfiguration
}

// CarbonIngesterRules contains the carbon ingestion rules.
Expand All @@ -86,11 +87,9 @@ func (o *Options) Validate() error {
if o.InstrumentOptions == nil {
return errIOptsMustBeSet
}

if o.WorkerPool == nil {
return errWorkerPoolMustBeSet
}

return nil
}

Expand Down Expand Up @@ -277,10 +276,11 @@ func (i *ingester) Handle(conn net.Conn) {
// Interfaces require a context be passed, but M3DB client already has timeouts
// built in and allocating a new context each time is expensive so we just pass
// the same context always and rely on M3DB client timeouts.
ctx = context.Background()
wg = sync.WaitGroup{}
s = carbon.NewScanner(conn, i.opts.InstrumentOptions)
logger = i.opts.InstrumentOptions.Logger()
ctx = context.Background()
wg = sync.WaitGroup{}
s = carbon.NewScanner(conn, i.opts.InstrumentOptions)
logger = i.opts.InstrumentOptions.Logger()
rewrite = &i.opts.IngesterConfig.Rewrite
)

logger.Debug("handling new carbon ingestion connection")
Expand All @@ -289,8 +289,9 @@ func (i *ingester) Handle(conn net.Conn) {
name, timestamp, value := s.Metric()

resources := i.getLineResources()

// Copy name since scanner bytes are recycled.
resources.name = append(resources.name[:0], name...)
resources.name = copyAndRewrite(resources.name, name, rewrite)

wg.Add(1)
i.opts.WorkerPool.Go(func() {
Expand Down
3 changes: 1 addition & 2 deletions src/cmd/services/m3coordinator/ingest/carbon/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,8 +622,7 @@ func TestGenerateTagsFromName(t *testing.T) {
func newTestOpts(rules CarbonIngesterRules) Options {
cfg := config.CarbonIngesterConfiguration{Rules: rules.Rules}
opts := testOptions
opts.IngesterConfig = &cfg

opts.IngesterConfig = cfg
return opts
}

Expand Down
88 changes: 88 additions & 0 deletions src/cmd/services/m3coordinator/ingest/carbon/rewrite.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package ingestcarbon

import (
"github.com/m3db/m3/src/cmd/services/m3query/config"
)

// nolint: gocyclo
func copyAndRewrite(
dst, src []byte,
cfg *config.CarbonIngesterRewriteConfiguration,
) []byte {
if cfg == nil || !cfg.Cleanup {
// No rewrite required.
return append(dst[:0], src...)
}

// Copy into dst as we rewrite.
dst = dst[:0]
leadingDots := true
numDots := 0
for _, c := range src {
if c == '.' {
numDots++
} else {
numDots = 0
leadingDots = false
}

if leadingDots {
// Currently processing leading dots.
continue
}

if numDots > 1 {
// Do not keep multiple dots.
continue
}

if !(c >= 'a' && c <= 'z') &&
!(c >= 'A' && c <= 'Z') &&
!(c >= '0' && c <= '9') &&
c != '.' &&
c != '-' &&
c != '_' &&
c != ':' &&
c != '#' {
// Invalid character, replace with underscore.
if n := len(dst); n > 0 && dst[n-1] == '_' {
// Preceding character already underscore.
continue
}
dst = append(dst, '_')
continue
}

// Valid character and not proceeding dot or multiple dots.
dst = append(dst, c)
}
for i := len(dst) - 1; i >= 0; i-- {
if dst[i] != '.' {
// Found non dot.
break
}
// Remove trailing dot.
dst = dst[:i]
}
return dst
}
124 changes: 124 additions & 0 deletions src/cmd/services/m3coordinator/ingest/carbon/rewrite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package ingestcarbon

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/m3db/m3/src/cmd/services/m3query/config"
)

func TestCopyAndRewrite(t *testing.T) {
tests := []struct {
name string
input string
expected string
cfg *config.CarbonIngesterRewriteConfiguration
}{
{
name: "bad but no rewrite",
input: "foo$$.bar%%.baz@@",
expected: "foo$$.bar%%.baz@@",
cfg: nil,
},
{
name: "bad but no rewrite cleanup",
input: "foo$$.bar%%.baz@@",
expected: "foo$$.bar%%.baz@@",
cfg: &config.CarbonIngesterRewriteConfiguration{
Cleanup: false,
},
},
{
name: "good with rewrite cleanup",
input: "foo.bar.baz",
expected: "foo.bar.baz",
cfg: &config.CarbonIngesterRewriteConfiguration{
Cleanup: true,
},
},
{
name: "bad with rewrite cleanup",
input: "foo$$.bar%%.baz@@",
expected: "foo_.bar_.baz_",
cfg: &config.CarbonIngesterRewriteConfiguration{
Cleanup: true,
},
},
{
name: "collapse two dots with rewrite cleanup",
input: "foo..bar.baz",
expected: "foo.bar.baz",
cfg: &config.CarbonIngesterRewriteConfiguration{
Cleanup: true,
},
},
{
name: "collapse three and two dots with rewrite cleanup",
input: "foo...bar..baz",
expected: "foo.bar.baz",
cfg: &config.CarbonIngesterRewriteConfiguration{
Cleanup: true,
},
},
{
name: "remove leading dot with rewrite cleanup",
input: ".foo.bar.baz",
expected: "foo.bar.baz",
cfg: &config.CarbonIngesterRewriteConfiguration{
Cleanup: true,
},
},
{
name: "remove multiple leading dots with rewrite cleanup",
input: "..foo.bar.baz",
expected: "foo.bar.baz",
cfg: &config.CarbonIngesterRewriteConfiguration{
Cleanup: true,
},
},
{
name: "remove trailing dot with rewrite cleanup",
input: "foo.bar.baz.",
expected: "foo.bar.baz",
cfg: &config.CarbonIngesterRewriteConfiguration{
Cleanup: true,
},
},
{
name: "remove multiple trailing dots with rewrite cleanup",
input: "foo.bar.baz..",
expected: "foo.bar.baz",
cfg: &config.CarbonIngesterRewriteConfiguration{
Cleanup: true,
},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
actual := copyAndRewrite(nil, []byte(test.input), test.cfg)
require.Equal(t, test.expected, string(actual))
})
}
}
18 changes: 15 additions & 3 deletions src/cmd/services/m3query/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,9 +452,21 @@ type CarbonConfiguration struct {

// CarbonIngesterConfiguration is the configuration struct for carbon ingestion.
type CarbonIngesterConfiguration struct {
ListenAddress string `yaml:"listenAddress"`
MaxConcurrency int `yaml:"maxConcurrency"`
Rules []CarbonIngesterRuleConfiguration `yaml:"rules"`
ListenAddress string `yaml:"listenAddress"`
MaxConcurrency int `yaml:"maxConcurrency"`
Rewrite CarbonIngesterRewriteConfiguration `yaml:"rewrite"`
Rules []CarbonIngesterRuleConfiguration `yaml:"rules"`
}

// CarbonIngesterRewriteConfiguration is the configuration for rewriting
// metrics at ingestion.
type CarbonIngesterRewriteConfiguration struct {
// Cleanup will perform:
// - Trailing/leading dot elimination.
// - Double dot elimination.
// - Irregular char replacement with underscores (_), currently irregular
// is defined as not being in [0-9a-zA-Z-_:#].
Cleanup bool `yaml:"cleanup"`
}

// LookbackDurationOrDefault validates the LookbackDuration
Expand Down
13 changes: 5 additions & 8 deletions src/query/server/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,12 +590,10 @@ func Run(runOpts RunOptions) {
}

if cfg.Carbon != nil && cfg.Carbon.Ingester != nil {
server, ok := startCarbonIngestion(cfg.Carbon, listenerOpts,
server := startCarbonIngestion(*cfg.Carbon.Ingester, listenerOpts,
instrumentOptions, logger, m3dbClusters, clusterNamespacesWatcher,
downsamplerAndWriter)
if ok {
defer server.Close()
}
defer server.Close()
}

// Wait for process interrupt.
Expand Down Expand Up @@ -1095,15 +1093,14 @@ func startGRPCServer(
}

func startCarbonIngestion(
cfg *config.CarbonConfiguration,
ingesterCfg config.CarbonIngesterConfiguration,
listenerOpts xnet.ListenerOptions,
iOpts instrument.Options,
logger *zap.Logger,
m3dbClusters m3.Clusters,
clusterNamespacesWatcher m3.ClusterNamespacesWatcher,
downsamplerAndWriter ingest.DownsamplerAndWriter,
) (xserver.Server, bool) {
ingesterCfg := cfg.Ingester
) xserver.Server {
logger.Info("carbon ingestion enabled, configuring ingester")

// Setup worker pool.
Expand Down Expand Up @@ -1167,7 +1164,7 @@ func startCarbonIngestion(

logger.Info("started carbon ingestion server", zap.String("listenAddress", carbonListenAddress))

return carbonServer, true
return carbonServer
}

func newDownsamplerAndWriter(
Expand Down