Skip to content

Commit

Permalink
manager: Allow API listeners to be bound after starting the manager
Browse files Browse the repository at this point in the history
This adds BindRemote and BindControl methods to Manager, which can be
used to specify the remote API and control API addresses after creating
or starting the manager. Raft has been updated to accept a new remote
address after being started. If The RemoteAPI and ControlAPI fields are
passed to manager.New, it is not necessary to call BindRemote or
BindControl explicitly.

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
  • Loading branch information
aaronlehmann committed Jan 23, 2017
1 parent 40647d3 commit cf71664
Show file tree
Hide file tree
Showing 5 changed files with 254 additions and 100 deletions.
247 changes: 157 additions & 90 deletions manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type Config struct {

// RemoteAPI is a listening address for serving the remote API, and
// an optional advertise address.
RemoteAPI RemoteAddrs
RemoteAPI *RemoteAddrs

// JoinRaft is an optional address of a node in an existing raft
// cluster to join.
Expand Down Expand Up @@ -115,8 +115,7 @@ type Config struct {
// This is the high-level object holding and initializing all the manager
// subsystems.
type Manager struct {
config *Config
listeners []net.Listener
config Config

caserver *ca.Server
dispatcher *dispatcher.Dispatcher
Expand All @@ -136,9 +135,18 @@ type Manager struct {

cancelFunc context.CancelFunc

mu sync.Mutex
// mu is a general mutex used to coordinate starting/stopping and
// leadership events.
mu sync.Mutex
// addrMu is a mutex that protects config.ControlAPI and config.RemoteAPI
addrMu sync.Mutex

started chan struct{}
stopped bool

remoteListener chan net.Listener
controlListener chan net.Listener
errServe chan error
}

type closeOnceListener struct {
Expand All @@ -156,29 +164,6 @@ func (l *closeOnceListener) Close() error {

// New creates a Manager which has not started to accept requests yet.
func New(config *Config) (*Manager, error) {
dispatcherConfig := dispatcher.DefaultConfig()

// If an AdvertiseAddr was specified, we use that as our
// externally-reachable address.
advertiseAddr := config.RemoteAPI.AdvertiseAddr

var advertiseAddrPort string
if advertiseAddr == "" {
// Otherwise, we know we are joining an existing swarm. Use a
// wildcard address to trigger remote autodetection of our
// address.
var err error
_, advertiseAddrPort, err = net.SplitHostPort(config.RemoteAPI.ListenAddr)
if err != nil {
return nil, fmt.Errorf("missing or invalid listen address %s", config.RemoteAPI.ListenAddr)
}

// Even with an IPv6 listening address, it's okay to use
// 0.0.0.0 here. Any "unspecified" (wildcard) IP will
// be substituted with the actual source address.
advertiseAddr = net.JoinHostPort("0.0.0.0", advertiseAddrPort)
}

err := os.MkdirAll(config.StateDir, 0700)
if err != nil {
return nil, errors.Wrap(err, "failed to create state directory")
Expand All @@ -190,17 +175,89 @@ func New(config *Config) (*Manager, error) {
return nil, errors.Wrap(err, "failed to create raft state directory")
}

var listeners []net.Listener
raftCfg := raft.DefaultNodeConfig()

if config.ElectionTick > 0 {
raftCfg.ElectionTick = int(config.ElectionTick)
}
if config.HeartbeatTick > 0 {
raftCfg.HeartbeatTick = int(config.HeartbeatTick)
}

dekRotator, err := NewRaftDEKManager(config.SecurityConfig.KeyWriter())
if err != nil {
return nil, err
}

newNodeOpts := raft.NodeOptions{
ID: config.SecurityConfig.ClientTLSCreds.NodeID(),
JoinAddr: config.JoinRaft,
Config: raftCfg,
StateDir: raftStateDir,
ForceNewCluster: config.ForceNewCluster,
TLSCredentials: config.SecurityConfig.ClientTLSCreds,
KeyRotator: dekRotator,
}
raftNode := raft.NewNode(newNodeOpts)

opts := []grpc.ServerOption{
grpc.Creds(config.SecurityConfig.ServerTLSCreds)}

m := &Manager{
config: *config,
caserver: ca.NewServer(raftNode.MemoryStore(), config.SecurityConfig),
dispatcher: dispatcher.New(raftNode, dispatcher.DefaultConfig()),
logbroker: logbroker.New(raftNode.MemoryStore()),
server: grpc.NewServer(opts...),
localserver: grpc.NewServer(opts...),
raftNode: raftNode,
started: make(chan struct{}),
dekRotator: dekRotator,
remoteListener: make(chan net.Listener, 1),
controlListener: make(chan net.Listener, 1),
errServe: make(chan error, 2),
}

if config.ControlAPI != "" {
m.config.ControlAPI = ""
if err := m.BindControl(config.ControlAPI); err != nil {
return nil, err
}
}

if config.RemoteAPI != nil {
m.config.RemoteAPI = nil
// The context isn't used in this case (before (*Manager).Run).
if err := m.BindRemote(context.Background(), *config.RemoteAPI); err != nil {
if config.ControlAPI != "" {
l := <-m.controlListener
l.Close()
}
return nil, err
}
}

return m, nil
}

// BindControl binds a local socket for the control API.
func (m *Manager) BindControl(addr string) error {
m.addrMu.Lock()
defer m.addrMu.Unlock()

if m.config.ControlAPI != "" {
return errors.New("manager already has a control API address")
}

// don't create a socket directory if we're on windows. we used named pipe
if runtime.GOOS != "windows" {
err := os.MkdirAll(filepath.Dir(config.ControlAPI), 0700)
err := os.MkdirAll(filepath.Dir(addr), 0700)
if err != nil {
return nil, errors.Wrap(err, "failed to create socket directory")
return errors.Wrap(err, "failed to create socket directory")
}
}

l, err := xnet.ListenLocal(config.ControlAPI)
l, err := xnet.ListenLocal(addr)

// A unix socket may fail to bind if the file already
// exists. Try replacing the file.
Expand All @@ -213,69 +270,64 @@ func New(config *Config) (*Manager, error) {
unwrappedErr = sys.Err
}
if unwrappedErr == syscall.EADDRINUSE {
os.Remove(config.ControlAPI)
l, err = xnet.ListenLocal(config.ControlAPI)
os.Remove(addr)
l, err = xnet.ListenLocal(addr)
}
}
if err != nil {
return nil, errors.Wrap(err, "failed to listen on control API address")
return errors.Wrap(err, "failed to listen on control API address")
}

listeners = append(listeners, l)
m.config.ControlAPI = addr
m.controlListener <- l
return nil
}

l, err = net.Listen("tcp", config.RemoteAPI.ListenAddr)
if err != nil {
return nil, errors.Wrap(err, "failed to listen on remote API address")
}
if advertiseAddrPort == "0" {
advertiseAddr = l.Addr().String()
config.RemoteAPI.ListenAddr = advertiseAddr
// BindRemote binds a port for the remote API.
func (m *Manager) BindRemote(ctx context.Context, addrs RemoteAddrs) error {
m.addrMu.Lock()
defer m.addrMu.Unlock()

if m.config.RemoteAPI != nil {
return errors.New("manager already has remote API address")
}
listeners = append(listeners, l)

raftCfg := raft.DefaultNodeConfig()
// If an AdvertiseAddr was specified, we use that as our
// externally-reachable address.
advertiseAddr := addrs.AdvertiseAddr

if config.ElectionTick > 0 {
raftCfg.ElectionTick = int(config.ElectionTick)
}
if config.HeartbeatTick > 0 {
raftCfg.HeartbeatTick = int(config.HeartbeatTick)
var advertiseAddrPort string
if advertiseAddr == "" {
// Otherwise, we know we are joining an existing swarm. Use a
// wildcard address to trigger remote autodetection of our
// address.
var err error
_, advertiseAddrPort, err = net.SplitHostPort(addrs.ListenAddr)
if err != nil {
return fmt.Errorf("missing or invalid listen address %s", addrs.ListenAddr)
}

// Even with an IPv6 listening address, it's okay to use
// 0.0.0.0 here. Any "unspecified" (wildcard) IP will
// be substituted with the actual source address.
advertiseAddr = net.JoinHostPort("0.0.0.0", advertiseAddrPort)
}

dekRotator, err := NewRaftDEKManager(config.SecurityConfig.KeyWriter())
l, err := net.Listen("tcp", addrs.ListenAddr)
if err != nil {
return nil, err
return errors.Wrap(err, "failed to listen on remote API address")
}

newNodeOpts := raft.NodeOptions{
ID: config.SecurityConfig.ClientTLSCreds.NodeID(),
Addr: advertiseAddr,
JoinAddr: config.JoinRaft,
Config: raftCfg,
StateDir: raftStateDir,
ForceNewCluster: config.ForceNewCluster,
TLSCredentials: config.SecurityConfig.ClientTLSCreds,
KeyRotator: dekRotator,
if advertiseAddrPort == "0" {
advertiseAddr = l.Addr().String()
addrs.ListenAddr = advertiseAddr
}
raftNode := raft.NewNode(newNodeOpts)

opts := []grpc.ServerOption{
grpc.Creds(config.SecurityConfig.ServerTLSCreds)}
m.config.RemoteAPI = &addrs

m := &Manager{
config: config,
listeners: listeners,
caserver: ca.NewServer(raftNode.MemoryStore(), config.SecurityConfig),
dispatcher: dispatcher.New(raftNode, dispatcherConfig),
logbroker: logbroker.New(raftNode.MemoryStore()),
server: grpc.NewServer(opts...),
localserver: grpc.NewServer(opts...),
raftNode: raftNode,
started: make(chan struct{}),
dekRotator: dekRotator,
}
m.raftNode.SetAddr(ctx, advertiseAddr)
m.remoteListener <- l

return m, nil
return nil
}

// RemovedFromRaft returns a channel that's closed if the manager is removed
Expand All @@ -286,6 +338,12 @@ func (m *Manager) RemovedFromRaft() <-chan struct{} {

// Addr returns tcp address on which remote api listens.
func (m *Manager) Addr() string {
m.addrMu.Lock()
defer m.addrMu.Unlock()

if m.config.RemoteAPI == nil {
return ""
}
return m.config.RemoteAPI.ListenAddr
}

Expand Down Expand Up @@ -357,12 +415,17 @@ func (m *Manager) Run(parent context.Context) error {
// requests (it has no TLS information to put in the metadata map).
forwardAsOwnRequest := func(ctx context.Context) (context.Context, error) { return ctx, nil }
handleRequestLocally := func(ctx context.Context) (context.Context, error) {
var remoteAddr string
if m.config.RemoteAPI.AdvertiseAddr != "" {
remoteAddr = m.config.RemoteAPI.AdvertiseAddr
} else {
remoteAddr = m.config.RemoteAPI.ListenAddr
remoteAddr := "127.0.0.1:0"

m.addrMu.Lock()
if m.config.RemoteAPI != nil {
if m.config.RemoteAPI.AdvertiseAddr != "" {
remoteAddr = m.config.RemoteAPI.AdvertiseAddr
} else {
remoteAddr = m.config.RemoteAPI.ListenAddr
}
}
m.addrMu.Unlock()

creds := m.config.SecurityConfig.ClientTLSCreds

Expand Down Expand Up @@ -408,10 +471,8 @@ func (m *Manager) Run(parent context.Context) error {
healthServer.SetServingStatus("Raft", api.HealthCheckResponse_NOT_SERVING)
localHealthServer.SetServingStatus("ControlAPI", api.HealthCheckResponse_NOT_SERVING)

errServe := make(chan error, len(m.listeners))
for _, lis := range m.listeners {
go m.serveListener(ctx, errServe, lis)
}
go m.serveListener(ctx, m.remoteListener)
go m.serveListener(ctx, m.controlListener)

defer func() {
m.server.Stop()
Expand Down Expand Up @@ -459,7 +520,7 @@ func (m *Manager) Run(parent context.Context) error {
}

// wait for an error in serving.
err = <-errServe
err = <-m.errServe
m.mu.Lock()
if m.stopped {
m.mu.Unlock()
Expand Down Expand Up @@ -759,7 +820,13 @@ func (m *Manager) handleLeadershipEvents(ctx context.Context, leadershipCh chan
}

// serveListener serves a listener for local and non local connections.
func (m *Manager) serveListener(ctx context.Context, errServe chan error, l net.Listener) {
func (m *Manager) serveListener(ctx context.Context, lCh <-chan net.Listener) {
var l net.Listener
select {
case l = <-lCh:
case <-ctx.Done():
return
}
ctx = log.WithLogger(ctx, log.G(ctx).WithFields(
logrus.Fields{
"proto": l.Addr().Network(),
Expand All @@ -770,10 +837,10 @@ func (m *Manager) serveListener(ctx context.Context, errServe chan error, l net.
// we need to disallow double closes because UnixListener.Close
// can delete unix-socket file of newer listener. grpc calls
// Close twice indeed: in Serve and in Stop.
errServe <- m.localserver.Serve(&closeOnceListener{Listener: l})
m.errServe <- m.localserver.Serve(&closeOnceListener{Listener: l})
} else {
log.G(ctx).Info("Listening for connections")
errServe <- m.server.Serve(l)
m.errServe <- m.server.Serve(l)
}
}

Expand Down
4 changes: 2 additions & 2 deletions manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestManager(t *testing.T) {
assert.NoError(t, err)

m, err := New(&Config{
RemoteAPI: RemoteAddrs{ListenAddr: "127.0.0.1:0"},
RemoteAPI: &RemoteAddrs{ListenAddr: "127.0.0.1:0"},
ControlAPI: temp.Name(),
StateDir: stateDir,
SecurityConfig: managerSecurityConfig,
Expand Down Expand Up @@ -218,7 +218,7 @@ func TestManagerLockUnlock(t *testing.T) {
require.NoError(t, err)

m, err := New(&Config{
RemoteAPI: RemoteAddrs{ListenAddr: "127.0.0.1:0"},
RemoteAPI: &RemoteAddrs{ListenAddr: "127.0.0.1:0"},
ControlAPI: temp.Name(),
StateDir: stateDir,
SecurityConfig: managerSecurityConfig,
Expand Down
1 change: 1 addition & 0 deletions manager/state/raft/membership/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func (c *Cluster) UpdateMember(id uint64, m *api.RaftMember) error {
return nil
}
oldMember.RaftMember = m
c.broadcastUpdate()
return nil
}

Expand Down
Loading

0 comments on commit cf71664

Please sign in to comment.