diff --git a/stats/memstats/stats.go b/stats/memstats/stats.go index 74313879..69d30c6a 100644 --- a/stats/memstats/stats.go +++ b/stats/memstats/stats.go @@ -2,9 +2,11 @@ package memstats import ( "context" + "sort" "sync" "time" + "github.com/samber/lo" "github.com/spf13/cast" "github.com/rudderlabs/rudder-go-kit/stats" @@ -33,6 +35,19 @@ type Measurement struct { durations []time.Duration } +// Metric captures the name, tags and value(s) depending on type. +// +// For Count and Gauge, Value is used. +// For Histogram, Values is used. +// For Timer, Durations is used. +type Metric struct { + Name string + Tags stats.Tags + Value float64 // Count, Gauge + Values []float64 // Histogram + Durations []time.Duration // Timer +} + func (m *Measurement) LastValue() float64 { m.mu.Lock() defer m.mu.Unlock() @@ -211,6 +226,56 @@ func (ms *Store) Get(name string, tags stats.Tags) *Measurement { return ms.byKey[ms.getKey(name, tags)] } +// GetAll returns the metric for all name/tags register in the store. +func (ms *Store) GetAll() []Metric { + return ms.getAllByName("") +} + +// GetByName returns the metric for each tag variation with the given name. +func (ms *Store) GetByName(name string) []Metric { + if name == "" { + panic("name cannot be empty") + } + return ms.getAllByName(name) +} + +func (ms *Store) getAllByName(name string) []Metric { + ms.mu.Lock() + defer ms.mu.Unlock() + + keys := lo.Filter(lo.Keys(ms.byKey), func(k string, index int) bool { + return name == "" || ms.byKey[k].name == name + }) + sort.SliceStable(keys, func(i, j int) bool { + return keys[i] < keys[j] + }) + return lo.Map(keys, func(key string, index int) Metric { + m := ms.byKey[key] + switch m.mType { + case stats.CountType, stats.GaugeType: + return Metric{ + Name: m.name, + Tags: m.tags, + Value: m.LastValue(), + } + case stats.HistogramType: + return Metric{ + Name: m.name, + Tags: m.tags, + Values: m.Values(), + } + case stats.TimerType: + return Metric{ + Name: m.name, + Tags: m.tags, + Durations: m.Durations(), + } + default: + panic("unknown measurement type:" + m.mType) + } + }) +} + // Start implements stats.Stats func (*Store) Start(_ context.Context, _ stats.GoRoutineFactory) error { return nil } diff --git a/stats/memstats/stats_test.go b/stats/memstats/stats_test.go index 5cfa1c07..a4fd5992 100644 --- a/stats/memstats/stats_test.go +++ b/stats/memstats/stats_test.go @@ -14,17 +14,15 @@ import ( func TestStats(t *testing.T) { now := time.Now() - store := memstats.New( - memstats.WithNow(func() time.Time { - return now - }), - ) - commonTags := stats.Tags{"tag1": "value1"} t.Run("test Count", func(t *testing.T) { name := "testCount" - + store := memstats.New( + memstats.WithNow(func() time.Time { + return now + }), + ) m := store.NewTaggedStat(name, stats.CountType, commonTags) m.Increment() @@ -36,10 +34,27 @@ func TestStats(t *testing.T) { require.Equal(t, 3.0, store.Get(name, commonTags).LastValue()) require.Equal(t, []float64{1.0, 3.0}, store.Get(name, commonTags).Values()) + + require.Equal(t, []memstats.Metric{{ + Name: name, + Tags: commonTags, + Value: 3.0, + }}, store.GetAll()) + + require.Equal(t, []memstats.Metric{{ + Name: name, + Tags: commonTags, + Value: 3.0, + }}, store.GetByName(name)) }) t.Run("test Gauge", func(t *testing.T) { name := "testGauge" + store := memstats.New( + memstats.WithNow(func() time.Time { + return now + }), + ) m := store.NewTaggedStat(name, stats.GaugeType, commonTags) m.Gauge(1.0) @@ -51,10 +66,28 @@ func TestStats(t *testing.T) { require.Equal(t, 2.0, store.Get(name, commonTags).LastValue()) require.Equal(t, []float64{1.0, 2.0}, store.Get(name, commonTags).Values()) + + require.Equal(t, []memstats.Metric{{ + Name: name, + Tags: commonTags, + Value: 2.0, + }}, store.GetAll()) + + require.Equal(t, []memstats.Metric{{ + Name: name, + Tags: commonTags, + Value: 2.0, + }}, store.GetByName(name)) }) t.Run("test Histogram", func(t *testing.T) { name := "testHistogram" + store := memstats.New( + memstats.WithNow(func() time.Time { + return now + }), + ) + m := store.NewTaggedStat(name, stats.HistogramType, commonTags) m.Observe(1.0) @@ -66,10 +99,27 @@ func TestStats(t *testing.T) { require.Equal(t, 2.0, store.Get(name, commonTags).LastValue()) require.Equal(t, []float64{1.0, 2.0}, store.Get(name, commonTags).Values()) + + require.Equal(t, []memstats.Metric{{ + Name: name, + Tags: commonTags, + Values: []float64{1.0, 2.0}, + }}, store.GetAll()) + + require.Equal(t, []memstats.Metric{{ + Name: name, + Tags: commonTags, + Values: []float64{1.0, 2.0}, + }}, store.GetByName(name)) }) t.Run("test Timer", func(t *testing.T) { name := "testTimer" + store := memstats.New( + memstats.WithNow(func() time.Time { + return now + }), + ) m := store.NewTaggedStat(name, stats.TimerType, commonTags) @@ -100,9 +150,27 @@ func TestStats(t *testing.T) { []time.Duration{time.Second, time.Minute, time.Second, time.Minute}, store.Get(name, commonTags).Durations(), ) + + require.Equal(t, []memstats.Metric{{ + Name: name, + Tags: commonTags, + Durations: []time.Duration{time.Second, time.Minute, time.Second, time.Minute}, + }}, store.GetAll()) + + require.Equal(t, []memstats.Metric{{ + Name: name, + Tags: commonTags, + Durations: []time.Duration{time.Second, time.Minute, time.Second, time.Minute}, + }}, store.GetByName(name)) }) t.Run("invalid operations", func(t *testing.T) { + store := memstats.New( + memstats.WithNow(func() time.Time { + return now + }), + ) + require.PanicsWithValue(t, "operation Count not supported for measurement type:gauge", func() { store.NewTaggedStat("invalid_count", stats.GaugeType, commonTags).Count(1) }) @@ -124,19 +192,81 @@ func TestStats(t *testing.T) { require.PanicsWithValue(t, "operation Observe not supported for measurement type:timer", func() { store.NewTaggedStat("invalid_observe", stats.TimerType, commonTags).Observe(1) }) + + require.PanicsWithValue(t, "name cannot be empty", func() { + store.GetByName("") + }) }) t.Run("no op", func(t *testing.T) { + store := memstats.New( + memstats.WithNow(func() time.Time { + return now + }), + ) + require.NoError(t, store.Start(context.Background(), stats.DefaultGoRoutineFactory)) store.Stop() + + require.Equal(t, []memstats.Metric{}, store.GetAll()) }) t.Run("no tags", func(t *testing.T) { name := "no_tags" + store := memstats.New( + memstats.WithNow(func() time.Time { + return now + }), + ) + m := store.NewStat(name, stats.CountType) m.Increment() require.Equal(t, 1.0, store.Get(name, nil).LastValue()) + + require.Equal(t, []memstats.Metric{{ + Name: name, + Value: 1.0, + }}, store.GetAll()) + + require.Equal(t, []memstats.Metric{{ + Name: name, + Value: 1.0, + }}, store.GetByName(name)) + }) + + t.Run("get by name", func(t *testing.T) { + name1 := "name_1" + name2 := "name_2" + + store := memstats.New( + memstats.WithNow(func() time.Time { + return now + }), + ) + + m1 := store.NewStat(name1, stats.CountType) + m1.Increment() + m2 := store.NewStat(name2, stats.TimerType) + m2.SendTiming(time.Second) + + require.Equal(t, []memstats.Metric{{ + Name: name1, + Value: 1.0, + }}, store.GetByName(name1)) + + require.Equal(t, []memstats.Metric{{ + Name: name2, + Durations: []time.Duration{time.Second}, + }}, store.GetByName(name2)) + + require.Equal(t, []memstats.Metric{{ + Name: name1, + Value: 1.0, + }, { + Name: name2, + Durations: []time.Duration{time.Second}, + }}, store.GetAll()) }) }