Skip to content

Commit

Permalink
Fixes handling of stop channel and failed barrier attempts.
Browse files Browse the repository at this point in the history
There were two issues here. First, we needed to not exit when there
was a timeout trying to write the barrier, because Raft might not
step down, so we'd be left as the leader but having run all the step
down actions.

Second, we didn't close over the stopCh correctly, so it was possible
to nil that out and have the leaderLoop never exit. We close over it
properly AND sequence the nil-ing of it AFTER the leaderLoop exits for
good measure, so the code is more robust.

Fixes #3545
  • Loading branch information
slackpad committed Oct 6, 2017
1 parent 8e19eb3 commit cb4b2b1
Showing 1 changed file with 30 additions and 5 deletions.
35 changes: 30 additions & 5 deletions agent/consul/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,41 @@ func (s *Server) monitorLeadership() {
// cleanup and to ensure we never run multiple leader loops.
raftNotifyCh := s.raftNotifyCh

// This wait group will ensure that only one leader loop will ever be
// running at any given time. We need to run that in a goroutine so we
// can look for other notifications (loss of leadership) which we pass
// along by closing the stopCh given to the current leader loop.
var wg sync.WaitGroup
var stopCh chan struct{}
for {
select {
case isLeader := <-raftNotifyCh:
if isLeader {
if stopCh != nil {
s.logger.Printf("[ERR] consul: attempted to start the leader loop while running")
continue
}

stopCh = make(chan struct{})
wg.Add(1)
go func() {
go func(stopCh chan struct{}) {
s.leaderLoop(stopCh)
wg.Done()
}()
}(stopCh)
s.logger.Printf("[INFO] consul: cluster leadership acquired")
} else if stopCh != nil {
} else {
if stopCh == nil {
s.logger.Printf("[ERR] consul: attempted to stop the leader loop while not running")
continue
}

s.logger.Printf("[DEBUG] consul: shutting down leader loop")
close(stopCh)
stopCh = nil
wg.Wait()
stopCh = nil
s.logger.Printf("[INFO] consul: cluster leadership lost")
}

case <-s.shutdownCh:
return
}
Expand Down Expand Up @@ -97,7 +113,7 @@ RECONCILE:
barrier := s.raft.Barrier(barrierWriteTimeout)
if err := barrier.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to wait for barrier: %v", err)
return
goto WAIT
}
metrics.MeasureSince([]string{"consul", "leader", "barrier"}, start)
metrics.MeasureSince([]string{"leader", "barrier"}, start)
Expand Down Expand Up @@ -127,6 +143,15 @@ RECONCILE:
reconcileCh = s.reconcileCh

WAIT:
// Poll the stop channel to give it priority so we don't waste time
// trying to perform the other operations if we have been asked to shut
// down.
select {
case <-stopCh:
return
default:
}

// Periodically reconcile as long as we are the leader,
// or when Serf events arrive
for {
Expand Down

0 comments on commit cb4b2b1

Please sign in to comment.