From cb4b2b157ab3e739ef4bca4aed5b66cfa9dab598 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Thu, 5 Oct 2017 21:52:36 -0700 Subject: [PATCH] Fixes handling of stop channel and failed barrier attempts. There were two issues here. First, we needed to not exit when there was a timeout trying to write the barrier, because Raft might not step down, so we'd be left as the leader but having run all the step down actions. Second, we didn't close over the stopCh correctly, so it was possible to nil that out and have the leaderLoop never exit. We close over it properly AND sequence the nil-ing of it AFTER the leaderLoop exits for good measure, so the code is more robust. Fixes #3545 --- agent/consul/leader.go | 35 ++++++++++++++++++++++++++++++----- 1 file changed, 30 insertions(+), 5 deletions(-) diff --git a/agent/consul/leader.go b/agent/consul/leader.go index e4e84b4a4a2e..dc2281b63b1b 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -33,25 +33,41 @@ func (s *Server) monitorLeadership() { // cleanup and to ensure we never run multiple leader loops. raftNotifyCh := s.raftNotifyCh + // This wait group will ensure that only one leader loop will ever be + // running at any given time. We need to run that in a goroutine so we + // can look for other notifications (loss of leadership) which we pass + // along by closing the stopCh given to the current leader loop. var wg sync.WaitGroup var stopCh chan struct{} for { select { case isLeader := <-raftNotifyCh: if isLeader { + if stopCh != nil { + s.logger.Printf("[ERR] consul: attempted to start the leader loop while running") + continue + } + stopCh = make(chan struct{}) wg.Add(1) - go func() { + go func(stopCh chan struct{}) { s.leaderLoop(stopCh) wg.Done() - }() + }(stopCh) s.logger.Printf("[INFO] consul: cluster leadership acquired") - } else if stopCh != nil { + } else { + if stopCh == nil { + s.logger.Printf("[ERR] consul: attempted to stop the leader loop while not running") + continue + } + + s.logger.Printf("[DEBUG] consul: shutting down leader loop") close(stopCh) - stopCh = nil wg.Wait() + stopCh = nil s.logger.Printf("[INFO] consul: cluster leadership lost") } + case <-s.shutdownCh: return } @@ -97,7 +113,7 @@ RECONCILE: barrier := s.raft.Barrier(barrierWriteTimeout) if err := barrier.Error(); err != nil { s.logger.Printf("[ERR] consul: failed to wait for barrier: %v", err) - return + goto WAIT } metrics.MeasureSince([]string{"consul", "leader", "barrier"}, start) metrics.MeasureSince([]string{"leader", "barrier"}, start) @@ -127,6 +143,15 @@ RECONCILE: reconcileCh = s.reconcileCh WAIT: + // Poll the stop channel to give it priority so we don't waste time + // trying to perform the other operations if we have been asked to shut + // down. + select { + case <-stopCh: + return + default: + } + // Periodically reconcile as long as we are the leader, // or when Serf events arrive for {