diff --git a/etcdserver/metrics.go b/etcdserver/metrics.go index 42a27166a6a..bb2fa2db6b4 100644 --- a/etcdserver/metrics.go +++ b/etcdserver/metrics.go @@ -41,6 +41,18 @@ var ( Name: "leader_changes_seen_total", Help: "The number of leader changes seen.", }) + heartbeatSendFailures = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "server", + Name: "heartbeat_send_failures_total", + Help: "The total number of leader heartbeat send failures (likely overloaded from slow disk).", + }) + slowApplies = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "server", + Name: "slow_apply_total", + Help: "The total number of slow apply requests (likely overloaded from slow disk).", + }) proposalsCommitted = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "etcd", Subsystem: "server", @@ -96,6 +108,8 @@ func init() { prometheus.MustRegister(hasLeader) prometheus.MustRegister(isLeader) prometheus.MustRegister(leaderChanges) + prometheus.MustRegister(heartbeatSendFailures) + prometheus.MustRegister(slowApplies) prometheus.MustRegister(proposalsCommitted) prometheus.MustRegister(proposalsApplied) prometheus.MustRegister(proposalsPending) diff --git a/etcdserver/raft.go b/etcdserver/raft.go index f6220791ee6..1080633b18b 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -346,6 +346,7 @@ func (r *raftNode) processMessages(ms []raftpb.Message) []raftpb.Message { // TODO: limit request rate. plog.Warningf("failed to send out heartbeat on time (exceeded the %v timeout for %v)", r.heartbeat, exceed) plog.Warningf("server is likely overloaded") + heartbeatSendFailures.Inc() } } } diff --git a/etcdserver/util.go b/etcdserver/util.go index 708fee4cb60..79bb6b859ca 100644 --- a/etcdserver/util.go +++ b/etcdserver/util.go @@ -146,6 +146,7 @@ func warnOfExpensiveGenericRequest(now time.Time, reqStringer fmt.Stringer, pref result = resp } plog.Warningf("%srequest %q with result %q took too long (%v) to execute", prefix, reqStringer.String(), result, d) + slowApplies.Inc() } } diff --git a/mvcc/backend/backend.go b/mvcc/backend/backend.go index c3459db8484..f7d9e60c2e7 100644 --- a/mvcc/backend/backend.go +++ b/mvcc/backend/backend.go @@ -291,6 +291,8 @@ func (b *backend) Defrag() error { } func (b *backend) defrag() error { + now := time.Now() + // TODO: make this non-blocking? // lock batchTx to ensure nobody is using previous tx, and then // close previous ongoing tx. @@ -354,6 +356,9 @@ func (b *backend) defrag() error { atomic.StoreInt64(&b.size, size) atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize))) + took := time.Since(now) + defragDurations.Observe(took.Seconds()) + return nil } diff --git a/mvcc/backend/batch_tx.go b/mvcc/backend/batch_tx.go index 7bb5f8d4a05..aed6893e41a 100644 --- a/mvcc/backend/batch_tx.go +++ b/mvcc/backend/batch_tx.go @@ -136,15 +136,15 @@ func unsafeForEach(tx *bolt.Tx, bucket []byte, visitor func(k, v []byte) error) // Commit commits a previous tx and begins a new writable one. func (t *batchTx) Commit() { t.Lock() - defer t.Unlock() t.commit(false) + t.Unlock() } // CommitAndStop commits the previous tx and does not create a new one. func (t *batchTx) CommitAndStop() { t.Lock() - defer t.Unlock() t.commit(true) + t.Unlock() } func (t *batchTx) Unlock() { @@ -162,9 +162,11 @@ func (t *batchTx) commit(stop bool) { } start := time.Now() + // gofail: var beforeCommit struct{} err := t.tx.Commit() // gofail: var afterCommit struct{} + commitDurations.Observe(time.Since(start).Seconds()) atomic.AddInt64(&t.backend.commits, 1) @@ -209,21 +211,21 @@ func (t *batchTxBuffered) Unlock() { func (t *batchTxBuffered) Commit() { t.Lock() - defer t.Unlock() t.commit(false) + t.Unlock() } func (t *batchTxBuffered) CommitAndStop() { t.Lock() - defer t.Unlock() t.commit(true) + t.Unlock() } func (t *batchTxBuffered) commit(stop bool) { // all read txs must be closed to acquire boltdb commit rwlock t.backend.readTx.mu.Lock() - defer t.backend.readTx.mu.Unlock() t.unsafeCommit(stop) + t.backend.readTx.mu.Unlock() } func (t *batchTxBuffered) unsafeCommit(stop bool) { diff --git a/mvcc/backend/metrics.go b/mvcc/backend/metrics.go index 30a38801476..34157080499 100644 --- a/mvcc/backend/metrics.go +++ b/mvcc/backend/metrics.go @@ -22,7 +22,22 @@ var ( Subsystem: "disk", Name: "backend_commit_duration_seconds", Help: "The latency distributions of commit called by backend.", - Buckets: prometheus.ExponentialBuckets(0.001, 2, 14), + + // lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2 + // highest bucket start of 0.001 sec * 2^13 == 8.192 sec + Buckets: prometheus.ExponentialBuckets(0.001, 2, 14), + }) + + defragDurations = prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: "etcd", + Subsystem: "disk", + Name: "backend_defrag_duration_seconds", + Help: "The latency distribution of backend defragmentation.", + + // 100 MB usually takes 1 sec, so start with 10 MB of 100 ms + // lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2 + // highest bucket start of 0.1 sec * 2^12 == 409.6 sec + Buckets: prometheus.ExponentialBuckets(.1, 2, 13), }) snapshotDurations = prometheus.NewHistogram(prometheus.HistogramOpts{ @@ -30,12 +45,15 @@ var ( Subsystem: "disk", Name: "backend_snapshot_duration_seconds", Help: "The latency distribution of backend snapshots.", - // 10 ms -> 655 seconds + + // lowest bucket start of upper bound 0.01 sec (10 ms) with factor 2 + // highest bucket start of 0.01 sec * 2^16 == 655.36 sec Buckets: prometheus.ExponentialBuckets(.01, 2, 17), }) ) func init() { prometheus.MustRegister(commitDurations) + prometheus.MustRegister(defragDurations) prometheus.MustRegister(snapshotDurations) } diff --git a/mvcc/kvstore.go b/mvcc/kvstore.go index 61b0e402acd..dd9f04ae211 100644 --- a/mvcc/kvstore.go +++ b/mvcc/kvstore.go @@ -156,12 +156,18 @@ func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) { } func (s *store) Hash() (hash uint32, revision int64, err error) { + start := time.Now() + s.b.ForceCommit() h, err := s.b.Hash(DefaultIgnores) + + hashDurations.Observe(time.Since(start).Seconds()) return h, s.currentRev, err } func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev int64, err error) { + start := time.Now() + s.mu.RLock() s.revMu.RLock() compactRev, currentRev = s.compactMainRev, s.currentRev @@ -206,7 +212,10 @@ func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev h.Write(v) return nil }) - return h.Sum32(), currentRev, compactRev, err + hash = h.Sum32() + + hashRevDurations.Observe(time.Since(start).Seconds()) + return hash, currentRev, compactRev, err } func (s *store) Compact(rev int64) (<-chan struct{}, error) { diff --git a/mvcc/metrics.go b/mvcc/metrics.go index 515884ad4ee..b753310cff0 100644 --- a/mvcc/metrics.go +++ b/mvcc/metrics.go @@ -181,7 +181,31 @@ var ( ) // overridden by mvcc initialization reportDbTotalSizeInUseInBytesMu sync.RWMutex - reportDbTotalSizeInUseInBytes = func() float64 { return 0 } + reportDbTotalSizeInUseInBytes func() float64 = func() float64 { return 0 } + + hashDurations = prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: "etcd", + Subsystem: "mvcc", + Name: "hash_duration_seconds", + Help: "The latency distribution of storage hash operation.", + + // 100 MB usually takes 100 ms, so start with 10 MB of 10 ms + // lowest bucket start of upper bound 0.01 sec (10 ms) with factor 2 + // highest bucket start of 0.01 sec * 2^14 == 163.84 sec + Buckets: prometheus.ExponentialBuckets(.01, 2, 15), + }) + + hashRevDurations = prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: "etcd", + Subsystem: "mvcc", + Name: "hash_rev_duration_seconds", + Help: "The latency distribution of storage hash by revision operation.", + + // 100 MB usually takes 100 ms, so start with 10 MB of 10 ms + // lowest bucket start of upper bound 0.01 sec (10 ms) with factor 2 + // highest bucket start of 0.01 sec * 2^14 == 163.84 sec + Buckets: prometheus.ExponentialBuckets(.01, 2, 15), + }) ) func init() { @@ -202,6 +226,8 @@ func init() { prometheus.MustRegister(dbTotalSizeDebugging) prometheus.MustRegister(dbTotalSize) prometheus.MustRegister(dbTotalSizeInUse) + prometheus.MustRegister(hashDurations) + prometheus.MustRegister(hashRevDurations) } // ReportEventReceived reports that an event is received.