Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allows disabling WAN federation by setting serf WAN port to -1 #3984

Merged
merged 4 commits into from
Mar 27, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,16 +703,21 @@ func (a *Agent) consulConfig() (*consul.Config, error) {
base.SerfLANConfig.MemberlistConfig.ProbeTimeout = a.config.ConsulSerfLANProbeTimeout
base.SerfLANConfig.MemberlistConfig.SuspicionMult = a.config.ConsulSerfLANSuspicionMult

base.SerfWANConfig.MemberlistConfig.BindAddr = a.config.SerfBindAddrWAN.IP.String()
base.SerfWANConfig.MemberlistConfig.BindPort = a.config.SerfBindAddrWAN.Port
base.SerfWANConfig.MemberlistConfig.AdvertiseAddr = a.config.SerfAdvertiseAddrWAN.IP.String()
base.SerfWANConfig.MemberlistConfig.AdvertisePort = a.config.SerfAdvertiseAddrWAN.Port
base.SerfWANConfig.MemberlistConfig.GossipVerifyIncoming = a.config.EncryptVerifyIncoming
base.SerfWANConfig.MemberlistConfig.GossipVerifyOutgoing = a.config.EncryptVerifyOutgoing
base.SerfWANConfig.MemberlistConfig.GossipInterval = a.config.ConsulSerfWANGossipInterval
base.SerfWANConfig.MemberlistConfig.ProbeInterval = a.config.ConsulSerfWANProbeInterval
base.SerfWANConfig.MemberlistConfig.ProbeTimeout = a.config.ConsulSerfWANProbeTimeout
base.SerfWANConfig.MemberlistConfig.SuspicionMult = a.config.ConsulSerfWANSuspicionMult
if a.config.SerfBindAddrWAN != nil {
base.SerfWANConfig.MemberlistConfig.BindAddr = a.config.SerfBindAddrWAN.IP.String()
base.SerfWANConfig.MemberlistConfig.BindPort = a.config.SerfBindAddrWAN.Port
base.SerfWANConfig.MemberlistConfig.AdvertiseAddr = a.config.SerfAdvertiseAddrWAN.IP.String()
base.SerfWANConfig.MemberlistConfig.AdvertisePort = a.config.SerfAdvertiseAddrWAN.Port
base.SerfWANConfig.MemberlistConfig.GossipVerifyIncoming = a.config.EncryptVerifyIncoming
base.SerfWANConfig.MemberlistConfig.GossipVerifyOutgoing = a.config.EncryptVerifyOutgoing
base.SerfWANConfig.MemberlistConfig.GossipInterval = a.config.ConsulSerfWANGossipInterval
base.SerfWANConfig.MemberlistConfig.ProbeInterval = a.config.ConsulSerfWANProbeInterval
base.SerfWANConfig.MemberlistConfig.ProbeTimeout = a.config.ConsulSerfWANProbeTimeout
base.SerfWANConfig.MemberlistConfig.SuspicionMult = a.config.ConsulSerfWANSuspicionMult
} else {
// Disable serf WAN federation
base.SerfWANConfig = nil
}

base.RPCAddr = a.config.RPCBindAddr
base.RPCAdvertise = a.config.RPCAdvertiseAddr
Expand Down Expand Up @@ -1019,6 +1024,7 @@ func (a *Agent) setupNodeID(config *config.RuntimeConfig) error {
func (a *Agent) setupBaseKeyrings(config *consul.Config) error {
// If the keyring file is disabled then just poke the provided key
// into the in-memory keyring.
fedarationEnabled := config.SerfWANConfig != nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be "federationEnabled"

if a.config.DisableKeyringFile {
if a.config.EncryptKey == "" {
return nil
Expand All @@ -1028,7 +1034,7 @@ func (a *Agent) setupBaseKeyrings(config *consul.Config) error {
if err := loadKeyring(config.SerfLANConfig, keys); err != nil {
return err
}
if a.config.ServerMode {
if a.config.ServerMode && fedarationEnabled {
if err := loadKeyring(config.SerfWANConfig, keys); err != nil {
return err
}
Expand All @@ -1048,7 +1054,7 @@ func (a *Agent) setupBaseKeyrings(config *consul.Config) error {
return err
}
}
if a.config.ServerMode {
if a.config.ServerMode && fedarationEnabled {
if _, err := os.Stat(fileWAN); err != nil {
if err := initKeyring(fileWAN, a.config.EncryptKey); err != nil {
return err
Expand All @@ -1063,7 +1069,7 @@ LOAD:
if err := loadKeyringFile(config.SerfLANConfig); err != nil {
return err
}
if a.config.ServerMode {
if a.config.ServerMode && fedarationEnabled {
if _, err := os.Stat(fileWAN); err == nil {
config.SerfWANConfig.KeyringFile = fileWAN
}
Expand Down
23 changes: 16 additions & 7 deletions agent/config/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,9 +369,6 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
if ipaddr.IsAny(b.stringVal(c.AdvertiseAddrWAN)) {
return RuntimeConfig{}, fmt.Errorf("Advertise WAN address cannot be 0.0.0.0, :: or [::]")
}
if serfPortWAN < 0 {
return RuntimeConfig{}, fmt.Errorf("ports.serf_wan must be a valid port from 1 to 65535")
}

bindAddr := bindAddrs[0].(*net.IPAddr)
advertiseAddr := b.makeIPAddr(b.expandFirstIP("advertise_addr", c.AdvertiseAddrLAN), bindAddr)
Expand Down Expand Up @@ -411,14 +408,23 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
// derive other bind addresses from the bindAddr
rpcBindAddr := b.makeTCPAddr(bindAddr, nil, serverPort)
serfBindAddrLAN := b.makeTCPAddr(b.expandFirstIP("serf_lan", c.SerfBindAddrLAN), bindAddr, serfPortLAN)
serfBindAddrWAN := b.makeTCPAddr(b.expandFirstIP("serf_wan", c.SerfBindAddrWAN), bindAddr, serfPortWAN)

// Only initialize serf WAN bind address when its enabled
var serfBindAddrWAN *net.TCPAddr
if serfPortWAN >= 0 {
serfBindAddrWAN = b.makeTCPAddr(b.expandFirstIP("serf_wan", c.SerfBindAddrWAN), bindAddr, serfPortWAN)
}

// derive other advertise addresses from the advertise address
advertiseAddrLAN := b.makeIPAddr(b.expandFirstIP("advertise_addr", c.AdvertiseAddrLAN), advertiseAddr)
advertiseAddrWAN := b.makeIPAddr(b.expandFirstIP("advertise_addr_wan", c.AdvertiseAddrWAN), advertiseAddrLAN)
rpcAdvertiseAddr := &net.TCPAddr{IP: advertiseAddrLAN.IP, Port: serverPort}
serfAdvertiseAddrLAN := &net.TCPAddr{IP: advertiseAddrLAN.IP, Port: serfPortLAN}
serfAdvertiseAddrWAN := &net.TCPAddr{IP: advertiseAddrWAN.IP, Port: serfPortWAN}
// Only initialize serf WAN advertise address when its enabled
var serfAdvertiseAddrWAN *net.TCPAddr
if serfPortWAN >= 0 {
serfAdvertiseAddrWAN = &net.TCPAddr{IP: advertiseAddrWAN.IP, Port: serfPortWAN}
}

// determine client addresses
clientAddrs := b.expandIPs("client_addr", c.ClientAddr)
Expand Down Expand Up @@ -869,8 +875,11 @@ func (b *Builder) Validate(rt RuntimeConfig) error {
if err := addrUnique(inuse, "Serf Advertise LAN", rt.SerfAdvertiseAddrLAN); err != nil {
return err
}
if err := addrUnique(inuse, "Serf Advertise WAN", rt.SerfAdvertiseAddrWAN); err != nil {
return err
// Validate serf WAN advertise address only when its set
if rt.SerfAdvertiseAddrWAN != nil {
if err := addrUnique(inuse, "Serf Advertise WAN", rt.SerfAdvertiseAddrWAN); err != nil {
return err
}
}
if b.err != nil {
return b.err
Expand Down
14 changes: 12 additions & 2 deletions agent/config/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1065,7 +1065,7 @@ func TestConfigFlagsAndEdgecases(t *testing.T) {
},
},
{
desc: "serf wan port > 0",
desc: "allow disabling serf wan port",
args: []string{`-data-dir=` + dataDir},
json: []string{`{
"ports": {
Expand All @@ -1079,7 +1079,17 @@ func TestConfigFlagsAndEdgecases(t *testing.T) {
}
advertise_addr_wan = "1.2.3.4"
`},
err: "ports.serf_wan must be a valid port from 1 to 65535",
patch: func(rt *RuntimeConfig) {
rt.AdvertiseAddrWAN = ipAddr("1.2.3.4")
rt.SerfAdvertiseAddrWAN = nil
rt.SerfBindAddrWAN = nil
rt.TaggedAddresses = map[string]string{
"lan": "10.0.0.1",
"wan": "1.2.3.4",
}
rt.DataDir = dataDir
rt.SerfPortWAN = -1
},
},
{
desc: "serf bind address lan template",
Expand Down
4 changes: 4 additions & 0 deletions agent/consul/catalog_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ func (c *Catalog) ListDatacenters(args *struct{}, reply *[]string) error {
return err
}

if len(dcs) == 0 { // no WAN federation, so return the local data center name
dcs = []string{c.srv.config.Datacenter}
}

*reply = dcs
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion agent/consul/internal_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (m *Internal) KeyringOperation(
}

// Only perform WAN keyring querying and RPC forwarding once
if !args.Forwarded {
if !args.Forwarded && m.srv.serfWAN != nil {
args.Forwarded = true
m.executeKeyringOp(args, reply, true)
return m.srv.globalRPC("Internal.KeyringOperation", args, reply)
Expand Down
86 changes: 53 additions & 33 deletions agent/consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/hashicorp/consul/types"
"github.com/hashicorp/raft"
raftboltdb "github.com/hashicorp/raft-boltdb"
"github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf"
)

Expand Down Expand Up @@ -75,6 +74,10 @@ const (
raftRemoveGracePeriod = 5 * time.Second
)

var (
ErrWANFederationDisabled = fmt.Errorf("WAN Federation is disabled")
)

// Server is Consul server which manages the service discovery,
// health checking, DC forwarding, Raft, and multiple Serf pools.
type Server struct {
Expand Down Expand Up @@ -344,25 +347,28 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
// created, so we can pull it out from there reliably, even though it's
// a little gross to be reading the updated config.

// Initialize the WAN Serf.
serfBindPortWAN := config.SerfWANConfig.MemberlistConfig.BindPort
s.serfWAN, err = s.setupSerf(config.SerfWANConfig, s.eventChWAN, serfWANSnapshot, true, serfBindPortWAN, "", s.Listener)
if err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to start WAN Serf: %v", err)
}

// See big comment above why we are doing this.
if serfBindPortWAN == 0 {
// Initialize the WAN Serf if enabled
serfBindPortWAN := -1
if config.SerfWANConfig != nil {
serfBindPortWAN = config.SerfWANConfig.MemberlistConfig.BindPort
s.serfWAN, err = s.setupSerf(config.SerfWANConfig, s.eventChWAN, serfWANSnapshot, true, serfBindPortWAN, "", s.Listener)
if err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to start WAN Serf: %v", err)
}
// See big comment above why we are doing this.
if serfBindPortWAN == 0 {
return nil, fmt.Errorf("Failed to get dynamic bind port for WAN Serf")
serfBindPortWAN = config.SerfWANConfig.MemberlistConfig.BindPort
if serfBindPortWAN == 0 {
return nil, fmt.Errorf("Failed to get dynamic bind port for WAN Serf")
}
s.logger.Printf("[INFO] agent: Serf WAN TCP bound to port %d", serfBindPortWAN)
}
s.logger.Printf("[INFO] agent: Serf WAN TCP bound to port %d", serfBindPortWAN)
}

// Initialize the LAN segments before the default LAN Serf so we have
// updated port information to publish there.
// TODO preetha: why is this passing WAN port to create segments?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like just a remnant of calling the same setupSerf function for segments; the servers are already in the default serf LAN before any segments start, so I don't think the segment Serfs need the WAN port tag set.

if err := s.setupSegments(config, serfBindPortWAN, segmentListeners); err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to setup network segments: %v", err)
Expand All @@ -380,20 +386,22 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
s.floodSegments(config)

// Add a "static route" to the WAN Serf and hook it up to Serf events.
if err := s.router.AddArea(types.AreaWAN, s.serfWAN, s.connPool, s.config.VerifyOutgoing); err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to add WAN serf route: %v", err)
}
go router.HandleSerfEvents(s.logger, s.router, types.AreaWAN, s.serfWAN.ShutdownCh(), s.eventChWAN)
if s.serfWAN != nil {
if err := s.router.AddArea(types.AreaWAN, s.serfWAN, s.connPool, s.config.VerifyOutgoing); err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to add WAN serf route: %v", err)
}
go router.HandleSerfEvents(s.logger, s.router, types.AreaWAN, s.serfWAN.ShutdownCh(), s.eventChWAN)

// Fire up the LAN <-> WAN join flooder.
portFn := func(s *metadata.Server) (int, bool) {
if s.WanJoinPort > 0 {
return s.WanJoinPort, true
// Fire up the LAN <-> WAN join flooder.
portFn := func(s *metadata.Server) (int, bool) {
if s.WanJoinPort > 0 {
return s.WanJoinPort, true
}
return 0, false
}
return 0, false
go s.Flood(nil, portFn, s.serfWAN)
}
go s.Flood(nil, portFn, s.serfWAN)

// Start monitoring leadership. This must happen after Serf is set up
// since it can fire events when leadership is obtained.
Expand Down Expand Up @@ -831,6 +839,9 @@ func (s *Server) JoinLAN(addrs []string) (int, error) {
// The target address should be another node listening on the
// Serf WAN address
func (s *Server) JoinWAN(addrs []string) (int, error) {
if s.serfWAN == nil {
return 0, ErrWANFederationDisabled
}
return s.serfWAN.Join(addrs, true)
}

Expand All @@ -846,6 +857,9 @@ func (s *Server) LANMembers() []serf.Member {

// WANMembers is used to return the members of the LAN cluster
func (s *Server) WANMembers() []serf.Member {
if s.serfWAN == nil {
return nil
}
return s.serfWAN.Members()
}

Expand All @@ -854,8 +868,10 @@ func (s *Server) RemoveFailedNode(node string) error {
if err := s.serfLAN.RemoveFailedNode(node); err != nil {
return err
}
if err := s.serfWAN.RemoveFailedNode(node); err != nil {
return err
if s.serfWAN != nil {
if err := s.serfWAN.RemoveFailedNode(node); err != nil {
return err
}
}
return nil
}
Expand All @@ -872,12 +888,19 @@ func (s *Server) KeyManagerLAN() *serf.KeyManager {

// KeyManagerWAN returns the WAN Serf keyring manager
func (s *Server) KeyManagerWAN() *serf.KeyManager {
if s.serfWAN == nil {
return nil
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the record I verified all call paths to this to check none will panic on a nil. It's only called from an internal state machine routine when working with WAN keyrings which should be disabled by code in this PR.

// executeKeyringOp executes the keyring-related operation in the request
// on either the WAN or LAN pools.
func (m *Internal) executeKeyringOp(
args *structs.KeyringRequest,
reply *structs.KeyringResponses,
wan bool) {
if wan {
mgr := m.srv.KeyManagerWAN()

called from

// Only perform WAN keyring querying and RPC forwarding once
if !args.Forwarded {
args.Forwarded = true
m.executeKeyringOp(args, reply, true)
return m.srv.globalRPC("Internal.KeyringOperation", args, reply)
}

which is disabled above.

return s.serfWAN.KeyManager()
}

// Encrypted determines if gossip is encrypted
func (s *Server) Encrypted() bool {
return s.serfLAN.EncryptionEnabled() && s.serfWAN.EncryptionEnabled()
LANEncrypted := s.serfLAN.EncryptionEnabled()
if s.serfWAN == nil {
return LANEncrypted
}
return LANEncrypted && s.serfWAN.EncryptionEnabled()
}

// LANSegments returns a map of LAN segments by name
Expand Down Expand Up @@ -995,9 +1018,11 @@ func (s *Server) Stats() map[string]map[string]string {
},
"raft": s.raft.Stats(),
"serf_lan": s.serfLAN.Stats(),
"serf_wan": s.serfWAN.Stats(),
"runtime": runtimeStats(),
}
if s.serfWAN != nil {
stats["serf_wan"] = s.serfWAN.Stats()
}
return stats
}

Expand All @@ -1019,11 +1044,6 @@ func (s *Server) GetLANCoordinate() (lib.CoordinateSet, error) {
return cs, nil
}

// GetWANCoordinate returns the coordinate of the server in the WAN gossip pool.
func (s *Server) GetWANCoordinate() (*coordinate.Coordinate, error) {
return s.serfWAN.GetCoordinate()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this not used anywhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup


// Atomically sets a readiness state flag when leadership is obtained, to indicate that server is past its barrier write
func (s *Server) setConsistentReadReady() {
atomic.StoreInt32(&s.readyForConsistentReads, 1)
Expand Down
4 changes: 3 additions & 1 deletion agent/consul/server_serf.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w
conf.NodeName = fmt.Sprintf("%s.%s", s.config.NodeName, s.config.Datacenter)
} else {
conf.NodeName = s.config.NodeName
conf.Tags["wan_join_port"] = fmt.Sprintf("%d", wanPort)
if wanPort > 0 {
conf.Tags["wan_join_port"] = fmt.Sprintf("%d", wanPort)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the record, I checked and it seems the only consumer of that tag in our code already handles the case where it's not present:

wanJoinPortStr, ok := m.Tags["wan_join_port"]
if ok {
wanJoinPort, err = strconv.Atoi(wanJoinPortStr)
if err != nil {
return false, nil
}
}

}
}
conf.Tags["role"] = "consul"
conf.Tags["dc"] = s.config.Datacenter
Expand Down
3 changes: 3 additions & 0 deletions agent/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ func (r *Router) Shutdown() {

// AddArea registers a new network area with the router.
func (r *Router) AddArea(areaID types.AreaID, cluster RouterSerfCluster, pinger Pinger, useTLS bool) error {
if cluster == nil {
return nil
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is returning an error appropriate here given that we guard the call in NewServerLogger when cluster is nil anyway? Maybe there are other callers that would fail if it were an error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to be defensive here but its better to panic if there are new call sites added for this method in the future, will remove

r.Lock()
defer r.Unlock()

Expand Down