Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make verify_outgoing compatible with agents not configured for TLS #3001

Merged
merged 11 commits into from
May 10, 2017
7 changes: 6 additions & 1 deletion consul/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,13 @@ func NewClient(config *Config) (*Client, error) {
// Create server
c := &Client{
config: config,
connPool: NewPool(config.LogOutput, clientRPCConnMaxIdle, clientMaxStreams, tlsWrap),
eventCh: make(chan serf.Event, serfEventBacklog),
logger: logger,
shutdownCh: make(chan struct{}),
}

c.connPool = NewPool(config.LogOutput, clientRPCConnMaxIdle, clientMaxStreams, tlsWrap, c.LANMembers)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd make this above and set it in the structure as connPool: connPool.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to pass in the LANMembers function from the client as a callback, though

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check out the comments below - I think we can move the "should use TLS" logic onto https://github.com/hashicorp/consul/blob/master/consul/agent/server.go and then just pass the result of that into dial and friends.


// Start lan event handlers before lan Serf setup to prevent deadlock
go c.lanEventHandler()

Expand Down Expand Up @@ -150,6 +151,10 @@ func (c *Client) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (
conf.Tags["vsn_min"] = fmt.Sprintf("%d", ProtocolVersionMin)
conf.Tags["vsn_max"] = fmt.Sprintf("%d", ProtocolVersionMax)
conf.Tags["build"] = c.config.Build
conf.Tags["rpc_port"] = fmt.Sprintf("%d", c.config.RPCAddr.Port)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't ever inbound connect to a client agent, right? We probably don't need this here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's right, I thought the keyring or event stuff might use it, but those both use serf to gossip out, so we can remove this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - they don't have an inbound listener at this time.

if c.config.VerifyIncoming {
conf.Tags["tls_verify_incoming"] = "1"
}
conf.MemberlistConfig.LogOutput = c.config.LogOutput
conf.LogOutput = c.config.LogOutput
conf.EventCh = ch
Expand Down
55 changes: 45 additions & 10 deletions consul/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ import (
"io"
"net"
"net/rpc"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/hashicorp/consul/consul/agent"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/serf/serf"
"github.com/hashicorp/yamux"
)

Expand Down Expand Up @@ -129,6 +131,10 @@ type ConnPool struct {
// The maximum number of open streams to keep
maxStreams int

// memberFunc is a callback to get the serf members for determining
// when to initiate a TLS connection
memberFunc func() []serf.Member

// Pool maps an address to a open connection
pool map[string]*Conn

Expand All @@ -151,11 +157,13 @@ type ConnPool struct {
// Set maxTime to 0 to disable reaping. maxStreams is used to control
// the number of idle streams allowed.
// If TLS settings are provided outgoing connections use TLS.
func NewPool(logOutput io.Writer, maxTime time.Duration, maxStreams int, tlsWrap tlsutil.DCWrapper) *ConnPool {
func NewPool(logOutput io.Writer, maxTime time.Duration, maxStreams int, tlsWrap tlsutil.DCWrapper,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Single line pls.

memberFunc func() []serf.Member) *ConnPool {
pool := &ConnPool{
logOutput: logOutput,
maxTime: maxTime,
maxStreams: maxStreams,
memberFunc: memberFunc,
pool: make(map[string]*Conn),
limiter: make(map[string]chan struct{}),
tlsWrap: tlsWrap,
Expand Down Expand Up @@ -280,24 +288,51 @@ func (p *ConnPool) DialTimeout(dc string, addr net.Addr, timeout time.Duration)

// Check if TLS is enabled
if p.tlsWrap != nil {
// Switch the connection into TLS mode
if _, err := conn.Write([]byte{byte(rpcTLS)}); err != nil {
conn.Close()
doWrap, err := memberExpectsTLS(addr.String(), p.memberFunc())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does a lot of magic. Please add a comment explaining what happens here and why.

if err != nil {
return nil, nil, err
}

// Wrap the connection in a TLS client
tlsConn, err := p.tlsWrap(dc, conn)
if err != nil {
conn.Close()
return nil, nil, err
if doWrap {
// Switch the connection into TLS mode
if _, err := conn.Write([]byte{byte(rpcTLS)}); err != nil {
conn.Close()
return nil, nil, err
}

// Wrap the connection in a TLS client
tlsConn, err := p.tlsWrap(dc, conn)
if err != nil {
conn.Close()
return nil, nil, err
}
conn = tlsConn
}
conn = tlsConn
}

return conn, hc, nil
}

// memberExpectsTLS returns true if the member with the specified address is
// verifying incoming TLS connections
func memberExpectsTLS(addr string, members []serf.Member) (bool, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks pretty expensive since it's scanning all the members. In other methods where we use the connPool, we actually have an agent.Server, which is essentially the Serf info - see https://github.com/hashicorp/consul/blob/master/consul/rpc.go#L259-L266. We could probably put the TLS logic in there, and pass that through.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like as a useTLS Boolean maybe.

for _, member := range members {
port, err := strconv.Atoi(member.Tags["rpc_port"])
if err != nil {
return false, err
}
memberAddr := (&net.TCPAddr{IP: member.Addr, Port: port}).String()
if memberAddr == addr {
if _, ok := member.Tags["tls_verify_incoming"]; !ok {
return false, nil
}
break
}
}

return true, nil
}

// getNewConn is used to return a new connection
func (p *ConnPool) getNewConn(dc string, addr net.Addr, version int) (*Conn, error) {
// Get a new, raw connection.
Expand Down
37 changes: 25 additions & 12 deletions consul/raft_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
)

// RaftLayer implements the raft.StreamLayer interface,
Expand All @@ -26,17 +27,22 @@ type RaftLayer struct {
closed bool
closeCh chan struct{}
closeLock sync.Mutex

// memberFunc is a callback to get the serf members for determining
// when to initiate a TLS connection
memberFunc func() []serf.Member
}

// NewRaftLayer is used to initialize a new RaftLayer which can
// be used as a StreamLayer for Raft. If a tlsConfig is provided,
// then the connection will use TLS.
func NewRaftLayer(addr net.Addr, tlsWrap tlsutil.Wrapper) *RaftLayer {
func NewRaftLayer(addr net.Addr, tlsWrap tlsutil.Wrapper, memberFunc func() []serf.Member) *RaftLayer {
layer := &RaftLayer{
addr: addr,
connCh: make(chan net.Conn),
tlsWrap: tlsWrap,
closeCh: make(chan struct{}),
addr: addr,
connCh: make(chan net.Conn),
tlsWrap: tlsWrap,
closeCh: make(chan struct{}),
memberFunc: memberFunc,
}
return layer
}
Expand Down Expand Up @@ -89,16 +95,23 @@ func (l *RaftLayer) Dial(address raft.ServerAddress, timeout time.Duration) (net

// Check for tls mode
if l.tlsWrap != nil {
// Switch the connection into TLS mode
if _, err := conn.Write([]byte{byte(rpcTLS)}); err != nil {
conn.Close()
doWrap, err := memberExpectsTLS(string(address), l.memberFunc())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks identical to the previous code. helper fn?

if err != nil {
return nil, err
}

// Wrap the connection in a TLS client
conn, err = l.tlsWrap(conn)
if err != nil {
return nil, err
if doWrap {
// Switch the connection into TLS mode
if _, err := conn.Write([]byte{byte(rpcTLS)}); err != nil {
conn.Close()
return nil, err
}

// Wrap the connection in a TLS client
conn, err = l.tlsWrap(conn)
if err != nil {
return nil, err
}
}
}

Expand Down
9 changes: 7 additions & 2 deletions consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,6 @@ func NewServer(config *Config) (*Server, error) {
autopilotRemoveDeadCh: make(chan struct{}),
autopilotShutdownCh: make(chan struct{}),
config: config,
connPool: NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, tlsWrap),
eventChLAN: make(chan serf.Event, 256),
eventChWAN: make(chan serf.Event, 256),
localConsuls: make(map[raft.ServerAddress]*agent.Server),
Expand All @@ -270,6 +269,8 @@ func NewServer(config *Config) (*Server, error) {
shutdownCh: make(chan struct{}),
}

s.connPool = NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, tlsWrap, s.LANMembers)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm with James on that one. Create first, then inject.


// Set up the autopilot policy
s.autopilotPolicy = &BasicAutopilot{server: s}

Expand Down Expand Up @@ -379,6 +380,7 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w
conf.Tags["raft_vsn"] = fmt.Sprintf("%d", s.config.RaftConfig.ProtocolVersion)
conf.Tags["build"] = s.config.Build
conf.Tags["port"] = fmt.Sprintf("%d", addr.Port)
conf.Tags["rpc_port"] = fmt.Sprintf("%d", s.config.RPCAddr.Port)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the same as the existing port tag.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing tag has the LAN gossip port, not the RPC port

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so - it gets populated from the RPC listener above (sanity check on https://demo.consul.io/v1/agent/self?pretty shows 8300).

if s.config.Bootstrap {
conf.Tags["bootstrap"] = "1"
}
Expand All @@ -388,6 +390,9 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w
if s.config.NonVoter {
conf.Tags["nonvoter"] = "1"
}
if s.config.VerifyIncoming {
conf.Tags["tls_verify_incoming"] = "1"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to key this off of certs being configured and have a sense more like use_tls? During the transition we will have to have verify turned off, but the server set up to handle incoming TLS connections.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, this can be more permissive since we only really need to avoid trying to do outgoing TLS to an agent that doesn't have it configured at all

}
conf.MemberlistConfig.LogOutput = s.config.LogOutput
conf.LogOutput = s.config.LogOutput
conf.EventCh = ch
Expand Down Expand Up @@ -625,7 +630,7 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error {
// Provide a DC specific wrapper. Raft replication is only
// ever done in the same datacenter, so we can provide it as a constant.
wrapper := tlsutil.SpecificDC(s.config.Datacenter, tlsWrap)
s.raftLayer = NewRaftLayer(advertise, wrapper)
s.raftLayer = NewRaftLayer(advertise, wrapper, s.LANMembers)
return nil
}

Expand Down