Skip to content

Commit

Permalink
Merge pull request #5237 from hashicorp/term-grpc-stream-on-token-fai…
Browse files Browse the repository at this point in the history
…lure

Check ACLs more often for xDS endpoints.
  • Loading branch information
rboyer authored Jan 29, 2019
2 parents 14aefea + d3eb781 commit db8a871
Show file tree
Hide file tree
Showing 3 changed files with 297 additions and 33 deletions.
2 changes: 2 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,8 @@ func (a *Agent) listenAndServeGRPC() error {
Authz: a,
ResolveToken: a.resolveToken,
}
a.xdsServer.Initialize()

var err error
a.grpcServer, err = a.xdsServer.GRPCServer(a.config.CertFile, a.config.KeyFile)
if err != nil {
Expand Down
83 changes: 68 additions & 15 deletions agent/xds/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"log"
"sync/atomic"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -56,6 +57,10 @@ const (
// LocalAgentClusterName is the name we give the local agent "cluster" in
// Envoy config.
LocalAgentClusterName = "local_agent"

// DefaultAuthCheckFrequency is the default value for
// Server.AuthCheckFrequency to use when the zero value is provided.
DefaultAuthCheckFrequency = 5 * time.Minute
)

// ACLResolverFunc is a shim to resolve ACLs. Since ACL enforcement is so far
Expand Down Expand Up @@ -90,6 +95,18 @@ type Server struct {
CfgMgr ConfigManager
Authz ConnectAuthz
ResolveToken ACLResolverFunc
// AuthCheckFrequency is how often we should re-check the credentials used
// during a long-lived gRPC Stream after it has been initially established.
// This is only used during idle periods of stream interactions (i.e. when
// there has been no recent DiscoveryRequest).
AuthCheckFrequency time.Duration
}

// Initialize will finish configuring the Server for first use.
func (s *Server) Initialize() {
if s.AuthCheckFrequency == 0 {
s.AuthCheckFrequency = DefaultAuthCheckFrequency
}
}

// StreamAggregatedResources implements
Expand Down Expand Up @@ -126,7 +143,7 @@ func (s *Server) StreamAggregatedResources(stream ADSStream) error {

const (
stateInit int = iota
statePendingAuth
statePendingInitialConfig
stateRunning
)

Expand Down Expand Up @@ -176,8 +193,44 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest)
},
}

var authTimer <-chan time.Time
extendAuthTimer := func() {
authTimer = time.After(s.AuthCheckFrequency)
}

checkStreamACLs := func(cfgSnap *proxycfg.ConfigSnapshot) error {
if cfgSnap == nil {
return status.Errorf(codes.Unauthenticated, "unauthenticated: no config snapshot")
}

token := tokenFromStream(stream)
rule, err := s.ResolveToken(token)

if acl.IsErrNotFound(err) {
return status.Errorf(codes.Unauthenticated, "unauthenticated: %v", err)
} else if acl.IsErrPermissionDenied(err) {
return status.Errorf(codes.PermissionDenied, "permission denied: %v", err)
} else if err != nil {
return err
}

if rule != nil && !rule.ServiceWrite(cfgSnap.Proxy.DestinationServiceName, nil) {
return status.Errorf(codes.PermissionDenied, "permission denied")
}

// Authed OK!
return nil
}

for {
select {
case <-authTimer:
// It's been too long since a Discovery{Request,Response} so recheck ACLs.
if err := checkStreamACLs(cfgSnap); err != nil {
return err
}
extendAuthTimer()

case req, ok = <-reqCh:
if !ok {
// reqCh is closed when stream.Recv errors which is how we detect client
Expand Down Expand Up @@ -218,27 +271,27 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest)
defer watchCancel()

// Now wait for the config so we can check ACL
state = statePendingAuth
case statePendingAuth:
state = statePendingInitialConfig
case statePendingInitialConfig:
if cfgSnap == nil {
// Nothing we can do until we get the initial config
continue
}
// Got config, try to authenticate
token := tokenFromStream(stream)
rule, err := s.ResolveToken(token)
if err != nil {
return err
}
if rule != nil && !rule.ServiceWrite(cfgSnap.Proxy.DestinationServiceName, nil) {
return status.Errorf(codes.PermissionDenied, "permission denied")
}
// Authed OK!

// Got config, try to authenticate next.
state = stateRunning

// Lets actually process the config we just got or we'll mis responding
fallthrough
case stateRunning:
// Check ACLs on every Discovery{Request,Response}.
if err := checkStreamACLs(cfgSnap); err != nil {
return err
}
// For the first time through the state machine, this is when the
// timer is first started.
extendAuthTimer()

// See if any handlers need to have the current (possibly new) config
// sent. Note the order here is actually significant so we can't just
// range the map which has no determined order. It's important because:
Expand Down Expand Up @@ -364,12 +417,12 @@ func (s *Server) Check(ctx context.Context, r *envoyauthz.CheckRequest) (*envoya
if err != nil {
// Treat this as an auth error since Envoy has sent something it considers
// valid, it's just not an identity we trust.
return deniedResponse("Destination Principal is not a valid Connect identitiy")
return deniedResponse("Destination Principal is not a valid Connect identity")
}

destID, ok := dest.(*connect.SpiffeIDService)
if !ok {
return deniedResponse("Destination Principal is not a valid Service identitiy")
return deniedResponse("Destination Principal is not a valid Service identity")
}

// For now we don't validate the trust domain of the _destination_ at all -
Expand Down
Loading

0 comments on commit db8a871

Please sign in to comment.