diff --git a/server/etcdserver/raft.go b/server/etcdserver/raft.go index 2a315ea58656..d6fb4e47013d 100644 --- a/server/etcdserver/raft.go +++ b/server/etcdserver/raft.go @@ -18,17 +18,19 @@ import ( "expvar" "fmt" "log" + "math" "sync" "time" "go.uber.org/zap" + "go.etcd.io/raft/v3" + "go.etcd.io/raft/v3/raftpb" + "go.etcd.io/etcd/client/pkg/v3/logutil" "go.etcd.io/etcd/pkg/v3/contention" "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp" serverstorage "go.etcd.io/etcd/server/v3/storage" - "go.etcd.io/raft/v3" - "go.etcd.io/raft/v3/raftpb" ) const ( @@ -82,6 +84,7 @@ type raftNode struct { tickMu *sync.Mutex raftNodeConfig + tickElapsed uint64 // counter reset after each probe // a chan to send/receive snapshot msgSnapC chan raftpb.Message @@ -155,9 +158,21 @@ func newRaftNode(cfg raftNodeConfig) *raftNode { func (r *raftNode) tick() { r.tickMu.Lock() r.Tick() + r.tickElapsed++ + if r.tickElapsed == math.MaxUint64 { + r.tickElapsed = 1 + } r.tickMu.Unlock() } +func (r *raftNode) safeReadTickElapsedAndClear() uint64 { + r.tickMu.Lock() + defer r.tickMu.Unlock() + tickElapsed := r.tickElapsed + r.tickElapsed = 0 + return tickElapsed +} + // start prepares and starts raftNode in a new goroutine. It is no longer safe // to modify the fields after it has been started. func (r *raftNode) start(rh *raftReadyHandler) { diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 1d48fa6732cb..eed6478e080e 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -33,6 +33,9 @@ import ( "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" + "go.etcd.io/raft/v3" + "go.etcd.io/raft/v3/raftpb" + pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/membershippb" "go.etcd.io/etcd/api/v3/version" @@ -67,8 +70,6 @@ import ( "go.etcd.io/etcd/server/v3/storage/backend" "go.etcd.io/etcd/server/v3/storage/mvcc" "go.etcd.io/etcd/server/v3/storage/schema" - "go.etcd.io/raft/v3" - "go.etcd.io/raft/v3/raftpb" ) const ( @@ -1643,6 +1644,10 @@ func (s *EtcdServer) AppliedIndex() uint64 { return s.getAppliedIndex() } func (s *EtcdServer) Term() uint64 { return s.getTerm() } +// ProbeRaftLoopProgress probes if the etcdserver raft loop is deadlocked +// since last time the function is invoked. +func (s *EtcdServer) ProbeRaftLoopProgress() bool { return s.r.safeReadTickElapsedAndClear() != 0 } + type confChangeResponse struct { membs []*membership.Member raftAdvanceC <-chan struct{}