Skip to content

Commit

Permalink
Switches to reliable Raft leader notifications.
Browse files Browse the repository at this point in the history
This fixes #2896 by switching to the `notifyCh` instead of the `leaderCh`,
so we get all up/down events from Raft regarding leadership. We also wait
for the old leader loop to shut down before we ever consider starting a
new one, which keeps that single-threaded and fixes the panic in that issue.
  • Loading branch information
slackpad committed Apr 13, 2017
1 parent ef6a42a commit 9654ee6
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 2 deletions.
16 changes: 14 additions & 2 deletions consul/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net"
"strconv"
"strings"
"sync"
"time"

"github.com/armon/go-metrics"
Expand All @@ -29,18 +30,29 @@ const (
// as the leader in the Raft cluster. There is some work the leader is
// expected to do, so we must react to changes
func (s *Server) monitorLeadership() {
leaderCh := s.raft.LeaderCh()
// We use the notify channel we configured Raft with, NOT Raft's
// leaderCh, which is only notified best-effort. Doing this ensures
// that we get all notifications in order, which is required for
// cleanup and to ensure we never run multiple leader loops.
leaderCh := s.leaderCh

var wg sync.WaitGroup
var stopCh chan struct{}
for {
select {
case isLeader := <-leaderCh:
if isLeader {
stopCh = make(chan struct{})
go s.leaderLoop(stopCh)
wg.Add(1)
go func() {
s.leaderLoop(stopCh)
wg.Done()
}()
s.logger.Printf("[INFO] consul: cluster leadership acquired")
} else if stopCh != nil {
close(stopCh)
stopCh = nil
wg.Wait()
s.logger.Printf("[INFO] consul: cluster leadership lost")
}
case <-s.shutdownCh:
Expand Down
9 changes: 9 additions & 0 deletions consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ type Server struct {
raftTransport *raft.NetworkTransport
raftInmem *raft.InmemStore

// leaderCh set up by setupRaft() and ensures that we get reliable leader
// transition notifications from the Raft layer.
leaderCh <-chan bool

// reconcileCh is used to pass events from the serf handler
// into the leader manager, so that the strong state can be
// updated
Expand Down Expand Up @@ -554,6 +558,11 @@ func (s *Server) setupRaft() error {
}
}

// Set up a channel for reliable leader notifications.
leaderCh := make(chan bool, 1)
s.config.RaftConfig.NotifyCh = leaderCh
s.leaderCh = leaderCh

// Setup the Raft store.
s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, log, stable, snap, trans)
if err != nil {
Expand Down

0 comments on commit 9654ee6

Please sign in to comment.