diff --git a/etcdserver/raft.go b/etcdserver/raft.go index d7ec176eb3a7..8e46f5e68891 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -95,6 +95,8 @@ type raftNode struct { lead uint64 mu sync.Mutex + + tickMu sync.Mutex // last lead elected time lt time.Time @@ -129,6 +131,13 @@ type raftNode struct { done chan struct{} } +// raft.Node does not have locks in Raft package +func (r *raftNode) tick() { + r.tickMu.Lock() + r.Tick() + r.tickMu.Unlock() +} + // start prepares and starts raftNode in a new goroutine. It is no longer safe // to modify the fields after it has been started. func (r *raftNode) start(rh *raftReadyHandler) { @@ -144,7 +153,7 @@ func (r *raftNode) start(rh *raftReadyHandler) { for { select { case <-r.ticker: - r.Tick() + r.tick() case rd := <-r.Ready(): if rd.SoftState != nil { if lead := atomic.LoadUint64(&r.lead); rd.SoftState.Lead != raft.None && lead != rd.SoftState.Lead { @@ -321,13 +330,13 @@ func (r *raftNode) resumeSending() { p.Resume() } -// advanceTicksForElection advances ticks to the node for fast election. -// This reduces the time to wait for first leader election if bootstrapping the whole -// cluster, while leaving at least 1 heartbeat for possible existing leader -// to contact it. -func advanceTicksForElection(n raft.Node, electionTicks int) { - for i := 0; i < electionTicks-1; i++ { - n.Tick() +// advanceTicks advances ticks of Raft node. +// This can be used for fast-forwarding election +// ticks in multi data-center deployments, thus +// speeding up election process. +func (r *raftNode) advanceTicks(ticks int) { + for i := 0; i < ticks; i++ { + r.tick() } } @@ -368,8 +377,7 @@ func startNode(cfg *ServerConfig, cl *membership.RaftCluster, ids []types.ID) (i raftStatusMu.Lock() raftStatus = n.Status raftStatusMu.Unlock() - advanceTicksForElection(n, c.ElectionTick) - return + return id, n, s, w } func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) { @@ -402,7 +410,6 @@ func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membe raftStatusMu.Lock() raftStatus = n.Status raftStatusMu.Unlock() - advanceTicksForElection(n, c.ElectionTick) return id, cl, n, s, w } diff --git a/etcdserver/server.go b/etcdserver/server.go index 0734f8c59249..b437f1081695 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -506,11 +506,50 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { return srv, nil } +func (s *EtcdServer) adjustTicks() { + clusterN := len(s.cluster.Members()) + + // single-node fresh start, or single-node recovers from snapshot + if clusterN == 1 { + ticks := s.Cfg.ElectionTicks - 1 + plog.Infof("%s as single-node; fast-forwarding %d ticks (election ticks %d)", s.ID(), ticks, s.Cfg.ElectionTicks) + s.r.advanceTicks(ticks) + return + } + + // retry up to "rafthttp.ConnReadTimeout", which is 5-sec + // until peer connection reports; otherwise: + // 1. all connections failed, or + // 2. no active peers, or + // 3. restarted single-node with no snapshot + // then, do nothing, because advancing ticks would have no effect + waitTime := rafthttp.ConnReadTimeout + itv := 50 * time.Millisecond + for i := int64(0); i < int64(waitTime/itv); i++ { + select { + case <-time.After(itv): + case <-s.stopping: + return + } + + peerN := s.r.transport.ActivePeers() + if peerN > 1 { + // multi-node received peer connection reports + // adjust ticks, in case slow leader message receive + ticks := s.Cfg.ElectionTicks - 2 + plog.Infof("%s initialzed peer connection; fast-forwarding %d ticks (election ticks %d) with %d active peer(s)", s.ID(), ticks, s.Cfg.ElectionTicks, peerN) + s.r.advanceTicks(ticks) + return + } + } +} + // Start prepares and starts server in a new goroutine. It is no longer safe to // modify a server's fields after it has been sent to Start. // It also starts a goroutine to publish its server information. func (s *EtcdServer) Start() { s.start() + s.goAttach(func() { s.adjustTicks() }) s.goAttach(func() { s.publish(s.Cfg.ReqTimeout()) }) s.goAttach(s.purgeFile) s.goAttach(func() { monitorFileDescriptor(s.stopping) }) diff --git a/etcdserver/util_test.go b/etcdserver/util_test.go index 79edabd1220f..b1cbe236c550 100644 --- a/etcdserver/util_test.go +++ b/etcdserver/util_test.go @@ -83,6 +83,7 @@ func (s *nopTransporterWithActiveTime) RemovePeer(id types.ID) {} func (s *nopTransporterWithActiveTime) RemoveAllPeers() {} func (s *nopTransporterWithActiveTime) UpdatePeer(id types.ID, us []string) {} func (s *nopTransporterWithActiveTime) ActiveSince(id types.ID) time.Time { return s.activeMap[id] } +func (s *nopTransporterWithActiveTime) ActivePeers() int { return 0 } func (s *nopTransporterWithActiveTime) Stop() {} func (s *nopTransporterWithActiveTime) Pause() {} func (s *nopTransporterWithActiveTime) Resume() {} diff --git a/rafthttp/transport.go b/rafthttp/transport.go index 1f0b46836e66..f96149aa393e 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -83,6 +83,8 @@ type Transporter interface { // If the connection is active since peer was added, it returns the adding time. // If the connection is currently inactive, it returns zero time. ActiveSince(id types.ID) time.Time + // ActivePeers returns the number of active peers. + ActivePeers() int // Stop closes the connections and stops the transporter. Stop() } @@ -362,6 +364,20 @@ func (t *Transport) Resume() { } } +// ActivePeers returns a channel that closes when an initial +// peer connection has been established. Use this to wait until the +// first peer connection becomes active. +func (t *Transport) ActivePeers() (cnt int) { + t.mu.RLock() + defer t.mu.RUnlock() + for _, p := range t.peers { + if !p.activeSince().IsZero() { + cnt++ + } + } + return cnt +} + type nopTransporter struct{} func NewNopTransporter() Transporter { @@ -378,6 +394,7 @@ func (s *nopTransporter) RemovePeer(id types.ID) {} func (s *nopTransporter) RemoveAllPeers() {} func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {} func (s *nopTransporter) ActiveSince(id types.ID) time.Time { return time.Time{} } +func (s *nopTransporter) ActivePeers() int { return 0 } func (s *nopTransporter) Stop() {} func (s *nopTransporter) Pause() {} func (s *nopTransporter) Resume() {}