Skip to content

Commit

Permalink
feat: introduce db stats collector (#648)
Browse files Browse the repository at this point in the history
  • Loading branch information
lvrach committed Sep 26, 2024
1 parent d8083ae commit 8ac166b
Show file tree
Hide file tree
Showing 17 changed files with 698 additions and 4 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ sec: ## Run security checks

.PHONY: fmt
fmt: install-tools ## Formats all go files
$(GO) generate ./...
$(GO) run $(govulncheck) ./...
$(GO) run $(gofumpt) -l -w -extra .
find . -type f -name '*.go' -exec grep -L -E 'Code generated by .*\. DO NOT EDIT.' {} + | xargs $(GO) run $(goimports) -format-only -w -local=github.com/rudderlabs
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ require (
dario.cat/mergo v1.0.0 // indirect
github.com/Azure/azure-pipeline-go v0.2.3 // indirect
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect
github.com/DATA-DOG/go-sqlmock v1.5.2
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect
github.com/actgardner/gogen-avro/v10 v10.2.1 // indirect
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZ
github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo=
github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
github.com/DataDog/zstd v1.5.6 h1:LbEglqepa/ipmmQJUDnSsfvA8e8IStVcGaFWDuxvGOY=
github.com/DataDog/zstd v1.5.6/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
github.com/Masterminds/semver/v3 v3.2.1 h1:RN9w6+7QoMeJVGyfmbcgs28Br8cvmnucEXnY0rYXWg0=
Expand Down Expand Up @@ -320,6 +322,7 @@ github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNU
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.17.10 h1:oXAz+Vh0PMUvJczoi+flxpnBEPxoER1IaAnU/NMPtT0=
github.com/klauspost/compress v1.17.10/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
Expand Down
77 changes: 77 additions & 0 deletions stats/collectors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package stats

import (
"context"
"fmt"
"sync"
"time"
)

const defaultPauseDur = 10 * time.Second

type gaugeTagsFunc = func(key string, tags Tags, val uint64)

type Collector interface {
Collect(gaugeTagsFunc)
Zero(gaugeTagsFunc)
ID() string
}

type aggregatedCollector struct {
c map[string]Collector
PauseDur time.Duration
gaugeFunc gaugeTagsFunc
mu sync.Mutex
}

func (p *aggregatedCollector) Add(c Collector) error {
p.mu.Lock()
defer p.mu.Unlock()

if p.c == nil {
p.c = make(map[string]Collector)
}

if _, ok := p.c[c.ID()]; ok {
return fmt.Errorf("collector with ID %s already register", c.ID())
}

p.c[c.ID()] = c
return nil
}

func (p *aggregatedCollector) Run(ctx context.Context) {
defer p.allZero()
p.allCollect()

if p.PauseDur <= 0 {
p.PauseDur = defaultPauseDur
}

tick := time.NewTicker(p.PauseDur)
defer tick.Stop()
for {
select {
case <-ctx.Done():
return
case <-tick.C:
p.allCollect()
}
}
}

func (p *aggregatedCollector) allCollect() {
p.mu.Lock()
defer p.mu.Unlock()
for _, c := range p.c {
c.Collect(p.gaugeFunc)
}
}

func (p *aggregatedCollector) allZero() {
p.mu.Lock()
defer p.mu.Unlock()
for _, c := range p.c {
c.Zero(p.gaugeFunc)
}
}
62 changes: 62 additions & 0 deletions stats/collectors/sqldb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package collectors

import (
"database/sql"
"fmt"

"github.com/rudderlabs/rudder-go-kit/stats"
)

const (
uniqName = "database_sql_%s"
)

type SQLDBStats struct {
name string
db *sql.DB
}

func NewDatabaseSQLStats(name string, db *sql.DB) *SQLDBStats {
return &SQLDBStats{
name: name,
db: db,
}
}

func (s *SQLDBStats) Collect(gaugeFunc func(key string, tag stats.Tags, val uint64)) {
dbStats := s.db.Stats()
tags := stats.Tags{"name": s.name}

gaugeFunc("sql_db_max_open_connections", tags, uint64(dbStats.MaxOpenConnections))
gaugeFunc("sql_db_open_connections", tags, uint64(dbStats.OpenConnections))
gaugeFunc("sql_db_in_use_connections", tags, uint64(dbStats.InUse))
gaugeFunc("sql_db_idle_connections", tags, uint64(dbStats.Idle))

gaugeFunc("sql_db_wait_count_total", tags, uint64(dbStats.WaitCount))
gaugeFunc("sql_db_wait_duration_seconds_total", tags, uint64(dbStats.WaitDuration.Seconds()))

gaugeFunc("sql_db_max_idle_closed_total", tags, uint64(dbStats.MaxIdleClosed))
gaugeFunc("sql_db_max_idle_time_closed_total", tags, uint64(dbStats.MaxIdleTimeClosed))
gaugeFunc("sql_db_max_lifetime_closed_total", tags, uint64(dbStats.MaxLifetimeClosed))
}

func (s *SQLDBStats) Zero(gaugeFunc func(key string, tag stats.Tags, val uint64)) {
tags := stats.Tags{"name": s.name}

gaugeFunc("sql_db_max_open_connections", tags, 0)

gaugeFunc("sql_db_open_connections", tags, 0)
gaugeFunc("sql_db_in_use_connections", tags, 0)
gaugeFunc("sql_db_idle_connections", tags, 0)

gaugeFunc("sql_db_wait_count_total", tags, 0)
gaugeFunc("sql_db_wait_duration_seconds_total", tags, 0)

gaugeFunc("sql_db_max_idle_closed_total", tags, 0)
gaugeFunc("sql_db_max_idle_time_closed_total", tags, 0)
gaugeFunc("sql_db_max_lifetime_closed_total", tags, 0)
}

func (s *SQLDBStats) ID() string {
return fmt.Sprintf(uniqName, s.name)
}
80 changes: 80 additions & 0 deletions stats/collectors/sqldb_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package collectors_test

import (
"testing"

"github.com/DATA-DOG/go-sqlmock"

"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-go-kit/stats/collectors"
"github.com/rudderlabs/rudder-go-kit/stats/memstats"
)

func TestSQLDatabase(t *testing.T) {
db, _, err := sqlmock.New()
if err != nil {
t.Fatalf("an error '%s' was not expected when opening a stub database connection", err)
}
defer db.Close()

db.SetMaxOpenConns(5)

m, err := memstats.New()
require.NoError(t, err)

testName := "test_sqlite"
s := collectors.NewDatabaseSQLStats(testName, db)

err = m.RegisterCollector(s)
require.NoError(t, err)

require.Equal(t, []memstats.Metric{
{
Name: "sql_db_idle_connections",
Tags: stats.Tags{"name": testName},
Value: 1,
},
{
Name: "sql_db_in_use_connections",
Tags: stats.Tags{"name": testName},
Value: 0,
},
{
Name: "sql_db_max_idle_closed_total",
Tags: stats.Tags{"name": testName},
Value: 0,
},
{
Name: "sql_db_max_idle_time_closed_total",
Tags: stats.Tags{"name": testName},
Value: 0,
},
{
Name: "sql_db_max_lifetime_closed_total",
Tags: stats.Tags{"name": testName},
Value: 0,
},
{
Name: "sql_db_max_open_connections",
Tags: stats.Tags{"name": testName},
Value: 5,
},
{
Name: "sql_db_open_connections",
Tags: stats.Tags{"name": testName},
Value: 1,
},
{
Name: "sql_db_wait_count_total",
Tags: stats.Tags{"name": testName},
Value: 0,
},
{
Name: "sql_db_wait_duration_seconds_total",
Tags: stats.Tags{"name": testName},
Value: 0,
},
}, m.GetAll())
}
39 changes: 39 additions & 0 deletions stats/collectors/static.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package collectors

import (
"fmt"

"github.com/rudderlabs/rudder-go-kit/stats"
)

const (
statsUniqName = "static_%s_%s"
)

type staticStats struct {
tags stats.Tags
key string
value uint64
}

// NewStaticMetric allows to capture a gauge metric that does not change during the lifetime of the application.
// Can be useful for capturing configuration values or application version.
func NewStaticMetric(key string, tags stats.Tags, value uint64) *staticStats {
return &staticStats{
tags: tags,
key: key,
value: value,
}
}

func (s *staticStats) Collect(gaugeFunc func(key string, tag stats.Tags, val uint64)) {
gaugeFunc(s.key, s.tags, s.value)
}

func (s *staticStats) Zero(gaugeFunc func(key string, tag stats.Tags, val uint64)) {
gaugeFunc(s.key, s.tags, 0)
}

func (s *staticStats) ID() string {
return fmt.Sprintf(statsUniqName, s.key, s.tags.String())
}
32 changes: 32 additions & 0 deletions stats/collectors/static_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package collectors_test

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-go-kit/stats/collectors"
"github.com/rudderlabs/rudder-go-kit/stats/memstats"
)

func TestStatic(t *testing.T) {
testName := "test_sqlite"
s := collectors.NewStaticMetric(testName, stats.Tags{
"foo": "bar",
}, 2)

m, err := memstats.New()
require.NoError(t, err)

err = m.RegisterCollector(s)
require.NoError(t, err)

require.Equal(t, []memstats.Metric{
{
Name: testName,
Tags: stats.Tags{"foo": "bar"},
Value: 2,
},
}, m.GetAll())
}
7 changes: 7 additions & 0 deletions stats/memstats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,13 @@ func (*Store) Start(_ context.Context, _ stats.GoRoutineFactory) error { return
// Stop implements stats.Stats
func (*Store) Stop() {}

func (s *Store) RegisterCollector(c stats.Collector) error {
c.Collect(func(key string, tags stats.Tags, val uint64) {
s.NewTaggedStat(key, stats.GaugeType, tags).Gauge(val)
})
return nil
}

// getKey maps name and tags, to a store lookup key.
func (*Store) getKey(name string, tags stats.Tags) string {
return name + tags.String()
Expand Down
14 changes: 14 additions & 0 deletions stats/mock_stats/mock_stats.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions stats/nop.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,5 @@ func (*nop) NewTracer(_ string) Tracer {

func (*nop) Start(_ context.Context, _ GoRoutineFactory) error { return nil }
func (*nop) Stop() {}

func (*nop) RegisterCollector(c Collector) error { return nil }
Loading

0 comments on commit 8ac166b

Please sign in to comment.