Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Call RemoveServer for reap events #5317

Merged
merged 7 commits into from
Mar 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions agent/consul/client_serf.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,11 @@ func (c *Client) lanEventHandler() {
switch e.EventType() {
case serf.EventMemberJoin:
c.nodeJoin(e.(serf.MemberEvent))
case serf.EventMemberLeave, serf.EventMemberFailed:
case serf.EventMemberLeave, serf.EventMemberFailed, serf.EventMemberReap:
c.nodeFail(e.(serf.MemberEvent))
case serf.EventUser:
c.localEvent(e.(serf.UserEvent))
case serf.EventMemberUpdate: // Ignore
case serf.EventMemberReap: // Ignore
case serf.EventQuery: // Ignore
default:
c.logger.Printf("[WARN] consul: unhandled LAN Serf Event: %#v", e)
Expand Down
40 changes: 40 additions & 0 deletions agent/consul/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,46 @@ func TestClient_JoinLAN(t *testing.T) {
})
}

func TestClient_LANReap(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)

dir2, c1 := testClientWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.SerfFloodInterval = 100 * time.Millisecond
c.SerfLANConfig.ReconnectTimeout = 250 * time.Millisecond
c.SerfLANConfig.ReapInterval = 500 * time.Millisecond
})
defer os.RemoveAll(dir2)
defer c1.Shutdown()

// Try to join
joinLAN(t, c1, s1)
testrpc.WaitForLeader(t, c1.RPC, "dc1")

retry.Run(t, func(r *retry.R) {
require.Len(r, s1.LANMembers(), 2)
require.Len(r, c1.LANMembers(), 2)
})

// Check the router has both
retry.Run(t, func(r *retry.R) {
server := c1.routers.FindServer()
require.NotNil(t, server)
require.Equal(t, s1.config.NodeName, server.Name)
})

// shutdown the second dc
s1.Shutdown()

retry.Run(t, func(r *retry.R) {
require.Len(r, c1.LANMembers(), 1)
server := c1.routers.FindServer()
require.Nil(t, server)
})
}

func TestClient_JoinLAN_Invalid(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
Expand Down
4 changes: 1 addition & 3 deletions agent/consul/server_serf.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,10 @@ func (s *Server) lanEventHandler() {
s.lanNodeJoin(e.(serf.MemberEvent))
s.localMemberEvent(e.(serf.MemberEvent))

case serf.EventMemberLeave, serf.EventMemberFailed:
case serf.EventMemberLeave, serf.EventMemberFailed, serf.EventMemberReap:
s.lanNodeFailed(e.(serf.MemberEvent))
s.localMemberEvent(e.(serf.MemberEvent))

case serf.EventMemberReap:
s.localMemberEvent(e.(serf.MemberEvent))
case serf.EventUser:
s.localEvent(e.(serf.UserEvent))
case serf.EventMemberUpdate:
Expand Down
103 changes: 103 additions & 0 deletions agent/consul/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/hashicorp/consul/testutil/retry"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/go-uuid"

"github.com/stretchr/testify/require"
)

func configureTLS(config *Config) {
Expand Down Expand Up @@ -235,6 +237,67 @@ func TestServer_JoinLAN(t *testing.T) {
})
}

func TestServer_LANReap(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.Bootstrap = true
c.SerfFloodInterval = 100 * time.Millisecond
c.SerfLANConfig.ReconnectTimeout = 250 * time.Millisecond
c.SerfLANConfig.ReapInterval = 500 * time.Millisecond
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()

dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.Bootstrap = false
c.SerfFloodInterval = 100 * time.Millisecond
c.SerfLANConfig.ReconnectTimeout = 250 * time.Millisecond
c.SerfLANConfig.ReapInterval = 500 * time.Millisecond
})
defer os.RemoveAll(dir2)

dir3, s3 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.Bootstrap = false
c.SerfFloodInterval = 100 * time.Millisecond
c.SerfLANConfig.ReconnectTimeout = 250 * time.Millisecond
c.SerfLANConfig.ReapInterval = 500 * time.Millisecond
})
defer os.RemoveAll(dir3)
defer s3.Shutdown()

// Try to join
joinLAN(t, s2, s1)
joinLAN(t, s3, s1)

testrpc.WaitForLeader(t, s3.RPC, "dc1")

retry.Run(t, func(r *retry.R) {
require.Len(r, s1.LANMembers(), 3)
require.Len(r, s2.LANMembers(), 3)
require.Len(r, s3.LANMembers(), 3)
})

// Check the router has both
retry.Run(t, func(r *retry.R) {
require.Len(r, s1.serverLookup.Servers(), 3)
require.Len(r, s2.serverLookup.Servers(), 3)
require.Len(r, s3.serverLookup.Servers(), 3)
})

// shutdown the second dc
s2.Shutdown()

retry.Run(t, func(r *retry.R) {
require.Len(r, s1.LANMembers(), 2)
servers := s1.serverLookup.Servers()
require.Len(r, servers, 2)
// require.Equal(r, s1.config.NodeName, servers[0].Name)
})
}

func TestServer_JoinWAN(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
Expand Down Expand Up @@ -267,6 +330,46 @@ func TestServer_JoinWAN(t *testing.T) {
})
}

func TestServer_WANReap(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.Bootstrap = true
c.SerfFloodInterval = 100 * time.Millisecond
c.SerfWANConfig.ReconnectTimeout = 250 * time.Millisecond
c.SerfWANConfig.ReapInterval = 500 * time.Millisecond
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()

dir2, s2 := testServerDC(t, "dc2")
defer os.RemoveAll(dir2)

// Try to join
joinWAN(t, s2, s1)
retry.Run(t, func(r *retry.R) {
require.Len(r, s1.WANMembers(), 2)
require.Len(r, s2.WANMembers(), 2)
})

// Check the router has both
retry.Run(t, func(r *retry.R) {
require.Len(r, s1.router.GetDatacenters(), 2)
require.Len(r, s2.router.GetDatacenters(), 2)
})

// shutdown the second dc
s2.Shutdown()

retry.Run(t, func(r *retry.R) {
require.Len(r, s1.WANMembers(), 1)
datacenters := s1.router.GetDatacenters()
require.Len(r, datacenters, 1)
require.Equal(r, "dc1", datacenters[0])
})

}

func TestServer_JoinWAN_Flood(t *testing.T) {
t.Parallel()
// Set up two servers in a WAN.
Expand Down
3 changes: 1 addition & 2 deletions agent/router/serf_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,14 @@ func HandleSerfEvents(logger *log.Logger, router *Router, areaID types.AreaID, s
case serf.EventMemberJoin:
handleMemberEvent(logger, router.AddServer, areaID, e)

case serf.EventMemberLeave:
case serf.EventMemberLeave, serf.EventMemberReap:
handleMemberEvent(logger, router.RemoveServer, areaID, e)

case serf.EventMemberFailed:
handleMemberEvent(logger, router.FailServer, areaID, e)

// All of these event types are ignored.
case serf.EventMemberUpdate:
case serf.EventMemberReap:
case serf.EventUser:
case serf.EventQuery:

Expand Down