diff --git a/Makefile b/Makefile index c431f13d..20cf6584 100644 --- a/Makefile +++ b/Makefile @@ -82,6 +82,7 @@ sec: ## Run security checks .PHONY: fmt fmt: install-tools ## Formats all go files + $(GO) generate ./... $(GO) run $(govulncheck) ./... $(GO) run $(gofumpt) -l -w -extra . find . -type f -name '*.go' -exec grep -L -E 'Code generated by .*\. DO NOT EDIT.' {} + | xargs $(GO) run $(goimports) -format-only -w -local=github.com/rudderlabs diff --git a/go.mod b/go.mod index b0b390ec..42e75827 100644 --- a/go.mod +++ b/go.mod @@ -73,6 +73,7 @@ require ( dario.cat/mergo v1.0.0 // indirect github.com/Azure/azure-pipeline-go v0.2.3 // indirect github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect + github.com/DATA-DOG/go-sqlmock v1.5.2 github.com/Microsoft/go-winio v0.6.2 // indirect github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect github.com/actgardner/gogen-avro/v10 v10.2.1 // indirect diff --git a/go.sum b/go.sum index 5dc1e5e6..05b4fe92 100644 --- a/go.sum +++ b/go.sum @@ -43,6 +43,8 @@ github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZ github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo= github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= +github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= github.com/DataDog/zstd v1.5.6 h1:LbEglqepa/ipmmQJUDnSsfvA8e8IStVcGaFWDuxvGOY= github.com/DataDog/zstd v1.5.6/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= github.com/Masterminds/semver/v3 v3.2.1 h1:RN9w6+7QoMeJVGyfmbcgs28Br8cvmnucEXnY0rYXWg0= @@ -320,6 +322,7 @@ github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNU github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE= github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= github.com/klauspost/compress v1.17.10 h1:oXAz+Vh0PMUvJczoi+flxpnBEPxoER1IaAnU/NMPtT0= github.com/klauspost/compress v1.17.10/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= diff --git a/stats/collectors.go b/stats/collectors.go new file mode 100644 index 00000000..9d31be36 --- /dev/null +++ b/stats/collectors.go @@ -0,0 +1,77 @@ +package stats + +import ( + "context" + "fmt" + "sync" + "time" +) + +const defaultPauseDur = 10 * time.Second + +type gaugeTagsFunc = func(key string, tags Tags, val uint64) + +type Collector interface { + Collect(gaugeTagsFunc) + Zero(gaugeTagsFunc) + ID() string +} + +type aggregatedCollector struct { + c map[string]Collector + PauseDur time.Duration + gaugeFunc gaugeTagsFunc + mu sync.Mutex +} + +func (p *aggregatedCollector) Add(c Collector) error { + p.mu.Lock() + defer p.mu.Unlock() + + if p.c == nil { + p.c = make(map[string]Collector) + } + + if _, ok := p.c[c.ID()]; ok { + return fmt.Errorf("collector with ID %s already register", c.ID()) + } + + p.c[c.ID()] = c + return nil +} + +func (p *aggregatedCollector) Run(ctx context.Context) { + defer p.allZero() + p.allCollect() + + if p.PauseDur <= 0 { + p.PauseDur = defaultPauseDur + } + + tick := time.NewTicker(p.PauseDur) + defer tick.Stop() + for { + select { + case <-ctx.Done(): + return + case <-tick.C: + p.allCollect() + } + } +} + +func (p *aggregatedCollector) allCollect() { + p.mu.Lock() + defer p.mu.Unlock() + for _, c := range p.c { + c.Collect(p.gaugeFunc) + } +} + +func (p *aggregatedCollector) allZero() { + p.mu.Lock() + defer p.mu.Unlock() + for _, c := range p.c { + c.Zero(p.gaugeFunc) + } +} diff --git a/stats/collectors/sqldb.go b/stats/collectors/sqldb.go new file mode 100644 index 00000000..d7a0b43b --- /dev/null +++ b/stats/collectors/sqldb.go @@ -0,0 +1,62 @@ +package collectors + +import ( + "database/sql" + "fmt" + + "github.com/rudderlabs/rudder-go-kit/stats" +) + +const ( + uniqName = "database_sql_%s" +) + +type SQLDBStats struct { + name string + db *sql.DB +} + +func NewDatabaseSQLStats(name string, db *sql.DB) *SQLDBStats { + return &SQLDBStats{ + name: name, + db: db, + } +} + +func (s *SQLDBStats) Collect(gaugeFunc func(key string, tag stats.Tags, val uint64)) { + dbStats := s.db.Stats() + tags := stats.Tags{"name": s.name} + + gaugeFunc("sql_db_max_open_connections", tags, uint64(dbStats.MaxOpenConnections)) + gaugeFunc("sql_db_open_connections", tags, uint64(dbStats.OpenConnections)) + gaugeFunc("sql_db_in_use_connections", tags, uint64(dbStats.InUse)) + gaugeFunc("sql_db_idle_connections", tags, uint64(dbStats.Idle)) + + gaugeFunc("sql_db_wait_count_total", tags, uint64(dbStats.WaitCount)) + gaugeFunc("sql_db_wait_duration_seconds_total", tags, uint64(dbStats.WaitDuration.Seconds())) + + gaugeFunc("sql_db_max_idle_closed_total", tags, uint64(dbStats.MaxIdleClosed)) + gaugeFunc("sql_db_max_idle_time_closed_total", tags, uint64(dbStats.MaxIdleTimeClosed)) + gaugeFunc("sql_db_max_lifetime_closed_total", tags, uint64(dbStats.MaxLifetimeClosed)) +} + +func (s *SQLDBStats) Zero(gaugeFunc func(key string, tag stats.Tags, val uint64)) { + tags := stats.Tags{"name": s.name} + + gaugeFunc("sql_db_max_open_connections", tags, 0) + + gaugeFunc("sql_db_open_connections", tags, 0) + gaugeFunc("sql_db_in_use_connections", tags, 0) + gaugeFunc("sql_db_idle_connections", tags, 0) + + gaugeFunc("sql_db_wait_count_total", tags, 0) + gaugeFunc("sql_db_wait_duration_seconds_total", tags, 0) + + gaugeFunc("sql_db_max_idle_closed_total", tags, 0) + gaugeFunc("sql_db_max_idle_time_closed_total", tags, 0) + gaugeFunc("sql_db_max_lifetime_closed_total", tags, 0) +} + +func (s *SQLDBStats) ID() string { + return fmt.Sprintf(uniqName, s.name) +} diff --git a/stats/collectors/sqldb_test.go b/stats/collectors/sqldb_test.go new file mode 100644 index 00000000..9f131be7 --- /dev/null +++ b/stats/collectors/sqldb_test.go @@ -0,0 +1,80 @@ +package collectors_test + +import ( + "testing" + + "github.com/DATA-DOG/go-sqlmock" + + "github.com/stretchr/testify/require" + + "github.com/rudderlabs/rudder-go-kit/stats" + "github.com/rudderlabs/rudder-go-kit/stats/collectors" + "github.com/rudderlabs/rudder-go-kit/stats/memstats" +) + +func TestSQLDatabase(t *testing.T) { + db, _, err := sqlmock.New() + if err != nil { + t.Fatalf("an error '%s' was not expected when opening a stub database connection", err) + } + defer db.Close() + + db.SetMaxOpenConns(5) + + m, err := memstats.New() + require.NoError(t, err) + + testName := "test_sqlite" + s := collectors.NewDatabaseSQLStats(testName, db) + + err = m.RegisterCollector(s) + require.NoError(t, err) + + require.Equal(t, []memstats.Metric{ + { + Name: "sql_db_idle_connections", + Tags: stats.Tags{"name": testName}, + Value: 1, + }, + { + Name: "sql_db_in_use_connections", + Tags: stats.Tags{"name": testName}, + Value: 0, + }, + { + Name: "sql_db_max_idle_closed_total", + Tags: stats.Tags{"name": testName}, + Value: 0, + }, + { + Name: "sql_db_max_idle_time_closed_total", + Tags: stats.Tags{"name": testName}, + Value: 0, + }, + { + Name: "sql_db_max_lifetime_closed_total", + Tags: stats.Tags{"name": testName}, + Value: 0, + }, + { + Name: "sql_db_max_open_connections", + Tags: stats.Tags{"name": testName}, + Value: 5, + }, + { + Name: "sql_db_open_connections", + Tags: stats.Tags{"name": testName}, + Value: 1, + }, + { + Name: "sql_db_wait_count_total", + Tags: stats.Tags{"name": testName}, + Value: 0, + }, + { + Name: "sql_db_wait_duration_seconds_total", + Tags: stats.Tags{"name": testName}, + Value: 0, + }, + }, m.GetAll()) +} diff --git a/stats/collectors/static.go b/stats/collectors/static.go new file mode 100644 index 00000000..2d1e2bab --- /dev/null +++ b/stats/collectors/static.go @@ -0,0 +1,39 @@ +package collectors + +import ( + "fmt" + + "github.com/rudderlabs/rudder-go-kit/stats" +) + +const ( + statsUniqName = "static_%s_%s" +) + +type staticStats struct { + tags stats.Tags + key string + value uint64 +} + +// NewStaticMetric allows to capture a gauge metric that does not change during the lifetime of the application. +// Can be useful for capturing configuration values or application version. +func NewStaticMetric(key string, tags stats.Tags, value uint64) *staticStats { + return &staticStats{ + tags: tags, + key: key, + value: value, + } +} + +func (s *staticStats) Collect(gaugeFunc func(key string, tag stats.Tags, val uint64)) { + gaugeFunc(s.key, s.tags, s.value) +} + +func (s *staticStats) Zero(gaugeFunc func(key string, tag stats.Tags, val uint64)) { + gaugeFunc(s.key, s.tags, 0) +} + +func (s *staticStats) ID() string { + return fmt.Sprintf(statsUniqName, s.key, s.tags.String()) +} diff --git a/stats/collectors/static_test.go b/stats/collectors/static_test.go new file mode 100644 index 00000000..ac062cf4 --- /dev/null +++ b/stats/collectors/static_test.go @@ -0,0 +1,32 @@ +package collectors_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/rudderlabs/rudder-go-kit/stats" + "github.com/rudderlabs/rudder-go-kit/stats/collectors" + "github.com/rudderlabs/rudder-go-kit/stats/memstats" +) + +func TestStatic(t *testing.T) { + testName := "test_sqlite" + s := collectors.NewStaticMetric(testName, stats.Tags{ + "foo": "bar", + }, 2) + + m, err := memstats.New() + require.NoError(t, err) + + err = m.RegisterCollector(s) + require.NoError(t, err) + + require.Equal(t, []memstats.Metric{ + { + Name: testName, + Tags: stats.Tags{"foo": "bar"}, + Value: 2, + }, + }, m.GetAll()) +} diff --git a/stats/memstats/stats.go b/stats/memstats/stats.go index 157371a9..90f9cf78 100644 --- a/stats/memstats/stats.go +++ b/stats/memstats/stats.go @@ -354,6 +354,13 @@ func (*Store) Start(_ context.Context, _ stats.GoRoutineFactory) error { return // Stop implements stats.Stats func (*Store) Stop() {} +func (s *Store) RegisterCollector(c stats.Collector) error { + c.Collect(func(key string, tags stats.Tags, val uint64) { + s.NewTaggedStat(key, stats.GaugeType, tags).Gauge(val) + }) + return nil +} + // getKey maps name and tags, to a store lookup key. func (*Store) getKey(name string, tags stats.Tags) string { return name + tags.String() diff --git a/stats/mock_stats/mock_stats.go b/stats/mock_stats/mock_stats.go index a959ce67..71eef411 100644 --- a/stats/mock_stats/mock_stats.go +++ b/stats/mock_stats/mock_stats.go @@ -97,6 +97,20 @@ func (mr *MockStatsMockRecorder) NewTracer(arg0 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewTracer", reflect.TypeOf((*MockStats)(nil).NewTracer), arg0) } +// RegisterCollector mocks base method. +func (m *MockStats) RegisterCollector(arg0 stats.Collector) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RegisterCollector", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// RegisterCollector indicates an expected call of RegisterCollector. +func (mr *MockStatsMockRecorder) RegisterCollector(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RegisterCollector", reflect.TypeOf((*MockStats)(nil).RegisterCollector), arg0) +} + // Start mocks base method. func (m *MockStats) Start(arg0 context.Context, arg1 stats.GoRoutineFactory) error { m.ctrl.T.Helper() diff --git a/stats/nop.go b/stats/nop.go index 95f3fa97..eb44ba54 100644 --- a/stats/nop.go +++ b/stats/nop.go @@ -39,3 +39,5 @@ func (*nop) NewTracer(_ string) Tracer { func (*nop) Start(_ context.Context, _ GoRoutineFactory) error { return nil } func (*nop) Stop() {} + +func (*nop) RegisterCollector(c Collector) error { return nil } diff --git a/stats/otel.go b/stats/otel.go index d40934f6..6a762f3e 100644 --- a/stats/otel.go +++ b/stats/otel.go @@ -52,6 +52,7 @@ type otelStats struct { histogramsMu sync.Mutex otelManager otel.Manager + collectorAggregator *aggregatedCollector runtimeStatsCollector runtimeStatsCollector metricsStatsCollector metricStatsCollector stopBackgroundCollection func() @@ -181,6 +182,14 @@ func (s *otelStats) Start(ctx context.Context, goFactory GoRoutineFactory) error s.metricsStatsCollector.run(backgroundCollectionCtx) }) + gaugeTagsFunc := func(key string, tags Tags, val uint64) { + s.getMeasurement(key, GaugeType, tags).Gauge(val) + } + s.collectorAggregator.gaugeFunc = gaugeTagsFunc + goFactory.Go(func() { + s.collectorAggregator.Run(backgroundCollectionCtx) + }) + if s.config.periodicStatsConfig.enabled { s.runtimeStatsCollector = newRuntimeStatsCollector(gaugeFunc) s.runtimeStatsCollector.PauseDur = time.Duration(s.config.periodicStatsConfig.statsCollectionInterval) * time.Second @@ -203,6 +212,10 @@ func (s *otelStats) Start(ctx context.Context, goFactory GoRoutineFactory) error return nil } +func (s *otelStats) RegisterCollector(c Collector) error { + return s.collectorAggregator.Add(c) +} + func (s *otelStats) Stop() { if !s.config.enabled.Load() { return diff --git a/stats/otel_collector_test.go b/stats/otel_collector_test.go new file mode 100644 index 00000000..d0f8ca7c --- /dev/null +++ b/stats/otel_collector_test.go @@ -0,0 +1,220 @@ +package stats_test + +import ( + "context" + "fmt" + "net/http" + "os" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" + promClient "github.com/prometheus/client_model/go" + "github.com/samber/lo" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + + "github.com/rudderlabs/rudder-go-kit/config" + "github.com/rudderlabs/rudder-go-kit/httputil" + "github.com/rudderlabs/rudder-go-kit/logger" + "github.com/rudderlabs/rudder-go-kit/stats" + "github.com/rudderlabs/rudder-go-kit/stats/collectors" + "github.com/rudderlabs/rudder-go-kit/stats/metric" + statsTest "github.com/rudderlabs/rudder-go-kit/stats/testhelper" + "github.com/rudderlabs/rudder-go-kit/testhelper/docker" +) + +const ( + metricsPort = "8889" +) + +var globalDefaultAttrs = []*promClient.LabelPair{ + {Name: lo.ToPtr("instanceName"), Value: lo.ToPtr("my-instance-id")}, + {Name: lo.ToPtr("service_version"), Value: lo.ToPtr("v1.2.3")}, + {Name: lo.ToPtr("telemetry_sdk_language"), Value: lo.ToPtr("go")}, + {Name: lo.ToPtr("telemetry_sdk_name"), Value: lo.ToPtr("opentelemetry")}, + {Name: lo.ToPtr("telemetry_sdk_version"), Value: lo.ToPtr(otel.Version())}, +} + +func TestOTelPeriodicStats(t *testing.T) { + type expectation struct { + name string + tags []*promClient.LabelPair + } + + cwd, err := os.Getwd() + require.NoError(t, err) + + runTest := func(t *testing.T, expected []expectation, cols ...stats.Collector) { + container, grpcEndpoint := statsTest.StartOTelCollector(t, metricsPort, + filepath.Join(cwd, "testdata", "otel-collector-config.yaml"), + ) + + c := config.New() + c.Set("INSTANCE_ID", "my-instance-id") + c.Set("OpenTelemetry.enabled", true) + c.Set("OpenTelemetry.metrics.endpoint", grpcEndpoint) + c.Set("OpenTelemetry.metrics.exportInterval", time.Millisecond) + m := metric.NewManager() + + l := logger.NewFactory(c) + s := stats.NewStats(c, l, m, + stats.WithServiceName("TestOTelPeriodicStats"), + stats.WithServiceVersion("v1.2.3"), + ) + + for _, col := range cols { + err := s.RegisterCollector(col) + require.NoError(t, err) + } + + // start stats + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + require.NoError(t, s.Start(ctx, stats.DefaultGoRoutineFactory)) + defer s.Stop() + + var ( + resp *http.Response + metrics map[string]*promClient.MetricFamily + metricsEndpoint = fmt.Sprintf("http://localhost:%d/metrics", docker.GetHostPort(t, metricsPort, container)) + ) + + require.Eventuallyf(t, func() bool { + resp, err = http.Get(metricsEndpoint) + if err != nil { + return false + } + defer func() { httputil.CloseResponse(resp) }() + metrics, err = statsTest.ParsePrometheusMetrics(resp.Body) + if err != nil { + return false + } + for _, exp := range expected { + expectedMetricName := strings.ReplaceAll(exp.name, ".", "_") + if _, ok := metrics[expectedMetricName]; !ok { + return false + } + } + return true + }, 10*time.Second, 100*time.Millisecond, "err: %v, metrics: %+v", err, metrics) + + for _, exp := range expected { + metricName := strings.ReplaceAll(exp.name, ".", "_") + require.EqualValues(t, &metricName, metrics[metricName].Name) + require.EqualValues(t, lo.ToPtr(promClient.MetricType_GAUGE), metrics[metricName].Type) + require.Len(t, metrics[metricName].Metric, 1) + + expectedLabels := append(globalDefaultAttrs, + // the label1=value1 is coming from the otel-collector-config.yaml (see const_labels) + &promClient.LabelPair{Name: lo.ToPtr("label1"), Value: lo.ToPtr("value1")}, + &promClient.LabelPair{Name: lo.ToPtr("job"), Value: lo.ToPtr("TestOTelPeriodicStats")}, + &promClient.LabelPair{Name: lo.ToPtr("service_name"), Value: lo.ToPtr("TestOTelPeriodicStats")}, + ) + if exp.tags != nil { + expectedLabels = append(expectedLabels, exp.tags...) + } + require.ElementsMatchf(t, expectedLabels, metrics[metricName].Metric[0].Label, + "Got %+v", metrics[metricName].Metric[0].Label, + ) + } + } + + t.Run("static stats", func(t *testing.T) { + runTest(t, + []expectation{ + {name: "a_custom_metric"}, + }, + collectors.NewStaticMetric("a_custom_metric", nil, 1), + ) + + runTest(t, + []expectation{ + {name: "a_custom_metric", tags: []*promClient.LabelPair{ + {Name: lo.ToPtr("foo"), Value: lo.ToPtr("bar")}, + }}, + }, + collectors.NewStaticMetric("a_custom_metric", stats.Tags{"foo": "bar"}, 1), + ) + }) + + t.Run("multiple collectors", func(t *testing.T) { + runTest(t, + []expectation{ + {name: "col_1"}, + {name: "col_2"}, + {name: "col_3"}, + }, + collectors.NewStaticMetric("col_1", nil, 1), + collectors.NewStaticMetric("col_2", nil, 1), + collectors.NewStaticMetric("col_3", nil, 1), + ) + }) + + t.Run("sql collector", func(t *testing.T) { + db, _, err := sqlmock.New() + if err != nil { + t.Fatalf("an error '%s' was not expected when opening a stub database connection", err) + } + defer db.Close() + + runTest(t, + []expectation{ + {name: "sql_db_max_open_connections", tags: []*promClient.LabelPair{ + {Name: lo.ToPtr("name"), Value: lo.ToPtr("test")}, + }}, + {name: "sql_db_open_connections", tags: []*promClient.LabelPair{ + {Name: lo.ToPtr("name"), Value: lo.ToPtr("test")}, + }}, + {name: "sql_db_in_use_connections", tags: []*promClient.LabelPair{ + {Name: lo.ToPtr("name"), Value: lo.ToPtr("test")}, + }}, + {name: "sql_db_idle_connections", tags: []*promClient.LabelPair{ + {Name: lo.ToPtr("name"), Value: lo.ToPtr("test")}, + }}, + {name: "sql_db_wait_count_total", tags: []*promClient.LabelPair{ + {Name: lo.ToPtr("name"), Value: lo.ToPtr("test")}, + }}, + {name: "sql_db_wait_duration_seconds_total", tags: []*promClient.LabelPair{ + {Name: lo.ToPtr("name"), Value: lo.ToPtr("test")}, + }}, + {name: "sql_db_max_idle_closed_total", tags: []*promClient.LabelPair{ + {Name: lo.ToPtr("name"), Value: lo.ToPtr("test")}, + }}, + {name: "sql_db_max_idle_time_closed_total", tags: []*promClient.LabelPair{ + {Name: lo.ToPtr("name"), Value: lo.ToPtr("test")}, + }}, + {name: "sql_db_max_lifetime_closed_total", tags: []*promClient.LabelPair{ + {Name: lo.ToPtr("name"), Value: lo.ToPtr("test")}, + }}, + }, + collectors.NewDatabaseSQLStats("test", db), + ) + }) + t.Run("error on duplicate collector", func(t *testing.T) { + _, grpcEndpoint := statsTest.StartOTelCollector(t, metricsPort, + filepath.Join(cwd, "testdata", "otel-collector-config.yaml"), + ) + + c := config.New() + c.Set("INSTANCE_ID", "my-instance-id") + c.Set("OpenTelemetry.enabled", true) + c.Set("OpenTelemetry.metrics.endpoint", grpcEndpoint) + c.Set("OpenTelemetry.metrics.exportInterval", time.Millisecond) + m := metric.NewManager() + + l := logger.NewFactory(c) + s := stats.NewStats(c, l, m, + stats.WithServiceName("TestOTelPeriodicStats"), + stats.WithServiceVersion("v1.2.3"), + ) + + err := s.RegisterCollector(collectors.NewStaticMetric("col_1", nil, 1)) + require.NoError(t, err) + + err = s.RegisterCollector(collectors.NewStaticMetric("col_1", nil, 1)) + require.Error(t, err) + }) +} diff --git a/stats/stats.go b/stats/stats.go index ee0fd64c..44bf976d 100644 --- a/stats/stats.go +++ b/stats/stats.go @@ -58,6 +58,10 @@ type Stats interface { // Stop stops the service and the collection of periodic stats. Stop() + + // RegisterCollector registers a collector that will collect stats periodically. + // You can find available collectors in the stats/collectors package. + RegisterCollector(c Collector) error } type loggerFactory interface { @@ -121,6 +125,7 @@ func NewStats( enablePrometheusExporter: config.GetBool("OpenTelemetry.metrics.prometheus.enabled", false), prometheusMetricsPort: config.GetInt("OpenTelemetry.metrics.prometheus.port", 0), }, + collectorAggregator: &aggregatedCollector{}, } } @@ -143,6 +148,7 @@ func NewStats( client: &statsdClient{}, clients: make(map[string]*statsdClient), pendingClients: make(map[string]*statsdClient), + ac: &aggregatedCollector{}, }, } } diff --git a/stats/statsd.go b/stats/statsd.go index 8df2f9ba..fac7b7d9 100644 --- a/stats/statsd.go +++ b/stats/statsd.go @@ -122,11 +122,17 @@ func (s *statsdStats) collectPeriodicStats(goFactory GoRoutineFactory) { s.state.rc.EnableCPU = s.config.periodicStatsConfig.enableCPUStats s.state.rc.EnableMem = s.config.periodicStatsConfig.enableMemStats s.state.rc.EnableGC = s.config.periodicStatsConfig.enableGCStats - s.state.mc = newMetricStatsCollector(s, s.config.periodicStatsConfig.metricManager) + + gaugeTagsFunc := func(key string, tags Tags, val uint64) { + s.NewTaggedStat(key, GaugeType, tags).Gauge(val) + } + s.state.ac.gaugeFunc = gaugeTagsFunc + s.state.ac.PauseDur = time.Duration(s.config.periodicStatsConfig.statsCollectionInterval) * time.Second + if s.config.periodicStatsConfig.enabled { var wg sync.WaitGroup - wg.Add(2) + wg.Add(3) goFactory.Go(func() { defer wg.Done() s.state.rc.run(s.backgroundCollectionCtx) @@ -135,10 +141,18 @@ func (s *statsdStats) collectPeriodicStats(goFactory GoRoutineFactory) { defer wg.Done() s.state.mc.run(s.backgroundCollectionCtx) }) + goFactory.Go(func() { + defer wg.Done() + s.state.ac.Run(s.backgroundCollectionCtx) + }) wg.Wait() } } +func (s *statsdStats) RegisterCollector(c Collector) error { + return s.state.ac.Add(c) +} + // Stop stops periodic collection of stats. func (s *statsdStats) Stop() { s.state.clientsLock.RLock() @@ -292,6 +306,7 @@ type statsdState struct { conn statsd.Option client *statsdClient rc runtimeStatsCollector + ac *aggregatedCollector mc metricStatsCollector clientsLock sync.RWMutex // protects the following diff --git a/stats/statsd_test.go b/stats/statsd_test.go index 8801c24b..19a9e3f9 100644 --- a/stats/statsd_test.go +++ b/stats/statsd_test.go @@ -13,11 +13,13 @@ import ( "testing" "time" + "github.com/DATA-DOG/go-sqlmock" "github.com/stretchr/testify/require" "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-go-kit/stats" + "github.com/rudderlabs/rudder-go-kit/stats/collectors" "github.com/rudderlabs/rudder-go-kit/stats/metric" "github.com/rudderlabs/rudder-go-kit/testhelper" ) @@ -365,6 +367,126 @@ func TestStatsdPeriodicStats(t *testing.T) { }) } +func TestStatsdRegisterCollector(t *testing.T) { + runTest := func(t *testing.T, expected []string, cols ...stats.Collector) { + var received []string + var receivedMu sync.RWMutex + server := newStatsdServer(t, func(s string) { + if i := strings.Index(s, ":"); i > 0 { + s = s[:i] + } + receivedMu.Lock() + received = append(received, s) + receivedMu.Unlock() + }) + defer server.Close() + + c := config.New() + m := metric.NewManager() + t.Setenv("KUBE_NAMESPACE", "my-namespace") + c.Set("STATSD_SERVER_URL", server.addr) + c.Set("INSTANCE_ID", "test") + c.Set("RuntimeStats.enabled", true) + c.Set("RuntimeStats.statsCollectionInterval", 60) + c.Set("RuntimeStats.enableCPUStats", false) + c.Set("RuntimeStats.enabledMemStats", false) + c.Set("RuntimeStats.enableGCStats", false) + + l := logger.NewFactory(c) + s := stats.NewStats(c, l, m) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + for _, col := range cols { + err := s.RegisterCollector(col) + require.NoError(t, err) + } + + // start stats + require.NoError(t, s.Start(ctx, stats.DefaultGoRoutineFactory)) + defer s.Stop() + + defer func() { + receivedMu.RLock() + defer receivedMu.RUnlock() + + t.Logf("received: %s \n!=\n expected: %s", received, expected) + }() + + require.Eventually(t, func() bool { + receivedMu.RLock() + defer receivedMu.RUnlock() + + if len(received) != len(expected) { + return false + } + return reflect.DeepEqual(received, expected) + }, 10*time.Second, time.Millisecond) + } + + t.Run("static stats", func(t *testing.T) { + runTest(t, + []string{"a_custom_metric,instanceName=test,namespace=my-namespace"}, + collectors.NewStaticMetric("a_custom_metric", nil, 1), + ) + + runTest(t, + []string{"a_custom_metric,instanceName=test,namespace=my-namespace,foo=bar"}, + collectors.NewStaticMetric("a_custom_metric", stats.Tags{"foo": "bar"}, 1), + ) + }) + + t.Run("multiple collectors", func(t *testing.T) { + runTest(t, + []string{ + "col_1,instanceName=test,namespace=my-namespace", + "col_2,instanceName=test,namespace=my-namespace", + "col_3,instanceName=test,namespace=my-namespace", + }, + collectors.NewStaticMetric("col_1", nil, 1), + collectors.NewStaticMetric("col_2", nil, 1), + collectors.NewStaticMetric("col_3", nil, 1), + ) + }) + + t.Run("sql collector", func(t *testing.T) { + db, _, err := sqlmock.New() + if err != nil { + t.Fatalf("an error '%s' was not expected when opening a stub database connection", err) + } + defer db.Close() + + runTest(t, + []string{ + "sql_db_max_open_connections,instanceName=test,namespace=my-namespace,name=test", + "sql_db_open_connections,instanceName=test,namespace=my-namespace,name=test", + "sql_db_in_use_connections,instanceName=test,namespace=my-namespace,name=test", + "sql_db_idle_connections,instanceName=test,namespace=my-namespace,name=test", + "sql_db_wait_count_total,instanceName=test,namespace=my-namespace,name=test", + "sql_db_wait_duration_seconds_total,instanceName=test,namespace=my-namespace,name=test", + "sql_db_max_idle_closed_total,instanceName=test,namespace=my-namespace,name=test", + "sql_db_max_idle_time_closed_total,instanceName=test,namespace=my-namespace,name=test", + "sql_db_max_lifetime_closed_total,instanceName=test,namespace=my-namespace,name=test", + }, + collectors.NewDatabaseSQLStats("test", db), + ) + }) + + t.Run("error on duplicate collector", func(t *testing.T) { + c := config.New() + m := metric.NewManager() + l := logger.NewFactory(c) + s := stats.NewStats(c, l, m) + + err := s.RegisterCollector(collectors.NewStaticMetric("col_1", nil, 1)) + require.NoError(t, err) + + err = s.RegisterCollector(collectors.NewStaticMetric("col_1", nil, 1)) + require.Error(t, err) + }) +} + func TestStatsdExcludedTags(t *testing.T) { var lastReceived atomic.Value server := newStatsdServer(t, func(s string) { lastReceived.Store(s) }) diff --git a/stats/testhelper/otel.go b/stats/testhelper/otel.go index a78347fe..9d1814fe 100644 --- a/stats/testhelper/otel.go +++ b/stats/testhelper/otel.go @@ -63,7 +63,7 @@ func StartOTelCollector(t testing.TB, metricsPort, configPath string, opts ...St } }) - healthEndpoint := fmt.Sprintf("http://localhost:%d", dt.GetHostPort(t, healthPort, collector.Container)) + healthEndpoint := fmt.Sprintf("http://%s:%d", collector.GetBoundIP(healthPort), dt.GetHostPort(t, healthPort, collector.Container)) require.Eventually(t, func() bool { resp, err := http.Get(healthEndpoint) if err != nil { @@ -75,7 +75,7 @@ func StartOTelCollector(t testing.TB, metricsPort, configPath string, opts ...St t.Log("Container is healthy") - return collector.Container, "localhost:" + strconv.Itoa(conf.port) + return collector.Container, collector.GetBoundIP("4317/tcp") + ":" + strconv.Itoa(conf.port) } type startOTelCollectorConf struct {