diff --git a/flusher.go b/flusher.go index b47292695..334e515ae 100644 --- a/flusher.go +++ b/flusher.go @@ -183,7 +183,19 @@ func (s *Server) flushSink( maxTagLengthCount += 1 continue metricLoop } - filteredTags = append(filteredTags, tag) + + replaced := false + for i, ft := range filteredTags { + if ft[0:len(k)] == k { + filteredTags[i] = tag + replaced = true + break + } + } + + if !replaced { + filteredTags = append(filteredTags, tag) + } } if sink.maxTags != 0 && len(filteredTags) > sink.maxTags { diff --git a/flusher_test.go b/flusher_test.go index fb884043b..1dba8abf5 100644 --- a/flusher_test.go +++ b/flusher_test.go @@ -972,6 +972,308 @@ func TestFlushWithAddTags(t *testing.T) { }) } +func TestFlushWithAddTagsDedupes(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + channel := make(chan []samplers.InterMetric) + mockStatsd := scopedstatsd.NewMockClient(ctrl) + server, err := NewFromConfig(ServerConfig{ + Logger: logrus.New(), + Config: Config{ + Debug: true, + Features: Features{ + EnableMetricSinkRouting: true, + }, + Hostname: "localhost", + Interval: DefaultFlushInterval, + MetricSinkRouting: []SinkRoutingConfig{{ + Name: "default", + Match: []matcher.Matcher{{ + Name: matcher.CreateNameMatcher(&matcher.NameMatcherConfig{ + Kind: "any", + }), + Tags: []matcher.TagMatcher{}, + }}, + Sinks: SinkRoutingSinks{ + Matched: []string{"channel"}, + }, + }}, + MetricSinks: []SinkConfig{{ + Kind: "channel", + Name: "channel", + AddTags: map[string]string{"foo": "bar", "anotha": "one"}, + MaxTags: 5, + }}, + StatsAddress: "localhost:8125", + }, + MetricSinkTypes: MetricSinkTypes{ + "channel": { + Create: func( + server *Server, name string, logger *logrus.Entry, config Config, + sinkConfig MetricSinkConfig, + ) (sinks.MetricSink, error) { + sink, err := NewChannelMetricSink(channel) + if err != nil { + return nil, err + } + return sink, nil + }, + ParseConfig: func( + name string, config interface{}, + ) (MetricSinkConfig, error) { + return nil, nil + }, + }, + }, + }) + assert.NoError(t, err) + server.Statsd = mockStatsd + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + server.Start() + wg.Done() + }() + defer func() { + server.Shutdown() + wg.Wait() + }() + + mockStatsd.EXPECT(). + Count( + gomock.Not(anyOf("flushed_metrics", "dropped_metrics")), gomock.All(), gomock.All(), gomock.All()). + AnyTimes() + mockStatsd.EXPECT(). + Gauge( + gomock.Not("flushed_metrics"), gomock.All(), gomock.All(), gomock.All()). + AnyTimes() + mockStatsd.EXPECT(). + Timing( + gomock.Not("flushed_metrics"), gomock.All(), gomock.All(), gomock.All()). + AnyTimes() + + t.Run("WithAddTags", func(t *testing.T) { + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:skipped", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:max_name_length", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:max_tags", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:max_tag_length", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(1), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:flushed", + "veneurglobalonly:true", + }, 1.0) + + server.Workers[0].PacketChan <- samplers.UDPMetric{ + MetricKey: samplers.MetricKey{ + Name: "test.metric", + Type: "counter", + }, + Digest: 0, + Scope: samplers.LocalOnly, + Tags: []string{"foo:baz"}, + Value: 1.0, + SampleRate: 1.0, + } + + result := <-channel + if assert.Len(t, result, 1) { + assert.Equal(t, "test.metric", result[0].Name) + if assert.Len(t, result[0].Tags, 2) { + assert.Equal(t, "foo:bar", result[0].Tags[0]) + } + } + }) + + t.Run("WithAddTagsMultiple", func(t *testing.T) { + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:skipped", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:max_name_length", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:max_tags", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:max_tag_length", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(1), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:flushed", + "veneurglobalonly:true", + }, 1.0) + + server.Workers[0].PacketChan <- samplers.UDPMetric{ + MetricKey: samplers.MetricKey{ + Name: "test.metric", + Type: "counter", + }, + Digest: 0, + Scope: samplers.LocalOnly, + Tags: []string{"anotha:thing", "foo:baz", "more:tags"}, + Value: 1.0, + SampleRate: 1.0, + } + + result := <-channel + if assert.Len(t, result, 1) { + assert.Equal(t, "test.metric", result[0].Name) + if assert.Len(t, result[0].Tags, 3) { + assert.Equal(t, "anotha:one", result[0].Tags[0]) + assert.Equal(t, "foo:bar", result[0].Tags[1]) + } + } + }) + + t.Run("WithAddTagsDoesNotMax", func(t *testing.T) { + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:skipped", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:max_name_length", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:max_tags", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:max_tag_length", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(1), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:flushed", + "veneurglobalonly:true", + }, 1.0) + + server.Workers[0].PacketChan <- samplers.UDPMetric{ + MetricKey: samplers.MetricKey{ + Name: "test.metric", + Type: "counter", + }, + Digest: 0, + Scope: samplers.LocalOnly, + Tags: []string{"anotha:thing", "foo:baz", "more:tags", "tags:fordays", "last:one"}, + Value: 1.0, + SampleRate: 1.0, + } + + result := <-channel + if assert.Len(t, result, 1) { + assert.Equal(t, "test.metric", result[0].Name) + if assert.Len(t, result[0].Tags, 5) { + assert.Equal(t, "anotha:one", result[0].Tags[0]) + assert.Equal(t, "foo:bar", result[0].Tags[1]) + assert.Equal(t, "more:tags", result[0].Tags[2]) + assert.Equal(t, "tags:fordays", result[0].Tags[3]) + assert.Equal(t, "last:one", result[0].Tags[4]) + } + } + }) + + t.Run("WithAddTagsHitsMaxTags", func(t *testing.T) { + mockStatsd.EXPECT().Count("dropped_metrics", int64(1), []string{ + "sink_name:channel", + "sink_kind:channel", + "metric_name:test.metric", + "reason:max_tags", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:skipped", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:max_name_length", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(1), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:max_tags", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:max_tag_length", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:flushed", + "veneurglobalonly:true", + }, 1.0) + + server.Workers[0].PacketChan <- samplers.UDPMetric{ + MetricKey: samplers.MetricKey{ + Name: "test.metric", + Type: "counter", + }, + Digest: 0, + Scope: samplers.LocalOnly, + Tags: []string{"anotha:thing", "foo:baz", "more:tags", "tags:fordays", "last:one", "max:tags"}, + Value: 1.0, + SampleRate: 1.0, + } + + result := <-channel + assert.Len(t, result, 0) + }) +} + type anyOfMatcher struct { s []string }