From 8b32d8939e104a09659d9d75de72706fbd013c0b Mon Sep 17 00:00:00 2001 From: Andres Galindo Date: Wed, 16 Nov 2022 08:56:45 -0500 Subject: [PATCH 1/2] Deduplicating add tags --- flusher.go | 14 ++++++- flusher_test.go | 98 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 111 insertions(+), 1 deletion(-) diff --git a/flusher.go b/flusher.go index b47292695..334e515ae 100644 --- a/flusher.go +++ b/flusher.go @@ -183,7 +183,19 @@ func (s *Server) flushSink( maxTagLengthCount += 1 continue metricLoop } - filteredTags = append(filteredTags, tag) + + replaced := false + for i, ft := range filteredTags { + if ft[0:len(k)] == k { + filteredTags[i] = tag + replaced = true + break + } + } + + if !replaced { + filteredTags = append(filteredTags, tag) + } } if sink.maxTags != 0 && len(filteredTags) > sink.maxTags { diff --git a/flusher_test.go b/flusher_test.go index fb884043b..be053bc2d 100644 --- a/flusher_test.go +++ b/flusher_test.go @@ -972,6 +972,104 @@ func TestFlushWithAddTags(t *testing.T) { }) } +func TestFlushWithAddTagsDedupes(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + channel := make(chan []samplers.InterMetric) + mockStatsd := scopedstatsd.NewMockClient(ctrl) + server, err := NewFromConfig(ServerConfig{ + Logger: logrus.New(), + Config: Config{ + Debug: true, + Features: Features{ + EnableMetricSinkRouting: true, + }, + Hostname: "localhost", + Interval: DefaultFlushInterval, + MetricSinkRouting: []SinkRoutingConfig{{ + Name: "default", + Match: []matcher.Matcher{{ + Name: matcher.CreateNameMatcher(&matcher.NameMatcherConfig{ + Kind: "any", + }), + Tags: []matcher.TagMatcher{}, + }}, + Sinks: SinkRoutingSinks{ + Matched: []string{"channel"}, + }, + }}, + MetricSinks: []SinkConfig{{ + Kind: "channel", + Name: "channel", + AddTags: map[string]string{"foo": "bar", "env": "qa"}, + }}, + StatsAddress: "localhost:8125", + }, + MetricSinkTypes: MetricSinkTypes{ + "channel": { + Create: func( + server *Server, name string, logger *logrus.Entry, config Config, + sinkConfig MetricSinkConfig, + ) (sinks.MetricSink, error) { + sink, err := NewChannelMetricSink(channel) + if err != nil { + return nil, err + } + return sink, nil + }, + ParseConfig: func( + name string, config interface{}, + ) (MetricSinkConfig, error) { + return nil, nil + }, + }, + }, + }) + + assert.NoError(t, err) + server.Statsd = mockStatsd + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + server.Start() + wg.Done() + }() + defer func() { + server.Shutdown() + wg.Wait() + }() + + mockStatsd.EXPECT().Count(gomock.Any(), gomock.All(), gomock.All(), gomock.All()).AnyTimes() + mockStatsd.EXPECT().Gauge(gomock.Any(), gomock.All(), gomock.All(), gomock.All()).AnyTimes() + mockStatsd.EXPECT().Timing(gomock.Any(), gomock.All(), gomock.All(), gomock.All()).AnyTimes() + + server.Workers[0].PacketChan <- samplers.UDPMetric{ + MetricKey: samplers.MetricKey{ + Name: "test.metric", + Type: "counter", + JoinedTags: "foo:not_bar", + }, + Digest: 0, + Scope: samplers.LocalOnly, + Tags: []string{"first:tag", "foo:not_bar", "env:foo", "not_matched:oo"}, + Value: 1.0, + SampleRate: 1.0, + } + + result := <-channel + if assert.Len(t, result, 1) { + assert.Equal(t, "test.metric", result[0].Name) + if assert.Len(t, result[0].Tags, 4) { + assert.Equal(t, "first:tag", result[0].Tags[0]) + assert.Equal(t, "foo:bar", result[0].Tags[1]) + assert.Equal(t, "env:qa", result[0].Tags[2]) + assert.Equal(t, "not_matched:oo", result[0].Tags[3]) + } + } +} + type anyOfMatcher struct { s []string } From 62606cb57108dc772ebe70584817cd02efef5630 Mon Sep 17 00:00:00 2001 From: Andres Galindo Date: Wed, 16 Nov 2022 10:26:07 -0500 Subject: [PATCH 2/2] Fixed tests --- flusher_test.go | 256 +++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 230 insertions(+), 26 deletions(-) diff --git a/flusher_test.go b/flusher_test.go index be053bc2d..1dba8abf5 100644 --- a/flusher_test.go +++ b/flusher_test.go @@ -1002,7 +1002,8 @@ func TestFlushWithAddTagsDedupes(t *testing.T) { MetricSinks: []SinkConfig{{ Kind: "channel", Name: "channel", - AddTags: map[string]string{"foo": "bar", "env": "qa"}, + AddTags: map[string]string{"foo": "bar", "anotha": "one"}, + MaxTags: 5, }}, StatsAddress: "localhost:8125", }, @@ -1026,7 +1027,6 @@ func TestFlushWithAddTagsDedupes(t *testing.T) { }, }, }) - assert.NoError(t, err) server.Statsd = mockStatsd @@ -1041,33 +1041,237 @@ func TestFlushWithAddTagsDedupes(t *testing.T) { wg.Wait() }() - mockStatsd.EXPECT().Count(gomock.Any(), gomock.All(), gomock.All(), gomock.All()).AnyTimes() - mockStatsd.EXPECT().Gauge(gomock.Any(), gomock.All(), gomock.All(), gomock.All()).AnyTimes() - mockStatsd.EXPECT().Timing(gomock.Any(), gomock.All(), gomock.All(), gomock.All()).AnyTimes() + mockStatsd.EXPECT(). + Count( + gomock.Not(anyOf("flushed_metrics", "dropped_metrics")), gomock.All(), gomock.All(), gomock.All()). + AnyTimes() + mockStatsd.EXPECT(). + Gauge( + gomock.Not("flushed_metrics"), gomock.All(), gomock.All(), gomock.All()). + AnyTimes() + mockStatsd.EXPECT(). + Timing( + gomock.Not("flushed_metrics"), gomock.All(), gomock.All(), gomock.All()). + AnyTimes() - server.Workers[0].PacketChan <- samplers.UDPMetric{ - MetricKey: samplers.MetricKey{ - Name: "test.metric", - Type: "counter", - JoinedTags: "foo:not_bar", - }, - Digest: 0, - Scope: samplers.LocalOnly, - Tags: []string{"first:tag", "foo:not_bar", "env:foo", "not_matched:oo"}, - Value: 1.0, - SampleRate: 1.0, - } + t.Run("WithAddTags", func(t *testing.T) { + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:skipped", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:max_name_length", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:max_tags", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:max_tag_length", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(1), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:flushed", + "veneurglobalonly:true", + }, 1.0) + + server.Workers[0].PacketChan <- samplers.UDPMetric{ + MetricKey: samplers.MetricKey{ + Name: "test.metric", + Type: "counter", + }, + Digest: 0, + Scope: samplers.LocalOnly, + Tags: []string{"foo:baz"}, + Value: 1.0, + SampleRate: 1.0, + } - result := <-channel - if assert.Len(t, result, 1) { - assert.Equal(t, "test.metric", result[0].Name) - if assert.Len(t, result[0].Tags, 4) { - assert.Equal(t, "first:tag", result[0].Tags[0]) - assert.Equal(t, "foo:bar", result[0].Tags[1]) - assert.Equal(t, "env:qa", result[0].Tags[2]) - assert.Equal(t, "not_matched:oo", result[0].Tags[3]) + result := <-channel + if assert.Len(t, result, 1) { + assert.Equal(t, "test.metric", result[0].Name) + if assert.Len(t, result[0].Tags, 2) { + assert.Equal(t, "foo:bar", result[0].Tags[0]) + } } - } + }) + + t.Run("WithAddTagsMultiple", func(t *testing.T) { + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:skipped", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:max_name_length", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:max_tags", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:max_tag_length", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(1), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:flushed", + "veneurglobalonly:true", + }, 1.0) + + server.Workers[0].PacketChan <- samplers.UDPMetric{ + MetricKey: samplers.MetricKey{ + Name: "test.metric", + Type: "counter", + }, + Digest: 0, + Scope: samplers.LocalOnly, + Tags: []string{"anotha:thing", "foo:baz", "more:tags"}, + Value: 1.0, + SampleRate: 1.0, + } + + result := <-channel + if assert.Len(t, result, 1) { + assert.Equal(t, "test.metric", result[0].Name) + if assert.Len(t, result[0].Tags, 3) { + assert.Equal(t, "anotha:one", result[0].Tags[0]) + assert.Equal(t, "foo:bar", result[0].Tags[1]) + } + } + }) + + t.Run("WithAddTagsDoesNotMax", func(t *testing.T) { + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:skipped", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:max_name_length", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:max_tags", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:max_tag_length", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(1), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:flushed", + "veneurglobalonly:true", + }, 1.0) + + server.Workers[0].PacketChan <- samplers.UDPMetric{ + MetricKey: samplers.MetricKey{ + Name: "test.metric", + Type: "counter", + }, + Digest: 0, + Scope: samplers.LocalOnly, + Tags: []string{"anotha:thing", "foo:baz", "more:tags", "tags:fordays", "last:one"}, + Value: 1.0, + SampleRate: 1.0, + } + + result := <-channel + if assert.Len(t, result, 1) { + assert.Equal(t, "test.metric", result[0].Name) + if assert.Len(t, result[0].Tags, 5) { + assert.Equal(t, "anotha:one", result[0].Tags[0]) + assert.Equal(t, "foo:bar", result[0].Tags[1]) + assert.Equal(t, "more:tags", result[0].Tags[2]) + assert.Equal(t, "tags:fordays", result[0].Tags[3]) + assert.Equal(t, "last:one", result[0].Tags[4]) + } + } + }) + + t.Run("WithAddTagsHitsMaxTags", func(t *testing.T) { + mockStatsd.EXPECT().Count("dropped_metrics", int64(1), []string{ + "sink_name:channel", + "sink_kind:channel", + "metric_name:test.metric", + "reason:max_tags", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:skipped", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:max_name_length", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(1), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:max_tags", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:max_tag_length", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:flushed", + "veneurglobalonly:true", + }, 1.0) + + server.Workers[0].PacketChan <- samplers.UDPMetric{ + MetricKey: samplers.MetricKey{ + Name: "test.metric", + Type: "counter", + }, + Digest: 0, + Scope: samplers.LocalOnly, + Tags: []string{"anotha:thing", "foo:baz", "more:tags", "tags:fordays", "last:one", "max:tags"}, + Value: 1.0, + SampleRate: 1.0, + } + + result := <-channel + assert.Len(t, result, 0) + }) } type anyOfMatcher struct {