From 0c25f04a0a026c5c4b8714437ab013676040194e Mon Sep 17 00:00:00 2001 From: Tyler Reid Date: Wed, 25 Aug 2021 08:51:52 -0500 Subject: [PATCH 1/7] WIP metrics refactor Signed-off-by: Tyler Reid --- kv/client.go | 2 +- kv/consul/metrics.go | 1 + kv/memberlist/kv_init_service.go | 8 ++- kv/memberlist/kv_init_service_test.go | 3 +- kv/memberlist/memberlist_client.go | 12 +++-- kv/memberlist/memberlist_client_test.go | 29 +++++------ kv/memberlist/metrics.go | 39 +++++++------- kv/memberlist/tcp_transport.go | 27 +++++----- kv/multi.go | 67 +++++++++++++------------ 9 files changed, 101 insertions(+), 87 deletions(-) diff --git a/kv/client.go b/kv/client.go index 2e9aa7ab4..b146a046a 100644 --- a/kv/client.go +++ b/kv/client.go @@ -205,5 +205,5 @@ func buildMultiClient(cfg StoreConfig, codec codec.Codec, reg prometheus.Registe {client: secondary, name: cfg.Multi.Secondary}, } - return NewMultiClient(cfg.Multi, clients, logger), nil + return NewMultiClient(cfg.Multi, clients, logger, reg), nil } diff --git a/kv/consul/metrics.go b/kv/consul/metrics.go index 0b2940567..364103adc 100644 --- a/kv/consul/metrics.go +++ b/kv/consul/metrics.go @@ -8,6 +8,7 @@ import ( "github.com/weaveworks/common/instrument" ) +// Needs a with call var consulRequestDuration = instrument.NewHistogramCollector(prometheus.NewHistogramVec(prometheus.HistogramOpts{ Name: "consul_request_duration_seconds", Help: "Time spent on consul requests.", diff --git a/kv/memberlist/kv_init_service.go b/kv/memberlist/kv_init_service.go index 01002fd53..ac8b9a6db 100644 --- a/kv/memberlist/kv_init_service.go +++ b/kv/memberlist/kv_init_service.go @@ -14,6 +14,7 @@ import ( "github.com/go-kit/kit/log" "github.com/hashicorp/memberlist" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/atomic" "github.com/grafana/dskit/services" @@ -27,6 +28,7 @@ type KVInitService struct { // config used for initialization cfg *KVConfig logger log.Logger + registerer prometheus.Registerer // init function, to avoid multiple initializations. init sync.Once @@ -37,11 +39,13 @@ type KVInitService struct { watcher *services.FailureWatcher } -func NewKVInitService(cfg *KVConfig, logger log.Logger) *KVInitService { +func NewKVInitService(cfg *KVConfig, logger log.Logger, registerer prometheus.Registerer) *KVInitService { kvinit := &KVInitService{ cfg: cfg, watcher: services.NewFailureWatcher(), logger: logger, + registerer: registerer, + } kvinit.Service = services.NewBasicService(nil, kvinit.running, kvinit.stopping).WithName("memberlist KV service") return kvinit @@ -50,7 +54,7 @@ func NewKVInitService(cfg *KVConfig, logger log.Logger) *KVInitService { // This method will initialize Memberlist.KV on first call, and add it to service failure watcher. func (kvs *KVInitService) GetMemberlistKV() (*KV, error) { kvs.init.Do(func() { - kv := NewKV(*kvs.cfg, kvs.logger) + kv := NewKV(*kvs.cfg, kvs.logger, kvs.registerer) kvs.watcher.WatchService(kv) kvs.err = kv.StartAsync(context.Background()) diff --git a/kv/memberlist/kv_init_service_test.go b/kv/memberlist/kv_init_service_test.go index 3f6b39c36..a54b74f78 100644 --- a/kv/memberlist/kv_init_service_test.go +++ b/kv/memberlist/kv_init_service_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/hashicorp/memberlist" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/grafana/dskit/flagext" @@ -56,6 +57,6 @@ func TestPage(t *testing.T) { func TestStop(t *testing.T) { var cfg KVConfig flagext.DefaultValues(&cfg) - kvinit := NewKVInitService(&cfg, nil) + kvinit := NewKVInitService(&cfg, nil, prometheus.NewRegistry()) require.NoError(t, kvinit.stopping(nil)) } diff --git a/kv/memberlist/memberlist_client.go b/kv/memberlist/memberlist_client.go index d4f6d0fcb..d8ac52cea 100644 --- a/kv/memberlist/memberlist_client.go +++ b/kv/memberlist/memberlist_client.go @@ -214,8 +214,9 @@ func generateRandomSuffix(logger log.Logger) string { type KV struct { services.Service - cfg KVConfig - logger log.Logger + cfg KVConfig + logger log.Logger + registerer prometheus.Registerer // dns discovery provider provider *dns.Provider @@ -328,7 +329,7 @@ var ( // gossiping part. Only after service is in Running state, it is really gossiping. Starting the service will also // trigger connecting to the existing memberlist cluster. If that fails and AbortIfJoinFails is true, error is returned // and service enters Failed state. -func NewKV(cfg KVConfig, logger log.Logger) *KV { +func NewKV(cfg KVConfig, logger log.Logger, registerer prometheus.Registerer) *KV { cfg.TCPTransport.MetricsRegisterer = cfg.MetricsRegisterer cfg.TCPTransport.MetricsNamespace = cfg.MetricsNamespace @@ -340,6 +341,7 @@ func NewKV(cfg KVConfig, logger log.Logger) *KV { mlkv := &KV{ cfg: cfg, logger: logger, + registerer: registerer, provider: dns.NewProvider(logger, mr, dns.GolangResolverType), store: make(map[string]valueDesc), codecs: make(map[string]codec.Codec), @@ -349,7 +351,7 @@ func NewKV(cfg KVConfig, logger log.Logger) *KV { maxCasRetries: maxCasRetries, } - mlkv.createAndRegisterMetrics() + mlkv.createAndRegisterMetrics(mlkv.registerer) for _, c := range cfg.Codecs { mlkv.codecs[c.CodecID()] = c @@ -364,7 +366,7 @@ func defaultMemberlistConfig() *memberlist.Config { } func (m *KV) buildMemberlistConfig() (*memberlist.Config, error) { - tr, err := NewTCPTransport(m.cfg.TCPTransport, m.logger) + tr, err := NewTCPTransport(m.cfg.TCPTransport, m.logger, m.registerer) if err != nil { return nil, fmt.Errorf("failed to create transport: %v", err) } diff --git a/kv/memberlist/memberlist_client_test.go b/kv/memberlist/memberlist_client_test.go index 11876c4ab..08b95a664 100644 --- a/kv/memberlist/memberlist_client_test.go +++ b/kv/memberlist/memberlist_client_test.go @@ -9,13 +9,14 @@ import ( "math" "math/rand" "net" - reflect "reflect" + "reflect" "sort" "sync" "testing" "time" "github.com/go-kit/kit/log" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -246,7 +247,7 @@ func TestBasicGetAndCas(t *testing.T) { } cfg.Codecs = []codec.Codec{c} - mkv := NewKV(cfg, log.NewNopLogger()) + mkv := NewKV(cfg, log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv)) defer services.StopAndAwaitTerminated(context.Background(), mkv) //nolint:errcheck @@ -302,7 +303,7 @@ func withFixtures(t *testing.T, testFN func(t *testing.T, kv *Client)) { cfg.TCPTransport = TCPTransportConfig{} cfg.Codecs = []codec.Codec{c} - mkv := NewKV(cfg, log.NewNopLogger()) + mkv := NewKV(cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry()) require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv)) defer services.StopAndAwaitTerminated(context.Background(), mkv) //nolint:errcheck @@ -446,7 +447,7 @@ func TestMultipleCAS(t *testing.T) { flagext.DefaultValues(&cfg) cfg.Codecs = []codec.Codec{c} - mkv := NewKV(cfg, log.NewNopLogger()) + mkv := NewKV(cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry()) mkv.maxCasRetries = 20 require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv)) defer services.StopAndAwaitTerminated(context.Background(), mkv) //nolint:errcheck @@ -548,7 +549,7 @@ func TestMultipleClients(t *testing.T) { cfg.Codecs = []codec.Codec{c} - mkv := NewKV(cfg, log.NewNopLogger()) + mkv := NewKV(cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry()) require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv)) kv, err := NewClient(mkv, c) @@ -701,7 +702,7 @@ func TestJoinMembersWithRetryBackoff(t *testing.T) { time.Sleep(1 * time.Second) } - mkv := NewKV(cfg, log.NewNopLogger()) // Not started yet. + mkv := NewKV(cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry()) // Not started yet. watcher.WatchService(mkv) kv, err := NewClient(mkv, c) @@ -775,7 +776,7 @@ func TestMemberlistFailsToJoin(t *testing.T) { cfg.Codecs = []codec.Codec{c} - mkv := NewKV(cfg, log.NewNopLogger()) + mkv := NewKV(cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry()) require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv)) ctxTimeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) @@ -946,7 +947,7 @@ func TestMultipleCodecs(t *testing.T) { distributedCounterCodec{}, } - mkv1 := NewKV(cfg, log.NewNopLogger()) + mkv1 := NewKV(cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry()) require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv1)) defer services.StopAndAwaitTerminated(context.Background(), mkv1) //nolint:errcheck @@ -989,7 +990,7 @@ func TestMultipleCodecs(t *testing.T) { require.NoError(t, err) // We will read values from second KV, which will join the first one - mkv2 := NewKV(cfg, log.NewNopLogger()) + mkv2 := NewKV(cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry()) require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv2)) defer services.StopAndAwaitTerminated(context.Background(), mkv2) //nolint:errcheck @@ -1041,11 +1042,11 @@ func TestRejoin(t *testing.T) { cfg2.JoinMembers = []string{fmt.Sprintf("localhost:%d", ports[0])} cfg2.RejoinInterval = 1 * time.Second - mkv1 := NewKV(cfg1, log.NewNopLogger()) + mkv1 := NewKV(cfg1, log.NewNopLogger(), prometheus.NewPedanticRegistry()) require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv1)) defer services.StopAndAwaitTerminated(context.Background(), mkv1) //nolint:errcheck - mkv2 := NewKV(cfg2, log.NewNopLogger()) + mkv2 := NewKV(cfg2, log.NewNopLogger(), prometheus.NewPedanticRegistry()) require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv2)) defer services.StopAndAwaitTerminated(context.Background(), mkv2) //nolint:errcheck @@ -1062,7 +1063,7 @@ func TestRejoin(t *testing.T) { poll(t, 5*time.Second, 1, membersFunc) // Let's start first KV again. It is not configured to join the cluster, but KV2 is rejoining. - mkv1 = NewKV(cfg1, log.NewNopLogger()) + mkv1 = NewKV(cfg1, log.NewNopLogger(), prometheus.NewPedanticRegistry()) require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv1)) defer services.StopAndAwaitTerminated(context.Background(), mkv1) //nolint:errcheck @@ -1094,7 +1095,7 @@ func TestNotifyMsgResendsOnlyChanges(t *testing.T) { cfg.RetransmitMult = 1 cfg.Codecs = append(cfg.Codecs, codec) - kv := NewKV(cfg, log.NewNopLogger()) + kv := NewKV(cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry()) require.NoError(t, services.StartAndAwaitRunning(context.Background(), kv)) defer services.StopAndAwaitTerminated(context.Background(), kv) //nolint:errcheck @@ -1157,7 +1158,7 @@ func TestSendingOldTombstoneShouldNotForwardMessage(t *testing.T) { cfg.LeftIngestersTimeout = 5 * time.Minute cfg.Codecs = append(cfg.Codecs, codec) - kv := NewKV(cfg, log.NewNopLogger()) + kv := NewKV(cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry()) require.NoError(t, services.StartAndAwaitRunning(context.Background(), kv)) defer services.StopAndAwaitTerminated(context.Background(), kv) //nolint:errcheck diff --git a/kv/memberlist/metrics.go b/kv/memberlist/metrics.go index 010288fba..1ca530829 100644 --- a/kv/memberlist/metrics.go +++ b/kv/memberlist/metrics.go @@ -7,63 +7,64 @@ import ( armonprometheus "github.com/armon/go-metrics/prometheus" "github.com/go-kit/kit/log/level" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/grafana/dskit/services" ) -func (m *KV) createAndRegisterMetrics() { +func (m *KV) createAndRegisterMetrics(registerer prometheus.Registerer) { const subsystem = "memberlist_client" - m.numberOfReceivedMessages = prometheus.NewCounter(prometheus.CounterOpts{ + m.numberOfReceivedMessages = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "received_broadcasts_total", Help: "Number of received broadcast user messages", }) - m.totalSizeOfReceivedMessages = prometheus.NewCounter(prometheus.CounterOpts{ + m.totalSizeOfReceivedMessages = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "received_broadcasts_bytes_total", Help: "Total size of received broadcast user messages", }) - m.numberOfInvalidReceivedMessages = prometheus.NewCounter(prometheus.CounterOpts{ + m.numberOfInvalidReceivedMessages = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "received_broadcasts_invalid_total", Help: "Number of received broadcast user messages that were invalid. Hopefully 0.", }) - m.numberOfPushes = prometheus.NewCounter(prometheus.CounterOpts{ + m.numberOfPushes = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "state_pushes_total", Help: "How many times did this node push its full state to another node", }) - m.totalSizeOfPushes = prometheus.NewCounter(prometheus.CounterOpts{ + m.totalSizeOfPushes = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "state_pushes_bytes_total", Help: "Total size of pushed state", }) - m.numberOfPulls = prometheus.NewCounter(prometheus.CounterOpts{ + m.numberOfPulls = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "state_pulls_total", Help: "How many times did this node pull full state from another node", }) - m.totalSizeOfPulls = prometheus.NewCounter(prometheus.CounterOpts{ + m.totalSizeOfPulls = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "state_pulls_bytes_total", Help: "Total size of pulled state", }) - m.numberOfBroadcastMessagesInQueue = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + m.numberOfBroadcastMessagesInQueue = promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "messages_in_broadcast_queue", @@ -76,35 +77,35 @@ func (m *KV) createAndRegisterMetrics() { return 0 }) - m.totalSizeOfBroadcastMessagesInQueue = prometheus.NewGauge(prometheus.GaugeOpts{ + m.totalSizeOfBroadcastMessagesInQueue = promauto.With(registerer).NewGauge(prometheus.GaugeOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "messages_in_broadcast_queue_bytes", Help: "Total size of messages waiting in the broadcast queue", }) - m.numberOfBroadcastMessagesDropped = prometheus.NewCounter(prometheus.CounterOpts{ + m.numberOfBroadcastMessagesDropped = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "messages_to_broadcast_dropped_total", Help: "Number of broadcast messages intended to be sent but were dropped due to encoding errors or for being too big", }) - m.casAttempts = prometheus.NewCounter(prometheus.CounterOpts{ + m.casAttempts = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "cas_attempt_total", Help: "Attempted CAS operations", }) - m.casSuccesses = prometheus.NewCounter(prometheus.CounterOpts{ + m.casSuccesses = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "cas_success_total", Help: "Successful CAS operations", }) - m.casFailures = prometheus.NewCounter(prometheus.CounterOpts{ + m.casFailures = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "cas_failure_total", @@ -116,21 +117,21 @@ func (m *KV) createAndRegisterMetrics() { "Number of values in KV Store", nil, nil) - m.storeTombstones = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + m.storeTombstones = promauto.With(registerer).NewGaugeVec(prometheus.GaugeOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "kv_store_value_tombstones", Help: "Number of tombstones currently present in KV store values", }, []string{"key"}) - m.storeRemovedTombstones = prometheus.NewCounterVec(prometheus.CounterOpts{ + m.storeRemovedTombstones = promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "kv_store_value_tombstones_removed_total", Help: "Total number of tombstones which have been removed from KV store values", }, []string{"key"}) - m.memberlistMembersCount = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + m.memberlistMembersCount = promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "cluster_members_count", @@ -143,7 +144,7 @@ func (m *KV) createAndRegisterMetrics() { return 0 }) - m.memberlistHealthScore = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + m.memberlistHealthScore = promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "cluster_node_health_score", @@ -156,7 +157,7 @@ func (m *KV) createAndRegisterMetrics() { return 0 }) - m.watchPrefixDroppedNotifications = prometheus.NewCounterVec(prometheus.CounterOpts{ + m.watchPrefixDroppedNotifications = promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "watch_prefix_dropped_notifications", diff --git a/kv/memberlist/tcp_transport.go b/kv/memberlist/tcp_transport.go index 1d50032cd..7c4962b1e 100644 --- a/kv/memberlist/tcp_transport.go +++ b/kv/memberlist/tcp_transport.go @@ -18,6 +18,7 @@ import ( "github.com/hashicorp/memberlist" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "go.uber.org/atomic" "github.com/grafana/dskit/flagext" @@ -106,7 +107,7 @@ type TCPTransport struct { // NewTCPTransport returns a new tcp-based transport with the given configuration. On // success all the network listeners will be created and listening. -func NewTCPTransport(config TCPTransportConfig, logger log.Logger) (*TCPTransport, error) { +func NewTCPTransport(config TCPTransportConfig, logger log.Logger, registerer prometheus.Registerer) (*TCPTransport, error) { if len(config.BindAddrs) == 0 { config.BindAddrs = []string{zeroZeroZeroZero} } @@ -128,7 +129,7 @@ func NewTCPTransport(config TCPTransportConfig, logger log.Logger) (*TCPTranspor } } - t.registerMetrics() + t.registerMetrics(registerer) // Clean up listeners if there's an error. defer func() { @@ -545,73 +546,73 @@ func (t *TCPTransport) Shutdown() error { return nil } -func (t *TCPTransport) registerMetrics() { +func (t *TCPTransport) registerMetrics(registerer prometheus.Registerer) { const subsystem = "memberlist_tcp_transport" - t.incomingStreams = prometheus.NewCounter(prometheus.CounterOpts{ + t.incomingStreams = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Namespace: t.cfg.MetricsNamespace, Subsystem: subsystem, Name: "incoming_streams_total", Help: "Number of incoming memberlist streams", }) - t.outgoingStreams = prometheus.NewCounter(prometheus.CounterOpts{ + t.outgoingStreams = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Namespace: t.cfg.MetricsNamespace, Subsystem: subsystem, Name: "outgoing_streams_total", Help: "Number of outgoing streams", }) - t.outgoingStreamErrors = prometheus.NewCounter(prometheus.CounterOpts{ + t.outgoingStreamErrors = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Namespace: t.cfg.MetricsNamespace, Subsystem: subsystem, Name: "outgoing_stream_errors_total", Help: "Number of errors when opening memberlist stream to another node", }) - t.receivedPackets = prometheus.NewCounter(prometheus.CounterOpts{ + t.receivedPackets = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Namespace: t.cfg.MetricsNamespace, Subsystem: subsystem, Name: "packets_received_total", Help: "Number of received memberlist packets", }) - t.receivedPacketsBytes = prometheus.NewCounter(prometheus.CounterOpts{ + t.receivedPacketsBytes = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Namespace: t.cfg.MetricsNamespace, Subsystem: subsystem, Name: "packets_received_bytes_total", Help: "Total bytes received as packets", }) - t.receivedPacketsErrors = prometheus.NewCounter(prometheus.CounterOpts{ + t.receivedPacketsErrors = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Namespace: t.cfg.MetricsNamespace, Subsystem: subsystem, Name: "packets_received_errors_total", Help: "Number of errors when receiving memberlist packets", }) - t.sentPackets = prometheus.NewCounter(prometheus.CounterOpts{ + t.sentPackets = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Namespace: t.cfg.MetricsNamespace, Subsystem: subsystem, Name: "packets_sent_total", Help: "Number of memberlist packets sent", }) - t.sentPacketsBytes = prometheus.NewCounter(prometheus.CounterOpts{ + t.sentPacketsBytes = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Namespace: t.cfg.MetricsNamespace, Subsystem: subsystem, Name: "packets_sent_bytes_total", Help: "Total bytes sent as packets", }) - t.sentPacketsErrors = prometheus.NewCounter(prometheus.CounterOpts{ + t.sentPacketsErrors = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Namespace: t.cfg.MetricsNamespace, Subsystem: subsystem, Name: "packets_sent_errors_total", Help: "Number of errors when sending memberlist packets", }) - t.unknownConnections = prometheus.NewCounter(prometheus.CounterOpts{ + t.unknownConnections = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Namespace: t.cfg.MetricsNamespace, Subsystem: subsystem, Name: "unknown_connections_total", diff --git a/kv/multi.go b/kv/multi.go index e750e9aa4..233b3a1aa 100644 --- a/kv/multi.go +++ b/kv/multi.go @@ -10,35 +10,10 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "go.uber.org/atomic" ) -var ( - primaryStoreGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Name: "multikv_primary_store", - Help: "Selected primary KV store", - }, []string{"store"}) - - mirrorEnabledGauge = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "multikv_mirror_enabled", - Help: "Is mirroring to secondary store enabled", - }) - - mirrorWritesCounter = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "multikv_mirror_writes_total", - Help: "Number of mirror-writes to secondary store", - }) - - mirrorFailuresCounter = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "multikv_mirror_write_errors_total", - Help: "Number of failures to mirror-write to secondary store", - }) -) - -func init() { - prometheus.MustRegister(primaryStoreGauge, mirrorEnabledGauge, mirrorWritesCounter, mirrorFailuresCounter) -} - // MultiConfig is a configuration for MultiClient. type MultiConfig struct { Primary string `yaml:"primary"` @@ -102,11 +77,16 @@ type MultiClient struct { // so we use this map instead. inProgress map[int]clientInProgress inProgressCnt int + + primaryStoreGauge *prometheus.GaugeVec + mirrorEnabledGauge prometheus.Gauge + mirrorWritesCounter prometheus.Counter + mirrorFailuresCounter prometheus.Counter } // NewMultiClient creates new MultiClient with given KV Clients. // First client in the slice is the primary client. -func NewMultiClient(cfg MultiConfig, clients []kvclient, logger log.Logger) *MultiClient { +func NewMultiClient(cfg MultiConfig, clients []kvclient, logger log.Logger, registerer prometheus.Registerer) *MultiClient { c := &MultiClient{ clients: clients, primaryID: atomic.NewInt32(0), @@ -125,6 +105,7 @@ func NewMultiClient(cfg MultiConfig, clients []kvclient, logger log.Logger) *Mul go c.watchConfigChannel(ctx, cfg.ConfigProvider()) } + c.registerMetrics(registerer) c.updatePrimaryStoreGauge() c.updateMirrorEnabledGauge() return c @@ -201,6 +182,28 @@ func (m *MultiClient) setNewPrimaryClient(store string) (bool, error) { return true, nil } +func (m *MultiClient) registerMetrics(registerer prometheus.Registerer) { + m.primaryStoreGauge = promauto.With(registerer).NewGaugeVec(prometheus.GaugeOpts{ + Name: "multikv_primary_store", + Help: "Selected primary KV store", + }, []string{"store"}) + + m.mirrorEnabledGauge = promauto.With(registerer).NewGauge(prometheus.GaugeOpts{ + Name: "multikv_mirror_enabled", + Help: "Is mirroring to secondary store enabled", + }) + + m.mirrorWritesCounter = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Name: "multikv_mirror_writes_total", + Help: "Number of mirror-writes to secondary store", + }) + + m.mirrorFailuresCounter = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Name: "multikv_mirror_write_errors_total", + Help: "Number of failures to mirror-write to secondary store", + }) +} + func (m *MultiClient) updatePrimaryStoreGauge() { _, pkv := m.getPrimaryClient() @@ -210,15 +213,15 @@ func (m *MultiClient) updatePrimaryStoreGauge() { value = 1 } - primaryStoreGauge.WithLabelValues(kv.name).Set(value) + m.primaryStoreGauge.WithLabelValues(kv.name).Set(value) } } func (m *MultiClient) updateMirrorEnabledGauge() { if m.mirroringEnabled.Load() { - mirrorEnabledGauge.Set(1) + m.mirrorEnabledGauge.Set(1) } else { - mirrorEnabledGauge.Set(0) + m.mirrorEnabledGauge.Set(0) } } @@ -345,14 +348,14 @@ func (m *MultiClient) writeToSecondary(ctx context.Context, primary kvclient, ke continue } - mirrorWritesCounter.Inc() + m.mirrorWritesCounter.Inc() err := kvc.client.CAS(ctx, key, func(in interface{}) (out interface{}, retry bool, err error) { // try once return newValue, false, nil }) if err != nil { - mirrorFailuresCounter.Inc() + m.mirrorFailuresCounter.Inc() level.Warn(m.logger).Log("msg", "failed to update value in secondary store", "key", key, "err", err, "primary", primary.name, "secondary", kvc.name) } else { level.Debug(m.logger).Log("msg", "stored updated value to secondary store", "key", key, "primary", primary.name, "secondary", kvc.name) From 53d26a927987390ef3dc3b767cbde3878c50dd07 Mon Sep 17 00:00:00 2001 From: Tyler Reid Date: Wed, 25 Aug 2021 16:22:27 -0500 Subject: [PATCH 2/7] Only check histograms for roles in unit test Signed-off-by: Tyler Reid --- kv/client_test.go | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/kv/client_test.go b/kv/client_test.go index a244cd7ce..4d52adeab 100644 --- a/kv/client_test.go +++ b/kv/client_test.go @@ -70,7 +70,7 @@ func Test_createClient_singleBackend_mustContainRoleAndTypeLabels(t *testing.T) return })) - actual := typeToRoleMap(t, reg) + actual := typeToRoleMapHistogramLabels(t, reg) require.Len(t, actual, 1) require.Equal(t, "primary", actual["mock"]) } @@ -88,7 +88,7 @@ func Test_createClient_multiBackend_mustContainRoleAndTypeLabels(t *testing.T) { return })) - actual := typeToRoleMap(t, reg) + actual := typeToRoleMapHistogramLabels(t, reg) // expected multi-primary, inmemory-primary and mock-secondary require.Len(t, actual, 3) require.Equal(t, "primary", actual["multi"]) @@ -97,24 +97,26 @@ func Test_createClient_multiBackend_mustContainRoleAndTypeLabels(t *testing.T) { } -func typeToRoleMap(t *testing.T, reg prometheus.Gatherer) map[string]string { +func typeToRoleMapHistogramLabels(t *testing.T, reg prometheus.Gatherer) map[string]string { mfs, err := reg.Gather() require.NoError(t, err) result := map[string]string{} for _, mf := range mfs { for _, m := range mf.GetMetric() { - backendType := "" - role := "" - for _, l := range m.GetLabel() { - if l.GetName() == "role" { - role = l.GetValue() - } else if l.GetName() == "type" { - backendType = l.GetValue() + if m.GetHistogram() != nil { + backendType := "" + role := "" + for _, l := range m.GetLabel() { + if l.GetName() == "role" { + role = l.GetValue() + } else if l.GetName() == "type" { + backendType = l.GetValue() + } } + require.NotEmpty(t, backendType) + require.NotEmpty(t, role) + result[backendType] = role } - require.NotEmpty(t, backendType) - require.NotEmpty(t, role) - result[backendType] = role } } return result From dd82f2214b1788f7c1dd55df7b0641972e2d1b77 Mon Sep 17 00:00:00 2001 From: Tyler Reid Date: Wed, 25 Aug 2021 17:45:50 -0500 Subject: [PATCH 3/7] a better run at refactoring consul metrics Signed-off-by: Tyler Reid --- kv/client.go | 4 ++-- kv/client_test.go | 10 +++++----- kv/consul/client.go | 29 +++++++++++++++++++---------- kv/consul/client_test.go | 9 +++++---- kv/consul/metrics.go | 23 ++++++----------------- kv/consul/mock.go | 24 +++++++++++++++++------- kv/kv_test.go | 15 ++++++++------- 7 files changed, 62 insertions(+), 52 deletions(-) diff --git a/kv/client.go b/kv/client.go index 647e1e00f..3a8f410b8 100644 --- a/kv/client.go +++ b/kv/client.go @@ -129,7 +129,7 @@ func createClient(backend string, prefix string, cfg StoreConfig, codec codec.Co switch backend { case "consul": - client, err = consul.NewClient(cfg.Consul, codec, logger) + client, err = consul.NewClient(cfg.Consul, codec, logger, reg) case "etcd": client, err = etcd.New(cfg.Etcd, codec, logger) @@ -138,7 +138,7 @@ func createClient(backend string, prefix string, cfg StoreConfig, codec codec.Co // If we use the in-memory store, make sure everyone gets the same instance // within the same process. inmemoryStoreInit.Do(func() { - inmemoryStore, _ = consul.NewInMemoryClient(codec, logger) + inmemoryStore, _ = consul.NewInMemoryClient(codec, logger, reg) }) client = inmemoryStore diff --git a/kv/client_test.go b/kv/client_test.go index 4d52adeab..a56ba89ed 100644 --- a/kv/client_test.go +++ b/kv/client_test.go @@ -70,7 +70,7 @@ func Test_createClient_singleBackend_mustContainRoleAndTypeLabels(t *testing.T) return })) - actual := typeToRoleMapHistogramLabels(t, reg) + actual := typeToRoleMapHistogramLabels(t, reg, "kv_request_duration_seconds") require.Len(t, actual, 1) require.Equal(t, "primary", actual["mock"]) } @@ -88,7 +88,7 @@ func Test_createClient_multiBackend_mustContainRoleAndTypeLabels(t *testing.T) { return })) - actual := typeToRoleMapHistogramLabels(t, reg) + actual := typeToRoleMapHistogramLabels(t, reg, "kv_request_duration_seconds") // expected multi-primary, inmemory-primary and mock-secondary require.Len(t, actual, 3) require.Equal(t, "primary", actual["multi"]) @@ -97,13 +97,13 @@ func Test_createClient_multiBackend_mustContainRoleAndTypeLabels(t *testing.T) { } -func typeToRoleMapHistogramLabels(t *testing.T, reg prometheus.Gatherer) map[string]string { +func typeToRoleMapHistogramLabels(t *testing.T, reg prometheus.Gatherer, histogramWithRoleLabels string) map[string]string { mfs, err := reg.Gather() require.NoError(t, err) result := map[string]string{} for _, mf := range mfs { - for _, m := range mf.GetMetric() { - if m.GetHistogram() != nil { + if mf.GetName() == histogramWithRoleLabels { + for _, m := range mf.GetMetric() { backendType := "" role := "" for _, l := range m.GetLabel() { diff --git a/kv/consul/client.go b/kv/consul/client.go index 8117e37af..d8a69c833 100644 --- a/kv/consul/client.go +++ b/kv/consul/client.go @@ -13,6 +13,8 @@ import ( "github.com/go-kit/kit/log/level" consul "github.com/hashicorp/consul/api" "github.com/hashicorp/go-cleanhttp" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/weaveworks/common/instrument" "golang.org/x/time/rate" @@ -61,9 +63,10 @@ type kv interface { // Client is a KV.Client for Consul. type Client struct { kv - codec codec.Codec - cfg Config - logger log.Logger + codec codec.Codec + cfg Config + logger log.Logger + consulRequestDuration *instrument.HistogramCollector } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -78,7 +81,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, prefix string) { } // NewClient returns a new Client. -func NewClient(cfg Config, codec codec.Codec, logger log.Logger) (*Client, error) { +func NewClient(cfg Config, codec codec.Codec, logger log.Logger, registerer prometheus.Registerer) (*Client, error) { client, err := consul.NewClient(&consul.Config{ Address: cfg.Host, Token: cfg.ACLToken, @@ -92,11 +95,17 @@ func NewClient(cfg Config, codec codec.Codec, logger log.Logger) (*Client, error if err != nil { return nil, err } + consulRequestDurationCollector := instrument.NewHistogramCollector(promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{ + Name: "consul_request_duration_seconds", + Help: "Time spent on consul requests.", + Buckets: prometheus.DefBuckets, + }, []string{"operation", "status_code"})) c := &Client{ - kv: consulMetrics{client.KV()}, - codec: codec, - cfg: cfg, - logger: logger, + kv: consulMetrics{client.KV(), consulRequestDurationCollector}, + codec: codec, + cfg: cfg, + logger: logger, + consulRequestDuration: consulRequestDurationCollector, } return c, nil } @@ -108,7 +117,7 @@ func (c *Client) Put(ctx context.Context, key string, value interface{}) error { return err } - return instrument.CollectedRequest(ctx, "Put", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + return instrument.CollectedRequest(ctx, "Put", c.consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { _, err := c.kv.Put(&consul.KVPair{ Key: key, Value: bytes, @@ -120,7 +129,7 @@ func (c *Client) Put(ctx context.Context, key string, value interface{}) error { // CAS atomically modifies a value in a callback. // If value doesn't exist you'll get nil as an argument to your callback. func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error { - return instrument.CollectedRequest(ctx, "CAS loop", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + return instrument.CollectedRequest(ctx, "CAS loop", c.consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { return c.cas(ctx, key, f) }) } diff --git a/kv/consul/client_test.go b/kv/consul/client_test.go index 52e429dfc..1f1f15df9 100644 --- a/kv/consul/client_test.go +++ b/kv/consul/client_test.go @@ -8,6 +8,7 @@ import ( "time" consul "github.com/hashicorp/consul/api" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -33,7 +34,7 @@ func TestWatchKeyWithRateLimit(t *testing.T) { c, closer := NewInMemoryClientWithConfig(codec.String{}, Config{ WatchKeyRateLimit: 5.0, WatchKeyBurstSize: 1, - }, testLogger{}) + }, testLogger{}, prometheus.NewRegistry()) t.Cleanup(func() { assert.NoError(t, closer.Close()) }) @@ -67,7 +68,7 @@ func TestWatchKeyWithRateLimit(t *testing.T) { func TestWatchKeyNoRateLimit(t *testing.T) { c, closer := NewInMemoryClientWithConfig(codec.String{}, Config{ WatchKeyRateLimit: 0, - }, testLogger{}) + }, testLogger{}, prometheus.NewRegistry()) t.Cleanup(func() { assert.NoError(t, closer.Close()) }) @@ -89,7 +90,7 @@ func TestWatchKeyNoRateLimit(t *testing.T) { } func TestReset(t *testing.T) { - c, closer := NewInMemoryClient(codec.String{}, testLogger{}) + c, closer := NewInMemoryClient(codec.String{}, testLogger{}, prometheus.NewRegistry()) t.Cleanup(func() { assert.NoError(t, closer.Close()) }) @@ -148,7 +149,7 @@ func observeValueForSomeTime(t *testing.T, client *Client, key string, timeout t } func TestWatchKeyWithNoStartValue(t *testing.T) { - c, closer := NewInMemoryClient(codec.String{}, testLogger{}) + c, closer := NewInMemoryClient(codec.String{}, testLogger{}, prometheus.NewRegistry()) t.Cleanup(func() { assert.NoError(t, closer.Close()) }) diff --git a/kv/consul/metrics.go b/kv/consul/metrics.go index 364103adc..e53523ad2 100644 --- a/kv/consul/metrics.go +++ b/kv/consul/metrics.go @@ -4,29 +4,18 @@ import ( "context" consul "github.com/hashicorp/consul/api" - "github.com/prometheus/client_golang/prometheus" "github.com/weaveworks/common/instrument" ) -// Needs a with call -var consulRequestDuration = instrument.NewHistogramCollector(prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Name: "consul_request_duration_seconds", - Help: "Time spent on consul requests.", - Buckets: prometheus.DefBuckets, -}, []string{"operation", "status_code"})) - -func init() { - consulRequestDuration.Register() -} - type consulMetrics struct { kv + consulRequestDuration *instrument.HistogramCollector } func (c consulMetrics) CAS(p *consul.KVPair, options *consul.WriteOptions) (bool, *consul.WriteMeta, error) { var ok bool var result *consul.WriteMeta - err := instrument.CollectedRequest(options.Context(), "CAS", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + err := instrument.CollectedRequest(options.Context(), "CAS", c.consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { options = options.WithContext(ctx) var err error ok, result, err = c.kv.CAS(p, options) @@ -38,7 +27,7 @@ func (c consulMetrics) CAS(p *consul.KVPair, options *consul.WriteOptions) (bool func (c consulMetrics) Get(key string, options *consul.QueryOptions) (*consul.KVPair, *consul.QueryMeta, error) { var kvp *consul.KVPair var meta *consul.QueryMeta - err := instrument.CollectedRequest(options.Context(), "Get", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + err := instrument.CollectedRequest(options.Context(), "Get", c.consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { options = options.WithContext(ctx) var err error kvp, meta, err = c.kv.Get(key, options) @@ -50,7 +39,7 @@ func (c consulMetrics) Get(key string, options *consul.QueryOptions) (*consul.KV func (c consulMetrics) List(path string, options *consul.QueryOptions) (consul.KVPairs, *consul.QueryMeta, error) { var kvps consul.KVPairs var meta *consul.QueryMeta - err := instrument.CollectedRequest(options.Context(), "List", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + err := instrument.CollectedRequest(options.Context(), "List", c.consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { options = options.WithContext(ctx) var err error kvps, meta, err = c.kv.List(path, options) @@ -61,7 +50,7 @@ func (c consulMetrics) List(path string, options *consul.QueryOptions) (consul.K func (c consulMetrics) Delete(key string, options *consul.WriteOptions) (*consul.WriteMeta, error) { var meta *consul.WriteMeta - err := instrument.CollectedRequest(options.Context(), "Delete", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + err := instrument.CollectedRequest(options.Context(), "Delete", c.consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { options = options.WithContext(ctx) var err error meta, err = c.kv.Delete(key, options) @@ -72,7 +61,7 @@ func (c consulMetrics) Delete(key string, options *consul.WriteOptions) (*consul func (c consulMetrics) Put(p *consul.KVPair, options *consul.WriteOptions) (*consul.WriteMeta, error) { var result *consul.WriteMeta - err := instrument.CollectedRequest(options.Context(), "Put", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + err := instrument.CollectedRequest(options.Context(), "Put", c.consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { options = options.WithContext(ctx) var err error result, err = c.kv.Put(p, options) diff --git a/kv/consul/mock.go b/kv/consul/mock.go index d22381bfe..e1e50b62a 100644 --- a/kv/consul/mock.go +++ b/kv/consul/mock.go @@ -10,6 +10,9 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" consul "github.com/hashicorp/consul/api" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/weaveworks/common/instrument" "github.com/grafana/dskit/closer" "github.com/grafana/dskit/kv/codec" @@ -28,12 +31,18 @@ type mockKV struct { } // NewInMemoryClient makes a new mock consul client. -func NewInMemoryClient(codec codec.Codec, logger log.Logger) (*Client, io.Closer) { - return NewInMemoryClientWithConfig(codec, Config{}, logger) +func NewInMemoryClient(codec codec.Codec, logger log.Logger, registerer prometheus.Registerer) (*Client, io.Closer) { + return NewInMemoryClientWithConfig(codec, Config{}, logger, registerer) } // NewInMemoryClientWithConfig makes a new mock consul client with supplied Config. -func NewInMemoryClientWithConfig(codec codec.Codec, cfg Config, logger log.Logger) (*Client, io.Closer) { +func NewInMemoryClientWithConfig(codec codec.Codec, cfg Config, logger log.Logger, registerer prometheus.Registerer) (*Client, io.Closer) { + consulRequestDurationCollector := instrument.NewHistogramCollector(promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{ + Name: "consul_request_duration_seconds", + Help: "Time spent on consul requests.", + Buckets: prometheus.DefBuckets, + }, []string{"operation", "status_code"})) + m := mockKV{ kvps: map[string]*consul.KVPair{}, // Always start from 1, we NEVER want to report back index 0 in the responses. @@ -58,10 +67,11 @@ func NewInMemoryClientWithConfig(codec codec.Codec, cfg Config, logger log.Logge go m.loop() return &Client{ - kv: &m, - codec: codec, - cfg: cfg, - logger: logger, + kv: &m, + codec: codec, + cfg: cfg, + logger: logger, + consulRequestDuration: consulRequestDurationCollector, }, closer } diff --git a/kv/kv_test.go b/kv/kv_test.go index fe57d3882..8ce50fd27 100644 --- a/kv/kv_test.go +++ b/kv/kv_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/grafana/dskit/kv/codec" @@ -17,7 +18,7 @@ import ( "github.com/grafana/dskit/kv/etcd" ) -func withFixtures(t *testing.T, f func(*testing.T, Client)) { +func withFixtures(t *testing.T, f func(*testing.T, Client), registerer prometheus.Registerer) { t.Helper() for _, fixture := range []struct { @@ -25,7 +26,7 @@ func withFixtures(t *testing.T, f func(*testing.T, Client)) { factory func() (Client, io.Closer, error) }{ {"consul", func() (Client, io.Closer, error) { - client, closer := consul.NewInMemoryClient(codec.String{}, testLogger{}) + client, closer := consul.NewInMemoryClient(codec.String{}, testLogger{}, registerer) return client, closer, nil }}, {"etcd", func() (Client, io.Closer, error) { @@ -69,7 +70,7 @@ func TestCAS(t *testing.T) { value, err := client.Get(ctx, key) require.NoError(t, err) require.EqualValues(t, "10", value) - }) + }, prometheus.NewRegistry()) } // TestNilCAS ensures we can return nil from the CAS callback when we don't @@ -93,7 +94,7 @@ func TestNilCAS(t *testing.T) { value, err := client.Get(ctx, key) require.NoError(t, err) require.EqualValues(t, "0", value) - }) + }, prometheus.NewRegistry()) } func TestWatchKey(t *testing.T) { @@ -165,7 +166,7 @@ func TestWatchKey(t *testing.T) { if observedCount < expectedFactor*max { t.Errorf("expected at least %.0f%% observed values, got %.0f%% (observed count: %d)", 100*expectedFactor, 100*float64(observedCount)/max, observedCount) } - }) + }, prometheus.NewRegistry()) } func TestWatchPrefix(t *testing.T) { @@ -258,7 +259,7 @@ func TestWatchPrefix(t *testing.T) { if len(observedKeys) > 0 { t.Errorf("unexpected keys reported: %v", observedKeys) } - }) + }, prometheus.NewRegistry()) } // TestList makes sure stored keys are listed back. @@ -278,5 +279,5 @@ func TestList(t *testing.T) { sort.Strings(storedKeys) require.Equal(t, keysToCreate, storedKeys) - }) + }, prometheus.NewRegistry()) } From 59e1c092c8e53d5b81cb6aa3f41e2e62966cbd0b Mon Sep 17 00:00:00 2001 From: Tyler Reid Date: Thu, 26 Aug 2021 10:33:55 -0500 Subject: [PATCH 4/7] Metrics cleanup Signed-off-by: Tyler Reid --- kv/client_test.go | 8 +-- kv/consul/client.go | 30 +++++------- kv/consul/client_test.go | 8 +-- kv/consul/metrics.go | 38 ++++++++++----- kv/consul/mock.go | 17 ++----- kv/kv_test.go | 15 +++--- kv/kvtls/test/tls_integration_test.go | 2 +- kv/memberlist/kv_init_service.go | 4 +- kv/memberlist/kv_init_service_test.go | 2 +- kv/memberlist/memberlist_client.go | 6 +-- kv/memberlist/memberlist_client_test.go | 26 +++++----- kv/memberlist/metrics.go | 65 ++++++++----------------- kv/memberlist/tcp_transport.go | 22 +-------- 13 files changed, 101 insertions(+), 142 deletions(-) diff --git a/kv/client_test.go b/kv/client_test.go index a56ba89ed..88fd94bfe 100644 --- a/kv/client_test.go +++ b/kv/client_test.go @@ -39,7 +39,7 @@ multi: func Test_createClient_multiBackend_withSingleRing(t *testing.T) { storeCfg, testCodec := newConfigsForTest() require.NotPanics(t, func() { - _, err := createClient("multi", "/collector", storeCfg, testCodec, Primary, prometheus.NewRegistry(), testLogger{}) + _, err := createClient("multi", "/collector", storeCfg, testCodec, Primary, prometheus.NewPedanticRegistry(), testLogger{}) require.NoError(t, err) }) } @@ -47,7 +47,7 @@ func Test_createClient_multiBackend_withSingleRing(t *testing.T) { func Test_createClient_multiBackend_withMultiRing(t *testing.T) { storeCfg1, testCodec := newConfigsForTest() storeCfg2 := StoreConfig{} - reg := prometheus.NewRegistry() + reg := prometheus.NewPedanticRegistry() require.NotPanics(t, func() { _, err := createClient("multi", "/test", storeCfg1, testCodec, Primary, reg, testLogger{}) @@ -61,7 +61,7 @@ func Test_createClient_multiBackend_withMultiRing(t *testing.T) { func Test_createClient_singleBackend_mustContainRoleAndTypeLabels(t *testing.T) { storeCfg, testCodec := newConfigsForTest() - reg := prometheus.NewRegistry() + reg := prometheus.NewPedanticRegistry() client, err := createClient("mock", "/test1", storeCfg, testCodec, Primary, reg, testLogger{}) require.NoError(t, err) require.NoError(t, client.CAS(context.Background(), "/test", func(_ interface{}) (out interface{}, retry bool, err error) { @@ -79,7 +79,7 @@ func Test_createClient_multiBackend_mustContainRoleAndTypeLabels(t *testing.T) { storeCfg, testCodec := newConfigsForTest() storeCfg.Multi.MirrorEnabled = true storeCfg.Multi.MirrorTimeout = 10 * time.Second - reg := prometheus.NewRegistry() + reg := prometheus.NewPedanticRegistry() client, err := createClient("multi", "/test1", storeCfg, testCodec, Primary, reg, testLogger{}) require.NoError(t, err) require.NoError(t, client.CAS(context.Background(), "/test", func(_ interface{}) (out interface{}, retry bool, err error) { diff --git a/kv/consul/client.go b/kv/consul/client.go index d8a69c833..9ce6d91c5 100644 --- a/kv/consul/client.go +++ b/kv/consul/client.go @@ -14,7 +14,6 @@ import ( consul "github.com/hashicorp/consul/api" "github.com/hashicorp/go-cleanhttp" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/weaveworks/common/instrument" "golang.org/x/time/rate" @@ -63,10 +62,10 @@ type kv interface { // Client is a KV.Client for Consul. type Client struct { kv - codec codec.Codec - cfg Config - logger log.Logger - consulRequestDuration *instrument.HistogramCollector + codec codec.Codec + cfg Config + logger log.Logger + consulMetrics *consulMetrics } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -95,17 +94,14 @@ func NewClient(cfg Config, codec codec.Codec, logger log.Logger, registerer prom if err != nil { return nil, err } - consulRequestDurationCollector := instrument.NewHistogramCollector(promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{ - Name: "consul_request_duration_seconds", - Help: "Time spent on consul requests.", - Buckets: prometheus.DefBuckets, - }, []string{"operation", "status_code"})) + consulMetrics := newConsulMetrics(registerer) + c := &Client{ - kv: consulMetrics{client.KV(), consulRequestDurationCollector}, - codec: codec, - cfg: cfg, - logger: logger, - consulRequestDuration: consulRequestDurationCollector, + kv: consulInstrumentation{client.KV(), consulMetrics}, + codec: codec, + cfg: cfg, + logger: logger, + consulMetrics: consulMetrics, } return c, nil } @@ -117,7 +113,7 @@ func (c *Client) Put(ctx context.Context, key string, value interface{}) error { return err } - return instrument.CollectedRequest(ctx, "Put", c.consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + return instrument.CollectedRequest(ctx, "Put", c.consulMetrics.consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { _, err := c.kv.Put(&consul.KVPair{ Key: key, Value: bytes, @@ -129,7 +125,7 @@ func (c *Client) Put(ctx context.Context, key string, value interface{}) error { // CAS atomically modifies a value in a callback. // If value doesn't exist you'll get nil as an argument to your callback. func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error { - return instrument.CollectedRequest(ctx, "CAS loop", c.consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + return instrument.CollectedRequest(ctx, "CAS loop", c.consulMetrics.consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { return c.cas(ctx, key, f) }) } diff --git a/kv/consul/client_test.go b/kv/consul/client_test.go index 1f1f15df9..74c188996 100644 --- a/kv/consul/client_test.go +++ b/kv/consul/client_test.go @@ -34,7 +34,7 @@ func TestWatchKeyWithRateLimit(t *testing.T) { c, closer := NewInMemoryClientWithConfig(codec.String{}, Config{ WatchKeyRateLimit: 5.0, WatchKeyBurstSize: 1, - }, testLogger{}, prometheus.NewRegistry()) + }, testLogger{}, prometheus.NewPedanticRegistry()) t.Cleanup(func() { assert.NoError(t, closer.Close()) }) @@ -68,7 +68,7 @@ func TestWatchKeyWithRateLimit(t *testing.T) { func TestWatchKeyNoRateLimit(t *testing.T) { c, closer := NewInMemoryClientWithConfig(codec.String{}, Config{ WatchKeyRateLimit: 0, - }, testLogger{}, prometheus.NewRegistry()) + }, testLogger{}, prometheus.NewPedanticRegistry()) t.Cleanup(func() { assert.NoError(t, closer.Close()) }) @@ -90,7 +90,7 @@ func TestWatchKeyNoRateLimit(t *testing.T) { } func TestReset(t *testing.T) { - c, closer := NewInMemoryClient(codec.String{}, testLogger{}, prometheus.NewRegistry()) + c, closer := NewInMemoryClient(codec.String{}, testLogger{}, prometheus.NewPedanticRegistry()) t.Cleanup(func() { assert.NoError(t, closer.Close()) }) @@ -149,7 +149,7 @@ func observeValueForSomeTime(t *testing.T, client *Client, key string, timeout t } func TestWatchKeyWithNoStartValue(t *testing.T) { - c, closer := NewInMemoryClient(codec.String{}, testLogger{}, prometheus.NewRegistry()) + c, closer := NewInMemoryClient(codec.String{}, testLogger{}, prometheus.NewPedanticRegistry()) t.Cleanup(func() { assert.NoError(t, closer.Close()) }) diff --git a/kv/consul/metrics.go b/kv/consul/metrics.go index e53523ad2..52a1d4e84 100644 --- a/kv/consul/metrics.go +++ b/kv/consul/metrics.go @@ -4,18 +4,34 @@ import ( "context" consul "github.com/hashicorp/consul/api" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/weaveworks/common/instrument" ) +type consulInstrumentation struct { + kv kv + consulMetrics *consulMetrics +} + type consulMetrics struct { - kv consulRequestDuration *instrument.HistogramCollector } -func (c consulMetrics) CAS(p *consul.KVPair, options *consul.WriteOptions) (bool, *consul.WriteMeta, error) { +func newConsulMetrics(registerer prometheus.Registerer) *consulMetrics { + consulRequestDurationCollector := instrument.NewHistogramCollector(promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{ + Name: "consul_request_duration_seconds", + Help: "Time spent on consul requests.", + Buckets: prometheus.DefBuckets, + }, []string{"operation", "status_code"})) + consulMetrics := consulMetrics{consulRequestDurationCollector} + return &consulMetrics +} + +func (c consulInstrumentation) CAS(p *consul.KVPair, options *consul.WriteOptions) (bool, *consul.WriteMeta, error) { var ok bool var result *consul.WriteMeta - err := instrument.CollectedRequest(options.Context(), "CAS", c.consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + err := instrument.CollectedRequest(options.Context(), "CAS", c.consulMetrics.consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { options = options.WithContext(ctx) var err error ok, result, err = c.kv.CAS(p, options) @@ -24,10 +40,10 @@ func (c consulMetrics) CAS(p *consul.KVPair, options *consul.WriteOptions) (bool return ok, result, err } -func (c consulMetrics) Get(key string, options *consul.QueryOptions) (*consul.KVPair, *consul.QueryMeta, error) { +func (c consulInstrumentation) Get(key string, options *consul.QueryOptions) (*consul.KVPair, *consul.QueryMeta, error) { var kvp *consul.KVPair var meta *consul.QueryMeta - err := instrument.CollectedRequest(options.Context(), "Get", c.consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + err := instrument.CollectedRequest(options.Context(), "Get", c.consulMetrics.consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { options = options.WithContext(ctx) var err error kvp, meta, err = c.kv.Get(key, options) @@ -36,10 +52,10 @@ func (c consulMetrics) Get(key string, options *consul.QueryOptions) (*consul.KV return kvp, meta, err } -func (c consulMetrics) List(path string, options *consul.QueryOptions) (consul.KVPairs, *consul.QueryMeta, error) { +func (c consulInstrumentation) List(path string, options *consul.QueryOptions) (consul.KVPairs, *consul.QueryMeta, error) { var kvps consul.KVPairs var meta *consul.QueryMeta - err := instrument.CollectedRequest(options.Context(), "List", c.consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + err := instrument.CollectedRequest(options.Context(), "List", c.consulMetrics.consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { options = options.WithContext(ctx) var err error kvps, meta, err = c.kv.List(path, options) @@ -48,9 +64,9 @@ func (c consulMetrics) List(path string, options *consul.QueryOptions) (consul.K return kvps, meta, err } -func (c consulMetrics) Delete(key string, options *consul.WriteOptions) (*consul.WriteMeta, error) { +func (c consulInstrumentation) Delete(key string, options *consul.WriteOptions) (*consul.WriteMeta, error) { var meta *consul.WriteMeta - err := instrument.CollectedRequest(options.Context(), "Delete", c.consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + err := instrument.CollectedRequest(options.Context(), "Delete", c.consulMetrics.consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { options = options.WithContext(ctx) var err error meta, err = c.kv.Delete(key, options) @@ -59,9 +75,9 @@ func (c consulMetrics) Delete(key string, options *consul.WriteOptions) (*consul return meta, err } -func (c consulMetrics) Put(p *consul.KVPair, options *consul.WriteOptions) (*consul.WriteMeta, error) { +func (c consulInstrumentation) Put(p *consul.KVPair, options *consul.WriteOptions) (*consul.WriteMeta, error) { var result *consul.WriteMeta - err := instrument.CollectedRequest(options.Context(), "Put", c.consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + err := instrument.CollectedRequest(options.Context(), "Put", c.consulMetrics.consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { options = options.WithContext(ctx) var err error result, err = c.kv.Put(p, options) diff --git a/kv/consul/mock.go b/kv/consul/mock.go index e1e50b62a..500430af3 100644 --- a/kv/consul/mock.go +++ b/kv/consul/mock.go @@ -11,8 +11,6 @@ import ( "github.com/go-kit/kit/log/level" consul "github.com/hashicorp/consul/api" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/weaveworks/common/instrument" "github.com/grafana/dskit/closer" "github.com/grafana/dskit/kv/codec" @@ -37,11 +35,6 @@ func NewInMemoryClient(codec codec.Codec, logger log.Logger, registerer promethe // NewInMemoryClientWithConfig makes a new mock consul client with supplied Config. func NewInMemoryClientWithConfig(codec codec.Codec, cfg Config, logger log.Logger, registerer prometheus.Registerer) (*Client, io.Closer) { - consulRequestDurationCollector := instrument.NewHistogramCollector(promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{ - Name: "consul_request_duration_seconds", - Help: "Time spent on consul requests.", - Buckets: prometheus.DefBuckets, - }, []string{"operation", "status_code"})) m := mockKV{ kvps: map[string]*consul.KVPair{}, @@ -67,11 +60,11 @@ func NewInMemoryClientWithConfig(codec codec.Codec, cfg Config, logger log.Logge go m.loop() return &Client{ - kv: &m, - codec: codec, - cfg: cfg, - logger: logger, - consulRequestDuration: consulRequestDurationCollector, + kv: &m, + codec: codec, + cfg: cfg, + logger: logger, + consulMetrics: newConsulMetrics(registerer), }, closer } diff --git a/kv/kv_test.go b/kv/kv_test.go index 8ce50fd27..f2a23ac61 100644 --- a/kv/kv_test.go +++ b/kv/kv_test.go @@ -10,7 +10,6 @@ import ( "testing" "time" - "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/grafana/dskit/kv/codec" @@ -18,7 +17,7 @@ import ( "github.com/grafana/dskit/kv/etcd" ) -func withFixtures(t *testing.T, f func(*testing.T, Client), registerer prometheus.Registerer) { +func withFixtures(t *testing.T, f func(*testing.T, Client)) { t.Helper() for _, fixture := range []struct { @@ -26,7 +25,7 @@ func withFixtures(t *testing.T, f func(*testing.T, Client), registerer prometheu factory func() (Client, io.Closer, error) }{ {"consul", func() (Client, io.Closer, error) { - client, closer := consul.NewInMemoryClient(codec.String{}, testLogger{}, registerer) + client, closer := consul.NewInMemoryClient(codec.String{}, testLogger{}, nil) return client, closer, nil }}, {"etcd", func() (Client, io.Closer, error) { @@ -70,7 +69,7 @@ func TestCAS(t *testing.T) { value, err := client.Get(ctx, key) require.NoError(t, err) require.EqualValues(t, "10", value) - }, prometheus.NewRegistry()) + }) } // TestNilCAS ensures we can return nil from the CAS callback when we don't @@ -94,7 +93,7 @@ func TestNilCAS(t *testing.T) { value, err := client.Get(ctx, key) require.NoError(t, err) require.EqualValues(t, "0", value) - }, prometheus.NewRegistry()) + }) } func TestWatchKey(t *testing.T) { @@ -166,7 +165,7 @@ func TestWatchKey(t *testing.T) { if observedCount < expectedFactor*max { t.Errorf("expected at least %.0f%% observed values, got %.0f%% (observed count: %d)", 100*expectedFactor, 100*float64(observedCount)/max, observedCount) } - }, prometheus.NewRegistry()) + }) } func TestWatchPrefix(t *testing.T) { @@ -259,7 +258,7 @@ func TestWatchPrefix(t *testing.T) { if len(observedKeys) > 0 { t.Errorf("unexpected keys reported: %v", observedKeys) } - }, prometheus.NewRegistry()) + }) } // TestList makes sure stored keys are listed back. @@ -279,5 +278,5 @@ func TestList(t *testing.T) { sort.Strings(storedKeys) require.Equal(t, keysToCreate, storedKeys) - }, prometheus.NewRegistry()) + }) } diff --git a/kv/kvtls/test/tls_integration_test.go b/kv/kvtls/test/tls_integration_test.go index e68de95c4..23ca7069e 100644 --- a/kv/kvtls/test/tls_integration_test.go +++ b/kv/kvtls/test/tls_integration_test.go @@ -78,7 +78,7 @@ func newIntegrationClientServer( ) { // server registers some metrics to default registry savedRegistry := prometheus.DefaultRegisterer - prometheus.DefaultRegisterer = prometheus.NewRegistry() + prometheus.DefaultRegisterer = prometheus.NewPedanticRegistry() defer func() { prometheus.DefaultRegisterer = savedRegistry }() diff --git a/kv/memberlist/kv_init_service.go b/kv/memberlist/kv_init_service.go index 186054dd0..fa858f9c1 100644 --- a/kv/memberlist/kv_init_service.go +++ b/kv/memberlist/kv_init_service.go @@ -40,7 +40,7 @@ type KVInitService struct { watcher *services.FailureWatcher } -func NewKVInitService(cfg *KVConfig, logger log.Logger, registerer prometheus.Registerer, dnsProvider DNSProvider) *KVInitService { +func NewKVInitService(cfg *KVConfig, logger log.Logger, dnsProvider DNSProvider, registerer prometheus.Registerer) *KVInitService { kvinit := &KVInitService{ cfg: cfg, @@ -56,7 +56,7 @@ func NewKVInitService(cfg *KVConfig, logger log.Logger, registerer prometheus.Re // GetMemberlistKV will initialize Memberlist.KV on first call, and add it to service failure watcher. func (kvs *KVInitService) GetMemberlistKV() (*KV, error) { kvs.init.Do(func() { - kv := NewKV(*kvs.cfg, kvs.logger, kvs.registerer, kvs.dnsProvider) + kv := NewKV(*kvs.cfg, kvs.logger, kvs.dnsProvider, kvs.registerer) kvs.watcher.WatchService(kv) kvs.err = kv.StartAsync(context.Background()) diff --git a/kv/memberlist/kv_init_service_test.go b/kv/memberlist/kv_init_service_test.go index 24f46b329..35c668433 100644 --- a/kv/memberlist/kv_init_service_test.go +++ b/kv/memberlist/kv_init_service_test.go @@ -57,6 +57,6 @@ func TestPage(t *testing.T) { func TestStop(t *testing.T) { var cfg KVConfig flagext.DefaultValues(&cfg) - kvinit := NewKVInitService(&cfg, nil, prometheus.NewRegistry(), &dnsProviderMock{}) + kvinit := NewKVInitService(&cfg, nil, &dnsProviderMock{}, prometheus.NewPedanticRegistry()) require.NoError(t, kvinit.stopping(nil)) } diff --git a/kv/memberlist/memberlist_client.go b/kv/memberlist/memberlist_client.go index 806617b8f..d5c1c7e74 100644 --- a/kv/memberlist/memberlist_client.go +++ b/kv/memberlist/memberlist_client.go @@ -327,7 +327,7 @@ var ( // gossiping part. Only after service is in Running state, it is really gossiping. Starting the service will also // trigger connecting to the existing memberlist cluster. If that fails and AbortIfJoinFails is true, error is returned // and service enters Failed state. -func NewKV(cfg KVConfig, logger log.Logger, registerer prometheus.Registerer, dnsProvider DNSProvider) *KV { +func NewKV(cfg KVConfig, logger log.Logger, dnsProvider DNSProvider, registerer prometheus.Registerer) *KV { cfg.TCPTransport.MetricsRegisterer = cfg.MetricsRegisterer cfg.TCPTransport.MetricsNamespace = cfg.MetricsNamespace @@ -345,7 +345,7 @@ func NewKV(cfg KVConfig, logger log.Logger, registerer prometheus.Registerer, dn maxCasRetries: maxCasRetries, } - mlkv.createAndRegisterMetrics(mlkv.registerer) + mlkv.createAndRegisterMetrics() for _, c := range cfg.Codecs { mlkv.codecs[c.CodecID()] = c @@ -360,7 +360,7 @@ func defaultMemberlistConfig() *memberlist.Config { } func (m *KV) buildMemberlistConfig() (*memberlist.Config, error) { - tr, err := NewTCPTransport(m.cfg.TCPTransport, m.logger, m.registerer) + tr, err := NewTCPTransport(m.cfg.TCPTransport, m.logger) if err != nil { return nil, fmt.Errorf("failed to create transport: %v", err) } diff --git a/kv/memberlist/memberlist_client_test.go b/kv/memberlist/memberlist_client_test.go index 92a96d177..2b0386b65 100644 --- a/kv/memberlist/memberlist_client_test.go +++ b/kv/memberlist/memberlist_client_test.go @@ -247,7 +247,7 @@ func TestBasicGetAndCas(t *testing.T) { } cfg.Codecs = []codec.Codec{c} - mkv := NewKV(cfg, log.NewNopLogger(), prometheus.NewRegistry(), &dnsProviderMock{}) + mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv)) defer services.StopAndAwaitTerminated(context.Background(), mkv) //nolint:errcheck @@ -303,7 +303,7 @@ func withFixtures(t *testing.T, testFN func(t *testing.T, kv *Client)) { cfg.TCPTransport = TCPTransportConfig{} cfg.Codecs = []codec.Codec{c} - mkv := NewKV(cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry(), &dnsProviderMock{}) + mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv)) defer services.StopAndAwaitTerminated(context.Background(), mkv) //nolint:errcheck @@ -447,7 +447,7 @@ func TestMultipleCAS(t *testing.T) { flagext.DefaultValues(&cfg) cfg.Codecs = []codec.Codec{c} - mkv := NewKV(cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry(), &dnsProviderMock{}) + mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) mkv.maxCasRetries = 20 require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv)) defer services.StopAndAwaitTerminated(context.Background(), mkv) //nolint:errcheck @@ -549,7 +549,7 @@ func TestMultipleClients(t *testing.T) { cfg.Codecs = []codec.Codec{c} - mkv := NewKV(cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry(), &dnsProviderMock{}) + mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv)) kv, err := NewClient(mkv, c) @@ -702,7 +702,7 @@ func TestJoinMembersWithRetryBackoff(t *testing.T) { time.Sleep(1 * time.Second) } - mkv := NewKV(cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry(), &dnsProviderMock{}) // Not started yet. + mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) // Not started yet. watcher.WatchService(mkv) kv, err := NewClient(mkv, c) @@ -776,7 +776,7 @@ func TestMemberlistFailsToJoin(t *testing.T) { cfg.Codecs = []codec.Codec{c} - mkv := NewKV(cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry(), &dnsProviderMock{}) + mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv)) ctxTimeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) @@ -947,7 +947,7 @@ func TestMultipleCodecs(t *testing.T) { distributedCounterCodec{}, } - mkv1 := NewKV(cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry(), &dnsProviderMock{}) + mkv1 := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv1)) defer services.StopAndAwaitTerminated(context.Background(), mkv1) //nolint:errcheck @@ -990,7 +990,7 @@ func TestMultipleCodecs(t *testing.T) { require.NoError(t, err) // We will read values from second KV, which will join the first one - mkv2 := NewKV(cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry(), &dnsProviderMock{}) + mkv2 := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv2)) defer services.StopAndAwaitTerminated(context.Background(), mkv2) //nolint:errcheck @@ -1042,11 +1042,11 @@ func TestRejoin(t *testing.T) { cfg2.JoinMembers = []string{fmt.Sprintf("localhost:%d", ports[0])} cfg2.RejoinInterval = 1 * time.Second - mkv1 := NewKV(cfg1, log.NewNopLogger(), prometheus.NewPedanticRegistry(), &dnsProviderMock{}) + mkv1 := NewKV(cfg1, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv1)) defer services.StopAndAwaitTerminated(context.Background(), mkv1) //nolint:errcheck - mkv2 := NewKV(cfg2, log.NewNopLogger(), prometheus.NewPedanticRegistry(), &dnsProviderMock{}) + mkv2 := NewKV(cfg2, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv2)) defer services.StopAndAwaitTerminated(context.Background(), mkv2) //nolint:errcheck @@ -1063,7 +1063,7 @@ func TestRejoin(t *testing.T) { poll(t, 5*time.Second, 1, membersFunc) // Let's start first KV again. It is not configured to join the cluster, but KV2 is rejoining. - mkv1 = NewKV(cfg1, log.NewNopLogger(), prometheus.NewPedanticRegistry(), &dnsProviderMock{}) + mkv1 = NewKV(cfg1, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv1)) defer services.StopAndAwaitTerminated(context.Background(), mkv1) //nolint:errcheck @@ -1095,7 +1095,7 @@ func TestNotifyMsgResendsOnlyChanges(t *testing.T) { cfg.RetransmitMult = 1 cfg.Codecs = append(cfg.Codecs, codec) - kv := NewKV(cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry(), &dnsProviderMock{}) + kv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) require.NoError(t, services.StartAndAwaitRunning(context.Background(), kv)) defer services.StopAndAwaitTerminated(context.Background(), kv) //nolint:errcheck @@ -1158,7 +1158,7 @@ func TestSendingOldTombstoneShouldNotForwardMessage(t *testing.T) { cfg.LeftIngestersTimeout = 5 * time.Minute cfg.Codecs = append(cfg.Codecs, codec) - kv := NewKV(cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry(), &dnsProviderMock{}) + kv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) require.NoError(t, services.StartAndAwaitRunning(context.Background(), kv)) defer services.StopAndAwaitTerminated(context.Background(), kv) //nolint:errcheck diff --git a/kv/memberlist/metrics.go b/kv/memberlist/metrics.go index 1ca530829..26fd905fb 100644 --- a/kv/memberlist/metrics.go +++ b/kv/memberlist/metrics.go @@ -12,59 +12,59 @@ import ( "github.com/grafana/dskit/services" ) -func (m *KV) createAndRegisterMetrics(registerer prometheus.Registerer) { +func (m *KV) createAndRegisterMetrics() { const subsystem = "memberlist_client" - m.numberOfReceivedMessages = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + m.numberOfReceivedMessages = promauto.With(m.registerer).NewCounter(prometheus.CounterOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "received_broadcasts_total", Help: "Number of received broadcast user messages", }) - m.totalSizeOfReceivedMessages = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + m.totalSizeOfReceivedMessages = promauto.With(m.registerer).NewCounter(prometheus.CounterOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "received_broadcasts_bytes_total", Help: "Total size of received broadcast user messages", }) - m.numberOfInvalidReceivedMessages = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + m.numberOfInvalidReceivedMessages = promauto.With(m.registerer).NewCounter(prometheus.CounterOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "received_broadcasts_invalid_total", Help: "Number of received broadcast user messages that were invalid. Hopefully 0.", }) - m.numberOfPushes = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + m.numberOfPushes = promauto.With(m.registerer).NewCounter(prometheus.CounterOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "state_pushes_total", Help: "How many times did this node push its full state to another node", }) - m.totalSizeOfPushes = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + m.totalSizeOfPushes = promauto.With(m.registerer).NewCounter(prometheus.CounterOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "state_pushes_bytes_total", Help: "Total size of pushed state", }) - m.numberOfPulls = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + m.numberOfPulls = promauto.With(m.registerer).NewCounter(prometheus.CounterOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "state_pulls_total", Help: "How many times did this node pull full state from another node", }) - m.totalSizeOfPulls = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + m.totalSizeOfPulls = promauto.With(m.registerer).NewCounter(prometheus.CounterOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "state_pulls_bytes_total", Help: "Total size of pulled state", }) - m.numberOfBroadcastMessagesInQueue = promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{ + m.numberOfBroadcastMessagesInQueue = promauto.With(m.registerer).NewGaugeFunc(prometheus.GaugeOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "messages_in_broadcast_queue", @@ -77,35 +77,35 @@ func (m *KV) createAndRegisterMetrics(registerer prometheus.Registerer) { return 0 }) - m.totalSizeOfBroadcastMessagesInQueue = promauto.With(registerer).NewGauge(prometheus.GaugeOpts{ + m.totalSizeOfBroadcastMessagesInQueue = promauto.With(m.registerer).NewGauge(prometheus.GaugeOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "messages_in_broadcast_queue_bytes", Help: "Total size of messages waiting in the broadcast queue", }) - m.numberOfBroadcastMessagesDropped = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + m.numberOfBroadcastMessagesDropped = promauto.With(m.registerer).NewCounter(prometheus.CounterOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "messages_to_broadcast_dropped_total", Help: "Number of broadcast messages intended to be sent but were dropped due to encoding errors or for being too big", }) - m.casAttempts = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + m.casAttempts = promauto.With(m.registerer).NewCounter(prometheus.CounterOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "cas_attempt_total", Help: "Attempted CAS operations", }) - m.casSuccesses = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + m.casSuccesses = promauto.With(m.registerer).NewCounter(prometheus.CounterOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "cas_success_total", Help: "Successful CAS operations", }) - m.casFailures = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + m.casFailures = promauto.With(m.registerer).NewCounter(prometheus.CounterOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "cas_failure_total", @@ -117,21 +117,21 @@ func (m *KV) createAndRegisterMetrics(registerer prometheus.Registerer) { "Number of values in KV Store", nil, nil) - m.storeTombstones = promauto.With(registerer).NewGaugeVec(prometheus.GaugeOpts{ + m.storeTombstones = promauto.With(m.registerer).NewGaugeVec(prometheus.GaugeOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "kv_store_value_tombstones", Help: "Number of tombstones currently present in KV store values", }, []string{"key"}) - m.storeRemovedTombstones = promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ + m.storeRemovedTombstones = promauto.With(m.registerer).NewCounterVec(prometheus.CounterOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "kv_store_value_tombstones_removed_total", Help: "Total number of tombstones which have been removed from KV store values", }, []string{"key"}) - m.memberlistMembersCount = promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{ + m.memberlistMembersCount = promauto.With(m.registerer).NewGaugeFunc(prometheus.GaugeOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "cluster_members_count", @@ -144,7 +144,7 @@ func (m *KV) createAndRegisterMetrics(registerer prometheus.Registerer) { return 0 }) - m.memberlistHealthScore = promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{ + m.memberlistHealthScore = promauto.With(m.registerer).NewGaugeFunc(prometheus.GaugeOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "cluster_node_health_score", @@ -157,7 +157,7 @@ func (m *KV) createAndRegisterMetrics(registerer prometheus.Registerer) { return 0 }) - m.watchPrefixDroppedNotifications = promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ + m.watchPrefixDroppedNotifications = promauto.With(m.registerer).NewCounterVec(prometheus.CounterOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, Name: "watch_prefix_dropped_notifications", @@ -168,33 +168,6 @@ func (m *KV) createAndRegisterMetrics(registerer prometheus.Registerer) { return } - all := []prometheus.Collector{ - m.numberOfReceivedMessages, - m.totalSizeOfReceivedMessages, - m.numberOfInvalidReceivedMessages, - m.numberOfBroadcastMessagesInQueue, - m.numberOfPushes, - m.numberOfPulls, - m.totalSizeOfPushes, - m.totalSizeOfPulls, - m.totalSizeOfBroadcastMessagesInQueue, - m.numberOfBroadcastMessagesDropped, - m.casAttempts, - m.casFailures, - m.casSuccesses, - m.watchPrefixDroppedNotifications, - m.storeTombstones, - m.storeRemovedTombstones, - m.memberlistMembersCount, - m.memberlistHealthScore, - } - - for _, c := range all { - m.cfg.MetricsRegisterer.MustRegister(c) - } - - m.cfg.MetricsRegisterer.MustRegister(m) - // memberlist uses armonmetrics package for internal usage // here we configure armonmetrics to use prometheus sink, err := armonprometheus.NewPrometheusSink() // there is no option to pass registrerer, this uses default diff --git a/kv/memberlist/tcp_transport.go b/kv/memberlist/tcp_transport.go index 7c4962b1e..0a548b876 100644 --- a/kv/memberlist/tcp_transport.go +++ b/kv/memberlist/tcp_transport.go @@ -107,7 +107,7 @@ type TCPTransport struct { // NewTCPTransport returns a new tcp-based transport with the given configuration. On // success all the network listeners will be created and listening. -func NewTCPTransport(config TCPTransportConfig, logger log.Logger, registerer prometheus.Registerer) (*TCPTransport, error) { +func NewTCPTransport(config TCPTransportConfig, logger log.Logger) (*TCPTransport, error) { if len(config.BindAddrs) == 0 { config.BindAddrs = []string{zeroZeroZeroZero} } @@ -129,7 +129,7 @@ func NewTCPTransport(config TCPTransportConfig, logger log.Logger, registerer pr } } - t.registerMetrics(registerer) + t.registerMetrics(config.MetricsRegisterer) // Clean up listeners if there's an error. defer func() { @@ -622,22 +622,4 @@ func (t *TCPTransport) registerMetrics(registerer prometheus.Registerer) { if t.cfg.MetricsRegisterer == nil { return } - - all := []prometheus.Metric{ - t.incomingStreams, - t.outgoingStreams, - t.outgoingStreamErrors, - t.receivedPackets, - t.receivedPacketsBytes, - t.receivedPacketsErrors, - t.sentPackets, - t.sentPacketsBytes, - t.sentPacketsErrors, - t.unknownConnections, - } - - // if registration fails, that's too bad, but don't panic - for _, c := range all { - t.cfg.MetricsRegisterer.MustRegister(c.(prometheus.Collector)) - } } From f607ecd6d1e3be8f700b8bf5412884296421c1b6 Mon Sep 17 00:00:00 2001 From: Tyler Reid Date: Thu, 26 Aug 2021 16:39:27 -0500 Subject: [PATCH 5/7] fix formatting in initservice --- kv/memberlist/kv_init_service.go | 1 - 1 file changed, 1 deletion(-) diff --git a/kv/memberlist/kv_init_service.go b/kv/memberlist/kv_init_service.go index fa858f9c1..c137bc786 100644 --- a/kv/memberlist/kv_init_service.go +++ b/kv/memberlist/kv_init_service.go @@ -42,7 +42,6 @@ type KVInitService struct { func NewKVInitService(cfg *KVConfig, logger log.Logger, dnsProvider DNSProvider, registerer prometheus.Registerer) *KVInitService { kvinit := &KVInitService{ - cfg: cfg, watcher: services.NewFailureWatcher(), logger: logger, From a78c168a09316a7131b98de7dd9bc856e20a22b1 Mon Sep 17 00:00:00 2001 From: Tyler Reid Date: Fri, 27 Aug 2021 09:18:52 -0500 Subject: [PATCH 6/7] review fixes Signed-off-by: Tyler Reid --- kv/client_test.go | 27 ++++++++++++++------------- kv/consul/mock.go | 1 - kv/memberlist/tcp_transport.go | 4 ---- 3 files changed, 14 insertions(+), 18 deletions(-) diff --git a/kv/client_test.go b/kv/client_test.go index 88fd94bfe..22d394b70 100644 --- a/kv/client_test.go +++ b/kv/client_test.go @@ -102,21 +102,22 @@ func typeToRoleMapHistogramLabels(t *testing.T, reg prometheus.Gatherer, histogr require.NoError(t, err) result := map[string]string{} for _, mf := range mfs { - if mf.GetName() == histogramWithRoleLabels { - for _, m := range mf.GetMetric() { - backendType := "" - role := "" - for _, l := range m.GetLabel() { - if l.GetName() == "role" { - role = l.GetValue() - } else if l.GetName() == "type" { - backendType = l.GetValue() - } + if mf.GetName() != histogramWithRoleLabels { + continue + } + for _, m := range mf.GetMetric() { + backendType := "" + role := "" + for _, l := range m.GetLabel() { + if l.GetName() == "role" { + role = l.GetValue() + } else if l.GetName() == "type" { + backendType = l.GetValue() } - require.NotEmpty(t, backendType) - require.NotEmpty(t, role) - result[backendType] = role } + require.NotEmpty(t, backendType) + require.NotEmpty(t, role) + result[backendType] = role } } return result diff --git a/kv/consul/mock.go b/kv/consul/mock.go index 500430af3..a5ed43df9 100644 --- a/kv/consul/mock.go +++ b/kv/consul/mock.go @@ -35,7 +35,6 @@ func NewInMemoryClient(codec codec.Codec, logger log.Logger, registerer promethe // NewInMemoryClientWithConfig makes a new mock consul client with supplied Config. func NewInMemoryClientWithConfig(codec codec.Codec, cfg Config, logger log.Logger, registerer prometheus.Registerer) (*Client, io.Closer) { - m := mockKV{ kvps: map[string]*consul.KVPair{}, // Always start from 1, we NEVER want to report back index 0 in the responses. diff --git a/kv/memberlist/tcp_transport.go b/kv/memberlist/tcp_transport.go index 0a548b876..4c6badee0 100644 --- a/kv/memberlist/tcp_transport.go +++ b/kv/memberlist/tcp_transport.go @@ -618,8 +618,4 @@ func (t *TCPTransport) registerMetrics(registerer prometheus.Registerer) { Name: "unknown_connections_total", Help: "Number of unknown TCP connections (not a packet or stream)", }) - - if t.cfg.MetricsRegisterer == nil { - return - } } From 195732468ecc6c9c243c396d1b50a43b85be54cd Mon Sep 17 00:00:00 2001 From: Tyler Reid Date: Fri, 27 Aug 2021 09:58:25 -0500 Subject: [PATCH 7/7] Update changelog entry for this pr Signed-off-by: Tyler Reid --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6c3e56c91..c0b0a4ca3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ ## Changelog +* [CHANGE] Removed global metrics for KV package. Making a KV object will now require a prometheus registerer that will + be used to register all relevant KV class metrics. #22 * [CHANGE] Added CHANGELOG.md and Pull Request template to reference the changelog \ No newline at end of file