diff --git a/etcdserver/raft.go b/etcdserver/raft.go index fe485331486f..02b782960d7f 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -97,6 +97,7 @@ type raftNode struct { term uint64 lead uint64 + tickMu sync.Mutex raftNodeConfig // a chan to send/receive snapshot @@ -151,6 +152,13 @@ func newRaftNode(cfg raftNodeConfig) *raftNode { return r } +// raft.Node does not have locks in Raft package +func (r *raftNode) tick() { + r.tickMu.Lock() + r.Tick() + r.tickMu.Unlock() +} + // start prepares and starts raftNode in a new goroutine. It is no longer safe // to modify the fields after it has been started. func (r *raftNode) start(rh *raftReadyHandler) { @@ -163,7 +171,7 @@ func (r *raftNode) start(rh *raftReadyHandler) { for { select { case <-r.ticker.C: - r.Tick() + r.tick() case rd := <-r.Ready(): if rd.SoftState != nil { newLeader := rd.SoftState.Lead != raft.None && atomic.LoadUint64(&r.lead) != rd.SoftState.Lead @@ -376,7 +384,7 @@ func (r *raftNode) resumeSending() { // speeding up election process. func (r *raftNode) advanceTicks(ticks int) { for i := 0; i < ticks; i++ { - r.Tick() + r.tick() } } diff --git a/etcdserver/server.go b/etcdserver/server.go index aa2321752de8..2ebef8373759 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -527,6 +527,41 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { } srv.r.transport = tr + srv.goAttach(func() { + clusterN := len(cl.Members()) + + // single-node fresh start, or single-node recovers from snapshot + if clusterN == 1 { + ticks := cfg.ElectionTicks - 1 + plog.Infof("%s as single-node; fast-forwarding %d ticks (election ticks %d)", srv.ID(), ticks, cfg.ElectionTicks) + srv.r.advanceTicks(ticks) + return + } + + // retry up to "rafthttp.ConnReadTimeout", which is 5-sec + for i := 0; i < 5; i++ { + select { + case <-time.After(time.Second): + case <-srv.stopping: + return + } + + peerN := tr.ActivePeers() + if peerN > 1 { + // multi-node received peer connection reports + // adjust ticks, in case slow leader message receive + ticks := cfg.ElectionTicks - 2 + plog.Infof("%s initialzed peer connection; fast-forwarding %d ticks (election ticks %d) with %d active peer(s)", srv.ID(), ticks, cfg.ElectionTicks, peerN) + srv.r.advanceTicks(ticks) + return + } + } + + // 1. all connections failed, or + // 2. no active peers, or + // 3. restarted single-node with no snapshot + // do nothing, because advancing ticks would have no effect + }) return srv, nil }