Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transfer leadership when establishLeadership fails #5247

Merged
merged 16 commits into from
Jun 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -1131,6 +1131,7 @@ func (a *Agent) consulConfig() (*consul.Config, error) {
}

// Setup the loggers
base.LogLevel = a.config.LogLevel
base.LogOutput = a.LogOutput

// This will set up the LAN keyring, as well as the WAN and any segments
Expand Down
3 changes: 3 additions & 0 deletions agent/consul/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ type Config struct {
// leader election.
ReconcileInterval time.Duration

// LogLevel is the level of the logs to write. Defaults to "INFO".
LogLevel string

// LogOutput is the location to write logs to. If this is not set,
// logs will go to stderr.
LogOutput io.Writer
Expand Down
99 changes: 72 additions & 27 deletions agent/consul/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,21 @@ func (s *Server) monitorLeadership() {
}
}

func (s *Server) leadershipTransfer() error {
retryCount := 3
for i := 0; i < retryCount; i++ {
freddygv marked this conversation as resolved.
Show resolved Hide resolved
future := s.raft.LeadershipTransfer()
if err := future.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to transfer leadership attempt %d/%d: %v", i, retryCount, err)
} else {
s.logger.Printf("[ERR] consul: successfully transferred leadership attempt %d/%d", i, retryCount)
return nil
}

}
return fmt.Errorf("failed to transfer leadership in %d attempts", retryCount)
}

