Skip to content

Commit

Permalink
Connect Envoy Command (#4735)
Browse files Browse the repository at this point in the history
* Plumb xDS server and proxyxfg into the agent startup

* Add `consul connect envoy` command to allow running Envoy as a connect sidecar.

* Add test for help tabs; typos and style fixups from review
  • Loading branch information
banks committed Oct 4, 2018
1 parent eff260b commit 2197450
Show file tree
Hide file tree
Showing 17 changed files with 909 additions and 184 deletions.
156 changes: 127 additions & 29 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"sync"
"time"

"google.golang.org/grpc"

"github.com/armon/go-metrics"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/ae"
Expand All @@ -27,10 +29,12 @@ import (
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/proxyprocess"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/systemd"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/agent/xds"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/consul/lib"
Expand Down Expand Up @@ -218,8 +222,23 @@ type Agent struct {
// proxyManager is the proxy process manager for managed Connect proxies.
proxyManager *proxyprocess.Manager

// proxyLock protects proxy information in the local state from concurrent modification
// proxyLock protects _managed_ proxy information in the local state from
// concurrent modification. It is not needed to work with proxyConfig state.
proxyLock sync.Mutex

// proxyConfig is the manager for proxy service (Kind = connect-proxy)
// configuration state. This ensures all state needed by a proxy registration
// is maintained in cache and handles pushing updates to that state into XDS
// server to be pushed out to Envoy. This is NOT related to managed proxies
// directly.
proxyConfig *proxycfg.Manager

// xdsServer is the Server instance that serves xDS gRPC API.
xdsServer *xds.Server

// grpcServer is the server instance used currently to serve xDS API for
// Envoy.
grpcServer *grpc.Server
}

func New(c *config.RuntimeConfig) (*Agent, error) {
Expand Down Expand Up @@ -409,6 +428,21 @@ func (a *Agent) Start() error {
}
}

// Start the proxy config manager.
a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{
Cache: a.cache,
Logger: a.logger,
State: a.State,
Source: &structs.QuerySource{
Node: a.config.NodeName,
Datacenter: a.config.Datacenter,
Segment: a.config.SegmentName,
},
})
if err != nil {
return err
}

// Start watching for critical services to deregister, based on their
// checks.
go a.reapServices()
Expand Down Expand Up @@ -446,6 +480,11 @@ func (a *Agent) Start() error {
a.httpServers = append(a.httpServers, srv)
}

// Start gRPC server.
if err := a.listenAndServeGRPC(); err != nil {
return err
}

// register watches
if err := a.reloadWatches(a.config); err != nil {
return err
Expand All @@ -458,6 +497,43 @@ func (a *Agent) Start() error {
return nil
}

func (a *Agent) listenAndServeGRPC() error {
if len(a.config.GRPCAddrs) < 1 {
return nil
}

a.xdsServer = &xds.Server{
Logger: a.logger,
CfgMgr: a.proxyConfig,
Authz: a,
ResolveToken: func(id string) (acl.ACL, error) {
return a.resolveToken(id)
},
}
var err error
a.grpcServer, err = a.xdsServer.GRPCServer(a.config.CertFile, a.config.KeyFile)
if err != nil {
return err
}

ln, err := a.startListeners(a.config.GRPCAddrs)
if err != nil {
return err
}

for _, l := range ln {
go func(innerL net.Listener) {
a.logger.Printf("[INFO] agent: Started gRPC server on %s (%s)",
innerL.Addr().String(), innerL.Addr().Network())
err := a.grpcServer.Serve(innerL)
if err != nil {
a.logger.Printf("[ERR] gRPC server failed: %s", err)
}
}(l)
}
return nil
}

func (a *Agent) listenAndServeDNS() error {
notif := make(chan net.Addr, len(a.config.DNSAddrs))
errCh := make(chan error, len(a.config.DNSAddrs))
Expand Down Expand Up @@ -497,6 +573,34 @@ func (a *Agent) listenAndServeDNS() error {
return merr.ErrorOrNil()
}

func (a *Agent) startListeners(addrs []net.Addr) ([]net.Listener, error) {
var ln []net.Listener
for _, addr := range addrs {
var l net.Listener
var err error

switch x := addr.(type) {
case *net.UnixAddr:
l, err = a.listenSocket(x.Name)
if err != nil {
return nil, err
}

case *net.TCPAddr:
l, err = net.Listen("tcp", x.String())
if err != nil {
return nil, err
}
l = &tcpKeepAliveListener{l.(*net.TCPListener)}

default:
return nil, fmt.Errorf("unsupported address type %T", addr)
}
ln = append(ln, l)
}
return ln, nil
}

// listenHTTP binds listeners to the provided addresses and also returns
// pre-configured HTTP servers which are not yet started. The motivation is
// that in the current startup/shutdown setup we de-couple the listener
Expand All @@ -516,38 +620,21 @@ func (a *Agent) listenHTTP() ([]*HTTPServer, error) {
var ln []net.Listener
var servers []*HTTPServer
start := func(proto string, addrs []net.Addr) error {
for _, addr := range addrs {
var l net.Listener
var tlscfg *tls.Config
var err error

switch x := addr.(type) {
case *net.UnixAddr:
l, err = a.listenSocket(x.Name)
if err != nil {
return err
}
listeners, err := a.startListeners(addrs)
if err != nil {
return err
}

case *net.TCPAddr:
l, err = net.Listen("tcp", x.String())
for _, l := range listeners {
var tlscfg *tls.Config
_, isTCP := l.(*tcpKeepAliveListener)
if isTCP && proto == "https" {
tlscfg, err = a.config.IncomingHTTPSConfig()
if err != nil {
return err
}
l = &tcpKeepAliveListener{l.(*net.TCPListener)}

if proto == "https" {
tlscfg, err = a.config.IncomingHTTPSConfig()
if err != nil {
return err
}
l = tls.NewListener(l, tlscfg)
}

default:
return fmt.Errorf("unsupported address type %T", addr)
l = tls.NewListener(l, tlscfg)
}
ln = append(ln, l)

srv := &HTTPServer{
Server: &http.Server{
Addr: l.Addr().String(),
Expand All @@ -569,6 +656,7 @@ func (a *Agent) listenHTTP() ([]*HTTPServer, error) {
}
}

ln = append(ln, l)
servers = append(servers, srv)
}
return nil
Expand Down Expand Up @@ -1341,7 +1429,17 @@ func (a *Agent) ShutdownAgent() error {
chk.Stop()
}

// Stop the proxy manager
// Stop gRPC
if a.grpcServer != nil {
a.grpcServer.Stop()
}

// Stop the proxy config manager
if a.proxyConfig != nil {
a.proxyConfig.Close()
}

// Stop the proxy process manager
if a.proxyManager != nil {
// If persistence is disabled (implies DevMode but a subset of DevMode) then
// don't leave the proxies running since the agent will not be able to
Expand Down
126 changes: 3 additions & 123 deletions agent/agent_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"log"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"time"
Expand All @@ -20,7 +19,6 @@ import (
"github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/checks"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/ipaddr"
Expand Down Expand Up @@ -1432,132 +1430,14 @@ func (s *HTTPServer) AgentConnectAuthorize(resp http.ResponseWriter, req *http.R
// Decode the request from the request body
var authReq structs.ConnectAuthorizeRequest
if err := decodeBody(req, &authReq, nil); err != nil {
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(resp, "Request decode failed: %v", err)
return nil, nil
}

// We need to have a target to check intentions
if authReq.Target == "" {
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(resp, "Target service must be specified")
return nil, nil
}

// Parse the certificate URI from the client ID
uriRaw, err := url.Parse(authReq.ClientCertURI)
if err != nil {
return &connectAuthorizeResp{
Authorized: false,
Reason: fmt.Sprintf("Client ID must be a URI: %s", err),
}, nil
}
uri, err := connect.ParseCertURI(uriRaw)
if err != nil {
return &connectAuthorizeResp{
Authorized: false,
Reason: fmt.Sprintf("Invalid client ID: %s", err),
}, nil
}

uriService, ok := uri.(*connect.SpiffeIDService)
if !ok {
return &connectAuthorizeResp{
Authorized: false,
Reason: "Client ID must be a valid SPIFFE service URI",
}, nil
}

// We need to verify service:write permissions for the given token.
// We do this manually here since the RPC request below only verifies
// service:read.
rule, err := s.agent.resolveToken(token)
if err != nil {
return nil, err
}
if rule != nil && !rule.ServiceWrite(authReq.Target, nil) {
return nil, acl.ErrPermissionDenied
}

// Validate the trust domain matches ours. Later we will support explicit
// external federation but not built yet.
rootArgs := &structs.DCSpecificRequest{Datacenter: s.agent.config.Datacenter}
raw, _, err := s.agent.cache.Get(cachetype.ConnectCARootName, rootArgs)
if err != nil {
return nil, err
}

roots, ok := raw.(*structs.IndexedCARoots)
if !ok {
return nil, fmt.Errorf("internal error: roots response type not correct")
return nil, BadRequestError{fmt.Sprintf("Request decode failed: %v", err)}
}
if roots.TrustDomain == "" {
return nil, fmt.Errorf("connect CA not bootstrapped yet")
}
if roots.TrustDomain != strings.ToLower(uriService.Host) {
return &connectAuthorizeResp{
Authorized: false,
Reason: fmt.Sprintf("Identity from an external trust domain: %s",
uriService.Host),
}, nil
}

// TODO(banks): Implement revocation list checking here.

// Get the intentions for this target service.
args := &structs.IntentionQueryRequest{
Datacenter: s.agent.config.Datacenter,
Match: &structs.IntentionQueryMatch{
Type: structs.IntentionMatchDestination,
Entries: []structs.IntentionMatchEntry{
{
Namespace: structs.IntentionDefaultNamespace,
Name: authReq.Target,
},
},
},
}
args.Token = token

raw, m, err := s.agent.cache.Get(cachetype.IntentionMatchName, args)
authz, reason, cacheMeta, err := s.agent.ConnectAuthorize(token, &authReq)
if err != nil {
return nil, err
}
setCacheMeta(resp, &m)

reply, ok := raw.(*structs.IndexedIntentionMatches)
if !ok {
return nil, fmt.Errorf("internal error: response type not correct")
}
if len(reply.Matches) != 1 {
return nil, fmt.Errorf("Internal error loading matches")
}

// Test the authorization for each match
for _, ixn := range reply.Matches[0] {
if auth, ok := uriService.Authorize(ixn); ok {
return &connectAuthorizeResp{
Authorized: auth,
Reason: fmt.Sprintf("Matched intention: %s", ixn.String()),
}, nil
}
}

// No match, we need to determine the default behavior. We do this by
// specifying the anonymous token token, which will get that behavior.
// The default behavior if ACLs are disabled is to allow connections
// to mimic the behavior of Consul itself: everything is allowed if
// ACLs are disabled.
rule, err = s.agent.resolveToken("")
if err != nil {
return nil, err
}
authz := true
reason := "ACLs disabled, access is allowed by default"
if rule != nil {
authz = rule.IntentionDefaultAllow()
reason = "Default behavior configured by ACLs"
}
setCacheMeta(resp, cacheMeta)

return &connectAuthorizeResp{
Authorized: authz,
Expand Down
Loading

0 comments on commit 2197450

Please sign in to comment.