diff --git a/agent/consul/client_serf.go b/agent/consul/client_serf.go index 51ce0d4ef332..5b0c769fafb0 100644 --- a/agent/consul/client_serf.go +++ b/agent/consul/client_serf.go @@ -70,12 +70,11 @@ func (c *Client) lanEventHandler() { switch e.EventType() { case serf.EventMemberJoin: c.nodeJoin(e.(serf.MemberEvent)) - case serf.EventMemberLeave, serf.EventMemberFailed: + case serf.EventMemberLeave, serf.EventMemberFailed, serf.EventMemberReap: c.nodeFail(e.(serf.MemberEvent)) case serf.EventUser: c.localEvent(e.(serf.UserEvent)) case serf.EventMemberUpdate: // Ignore - case serf.EventMemberReap: // Ignore case serf.EventQuery: // Ignore default: c.logger.Printf("[WARN] consul: unhandled LAN Serf Event: %#v", e) diff --git a/agent/consul/client_test.go b/agent/consul/client_test.go index d28e3994ee81..3c82551b2cdf 100644 --- a/agent/consul/client_test.go +++ b/agent/consul/client_test.go @@ -99,6 +99,46 @@ func TestClient_JoinLAN(t *testing.T) { }) } +func TestClient_LANReap(t *testing.T) { + t.Parallel() + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + + dir2, c1 := testClientWithConfig(t, func(c *Config) { + c.Datacenter = "dc1" + c.SerfFloodInterval = 100 * time.Millisecond + c.SerfLANConfig.ReconnectTimeout = 250 * time.Millisecond + c.SerfLANConfig.ReapInterval = 500 * time.Millisecond + }) + defer os.RemoveAll(dir2) + defer c1.Shutdown() + + // Try to join + joinLAN(t, c1, s1) + testrpc.WaitForLeader(t, c1.RPC, "dc1") + + retry.Run(t, func(r *retry.R) { + require.Len(r, s1.LANMembers(), 2) + require.Len(r, c1.LANMembers(), 2) + }) + + // Check the router has both + retry.Run(t, func(r *retry.R) { + server := c1.routers.FindServer() + require.NotNil(t, server) + require.Equal(t, s1.config.NodeName, server.Name) + }) + + // shutdown the second dc + s1.Shutdown() + + retry.Run(t, func(r *retry.R) { + require.Len(r, c1.LANMembers(), 1) + server := c1.routers.FindServer() + require.Nil(t, server) + }) +} + func TestClient_JoinLAN_Invalid(t *testing.T) { t.Parallel() dir1, s1 := testServer(t) diff --git a/agent/consul/server_serf.go b/agent/consul/server_serf.go index 087188a4f21a..94020816de2a 100644 --- a/agent/consul/server_serf.go +++ b/agent/consul/server_serf.go @@ -137,12 +137,10 @@ func (s *Server) lanEventHandler() { s.lanNodeJoin(e.(serf.MemberEvent)) s.localMemberEvent(e.(serf.MemberEvent)) - case serf.EventMemberLeave, serf.EventMemberFailed: + case serf.EventMemberLeave, serf.EventMemberFailed, serf.EventMemberReap: s.lanNodeFailed(e.(serf.MemberEvent)) s.localMemberEvent(e.(serf.MemberEvent)) - case serf.EventMemberReap: - s.localMemberEvent(e.(serf.MemberEvent)) case serf.EventUser: s.localEvent(e.(serf.UserEvent)) case serf.EventMemberUpdate: diff --git a/agent/consul/server_test.go b/agent/consul/server_test.go index d89db26193f2..ddb44d489dbc 100644 --- a/agent/consul/server_test.go +++ b/agent/consul/server_test.go @@ -20,6 +20,8 @@ import ( "github.com/hashicorp/consul/testutil/retry" "github.com/hashicorp/consul/types" "github.com/hashicorp/go-uuid" + + "github.com/stretchr/testify/require" ) func configureTLS(config *Config) { @@ -235,6 +237,67 @@ func TestServer_JoinLAN(t *testing.T) { }) } +func TestServer_LANReap(t *testing.T) { + t.Parallel() + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc1" + c.Bootstrap = true + c.SerfFloodInterval = 100 * time.Millisecond + c.SerfLANConfig.ReconnectTimeout = 250 * time.Millisecond + c.SerfLANConfig.ReapInterval = 500 * time.Millisecond + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + dir2, s2 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc1" + c.Bootstrap = false + c.SerfFloodInterval = 100 * time.Millisecond + c.SerfLANConfig.ReconnectTimeout = 250 * time.Millisecond + c.SerfLANConfig.ReapInterval = 500 * time.Millisecond + }) + defer os.RemoveAll(dir2) + + dir3, s3 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc1" + c.Bootstrap = false + c.SerfFloodInterval = 100 * time.Millisecond + c.SerfLANConfig.ReconnectTimeout = 250 * time.Millisecond + c.SerfLANConfig.ReapInterval = 500 * time.Millisecond + }) + defer os.RemoveAll(dir3) + defer s3.Shutdown() + + // Try to join + joinLAN(t, s2, s1) + joinLAN(t, s3, s1) + + testrpc.WaitForLeader(t, s3.RPC, "dc1") + + retry.Run(t, func(r *retry.R) { + require.Len(r, s1.LANMembers(), 3) + require.Len(r, s2.LANMembers(), 3) + require.Len(r, s3.LANMembers(), 3) + }) + + // Check the router has both + retry.Run(t, func(r *retry.R) { + require.Len(r, s1.serverLookup.Servers(), 3) + require.Len(r, s2.serverLookup.Servers(), 3) + require.Len(r, s3.serverLookup.Servers(), 3) + }) + + // shutdown the second dc + s2.Shutdown() + + retry.Run(t, func(r *retry.R) { + require.Len(r, s1.LANMembers(), 2) + servers := s1.serverLookup.Servers() + require.Len(r, servers, 2) + // require.Equal(r, s1.config.NodeName, servers[0].Name) + }) +} + func TestServer_JoinWAN(t *testing.T) { t.Parallel() dir1, s1 := testServer(t) @@ -267,6 +330,46 @@ func TestServer_JoinWAN(t *testing.T) { }) } +func TestServer_WANReap(t *testing.T) { + t.Parallel() + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc1" + c.Bootstrap = true + c.SerfFloodInterval = 100 * time.Millisecond + c.SerfWANConfig.ReconnectTimeout = 250 * time.Millisecond + c.SerfWANConfig.ReapInterval = 500 * time.Millisecond + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + dir2, s2 := testServerDC(t, "dc2") + defer os.RemoveAll(dir2) + + // Try to join + joinWAN(t, s2, s1) + retry.Run(t, func(r *retry.R) { + require.Len(r, s1.WANMembers(), 2) + require.Len(r, s2.WANMembers(), 2) + }) + + // Check the router has both + retry.Run(t, func(r *retry.R) { + require.Len(r, s1.router.GetDatacenters(), 2) + require.Len(r, s2.router.GetDatacenters(), 2) + }) + + // shutdown the second dc + s2.Shutdown() + + retry.Run(t, func(r *retry.R) { + require.Len(r, s1.WANMembers(), 1) + datacenters := s1.router.GetDatacenters() + require.Len(r, datacenters, 1) + require.Equal(r, "dc1", datacenters[0]) + }) + +} + func TestServer_JoinWAN_Flood(t *testing.T) { t.Parallel() // Set up two servers in a WAN. diff --git a/agent/router/serf_adapter.go b/agent/router/serf_adapter.go index 12941b140a21..55c711a43538 100644 --- a/agent/router/serf_adapter.go +++ b/agent/router/serf_adapter.go @@ -53,7 +53,7 @@ func HandleSerfEvents(logger *log.Logger, router *Router, areaID types.AreaID, s case serf.EventMemberJoin: handleMemberEvent(logger, router.AddServer, areaID, e) - case serf.EventMemberLeave: + case serf.EventMemberLeave, serf.EventMemberReap: handleMemberEvent(logger, router.RemoveServer, areaID, e) case serf.EventMemberFailed: @@ -61,7 +61,6 @@ func HandleSerfEvents(logger *log.Logger, router *Router, areaID types.AreaID, s // All of these event types are ignored. case serf.EventMemberUpdate: - case serf.EventMemberReap: case serf.EventUser: case serf.EventQuery: