diff --git a/manager/manager.go b/manager/manager.go index b1c65aa14b..a1f084a136 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -71,7 +71,7 @@ type Config struct { // RemoteAPI is a listening address for serving the remote API, and // an optional advertise address. - RemoteAPI RemoteAddrs + RemoteAPI *RemoteAddrs // JoinRaft is an optional address of a node in an existing raft // cluster to join. @@ -111,8 +111,7 @@ type Config struct { // This is the high-level object holding and initializing all the manager // subsystems. type Manager struct { - config *Config - listeners []net.Listener + config Config caserver *ca.Server dispatcher *dispatcher.Dispatcher @@ -132,9 +131,15 @@ type Manager struct { cancelFunc context.CancelFunc - mu sync.Mutex + mu sync.Mutex + addrMu sync.Mutex + started chan struct{} stopped bool + + remoteListener chan net.Listener + controlListener chan net.Listener + errServe chan error } type closeOnceListener struct { @@ -152,11 +157,92 @@ func (l *closeOnceListener) Close() error { // New creates a Manager which has not started to accept requests yet. func New(config *Config) (*Manager, error) { - dispatcherConfig := dispatcher.DefaultConfig() + err := os.MkdirAll(config.StateDir, 0700) + if err != nil { + return nil, errors.Wrap(err, "failed to create state directory") + } + + raftStateDir := filepath.Join(config.StateDir, "raft") + err = os.MkdirAll(raftStateDir, 0700) + if err != nil { + return nil, errors.Wrap(err, "failed to create raft state directory") + } + + raftCfg := raft.DefaultNodeConfig() + + if config.ElectionTick > 0 { + raftCfg.ElectionTick = int(config.ElectionTick) + } + if config.HeartbeatTick > 0 { + raftCfg.HeartbeatTick = int(config.HeartbeatTick) + } + + dekRotator, err := NewRaftDEKManager(config.SecurityConfig.KeyWriter()) + if err != nil { + return nil, err + } + + newNodeOpts := raft.NodeOptions{ + ID: config.SecurityConfig.ClientTLSCreds.NodeID(), + JoinAddr: config.JoinRaft, + Config: raftCfg, + StateDir: raftStateDir, + ForceNewCluster: config.ForceNewCluster, + TLSCredentials: config.SecurityConfig.ClientTLSCreds, + KeyRotator: dekRotator, + } + raftNode := raft.NewNode(newNodeOpts) + + opts := []grpc.ServerOption{ + grpc.Creds(config.SecurityConfig.ServerTLSCreds)} + + m := &Manager{ + config: *config, + caserver: ca.NewServer(raftNode.MemoryStore(), config.SecurityConfig), + dispatcher: dispatcher.New(raftNode, dispatcher.DefaultConfig()), + logbroker: logbroker.New(raftNode.MemoryStore()), + server: grpc.NewServer(opts...), + localserver: grpc.NewServer(opts...), + raftNode: raftNode, + started: make(chan struct{}), + dekRotator: dekRotator, + remoteListener: make(chan net.Listener, 1), + controlListener: make(chan net.Listener, 1), + errServe: make(chan error, 2), + } + + if config.ControlAPI != "" { + m.config.ControlAPI = "" + if err := m.BindControl(config.ControlAPI); err != nil { + return nil, err + } + } + + if config.RemoteAPI != nil { + m.config.RemoteAPI = nil + // The context isn't used in this case (before (*Manager).Run). + if err := m.BindRemote(context.Background(), *config.RemoteAPI); err != nil { + l := <-m.controlListener + l.Close() + return nil, err + } + } + + return m, nil +} + +// BindRemote binds a port for the remote API. +func (m *Manager) BindRemote(ctx context.Context, addrs RemoteAddrs) error { + m.addrMu.Lock() + defer m.addrMu.Unlock() + + if m.config.RemoteAPI != nil { + return errors.New("manager already has remote API address") + } // If an AdvertiseAddr was specified, we use that as our // externally-reachable address. - advertiseAddr := config.RemoteAPI.AdvertiseAddr + advertiseAddr := addrs.AdvertiseAddr var advertiseAddrPort string if advertiseAddr == "" { @@ -164,9 +250,9 @@ func New(config *Config) (*Manager, error) { // wildcard address to trigger remote autodetection of our // address. var err error - _, advertiseAddrPort, err = net.SplitHostPort(config.RemoteAPI.ListenAddr) + _, advertiseAddrPort, err = net.SplitHostPort(addrs.ListenAddr) if err != nil { - return nil, fmt.Errorf("missing or invalid listen address %s", config.RemoteAPI.ListenAddr) + return fmt.Errorf("missing or invalid listen address %s", addrs.ListenAddr) } // Even with an IPv6 listening address, it's okay to use @@ -175,28 +261,42 @@ func New(config *Config) (*Manager, error) { advertiseAddr = net.JoinHostPort("0.0.0.0", advertiseAddrPort) } - err := os.MkdirAll(config.StateDir, 0700) + l, err := net.Listen("tcp", addrs.ListenAddr) if err != nil { - return nil, errors.Wrap(err, "failed to create state directory") + return errors.Wrap(err, "failed to listen on remote API address") } - - raftStateDir := filepath.Join(config.StateDir, "raft") - err = os.MkdirAll(raftStateDir, 0700) - if err != nil { - return nil, errors.Wrap(err, "failed to create raft state directory") + if advertiseAddrPort == "0" { + advertiseAddr = l.Addr().String() + addrs.ListenAddr = advertiseAddr } - var listeners []net.Listener + m.config.RemoteAPI = &addrs + + m.raftNode.SetAddr(ctx, advertiseAddr) + m.remoteListener <- l + + return nil +} + +// BindControl binds a local socket for the control API. +func (m *Manager) BindControl(addr string) error { + m.addrMu.Lock() + defer m.addrMu.Unlock() + + if m.config.ControlAPI != "" { + return errors.New("manager already has a control API address") + } + m.config.ControlAPI = addr // don't create a socket directory if we're on windows. we used named pipe if runtime.GOOS != "windows" { - err := os.MkdirAll(filepath.Dir(config.ControlAPI), 0700) + err := os.MkdirAll(filepath.Dir(addr), 0700) if err != nil { - return nil, errors.Wrap(err, "failed to create socket directory") + return errors.Wrap(err, "failed to create socket directory") } } - l, err := xnet.ListenLocal(config.ControlAPI) + l, err := xnet.ListenLocal(addr) // A unix socket may fail to bind if the file already // exists. Try replacing the file. @@ -209,69 +309,16 @@ func New(config *Config) (*Manager, error) { unwrappedErr = sys.Err } if unwrappedErr == syscall.EADDRINUSE { - os.Remove(config.ControlAPI) - l, err = xnet.ListenLocal(config.ControlAPI) + os.Remove(addr) + l, err = xnet.ListenLocal(addr) } } if err != nil { - return nil, errors.Wrap(err, "failed to listen on control API address") - } - - listeners = append(listeners, l) - - l, err = net.Listen("tcp", config.RemoteAPI.ListenAddr) - if err != nil { - return nil, errors.Wrap(err, "failed to listen on remote API address") - } - if advertiseAddrPort == "0" { - advertiseAddr = l.Addr().String() - config.RemoteAPI.ListenAddr = advertiseAddr + return errors.Wrap(err, "failed to listen on control API address") } - listeners = append(listeners, l) - raftCfg := raft.DefaultNodeConfig() - - if config.ElectionTick > 0 { - raftCfg.ElectionTick = int(config.ElectionTick) - } - if config.HeartbeatTick > 0 { - raftCfg.HeartbeatTick = int(config.HeartbeatTick) - } - - dekRotator, err := NewRaftDEKManager(config.SecurityConfig.KeyWriter()) - if err != nil { - return nil, err - } - - newNodeOpts := raft.NodeOptions{ - ID: config.SecurityConfig.ClientTLSCreds.NodeID(), - Addr: advertiseAddr, - JoinAddr: config.JoinRaft, - Config: raftCfg, - StateDir: raftStateDir, - ForceNewCluster: config.ForceNewCluster, - TLSCredentials: config.SecurityConfig.ClientTLSCreds, - KeyRotator: dekRotator, - } - raftNode := raft.NewNode(newNodeOpts) - - opts := []grpc.ServerOption{ - grpc.Creds(config.SecurityConfig.ServerTLSCreds)} - - m := &Manager{ - config: config, - listeners: listeners, - caserver: ca.NewServer(raftNode.MemoryStore(), config.SecurityConfig), - dispatcher: dispatcher.New(raftNode, dispatcherConfig), - logbroker: logbroker.New(raftNode.MemoryStore()), - server: grpc.NewServer(opts...), - localserver: grpc.NewServer(opts...), - raftNode: raftNode, - started: make(chan struct{}), - dekRotator: dekRotator, - } - - return m, nil + m.controlListener <- l + return nil } // RemovedFromRaft returns a channel that's closed if the manager is removed @@ -282,6 +329,12 @@ func (m *Manager) RemovedFromRaft() <-chan struct{} { // Addr returns tcp address on which remote api listens. func (m *Manager) Addr() string { + m.addrMu.Lock() + defer m.addrMu.Unlock() + + if m.config.RemoteAPI == nil { + return "" + } return m.config.RemoteAPI.ListenAddr } @@ -353,12 +406,17 @@ func (m *Manager) Run(parent context.Context) error { // requests (it has no TLS information to put in the metadata map). forwardAsOwnRequest := func(ctx context.Context) (context.Context, error) { return ctx, nil } handleRequestLocally := func(ctx context.Context) (context.Context, error) { - var remoteAddr string - if m.config.RemoteAPI.AdvertiseAddr != "" { - remoteAddr = m.config.RemoteAPI.AdvertiseAddr - } else { - remoteAddr = m.config.RemoteAPI.ListenAddr + remoteAddr := "127.0.0.1:0" + + m.addrMu.Lock() + if m.config.RemoteAPI != nil { + if m.config.RemoteAPI.AdvertiseAddr != "" { + remoteAddr = m.config.RemoteAPI.AdvertiseAddr + } else { + remoteAddr = m.config.RemoteAPI.ListenAddr + } } + m.addrMu.Unlock() creds := m.config.SecurityConfig.ClientTLSCreds @@ -404,10 +462,8 @@ func (m *Manager) Run(parent context.Context) error { healthServer.SetServingStatus("Raft", api.HealthCheckResponse_NOT_SERVING) localHealthServer.SetServingStatus("ControlAPI", api.HealthCheckResponse_NOT_SERVING) - errServe := make(chan error, len(m.listeners)) - for _, lis := range m.listeners { - go m.serveListener(ctx, errServe, lis) - } + go m.serveListener(ctx, m.remoteListener) + go m.serveListener(ctx, m.controlListener) defer func() { m.server.Stop() @@ -455,7 +511,7 @@ func (m *Manager) Run(parent context.Context) error { } // wait for an error in serving. - err = <-errServe + err = <-m.errServe m.mu.Lock() if m.stopped { m.mu.Unlock() @@ -755,7 +811,13 @@ func (m *Manager) handleLeadershipEvents(ctx context.Context, leadershipCh chan } // serveListener serves a listener for local and non local connections. -func (m *Manager) serveListener(ctx context.Context, errServe chan error, l net.Listener) { +func (m *Manager) serveListener(ctx context.Context, lCh <-chan net.Listener) { + var l net.Listener + select { + case l = <-lCh: + case <-ctx.Done(): + return + } ctx = log.WithLogger(ctx, log.G(ctx).WithFields( logrus.Fields{ "proto": l.Addr().Network(), @@ -766,10 +828,10 @@ func (m *Manager) serveListener(ctx context.Context, errServe chan error, l net. // we need to disallow double closes because UnixListener.Close // can delete unix-socket file of newer listener. grpc calls // Close twice indeed: in Serve and in Stop. - errServe <- m.localserver.Serve(&closeOnceListener{Listener: l}) + m.errServe <- m.localserver.Serve(&closeOnceListener{Listener: l}) } else { log.G(ctx).Info("Listening for connections") - errServe <- m.server.Serve(l) + m.errServe <- m.server.Serve(l) } } diff --git a/manager/manager_test.go b/manager/manager_test.go index e7211eb154..1d0bd43d9d 100644 --- a/manager/manager_test.go +++ b/manager/manager_test.go @@ -56,7 +56,7 @@ func TestManager(t *testing.T) { assert.NoError(t, err) m, err := New(&Config{ - RemoteAPI: RemoteAddrs{ListenAddr: "127.0.0.1:0"}, + RemoteAPI: &RemoteAddrs{ListenAddr: "127.0.0.1:0"}, ControlAPI: temp.Name(), StateDir: stateDir, SecurityConfig: managerSecurityConfig, @@ -218,7 +218,7 @@ func TestManagerLockUnlock(t *testing.T) { require.NoError(t, err) m, err := New(&Config{ - RemoteAPI: RemoteAddrs{ListenAddr: "127.0.0.1:0"}, + RemoteAPI: &RemoteAddrs{ListenAddr: "127.0.0.1:0"}, ControlAPI: temp.Name(), StateDir: stateDir, SecurityConfig: managerSecurityConfig, diff --git a/manager/state/raft/membership/cluster.go b/manager/state/raft/membership/cluster.go index 84c9514066..656edb9a3e 100644 --- a/manager/state/raft/membership/cluster.go +++ b/manager/state/raft/membership/cluster.go @@ -236,6 +236,31 @@ func (c *Cluster) ReplaceMemberConnection(id uint64, oldConn *Member, newConn *M newMember.Conn = newConn.Conn c.members[id] = &newMember + if oldMember.RaftMember.Addr != newAddr { + c.broadcastUpdate() + } + + return nil +} + +// SetNodeAddr changes the node's address. It does not initiate a connection. +func (c *Cluster) SetNodeAddr(id uint64, newAddr string) error { + c.mu.Lock() + defer c.mu.Unlock() + + oldMember, ok := c.members[id] + if !ok { + return ErrIDNotFound + } + + newMember := *oldMember + newMember.RaftMember.Addr = newAddr + c.members[id] = &newMember + + if oldMember.RaftMember.Addr != newAddr { + c.broadcastUpdate() + } + return nil } diff --git a/manager/state/raft/raft.go b/manager/state/raft/raft.go index c46727d937..4b78db021a 100644 --- a/manager/state/raft/raft.go +++ b/manager/state/raft/raft.go @@ -123,6 +123,9 @@ type Node struct { stopMu sync.RWMutex // used for membership management checks membershipLock sync.Mutex + // synchronizes access to n.opts.Addr, and makes sure the address is not + // updated concurrently with JoinAndStart. + addrLock sync.Mutex snapshotInProgress chan raftpb.SnapshotMetadata asyncTasks sync.WaitGroup @@ -240,6 +243,51 @@ func NewNode(opts NodeOptions) *Node { return n } +// SetAddr provides the raft node's address. This can be used in cases where +// opts.Addr was not provided to NewNode, for example when a port was not bound +// until after the raft node was created. +func (n *Node) SetAddr(ctx context.Context, addr string) error { + n.addrLock.Lock() + defer n.addrLock.Unlock() + + n.opts.Addr = addr + + if n.IsMember() { + if err := n.cluster.SetNodeAddr(n.Config.ID, addr); err != nil { + return err + } + + // If the raft node is running, submit a configuration change + // with the new address. + + // TODO(aaronl): Currently, this node must be the leader to + // submit this configuration change. This works for the initial + // use cases (single-node cluster late binding ports, or calling + // SetAddr before joining a cluster). In the future, we may want + // to support having a follower proactively change its remote + // address. + + leadershipCh, cancel := n.SubscribeLeadership() + defer cancel() + + isLeader := atomic.LoadUint32(&n.signalledLeadership) == 1 + for !isLeader { + select { + case leadershipChange := <-leadershipCh: + if leadershipChange == IsLeader { + isLeader = true + } + case <-ctx.Done(): + return ctx.Err() + } + } + + return n.updateMember(ctx, addr, n.Config.ID, n.opts.ID) + } + + return nil +} + // WithContext returns context which is cancelled when parent context cancelled // or node is stopped. func (n *Node) WithContext(ctx context.Context) (context.Context, context.CancelFunc) { @@ -281,8 +329,14 @@ func (n *Node) JoinAndStart(ctx context.Context) (err error) { n.snapshotMeta = snapshot.Metadata n.writtenWALIndex, _ = n.raftStore.LastIndex() // lastIndex always returns nil as an error + n.addrLock.Lock() + defer n.addrLock.Unlock() + if loadAndStartErr == storage.ErrNoWAL { if n.opts.JoinAddr != "" { + if n.opts.Addr == "" { + return errors.New("attempted to join raft cluster without knowing own address") + } c, err := n.ConnectToMember(n.opts.JoinAddr, 10*time.Second) if err != nil { return err diff --git a/node/node.go b/node/node.go index be776dc151..0401aaa596 100644 --- a/node/node.go +++ b/node/node.go @@ -131,11 +131,11 @@ func (n *Node) RemoteAPIAddr() (string, error) { n.RLock() defer n.RUnlock() if n.manager == nil { - return "", errors.Errorf("node is not manager") + return "", errors.New("manager is not running") } addr := n.manager.Addr() if addr == "" { - return "", errors.Errorf("manager addr is not set") + return "", errors.New("manager addr is not set") } return addr, nil } @@ -182,6 +182,21 @@ func New(c *Config) (*Node, error) { return n, nil } +// BindRemote starts a listener that exposes the remote API. +func (n *Node) BindRemote(ctx context.Context, listenAddr string, advertiseAddr string) error { + n.RLock() + defer n.RUnlock() + + if n.manager == nil { + return errors.New("manager is not running") + } + + return n.manager.BindRemote(ctx, manager.RemoteAddrs{ + ListenAddr: listenAddr, + AdvertiseAddr: advertiseAddr, + }) +} + // Start starts a node instance. func (n *Node) Start(ctx context.Context) error { err := errNodeStarted @@ -666,13 +681,18 @@ func (n *Node) waitRole(ctx context.Context, role string) error { } func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig, ready chan struct{}, workerRole <-chan struct{}) error { - remoteAddr, _ := n.remotes.Select(n.NodeID()) - m, err := manager.New(&manager.Config{ - ForceNewCluster: n.config.ForceNewCluster, - RemoteAPI: manager.RemoteAddrs{ + var remoteAPI *manager.RemoteAddrs + if n.config.ListenRemoteAPI != "" { + remoteAPI = &manager.RemoteAddrs{ ListenAddr: n.config.ListenRemoteAPI, AdvertiseAddr: n.config.AdvertiseRemoteAPI, - }, + } + } + + remoteAddr, _ := n.remotes.Select(n.NodeID()) + m, err := manager.New(&manager.Config{ + ForceNewCluster: n.config.ForceNewCluster, + RemoteAPI: remoteAPI, ControlAPI: n.config.ListenControlAPI, SecurityConfig: securityConfig, ExternalCAs: n.config.ExternalCAs,