// leaderLoop runs as long as we are the leader to run various
// maintenance activities
func (s *Server) leaderLoop(stopCh chan struct{}) {
Expand All @@ -142,19 +157,6 @@ func (s *Server) leaderLoop(stopCh chan struct{}) {
var reconcileCh chan serf.Member
establishedLeader := false

reassert := func() error {
if !establishedLeader {
return fmt.Errorf("leadership has not been established")
}
if err := s.revokeLeadership(); err != nil {
return err
}
if err := s.establishLeadership(); err != nil {
return err
}
return nil
}

RECONCILE:
// Setup a reconciliation timer
reconcileCh = nil
Expand All @@ -175,17 +177,22 @@ RECONCILE:
s.logger.Printf("[ERR] consul: failed to establish leadership: %v", err)
// Immediately revoke leadership since we didn't successfully
// establish leadership.
if err := s.revokeLeadership(); err != nil {
s.logger.Printf("[ERR] consul: failed to revoke leadership: %v", err)
s.revokeLeadership()

// attempt to transfer leadership. If successful it is
// time to leave the leaderLoop since this node is no
// longer the leader. If leadershipTransfer() fails, we
// will try to acquire it again after
// 5 seconds.
if err := s.leadershipTransfer(); err != nil {
s.logger.Printf("[ERR] consul: %v", err)
interval = time.After(5 * time.Second)
goto WAIT
}
goto WAIT
return
}
establishedLeader = true
defer func() {
if err := s.revokeLeadership(); err != nil {
s.logger.Printf("[ERR] consul: failed to revoke leadership: %v", err)
}
}()
defer s.revokeLeadership()
}

// Reconcile any missing data
Expand Down Expand Up @@ -223,7 +230,47 @@ WAIT:
case index := <-s.tombstoneGC.ExpireCh():
go s.reapTombstones(index)
case errCh := <-s.reassertLeaderCh:
errCh <- reassert()
// we can get into this state when the initial
// establishLeadership has failed as well as the follow
// up leadershipTransfer. Afterwards we will be waiting
// for the interval to trigger a reconciliation and can
// potentially end up here. There is no point to
// reassert because this agent was never leader in the
// first place.
if !establishedLeader {
errCh <- fmt.Errorf("leadership has not been established")
continue
}

// continue to reassert only if we previously were the
// leader, which means revokeLeadership followed by an
// establishLeadership().
s.revokeLeadership()
err := s.establishLeadership()
errCh <- err
freddygv marked this conversation as resolved.
Show resolved Hide resolved

// in case establishLeadership failed, we will try to
// transfer leadership. At this time raft thinks we are
// the leader, but consul disagrees.
if err != nil {
if err := s.leadershipTransfer(); err != nil {
// establishedLeader was true before,
// but it no longer is since it revoked
// leadership and Leadership transfer
// also failed. Which is why it stays
// in the leaderLoop, but now
// establishedLeader needs to be set to
// false.
establishedLeader = false
interval = time.After(5 * time.Second)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is OK because we'll wait for 5 seconds in next wait loop and then timeout and goto RECONCILE. Right after that label we will hit interval := time.After(s.config.ReconcileInterval) again which presumably will set the interval back again.

I must admit I'm a little confused about how := works after a GOTO - it's reassigning the same variable name but in the same scope on subsequent jumps? I wonder if this is some strange variant of variable shadowing even though they are in the same scope? Maybe Go just has a special case to allow this when using GOTO but not in serial code? If it works I guess it's fine 😄

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what you mean. WAIT is well after the RECONCILE and the interval variable declaration and the code should just be using the same variable.

I reproed your question in a playground: https://play.golang.org/p/6AqssHXg3Wt. If you jump before a variable declaration golang will create a new variable. Or did you mean something else?

Copy link
Member

@banks banks Jun 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I did a similar repro and it's fine, just was a strange one.

The path here is:

  • we set interval to be a timer chan that will go off in 5 seconds
  • we goto WAIT which enters a select on a few things including that chan
  • when the chan fires, that select branch does goto RECONCILE which immeidately re-assigns a timer chan for the original ReconcileInterval (using :=).

My original concern was that we might end up regaining leadership and then doing reconcile every 5 seconds after that but it's not the case due to the path mentioned above.

It also occurs to me that we always have had a re-assignment after goto RECONCILE so it's not really any different than before, it's just that was the only assignment before and I wondered if some strange form of shadowing might cause issues. That appears not to be the case so I think this is fine!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For fun - Go does create a new variable with the same name.

You can see that here: https://play.golang.org/p/FU0ZxictDXE capturing the variable in a lambda and then looping with GOTO leaves the lamda holding the original value not the redefined one after the GOTO jump.

It's just weird to me because it's in the same scope - shadowing across scopes seems fine but this seems to be a special case you can't normally do outside of GOTO.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow, now I see what you mean.

goto WAIT
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So now we attempt to transfer 3 times but if it fails we still hang out in non-leader limbo land for a bit before retrying?

I guess this is what i mentioned as "retry indefinitiely" and it should really work immediately if rest of the cluster is in an OK state so I think this is good.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thinking was that since leadershipTransfer failed, we try to establishLeadership again. I think in general establishLeadership is more likely to succeed than transferLeadership. I think if I make the interval smaller - like 5 seconds - before it retries it is the better solution than trying to transfer leadership indefinitely.

}

// leadershipTransfer was successful and it is
// time to leave the leaderLoop.
return
}

}
}
}
Expand Down Expand Up @@ -290,15 +337,13 @@ func (s *Server) establishLeadership() error {

// revokeLeadership is invoked once we step down as leader.
// This is used to cleanup any state that may be specific to a leader.
func (s *Server) revokeLeadership() error {
func (s *Server) revokeLeadership() {
// Disable the tombstone GC, since it is only useful as a leader
s.tombstoneGC.SetEnabled(false)

// Clear the session timers on either shutdown or step down, since we
// are no longer responsible for session expirations.
if err := s.clearAllSessionTimers(); err != nil {
return err
}
s.clearAllSessionTimers()

s.stopConfigReplication()

Expand All @@ -313,8 +358,8 @@ func (s *Server) revokeLeadership() error {
s.stopACLUpgrade()

s.resetConsistentReadReady()

s.autopilot.Stop()
return nil
}

// DEPRECATED (ACL-Legacy-Compat) - Remove once old ACL compatibility is removed
Expand Down
9 changes: 8 additions & 1 deletion agent/consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/hashicorp/consul/sentinel"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/raft"
raftboltdb "github.com/hashicorp/raft-boltdb"
"github.com/hashicorp/serf/serf"
Expand Down Expand Up @@ -540,7 +541,13 @@ func (s *Server) setupRaft() error {

// Make sure we set the LogOutput.
s.config.RaftConfig.LogOutput = s.config.LogOutput
s.config.RaftConfig.Logger = s.logger
raftLogger := hclog.New(&hclog.LoggerOptions{
Name: "raft",
Level: hclog.LevelFromString(s.config.LogLevel),
Output: s.config.LogOutput,
TimeFormat: `2006/01/02 15:04:05`,
})
s.config.RaftConfig.Logger = raftLogger
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Raft logging changed and we need to provide the hclogger now.


// Versions of the Raft protocol below 3 require the LocalID to match the network
// address of the transport.
Expand Down
10 changes: 2 additions & 8 deletions agent/consul/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -962,14 +962,8 @@ func TestServer_RevokeLeadershipIdempotent(t *testing.T) {

testrpc.WaitForLeader(t, s1.RPC, "dc1")

err := s1.revokeLeadership()
if err != nil {
t.Fatal(err)
}
err = s1.revokeLeadership()
if err != nil {
t.Fatal(err)
}
s1.revokeLeadership()
s1.revokeLeadership()
}

func TestServer_Reload(t *testing.T) {
Expand Down
3 changes: 1 addition & 2 deletions agent/consul/session_ttl.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,8 @@ func (s *Server) clearSessionTimer(id string) error {

// clearAllSessionTimers is used when a leader is stepping
// down and we no longer need to track any session timers.
func (s *Server) clearAllSessionTimers() error {
func (s *Server) clearAllSessionTimers() {
s.sessionTimers.StopAll()
return nil
}

// sessionStats is a long running routine used to capture
Expand Down
5 changes: 1 addition & 4 deletions agent/consul/session_ttl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,10 +281,7 @@ func TestClearAllSessionTimers(t *testing.T) {
s1.createSessionTimer("bar", 10*time.Millisecond)
s1.createSessionTimer("baz", 10*time.Millisecond)

err := s1.clearAllSessionTimers()
if err != nil {
t.Fatalf("err: %v", err)
}
s1.clearAllSessionTimers()

// sessionTimers is guarded by the lock
if s1.sessionTimers.Len() != 0 {
Expand Down
21 changes: 6 additions & 15 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ replace github.com/hashicorp/consul/sdk => ./sdk
require (
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 // indirect
github.com/Azure/go-autorest v10.15.3+incompatible // indirect
github.com/DataDog/datadog-go v0.0.0-20160329135253-cc2f4770f4d6 // indirect
github.com/Jeffail/gabs v1.1.0 // indirect
github.com/Microsoft/go-winio v0.4.3 // indirect
github.com/NYTimes/gziphandler v1.0.1
Expand All @@ -18,16 +17,13 @@ require (
github.com/SermoDigital/jose v0.0.0-20180104203859-803625baeddc // indirect
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6 // indirect
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310
github.com/asaskevich/govalidator v0.0.0-20180319081651-7d2e70ef918f // indirect
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 // indirect
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 // indirect
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
github.com/boltdb/bolt v1.3.1 // indirect
github.com/cenkalti/backoff v2.1.1+incompatible // indirect
github.com/circonus-labs/circonus-gometrics v0.0.0-20161109192337-d17a8420c36e // indirect
github.com/circonus-labs/circonusllhist v0.0.0-20161110002650-365d370cc145 // indirect
github.com/containerd/continuity v0.0.0-20181203112020-004b46473808 // indirect
github.com/coredns/coredns v1.1.2
github.com/denisenkom/go-mssqldb v0.0.0-20180620032804-94c9c97e8c9f // indirect
Expand Down Expand Up @@ -58,12 +54,11 @@ require (
github.com/hashicorp/go-checkpoint v0.0.0-20171009173528-1545e56e46de
github.com/hashicorp/go-cleanhttp v0.5.1
github.com/hashicorp/go-discover v0.0.0-20190403160810-22221edb15cd
github.com/hashicorp/go-hclog v0.0.0-20180402200405-69ff559dc25f // indirect
github.com/hashicorp/go-hclog v0.9.1
github.com/hashicorp/go-memdb v0.0.0-20180223233045-1289e7fffe71
github.com/hashicorp/go-msgpack v0.5.4
github.com/hashicorp/go-msgpack v0.5.5
github.com/hashicorp/go-multierror v1.0.0
github.com/hashicorp/go-plugin v0.0.0-20180331002553-e8d22c780116
github.com/hashicorp/go-retryablehttp v0.0.0-20180531211321-3b087ef2d313 // indirect
github.com/hashicorp/go-rootcerts v1.0.0
github.com/hashicorp/go-sockaddr v1.0.0
github.com/hashicorp/go-syslog v1.0.0
Expand All @@ -76,7 +71,7 @@ require (
github.com/hashicorp/mdns v1.0.1 // indirect
github.com/hashicorp/memberlist v0.1.4
github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69
github.com/hashicorp/raft v1.0.1-0.20190409200437-d9fe23f7d472
github.com/hashicorp/raft v1.1.0
github.com/hashicorp/raft-boltdb v0.0.0-20150201200839-d1e82c1ec3f1
github.com/hashicorp/serf v0.8.2
github.com/hashicorp/vault v0.10.3
Expand All @@ -89,7 +84,6 @@ require (
github.com/kr/text v0.1.0
github.com/lib/pq v0.0.0-20180523175426-90697d60dd84 // indirect
github.com/lyft/protoc-gen-validate v0.0.0-20180911180927-64fcb82c878e // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/miekg/dns v1.0.14
github.com/mitchellh/cli v1.0.0
github.com/mitchellh/copystructure v0.0.0-20160804032330-cdac8253d00f
Expand All @@ -103,13 +97,10 @@ require (
github.com/opencontainers/image-spec v1.0.1 // indirect
github.com/opencontainers/runc v0.1.1 // indirect
github.com/ory/dockertest v3.3.4+incompatible // indirect
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c
github.com/pascaldekloe/goe v0.1.0
github.com/patrickmn/go-cache v0.0.0-20180527043350-9f6ff22cfff8 // indirect
github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v0.0.0-20180328130430-f504d69affe1
github.com/prometheus/client_model v0.0.0-20171117100541-99fa1f4be8e5 // indirect
github.com/prometheus/common v0.0.0-20180326160409-38c53a9f4bfc // indirect
github.com/prometheus/procfs v0.0.0-20180408092902-8b1c2da0d56d // indirect
github.com/prometheus/client_golang v0.9.2
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f
github.com/ryanuber/go-glob v0.0.0-20170128012129-256dc444b735 // indirect
github.com/shirou/gopsutil v0.0.0-20181107111621-48177ef5f880
Expand Down
Loading