diff --git a/CHANGELOG.md b/CHANGELOG.md index 6c3e56c91..c0b0a4ca3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ ## Changelog +* [CHANGE] Removed global metrics for KV package. Making a KV object will now require a prometheus registerer that will + be used to register all relevant KV class metrics. #22 * [CHANGE] Added CHANGELOG.md and Pull Request template to reference the changelog \ No newline at end of file diff --git a/kv/client.go b/kv/client.go index e26efc5cd..3a8f410b8 100644 --- a/kv/client.go +++ b/kv/client.go @@ -129,7 +129,7 @@ func createClient(backend string, prefix string, cfg StoreConfig, codec codec.Co switch backend { case "consul": - client, err = consul.NewClient(cfg.Consul, codec, logger) + client, err = consul.NewClient(cfg.Consul, codec, logger, reg) case "etcd": client, err = etcd.New(cfg.Etcd, codec, logger) @@ -138,7 +138,7 @@ func createClient(backend string, prefix string, cfg StoreConfig, codec codec.Co // If we use the in-memory store, make sure everyone gets the same instance // within the same process. inmemoryStoreInit.Do(func() { - inmemoryStore, _ = consul.NewInMemoryClient(codec, logger) + inmemoryStore, _ = consul.NewInMemoryClient(codec, logger, reg) }) client = inmemoryStore @@ -205,5 +205,5 @@ func buildMultiClient(cfg StoreConfig, codec codec.Codec, reg prometheus.Registe {client: secondary, name: cfg.Multi.Secondary}, } - return NewMultiClient(cfg.Multi, clients, logger), nil + return NewMultiClient(cfg.Multi, clients, logger, reg), nil } diff --git a/kv/client_test.go b/kv/client_test.go index a244cd7ce..22d394b70 100644 --- a/kv/client_test.go +++ b/kv/client_test.go @@ -39,7 +39,7 @@ multi: func Test_createClient_multiBackend_withSingleRing(t *testing.T) { storeCfg, testCodec := newConfigsForTest() require.NotPanics(t, func() { - _, err := createClient("multi", "/collector", storeCfg, testCodec, Primary, prometheus.NewRegistry(), testLogger{}) + _, err := createClient("multi", "/collector", storeCfg, testCodec, Primary, prometheus.NewPedanticRegistry(), testLogger{}) require.NoError(t, err) }) } @@ -47,7 +47,7 @@ func Test_createClient_multiBackend_withSingleRing(t *testing.T) { func Test_createClient_multiBackend_withMultiRing(t *testing.T) { storeCfg1, testCodec := newConfigsForTest() storeCfg2 := StoreConfig{} - reg := prometheus.NewRegistry() + reg := prometheus.NewPedanticRegistry() require.NotPanics(t, func() { _, err := createClient("multi", "/test", storeCfg1, testCodec, Primary, reg, testLogger{}) @@ -61,7 +61,7 @@ func Test_createClient_multiBackend_withMultiRing(t *testing.T) { func Test_createClient_singleBackend_mustContainRoleAndTypeLabels(t *testing.T) { storeCfg, testCodec := newConfigsForTest() - reg := prometheus.NewRegistry() + reg := prometheus.NewPedanticRegistry() client, err := createClient("mock", "/test1", storeCfg, testCodec, Primary, reg, testLogger{}) require.NoError(t, err) require.NoError(t, client.CAS(context.Background(), "/test", func(_ interface{}) (out interface{}, retry bool, err error) { @@ -70,7 +70,7 @@ func Test_createClient_singleBackend_mustContainRoleAndTypeLabels(t *testing.T) return })) - actual := typeToRoleMap(t, reg) + actual := typeToRoleMapHistogramLabels(t, reg, "kv_request_duration_seconds") require.Len(t, actual, 1) require.Equal(t, "primary", actual["mock"]) } @@ -79,7 +79,7 @@ func Test_createClient_multiBackend_mustContainRoleAndTypeLabels(t *testing.T) { storeCfg, testCodec := newConfigsForTest() storeCfg.Multi.MirrorEnabled = true storeCfg.Multi.MirrorTimeout = 10 * time.Second - reg := prometheus.NewRegistry() + reg := prometheus.NewPedanticRegistry() client, err := createClient("multi", "/test1", storeCfg, testCodec, Primary, reg, testLogger{}) require.NoError(t, err) require.NoError(t, client.CAS(context.Background(), "/test", func(_ interface{}) (out interface{}, retry bool, err error) { @@ -88,7 +88,7 @@ func Test_createClient_multiBackend_mustContainRoleAndTypeLabels(t *testing.T) { return })) - actual := typeToRoleMap(t, reg) + actual := typeToRoleMapHistogramLabels(t, reg, "kv_request_duration_seconds") // expected multi-primary, inmemory-primary and mock-secondary require.Len(t, actual, 3) require.Equal(t, "primary", actual["multi"]) @@ -97,11 +97,14 @@ func Test_createClient_multiBackend_mustContainRoleAndTypeLabels(t *testing.T) { } -func typeToRoleMap(t *testing.T, reg prometheus.Gatherer) map[string]string { +func typeToRoleMapHistogramLabels(t *testing.T, reg prometheus.Gatherer, histogramWithRoleLabels string) map[string]string { mfs, err := reg.Gather() require.NoError(t, err) result := map[string]string{} for _, mf := range mfs { + if mf.GetName() != histogramWithRoleLabels { + continue + } for _, m := range mf.GetMetric() { backendType := "" role := "" diff --git a/kv/consul/client.go b/kv/consul/client.go index 8117e37af..9ce6d91c5 100644 --- a/kv/consul/client.go +++ b/kv/consul/client.go @@ -13,6 +13,7 @@ import ( "github.com/go-kit/kit/log/level" consul "github.com/hashicorp/consul/api" "github.com/hashicorp/go-cleanhttp" + "github.com/prometheus/client_golang/prometheus" "github.com/weaveworks/common/instrument" "golang.org/x/time/rate" @@ -61,9 +62,10 @@ type kv interface { // Client is a KV.Client for Consul. type Client struct { kv - codec codec.Codec - cfg Config - logger log.Logger + codec codec.Codec + cfg Config + logger log.Logger + consulMetrics *consulMetrics } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -78,7 +80,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, prefix string) { } // NewClient returns a new Client. -func NewClient(cfg Config, codec codec.Codec, logger log.Logger) (*Client, error) { +func NewClient(cfg Config, codec codec.Codec, logger log.Logger, registerer prometheus.Registerer) (*Client, error) { client, err := consul.NewClient(&consul.Config{ Address: cfg.Host, Token: cfg.ACLToken, @@ -92,11 +94,14 @@ func NewClient(cfg Config, codec codec.Codec, logger log.Logger) (*Client, error if err != nil { return nil, err } + consulMetrics := newConsulMetrics(registerer) + c := &Client{ - kv: consulMetrics{client.KV()}, - codec: codec, - cfg: cfg, - logger: logger, + kv: consulInstrumentation{client.KV(), consulMetrics}, + codec: codec, + cfg: cfg, + logger: logger, + consulMetrics: consulMetrics, } return c, nil } @@ -108,7 +113,7 @@ func (c *Client) Put(ctx context.Context, key string, value interface{}) error { return err } - return instrument.CollectedRequest(ctx, "Put", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + return instrument.CollectedRequest(ctx, "Put", c.consulMetrics.consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { _, err := c.kv.Put(&consul.KVPair{ Key: key, Value: bytes, @@ -120,7 +125,7 @@ func (c *Client) Put(ctx context.Context, key string, value interface{}) error { // CAS atomically modifies a value in a callback. // If value doesn't exist you'll get nil as an argument to your callback. func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error { - return instrument.CollectedRequest(ctx, "CAS loop", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + return instrument.CollectedRequest(ctx, "CAS loop", c.consulMetrics.consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { return c.cas(ctx, key, f) }) } diff --git a/kv/consul/client_test.go b/kv/consul/client_test.go index 52e429dfc..74c188996 100644 --- a/kv/consul/client_test.go +++ b/kv/consul/client_test.go @@ -8,6 +8,7 @@ import ( "time" consul "github.com/hashicorp/consul/api" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -33,7 +34,7 @@ func TestWatchKeyWithRateLimit(t *testing.T) { c, closer := NewInMemoryClientWithConfig(codec.String{}, Config{ WatchKeyRateLimit: 5.0, WatchKeyBurstSize: 1, - }, testLogger{}) + }, testLogger{}, prometheus.NewPedanticRegistry()) t.Cleanup(func() { assert.NoError(t, closer.Close()) }) @@ -67,7 +68,7 @@ func TestWatchKeyWithRateLimit(t *testing.T) { func TestWatchKeyNoRateLimit(t *testing.T) { c, closer := NewInMemoryClientWithConfig(codec.String{}, Config{ WatchKeyRateLimit: 0, - }, testLogger{}) + }, testLogger{}, prometheus.NewPedanticRegistry()) t.Cleanup(func() { assert.NoError(t, closer.Close()) }) @@ -89,7 +90,7 @@ func TestWatchKeyNoRateLimit(t *testing.T) { } func TestReset(t *testing.T) { - c, closer := NewInMemoryClient(codec.String{}, testLogger{}) + c, closer := NewInMemoryClient(codec.String{}, testLogger{}, prometheus.NewPedanticRegistry()) t.Cleanup(func() { assert.NoError(t, closer.Close()) }) @@ -148,7 +149,7 @@ func observeValueForSomeTime(t *testing.T, client *Client, key string, timeout t } func TestWatchKeyWithNoStartValue(t *testing.T) { - c, closer := NewInMemoryClient(codec.String{}, testLogger{}) + c, closer := NewInMemoryClient(codec.String{}, testLogger{}, prometheus.NewPedanticRegistry()) t.Cleanup(func() { assert.NoError(t, closer.Close()) }) diff --git a/kv/consul/metrics.go b/kv/consul/metrics.go index 0b2940567..52a1d4e84 100644 --- a/kv/consul/metrics.go +++ b/kv/consul/metrics.go @@ -5,27 +5,33 @@ import ( consul "github.com/hashicorp/consul/api" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/weaveworks/common/instrument" ) -var consulRequestDuration = instrument.NewHistogramCollector(prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Name: "consul_request_duration_seconds", - Help: "Time spent on consul requests.", - Buckets: prometheus.DefBuckets, -}, []string{"operation", "status_code"})) - -func init() { - consulRequestDuration.Register() +type consulInstrumentation struct { + kv kv + consulMetrics *consulMetrics } type consulMetrics struct { - kv + consulRequestDuration *instrument.HistogramCollector +} + +func newConsulMetrics(registerer prometheus.Registerer) *consulMetrics { + consulRequestDurationCollector := instrument.NewHistogramCollector(promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{ + Name: "consul_request_duration_seconds", + Help: "Time spent on consul requests.", + Buckets: prometheus.DefBuckets, + }, []string{"operation", "status_code"})) + consulMetrics := consulMetrics{consulRequestDurationCollector} + return &consulMetrics } -func (c consulMetrics) CAS(p *consul.KVPair, options *consul.WriteOptions) (bool, *consul.WriteMeta, error) { +func (c consulInstrumentation) CAS(p *consul.KVPair, options *consul.WriteOptions) (bool, *consul.WriteMeta, error) { var ok bool var result *consul.WriteMeta - err := instrument.CollectedRequest(options.Context(), "CAS", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + err := instrument.CollectedRequest(options.Context(), "CAS", c.consulMetrics.consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { options = options.WithContext(ctx) var err error ok, result, err = c.kv.CAS(p, options) @@ -34,10 +40,10 @@ func (c consulMetrics) CAS(p *consul.KVPair, options *consul.WriteOptions) (bool return ok, result, err } -func (c consulMetrics) Get(key string, options *consul.QueryOptions) (*consul.KVPair, *consul.QueryMeta, error) { +func (c consulInstrumentation) Get(key string, options *consul.QueryOptions) (*consul.KVPair, *consul.QueryMeta, error) { var kvp *consul.KVPair var meta *consul.QueryMeta - err := instrument.CollectedRequest(options.Context(), "Get", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + err := instrument.CollectedRequest(options.Context(), "Get", c.consulMetrics.consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { options = options.WithContext(ctx) var err error kvp, meta, err = c.kv.Get(key, options) @@ -46,10 +52,10 @@ func (c consulMetrics) Get(key string, options *consul.QueryOptions) (*consul.KV return kvp, meta, err } -func (c consulMetrics) List(path string, options *consul.QueryOptions) (consul.KVPairs, *consul.QueryMeta, error) { +func (c consulInstrumentation) List(path string, options *consul.QueryOptions) (consul.KVPairs, *consul.QueryMeta, error) { var kvps consul.KVPairs var meta *consul.QueryMeta - err := instrument.CollectedRequest(options.Context(), "List", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + err := instrument.CollectedRequest(options.Context(), "List", c.consulMetrics.consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { options = options.WithContext(ctx) var err error kvps, meta, err = c.kv.List(path, options) @@ -58,9 +64,9 @@ func (c consulMetrics) List(path string, options *consul.QueryOptions) (consul.K return kvps, meta, err } -func (c consulMetrics) Delete(key string, options *consul.WriteOptions) (*consul.WriteMeta, error) { +func (c consulInstrumentation) Delete(key string, options *consul.WriteOptions) (*consul.WriteMeta, error) { var meta *consul.WriteMeta - err := instrument.CollectedRequest(options.Context(), "Delete", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + err := instrument.CollectedRequest(options.Context(), "Delete", c.consulMetrics.consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { options = options.WithContext(ctx) var err error meta, err = c.kv.Delete(key, options) @@ -69,9 +75,9 @@ func (c consulMetrics) Delete(key string, options *consul.WriteOptions) (*consul return meta, err } -func (c consulMetrics) Put(p *consul.KVPair, options *consul.WriteOptions) (*consul.WriteMeta, error) { +func (c consulInstrumentation) Put(p *consul.KVPair, options *consul.WriteOptions) (*consul.WriteMeta, error) { var result *consul.WriteMeta - err := instrument.CollectedRequest(options.Context(), "Put", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + err := instrument.CollectedRequest(options.Context(), "Put", c.consulMetrics.consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { options = options.WithContext(ctx) var err error result, err = c.kv.Put(p, options) diff --git a/kv/consul/mock.go b/kv/consul/mock.go index d22381bfe..a5ed43df9 100644 --- a/kv/consul/mock.go +++ b/kv/consul/mock.go @@ -10,6 +10,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" consul "github.com/hashicorp/consul/api" + "github.com/prometheus/client_golang/prometheus" "github.com/grafana/dskit/closer" "github.com/grafana/dskit/kv/codec" @@ -28,12 +29,12 @@ type mockKV struct { } // NewInMemoryClient makes a new mock consul client. -func NewInMemoryClient(codec codec.Codec, logger log.Logger) (*Client, io.Closer) { - return NewInMemoryClientWithConfig(codec, Config{}, logger) +func NewInMemoryClient(codec codec.Codec, logger log.Logger, registerer prometheus.Registerer) (*Client, io.Closer) { + return NewInMemoryClientWithConfig(codec, Config{}, logger, registerer) } // NewInMemoryClientWithConfig makes a new mock consul client with supplied Config. -func NewInMemoryClientWithConfig(codec codec.Codec, cfg Config, logger log.Logger) (*Client, io.Closer) { +func NewInMemoryClientWithConfig(codec codec.Codec, cfg Config, logger log.Logger, registerer prometheus.Registerer) (*Client, io.Closer) { m := mockKV{ kvps: map[string]*consul.KVPair{}, // Always start from 1, we NEVER want to report back index 0 in the responses. @@ -58,10 +59,11 @@ func NewInMemoryClientWithConfig(codec codec.Codec, cfg Config, logger log.Logge go m.loop() return &Client{ - kv: &m, - codec: codec, - cfg: cfg, - logger: logger, + kv: &m, + codec: codec, + cfg: cfg, + logger: logger, + consulMetrics: newConsulMetrics(registerer), }, closer } diff --git a/kv/kv_test.go b/kv/kv_test.go index fe57d3882..f2a23ac61 100644 --- a/kv/kv_test.go +++ b/kv/kv_test.go @@ -25,7 +25,7 @@ func withFixtures(t *testing.T, f func(*testing.T, Client)) { factory func() (Client, io.Closer, error) }{ {"consul", func() (Client, io.Closer, error) { - client, closer := consul.NewInMemoryClient(codec.String{}, testLogger{}) + client, closer := consul.NewInMemoryClient(codec.String{}, testLogger{}, nil) return client, closer, nil }}, {"etcd", func() (Client, io.Closer, error) { diff --git a/kv/kvtls/test/tls_integration_test.go b/kv/kvtls/test/tls_integration_test.go index e68de95c4..23ca7069e 100644 --- a/kv/kvtls/test/tls_integration_test.go +++ b/kv/kvtls/test/tls_integration_test.go @@ -78,7 +78,7 @@ func newIntegrationClientServer( ) { // server registers some metrics to default registry savedRegistry := prometheus.DefaultRegisterer - prometheus.DefaultRegisterer = prometheus.NewRegistry() + prometheus.DefaultRegisterer = prometheus.NewPedanticRegistry() defer func() { prometheus.DefaultRegisterer = savedRegistry }() diff --git a/kv/memberlist/kv_init_service.go b/kv/memberlist/kv_init_service.go index f1e053b13..c137bc786 100644 --- a/kv/memberlist/kv_init_service.go +++ b/kv/memberlist/kv_init_service.go @@ -14,6 +14,7 @@ import ( "github.com/go-kit/kit/log" "github.com/hashicorp/memberlist" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/atomic" "github.com/grafana/dskit/services" @@ -28,6 +29,7 @@ type KVInitService struct { cfg *KVConfig logger log.Logger dnsProvider DNSProvider + registerer prometheus.Registerer // init function, to avoid multiple initializations. init sync.Once @@ -38,11 +40,12 @@ type KVInitService struct { watcher *services.FailureWatcher } -func NewKVInitService(cfg *KVConfig, logger log.Logger, dnsProvider DNSProvider) *KVInitService { +func NewKVInitService(cfg *KVConfig, logger log.Logger, dnsProvider DNSProvider, registerer prometheus.Registerer) *KVInitService { kvinit := &KVInitService{ cfg: cfg, watcher: services.NewFailureWatcher(), logger: logger, + registerer: registerer, dnsProvider: dnsProvider, } kvinit.Service = services.NewBasicService(nil, kvinit.running, kvinit.stopping).WithName("memberlist KV service") @@ -52,7 +55,7 @@ func NewKVInitService(cfg *KVConfig, logger log.Logger, dnsProvider DNSProvider) // GetMemberlistKV will initialize Memberlist.KV on first call, and add it to service failure watcher. func (kvs *KVInitService) GetMemberlistKV() (*KV, error) { kvs.init.Do(func() { - kv := NewKV(*kvs.cfg, kvs.logger, kvs.dnsProvider) + kv := NewKV(*kvs.cfg, kvs.logger, kvs.dnsProvider, kvs.registerer) kvs.watcher.WatchService(kv) kvs.err = kv.StartAsync(context.Background()) diff --git a/kv/memberlist/kv_init_service_test.go b/kv/memberlist/kv_init_service_test.go index 2e58b6604..35c668433 100644 --- a/kv/memberlist/kv_init_service_test.go +++ b/kv/memberlist/kv_init_service_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/hashicorp/memberlist" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/grafana/dskit/flagext" @@ -56,6 +57,6 @@ func TestPage(t *testing.T) { func TestStop(t *testing.T) { var cfg KVConfig flagext.DefaultValues(&cfg) - kvinit := NewKVInitService(&cfg, nil, &dnsProviderMock{}) + kvinit := NewKVInitService(&cfg, nil, &dnsProviderMock{}, prometheus.NewPedanticRegistry()) require.NoError(t, kvinit.stopping(nil)) } diff --git a/kv/memberlist/memberlist_client.go b/kv/memberlist/memberlist_client.go index 4faad94f3..d5c1c7e74 100644 --- a/kv/memberlist/memberlist_client.go +++ b/kv/memberlist/memberlist_client.go @@ -212,8 +212,9 @@ func generateRandomSuffix(logger log.Logger) string { type KV struct { services.Service - cfg KVConfig - logger log.Logger + cfg KVConfig + logger log.Logger + registerer prometheus.Registerer // dns discovery provider provider DNSProvider @@ -326,14 +327,16 @@ var ( // gossiping part. Only after service is in Running state, it is really gossiping. Starting the service will also // trigger connecting to the existing memberlist cluster. If that fails and AbortIfJoinFails is true, error is returned // and service enters Failed state. -func NewKV(cfg KVConfig, logger log.Logger, dnsProvider DNSProvider) *KV { +func NewKV(cfg KVConfig, logger log.Logger, dnsProvider DNSProvider, registerer prometheus.Registerer) *KV { cfg.TCPTransport.MetricsRegisterer = cfg.MetricsRegisterer cfg.TCPTransport.MetricsNamespace = cfg.MetricsNamespace mlkv := &KV{ - cfg: cfg, - logger: logger, - provider: dnsProvider, + cfg: cfg, + logger: logger, + registerer: registerer, + provider: dnsProvider, + store: make(map[string]valueDesc), codecs: make(map[string]codec.Codec), watchers: make(map[string][]chan string), diff --git a/kv/memberlist/memberlist_client_test.go b/kv/memberlist/memberlist_client_test.go index 242074eb5..2b0386b65 100644 --- a/kv/memberlist/memberlist_client_test.go +++ b/kv/memberlist/memberlist_client_test.go @@ -9,13 +9,14 @@ import ( "math" "math/rand" "net" - reflect "reflect" + "reflect" "sort" "sync" "testing" "time" "github.com/go-kit/kit/log" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -246,7 +247,7 @@ func TestBasicGetAndCas(t *testing.T) { } cfg.Codecs = []codec.Codec{c} - mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}) + mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv)) defer services.StopAndAwaitTerminated(context.Background(), mkv) //nolint:errcheck @@ -302,7 +303,7 @@ func withFixtures(t *testing.T, testFN func(t *testing.T, kv *Client)) { cfg.TCPTransport = TCPTransportConfig{} cfg.Codecs = []codec.Codec{c} - mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}) + mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv)) defer services.StopAndAwaitTerminated(context.Background(), mkv) //nolint:errcheck @@ -446,7 +447,7 @@ func TestMultipleCAS(t *testing.T) { flagext.DefaultValues(&cfg) cfg.Codecs = []codec.Codec{c} - mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}) + mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) mkv.maxCasRetries = 20 require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv)) defer services.StopAndAwaitTerminated(context.Background(), mkv) //nolint:errcheck @@ -548,7 +549,7 @@ func TestMultipleClients(t *testing.T) { cfg.Codecs = []codec.Codec{c} - mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}) + mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv)) kv, err := NewClient(mkv, c) @@ -701,7 +702,7 @@ func TestJoinMembersWithRetryBackoff(t *testing.T) { time.Sleep(1 * time.Second) } - mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}) // Not started yet. + mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) // Not started yet. watcher.WatchService(mkv) kv, err := NewClient(mkv, c) @@ -775,7 +776,7 @@ func TestMemberlistFailsToJoin(t *testing.T) { cfg.Codecs = []codec.Codec{c} - mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}) + mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv)) ctxTimeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) @@ -946,7 +947,7 @@ func TestMultipleCodecs(t *testing.T) { distributedCounterCodec{}, } - mkv1 := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}) + mkv1 := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv1)) defer services.StopAndAwaitTerminated(context.Background(), mkv1) //nolint:errcheck @@ -989,7 +990,7 @@ func TestMultipleCodecs(t *testing.T) { require.NoError(t, err) // We will read values from second KV, which will join the first one - mkv2 := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}) + mkv2 := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv2)) defer services.StopAndAwaitTerminated(context.Background(), mkv2) //nolint:errcheck @@ -1041,11 +1042,11 @@ func TestRejoin(t *testing.T) { cfg2.JoinMembers = []string{fmt.Sprintf("localhost:%d", ports[0])} cfg2.RejoinInterval = 1 * time.Second - mkv1 := NewKV(cfg1, log.NewNopLogger(), &dnsProviderMock{}) + mkv1 := NewKV(cfg1, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv1)) defer services.StopAndAwaitTerminated(context.Background(), mkv1) //nolint:errcheck - mkv2 := NewKV(cfg2, log.NewNopLogger(), &dnsProviderMock{}) + mkv2 := NewKV(cfg2, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv2)) defer services.StopAndAwaitTerminated(context.Background(), mkv2) //nolint:errcheck @@ -1062,7 +1063,7 @@ func TestRejoin(t *testing.T) { poll(t, 5*time.Second, 1, membersFunc) // Let's start first KV again. It is not configured to join the cluster, but KV2 is rejoining. - mkv1 = NewKV(cfg1, log.NewNopLogger(), &dnsProviderMock{}) + mkv1 = NewKV(cfg1, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv1)) defer services.StopAndAwaitTerminated(context.Background(), mkv1) //nolint:errcheck @@ -1094,7 +1095,7 @@ func TestNotifyMsgResendsOnlyChanges(t *testing.T) { cfg.RetransmitMult = 1 cfg.Codecs = append(cfg.Codecs, codec) - kv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}) + kv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) require.NoError(t, services.StartAndAwaitRunning(context.Background(), kv)) defer services.StopAndAwaitTerminated(context.Background(), kv) //nolint:errcheck @@ -1157,7 +1158,7 @@ func TestSendingOldTombstoneShouldNotForwardMessage(t *testing.T) { cfg.LeftIngestersTimeout = 5 * time.Minute cfg.Codecs = append(cfg.Codecs, codec) - kv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}) + kv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) require.NoError(t, services.StartAndAwaitRunning(context.Background(), kv)) defer services.StopAndAwaitTerminated(context.Background(), kv) //nolint:errcheck diff --git a/kv/memberlist/metrics.go b/kv/memberlist/metrics.go index 010288fba..26fd905fb 100644 --- a/kv/memberlist/metrics.go +++ b/kv/memberlist/metrics.go @@ -7,6 +7,7 @@ import ( armonprometheus "github.com/armon/go-metrics/prometheus" "github.com/go-kit/kit/log/level" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/grafana/dskit/services" ) @@ -14,56 +15,56 @@ import ( func (m *KV) createAndRegisterMetrics() { const subsystem = "memberlist_client" - m.numberOfReceivedMessages = prometheus.NewCounter(prometheus.CounterOpts{ + m.numberOfReceivedMessages = promauto.With(m.registerer).NewCounter(prometheus.CounterOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "received_broadcasts_total", Help: "Number of received broadcast user messages", }) - m.totalSizeOfReceivedMessages = prometheus.NewCounter(prometheus.CounterOpts{ + m.totalSizeOfReceivedMessages = promauto.With(m.registerer).NewCounter(prometheus.CounterOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "received_broadcasts_bytes_total", Help: "Total size of received broadcast user messages", }) - m.numberOfInvalidReceivedMessages = prometheus.NewCounter(prometheus.CounterOpts{ + m.numberOfInvalidReceivedMessages = promauto.With(m.registerer).NewCounter(prometheus.CounterOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "received_broadcasts_invalid_total", Help: "Number of received broadcast user messages that were invalid. Hopefully 0.", }) - m.numberOfPushes = prometheus.NewCounter(prometheus.CounterOpts{ + m.numberOfPushes = promauto.With(m.registerer).NewCounter(prometheus.CounterOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "state_pushes_total", Help: "How many times did this node push its full state to another node", }) - m.totalSizeOfPushes = prometheus.NewCounter(prometheus.CounterOpts{ + m.totalSizeOfPushes = promauto.With(m.registerer).NewCounter(prometheus.CounterOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "state_pushes_bytes_total", Help: "Total size of pushed state", }) - m.numberOfPulls = prometheus.NewCounter(prometheus.CounterOpts{ + m.numberOfPulls = promauto.With(m.registerer).NewCounter(prometheus.CounterOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "state_pulls_total", Help: "How many times did this node pull full state from another node", }) - m.totalSizeOfPulls = prometheus.NewCounter(prometheus.CounterOpts{ + m.totalSizeOfPulls = promauto.With(m.registerer).NewCounter(prometheus.CounterOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "state_pulls_bytes_total", Help: "Total size of pulled state", }) - m.numberOfBroadcastMessagesInQueue = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + m.numberOfBroadcastMessagesInQueue = promauto.With(m.registerer).NewGaugeFunc(prometheus.GaugeOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "messages_in_broadcast_queue", @@ -76,35 +77,35 @@ func (m *KV) createAndRegisterMetrics() { return 0 }) - m.totalSizeOfBroadcastMessagesInQueue = prometheus.NewGauge(prometheus.GaugeOpts{ + m.totalSizeOfBroadcastMessagesInQueue = promauto.With(m.registerer).NewGauge(prometheus.GaugeOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "messages_in_broadcast_queue_bytes", Help: "Total size of messages waiting in the broadcast queue", }) - m.numberOfBroadcastMessagesDropped = prometheus.NewCounter(prometheus.CounterOpts{ + m.numberOfBroadcastMessagesDropped = promauto.With(m.registerer).NewCounter(prometheus.CounterOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "messages_to_broadcast_dropped_total", Help: "Number of broadcast messages intended to be sent but were dropped due to encoding errors or for being too big", }) - m.casAttempts = prometheus.NewCounter(prometheus.CounterOpts{ + m.casAttempts = promauto.With(m.registerer).NewCounter(prometheus.CounterOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "cas_attempt_total", Help: "Attempted CAS operations", }) - m.casSuccesses = prometheus.NewCounter(prometheus.CounterOpts{ + m.casSuccesses = promauto.With(m.registerer).NewCounter(prometheus.CounterOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "cas_success_total", Help: "Successful CAS operations", }) - m.casFailures = prometheus.NewCounter(prometheus.CounterOpts{ + m.casFailures = promauto.With(m.registerer).NewCounter(prometheus.CounterOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "cas_failure_total", @@ -116,21 +117,21 @@ func (m *KV) createAndRegisterMetrics() { "Number of values in KV Store", nil, nil) - m.storeTombstones = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + m.storeTombstones = promauto.With(m.registerer).NewGaugeVec(prometheus.GaugeOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "kv_store_value_tombstones", Help: "Number of tombstones currently present in KV store values", }, []string{"key"}) - m.storeRemovedTombstones = prometheus.NewCounterVec(prometheus.CounterOpts{ + m.storeRemovedTombstones = promauto.With(m.registerer).NewCounterVec(prometheus.CounterOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "kv_store_value_tombstones_removed_total", Help: "Total number of tombstones which have been removed from KV store values", }, []string{"key"}) - m.memberlistMembersCount = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + m.memberlistMembersCount = promauto.With(m.registerer).NewGaugeFunc(prometheus.GaugeOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "cluster_members_count", @@ -143,7 +144,7 @@ func (m *KV) createAndRegisterMetrics() { return 0 }) - m.memberlistHealthScore = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + m.memberlistHealthScore = promauto.With(m.registerer).NewGaugeFunc(prometheus.GaugeOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "cluster_node_health_score", @@ -156,7 +157,7 @@ func (m *KV) createAndRegisterMetrics() { return 0 }) - m.watchPrefixDroppedNotifications = prometheus.NewCounterVec(prometheus.CounterOpts{ + m.watchPrefixDroppedNotifications = promauto.With(m.registerer).NewCounterVec(prometheus.CounterOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "watch_prefix_dropped_notifications", @@ -167,33 +168,6 @@ func (m *KV) createAndRegisterMetrics() { return } - all := []prometheus.Collector{ - m.numberOfReceivedMessages, - m.totalSizeOfReceivedMessages, - m.numberOfInvalidReceivedMessages, - m.numberOfBroadcastMessagesInQueue, - m.numberOfPushes, - m.numberOfPulls, - m.totalSizeOfPushes, - m.totalSizeOfPulls, - m.totalSizeOfBroadcastMessagesInQueue, - m.numberOfBroadcastMessagesDropped, - m.casAttempts, - m.casFailures, - m.casSuccesses, - m.watchPrefixDroppedNotifications, - m.storeTombstones, - m.storeRemovedTombstones, - m.memberlistMembersCount, - m.memberlistHealthScore, - } - - for _, c := range all { - m.cfg.MetricsRegisterer.MustRegister(c) - } - - m.cfg.MetricsRegisterer.MustRegister(m) - // memberlist uses armonmetrics package for internal usage // here we configure armonmetrics to use prometheus sink, err := armonprometheus.NewPrometheusSink() // there is no option to pass registrerer, this uses default diff --git a/kv/memberlist/tcp_transport.go b/kv/memberlist/tcp_transport.go index 1d50032cd..4c6badee0 100644 --- a/kv/memberlist/tcp_transport.go +++ b/kv/memberlist/tcp_transport.go @@ -18,6 +18,7 @@ import ( "github.com/hashicorp/memberlist" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "go.uber.org/atomic" "github.com/grafana/dskit/flagext" @@ -128,7 +129,7 @@ func NewTCPTransport(config TCPTransportConfig, logger log.Logger) (*TCPTranspor } } - t.registerMetrics() + t.registerMetrics(config.MetricsRegisterer) // Clean up listeners if there's an error. defer func() { @@ -545,98 +546,76 @@ func (t *TCPTransport) Shutdown() error { return nil } -func (t *TCPTransport) registerMetrics() { +func (t *TCPTransport) registerMetrics(registerer prometheus.Registerer) { const subsystem = "memberlist_tcp_transport" - t.incomingStreams = prometheus.NewCounter(prometheus.CounterOpts{ + t.incomingStreams = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Namespace: t.cfg.MetricsNamespace, Subsystem: subsystem, Name: "incoming_streams_total", Help: "Number of incoming memberlist streams", }) - t.outgoingStreams = prometheus.NewCounter(prometheus.CounterOpts{ + t.outgoingStreams = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Namespace: t.cfg.MetricsNamespace, Subsystem: subsystem, Name: "outgoing_streams_total", Help: "Number of outgoing streams", }) - t.outgoingStreamErrors = prometheus.NewCounter(prometheus.CounterOpts{ + t.outgoingStreamErrors = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Namespace: t.cfg.MetricsNamespace, Subsystem: subsystem, Name: "outgoing_stream_errors_total", Help: "Number of errors when opening memberlist stream to another node", }) - t.receivedPackets = prometheus.NewCounter(prometheus.CounterOpts{ + t.receivedPackets = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Namespace: t.cfg.MetricsNamespace, Subsystem: subsystem, Name: "packets_received_total", Help: "Number of received memberlist packets", }) - t.receivedPacketsBytes = prometheus.NewCounter(prometheus.CounterOpts{ + t.receivedPacketsBytes = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Namespace: t.cfg.MetricsNamespace, Subsystem: subsystem, Name: "packets_received_bytes_total", Help: "Total bytes received as packets", }) - t.receivedPacketsErrors = prometheus.NewCounter(prometheus.CounterOpts{ + t.receivedPacketsErrors = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Namespace: t.cfg.MetricsNamespace, Subsystem: subsystem, Name: "packets_received_errors_total", Help: "Number of errors when receiving memberlist packets", }) - t.sentPackets = prometheus.NewCounter(prometheus.CounterOpts{ + t.sentPackets = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Namespace: t.cfg.MetricsNamespace, Subsystem: subsystem, Name: "packets_sent_total", Help: "Number of memberlist packets sent", }) - t.sentPacketsBytes = prometheus.NewCounter(prometheus.CounterOpts{ + t.sentPacketsBytes = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Namespace: t.cfg.MetricsNamespace, Subsystem: subsystem, Name: "packets_sent_bytes_total", Help: "Total bytes sent as packets", }) - t.sentPacketsErrors = prometheus.NewCounter(prometheus.CounterOpts{ + t.sentPacketsErrors = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Namespace: t.cfg.MetricsNamespace, Subsystem: subsystem, Name: "packets_sent_errors_total", Help: "Number of errors when sending memberlist packets", }) - t.unknownConnections = prometheus.NewCounter(prometheus.CounterOpts{ + t.unknownConnections = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Namespace: t.cfg.MetricsNamespace, Subsystem: subsystem, Name: "unknown_connections_total", Help: "Number of unknown TCP connections (not a packet or stream)", }) - - if t.cfg.MetricsRegisterer == nil { - return - } - - all := []prometheus.Metric{ - t.incomingStreams, - t.outgoingStreams, - t.outgoingStreamErrors, - t.receivedPackets, - t.receivedPacketsBytes, - t.receivedPacketsErrors, - t.sentPackets, - t.sentPacketsBytes, - t.sentPacketsErrors, - t.unknownConnections, - } - - // if registration fails, that's too bad, but don't panic - for _, c := range all { - t.cfg.MetricsRegisterer.MustRegister(c.(prometheus.Collector)) - } } diff --git a/kv/multi.go b/kv/multi.go index e750e9aa4..233b3a1aa 100644 --- a/kv/multi.go +++ b/kv/multi.go @@ -10,35 +10,10 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "go.uber.org/atomic" ) -var ( - primaryStoreGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Name: "multikv_primary_store", - Help: "Selected primary KV store", - }, []string{"store"}) - - mirrorEnabledGauge = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "multikv_mirror_enabled", - Help: "Is mirroring to secondary store enabled", - }) - - mirrorWritesCounter = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "multikv_mirror_writes_total", - Help: "Number of mirror-writes to secondary store", - }) - - mirrorFailuresCounter = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "multikv_mirror_write_errors_total", - Help: "Number of failures to mirror-write to secondary store", - }) -) - -func init() { - prometheus.MustRegister(primaryStoreGauge, mirrorEnabledGauge, mirrorWritesCounter, mirrorFailuresCounter) -} - // MultiConfig is a configuration for MultiClient. type MultiConfig struct { Primary string `yaml:"primary"` @@ -102,11 +77,16 @@ type MultiClient struct { // so we use this map instead. inProgress map[int]clientInProgress inProgressCnt int + + primaryStoreGauge *prometheus.GaugeVec + mirrorEnabledGauge prometheus.Gauge + mirrorWritesCounter prometheus.Counter + mirrorFailuresCounter prometheus.Counter } // NewMultiClient creates new MultiClient with given KV Clients. // First client in the slice is the primary client. -func NewMultiClient(cfg MultiConfig, clients []kvclient, logger log.Logger) *MultiClient { +func NewMultiClient(cfg MultiConfig, clients []kvclient, logger log.Logger, registerer prometheus.Registerer) *MultiClient { c := &MultiClient{ clients: clients, primaryID: atomic.NewInt32(0), @@ -125,6 +105,7 @@ func NewMultiClient(cfg MultiConfig, clients []kvclient, logger log.Logger) *Mul go c.watchConfigChannel(ctx, cfg.ConfigProvider()) } + c.registerMetrics(registerer) c.updatePrimaryStoreGauge() c.updateMirrorEnabledGauge() return c @@ -201,6 +182,28 @@ func (m *MultiClient) setNewPrimaryClient(store string) (bool, error) { return true, nil } +func (m *MultiClient) registerMetrics(registerer prometheus.Registerer) { + m.primaryStoreGauge = promauto.With(registerer).NewGaugeVec(prometheus.GaugeOpts{ + Name: "multikv_primary_store", + Help: "Selected primary KV store", + }, []string{"store"}) + + m.mirrorEnabledGauge = promauto.With(registerer).NewGauge(prometheus.GaugeOpts{ + Name: "multikv_mirror_enabled", + Help: "Is mirroring to secondary store enabled", + }) + + m.mirrorWritesCounter = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Name: "multikv_mirror_writes_total", + Help: "Number of mirror-writes to secondary store", + }) + + m.mirrorFailuresCounter = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Name: "multikv_mirror_write_errors_total", + Help: "Number of failures to mirror-write to secondary store", + }) +} + func (m *MultiClient) updatePrimaryStoreGauge() { _, pkv := m.getPrimaryClient() @@ -210,15 +213,15 @@ func (m *MultiClient) updatePrimaryStoreGauge() { value = 1 } - primaryStoreGauge.WithLabelValues(kv.name).Set(value) + m.primaryStoreGauge.WithLabelValues(kv.name).Set(value) } } func (m *MultiClient) updateMirrorEnabledGauge() { if m.mirroringEnabled.Load() { - mirrorEnabledGauge.Set(1) + m.mirrorEnabledGauge.Set(1) } else { - mirrorEnabledGauge.Set(0) + m.mirrorEnabledGauge.Set(0) } } @@ -345,14 +348,14 @@ func (m *MultiClient) writeToSecondary(ctx context.Context, primary kvclient, ke continue } - mirrorWritesCounter.Inc() + m.mirrorWritesCounter.Inc() err := kvc.client.CAS(ctx, key, func(in interface{}) (out interface{}, retry bool, err error) { // try once return newValue, false, nil }) if err != nil { - mirrorFailuresCounter.Inc() + m.mirrorFailuresCounter.Inc() level.Warn(m.logger).Log("msg", "failed to update value in secondary store", "key", key, "err", err, "primary", primary.name, "secondary", kvc.name) } else { level.Debug(m.logger).Log("msg", "stored updated value to secondary store", "key", key, "primary", primary.name, "secondary", kvc.name)