Skip to content

Commit

Permalink
Manually backport etcdserver/raft.go tickMu fix to 3.1
Browse files Browse the repository at this point in the history
  • Loading branch information
jpbetz committed Mar 28, 2018
1 parent 8eeab58 commit b02a96b
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 2 deletions.
13 changes: 11 additions & 2 deletions etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ type raftNode struct {
lead uint64

mu sync.Mutex

tickMu sync.Mutex
// last lead elected time
lt time.Time

Expand Down Expand Up @@ -129,6 +131,13 @@ type raftNode struct {
done chan struct{}
}

// raft.Node does not have locks in Raft package
func (r *raftNode) tick() {
r.tickMu.Lock()
r.Tick()
r.tickMu.Unlock()
}

// start prepares and starts raftNode in a new goroutine. It is no longer safe
// to modify the fields after it has been started.
func (r *raftNode) start(rh *raftReadyHandler) {
Expand All @@ -144,7 +153,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
for {
select {
case <-r.ticker:
r.Tick()
r.tick()
case rd := <-r.Ready():
if rd.SoftState != nil {
if lead := atomic.LoadUint64(&r.lead); rd.SoftState.Lead != raft.None && lead != rd.SoftState.Lead {
Expand Down Expand Up @@ -327,7 +336,7 @@ func (r *raftNode) resumeSending() {
// speeding up election process.
func (r *raftNode) advanceTicks(ticks int) {
for i := 0; i < ticks; i++ {
r.Tick()
r.tick()
}
}

Expand Down
39 changes: 39 additions & 0 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,11 +506,50 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
return srv, nil
}

func (s *EtcdServer) adjustTicks() {
clusterN := len(s.cluster.Members())

// single-node fresh start, or single-node recovers from snapshot
if clusterN == 1 {
ticks := s.Cfg.ElectionTicks - 1
plog.Infof("%s as single-node; fast-forwarding %d ticks (election ticks %d)", s.ID(), ticks, s.Cfg.ElectionTicks)
s.r.advanceTicks(ticks)
return
}

// retry up to "rafthttp.ConnReadTimeout", which is 5-sec
// until peer connection reports; otherwise:
// 1. all connections failed, or
// 2. no active peers, or
// 3. restarted single-node with no snapshot
// then, do nothing, because advancing ticks would have no effect
waitTime := rafthttp.ConnReadTimeout
itv := 50 * time.Millisecond
for i := int64(0); i < int64(waitTime/itv); i++ {
select {
case <-time.After(itv):
case <-s.stopping:
return
}

peerN := s.r.transport.ActivePeers()
if peerN > 1 {
// multi-node received peer connection reports
// adjust ticks, in case slow leader message receive
ticks := s.Cfg.ElectionTicks - 2
plog.Infof("%s initialzed peer connection; fast-forwarding %d ticks (election ticks %d) with %d active peer(s)", s.ID(), ticks, s.Cfg.ElectionTicks, peerN)
s.r.advanceTicks(ticks)
return
}
}
}

// Start prepares and starts server in a new goroutine. It is no longer safe to
// modify a server's fields after it has been sent to Start.
// It also starts a goroutine to publish its server information.
func (s *EtcdServer) Start() {
s.start()
s.goAttach(func() { s.adjustTicks() })
s.goAttach(func() { s.publish(s.Cfg.ReqTimeout()) })
s.goAttach(s.purgeFile)
s.goAttach(func() { monitorFileDescriptor(s.stopping) })
Expand Down

0 comments on commit b02a96b

Please sign in to comment.