diff --git a/src/cluster/etcd/watchmanager/manager_test.go b/src/cluster/etcd/watchmanager/manager_test.go index d3720132eb..3cd9e1646c 100644 --- a/src/cluster/etcd/watchmanager/manager_test.go +++ b/src/cluster/etcd/watchmanager/manager_test.go @@ -33,6 +33,8 @@ import ( "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/integration" "golang.org/x/net/context" + + "github.com/m3db/m3/src/x/clock" ) func TestWatchChan(t *testing.T) { @@ -242,13 +244,9 @@ func TestWatchNoLeader(t *testing.T) { require.NoError(t, err) // give some time for watch to be updated - for i := 0; i < 10; i++ { - if atomic.LoadInt32(&updateCalled) == int32(2) { - break - } - time.Sleep(watchInitAndRetryDelay) - runtime.Gosched() - } + require.True(t, clock.WaitUntil(func() bool { + return atomic.LoadInt32(&updateCalled) >= 2 + }, 30*time.Second)) updates := atomic.LoadInt32(&updateCalled) if updates < 2 { @@ -295,9 +293,10 @@ func TestWatchCompactedRevision(t *testing.T) { }) go wh.Watch("foo") - time.Sleep(3 * wh.opts.WatchChanInitTimeout()) - assert.Equal(t, int32(3), atomic.LoadInt32(updateCalled)) + require.True(t, clock.WaitUntil(func() bool { + return atomic.LoadInt32(updateCalled) == 3 + }, 30*time.Second)) lastRead := atomic.LoadInt32(updateCalled) ec.Put(context.Background(), "foo", "bar-11") diff --git a/src/cmd/services/m3coordinator/ingest/carbon/ingest.go b/src/cmd/services/m3coordinator/ingest/carbon/ingest.go index 6e1e9f86f2..42fa0637a8 100644 --- a/src/cmd/services/m3coordinator/ingest/carbon/ingest.go +++ b/src/cmd/services/m3coordinator/ingest/carbon/ingest.go @@ -18,6 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. +// Package ingestcarbon implements a carbon ingester. package ingestcarbon import ( @@ -73,7 +74,7 @@ var ( type Options struct { InstrumentOptions instrument.Options WorkerPool xsync.PooledWorkerPool - IngesterConfig *config.CarbonIngesterConfiguration + IngesterConfig config.CarbonIngesterConfiguration } // CarbonIngesterRules contains the carbon ingestion rules. @@ -86,11 +87,9 @@ func (o *Options) Validate() error { if o.InstrumentOptions == nil { return errIOptsMustBeSet } - if o.WorkerPool == nil { return errWorkerPoolMustBeSet } - return nil } @@ -277,10 +276,11 @@ func (i *ingester) Handle(conn net.Conn) { // Interfaces require a context be passed, but M3DB client already has timeouts // built in and allocating a new context each time is expensive so we just pass // the same context always and rely on M3DB client timeouts. - ctx = context.Background() - wg = sync.WaitGroup{} - s = carbon.NewScanner(conn, i.opts.InstrumentOptions) - logger = i.opts.InstrumentOptions.Logger() + ctx = context.Background() + wg = sync.WaitGroup{} + s = carbon.NewScanner(conn, i.opts.InstrumentOptions) + logger = i.opts.InstrumentOptions.Logger() + rewrite = &i.opts.IngesterConfig.Rewrite ) logger.Debug("handling new carbon ingestion connection") @@ -289,8 +289,9 @@ func (i *ingester) Handle(conn net.Conn) { name, timestamp, value := s.Metric() resources := i.getLineResources() + // Copy name since scanner bytes are recycled. - resources.name = append(resources.name[:0], name...) + resources.name = copyAndRewrite(resources.name, name, rewrite) wg.Add(1) i.opts.WorkerPool.Go(func() { diff --git a/src/cmd/services/m3coordinator/ingest/carbon/ingest_test.go b/src/cmd/services/m3coordinator/ingest/carbon/ingest_test.go index 0c7dc595ba..ff8d1542d9 100644 --- a/src/cmd/services/m3coordinator/ingest/carbon/ingest_test.go +++ b/src/cmd/services/m3coordinator/ingest/carbon/ingest_test.go @@ -622,8 +622,7 @@ func TestGenerateTagsFromName(t *testing.T) { func newTestOpts(rules CarbonIngesterRules) Options { cfg := config.CarbonIngesterConfiguration{Rules: rules.Rules} opts := testOptions - opts.IngesterConfig = &cfg - + opts.IngesterConfig = cfg return opts } diff --git a/src/cmd/services/m3coordinator/ingest/carbon/rewrite.go b/src/cmd/services/m3coordinator/ingest/carbon/rewrite.go new file mode 100644 index 0000000000..b5480b7650 --- /dev/null +++ b/src/cmd/services/m3coordinator/ingest/carbon/rewrite.go @@ -0,0 +1,88 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package ingestcarbon + +import ( + "github.com/m3db/m3/src/cmd/services/m3query/config" +) + +// nolint: gocyclo +func copyAndRewrite( + dst, src []byte, + cfg *config.CarbonIngesterRewriteConfiguration, +) []byte { + if cfg == nil || !cfg.Cleanup { + // No rewrite required. + return append(dst[:0], src...) + } + + // Copy into dst as we rewrite. + dst = dst[:0] + leadingDots := true + numDots := 0 + for _, c := range src { + if c == '.' { + numDots++ + } else { + numDots = 0 + leadingDots = false + } + + if leadingDots { + // Currently processing leading dots. + continue + } + + if numDots > 1 { + // Do not keep multiple dots. + continue + } + + if !(c >= 'a' && c <= 'z') && + !(c >= 'A' && c <= 'Z') && + !(c >= '0' && c <= '9') && + c != '.' && + c != '-' && + c != '_' && + c != ':' && + c != '#' { + // Invalid character, replace with underscore. + if n := len(dst); n > 0 && dst[n-1] == '_' { + // Preceding character already underscore. + continue + } + dst = append(dst, '_') + continue + } + + // Valid character and not proceeding dot or multiple dots. + dst = append(dst, c) + } + for i := len(dst) - 1; i >= 0; i-- { + if dst[i] != '.' { + // Found non dot. + break + } + // Remove trailing dot. + dst = dst[:i] + } + return dst +} diff --git a/src/cmd/services/m3coordinator/ingest/carbon/rewrite_test.go b/src/cmd/services/m3coordinator/ingest/carbon/rewrite_test.go new file mode 100644 index 0000000000..baf12ec36c --- /dev/null +++ b/src/cmd/services/m3coordinator/ingest/carbon/rewrite_test.go @@ -0,0 +1,124 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package ingestcarbon + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/m3db/m3/src/cmd/services/m3query/config" +) + +func TestCopyAndRewrite(t *testing.T) { + tests := []struct { + name string + input string + expected string + cfg *config.CarbonIngesterRewriteConfiguration + }{ + { + name: "bad but no rewrite", + input: "foo$$.bar%%.baz@@", + expected: "foo$$.bar%%.baz@@", + cfg: nil, + }, + { + name: "bad but no rewrite cleanup", + input: "foo$$.bar%%.baz@@", + expected: "foo$$.bar%%.baz@@", + cfg: &config.CarbonIngesterRewriteConfiguration{ + Cleanup: false, + }, + }, + { + name: "good with rewrite cleanup", + input: "foo.bar.baz", + expected: "foo.bar.baz", + cfg: &config.CarbonIngesterRewriteConfiguration{ + Cleanup: true, + }, + }, + { + name: "bad with rewrite cleanup", + input: "foo$$.bar%%.baz@@", + expected: "foo_.bar_.baz_", + cfg: &config.CarbonIngesterRewriteConfiguration{ + Cleanup: true, + }, + }, + { + name: "collapse two dots with rewrite cleanup", + input: "foo..bar.baz", + expected: "foo.bar.baz", + cfg: &config.CarbonIngesterRewriteConfiguration{ + Cleanup: true, + }, + }, + { + name: "collapse three and two dots with rewrite cleanup", + input: "foo...bar..baz", + expected: "foo.bar.baz", + cfg: &config.CarbonIngesterRewriteConfiguration{ + Cleanup: true, + }, + }, + { + name: "remove leading dot with rewrite cleanup", + input: ".foo.bar.baz", + expected: "foo.bar.baz", + cfg: &config.CarbonIngesterRewriteConfiguration{ + Cleanup: true, + }, + }, + { + name: "remove multiple leading dots with rewrite cleanup", + input: "..foo.bar.baz", + expected: "foo.bar.baz", + cfg: &config.CarbonIngesterRewriteConfiguration{ + Cleanup: true, + }, + }, + { + name: "remove trailing dot with rewrite cleanup", + input: "foo.bar.baz.", + expected: "foo.bar.baz", + cfg: &config.CarbonIngesterRewriteConfiguration{ + Cleanup: true, + }, + }, + { + name: "remove multiple trailing dots with rewrite cleanup", + input: "foo.bar.baz..", + expected: "foo.bar.baz", + cfg: &config.CarbonIngesterRewriteConfiguration{ + Cleanup: true, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + actual := copyAndRewrite(nil, []byte(test.input), test.cfg) + require.Equal(t, test.expected, string(actual)) + }) + } +} diff --git a/src/cmd/services/m3query/config/config.go b/src/cmd/services/m3query/config/config.go index 30c89ab522..60a946b5e3 100644 --- a/src/cmd/services/m3query/config/config.go +++ b/src/cmd/services/m3query/config/config.go @@ -452,9 +452,21 @@ type CarbonConfiguration struct { // CarbonIngesterConfiguration is the configuration struct for carbon ingestion. type CarbonIngesterConfiguration struct { - ListenAddress string `yaml:"listenAddress"` - MaxConcurrency int `yaml:"maxConcurrency"` - Rules []CarbonIngesterRuleConfiguration `yaml:"rules"` + ListenAddress string `yaml:"listenAddress"` + MaxConcurrency int `yaml:"maxConcurrency"` + Rewrite CarbonIngesterRewriteConfiguration `yaml:"rewrite"` + Rules []CarbonIngesterRuleConfiguration `yaml:"rules"` +} + +// CarbonIngesterRewriteConfiguration is the configuration for rewriting +// metrics at ingestion. +type CarbonIngesterRewriteConfiguration struct { + // Cleanup will perform: + // - Trailing/leading dot elimination. + // - Double dot elimination. + // - Irregular char replacement with underscores (_), currently irregular + // is defined as not being in [0-9a-zA-Z-_:#]. + Cleanup bool `yaml:"cleanup"` } // LookbackDurationOrDefault validates the LookbackDuration diff --git a/src/query/server/query.go b/src/query/server/query.go index 5f491af8f6..21cd32561b 100644 --- a/src/query/server/query.go +++ b/src/query/server/query.go @@ -590,12 +590,10 @@ func Run(runOpts RunOptions) { } if cfg.Carbon != nil && cfg.Carbon.Ingester != nil { - server, ok := startCarbonIngestion(cfg.Carbon, listenerOpts, + server := startCarbonIngestion(*cfg.Carbon.Ingester, listenerOpts, instrumentOptions, logger, m3dbClusters, clusterNamespacesWatcher, downsamplerAndWriter) - if ok { - defer server.Close() - } + defer server.Close() } // Wait for process interrupt. @@ -1095,15 +1093,14 @@ func startGRPCServer( } func startCarbonIngestion( - cfg *config.CarbonConfiguration, + ingesterCfg config.CarbonIngesterConfiguration, listenerOpts xnet.ListenerOptions, iOpts instrument.Options, logger *zap.Logger, m3dbClusters m3.Clusters, clusterNamespacesWatcher m3.ClusterNamespacesWatcher, downsamplerAndWriter ingest.DownsamplerAndWriter, -) (xserver.Server, bool) { - ingesterCfg := cfg.Ingester +) xserver.Server { logger.Info("carbon ingestion enabled, configuring ingester") // Setup worker pool. @@ -1167,7 +1164,7 @@ func startCarbonIngestion( logger.Info("started carbon ingestion server", zap.String("listenAddress", carbonListenAddress)) - return carbonServer, true + return carbonServer } func newDownsamplerAndWriter(