Skip to content

Commit

Permalink
Check ACLs more often for xDS endpoints.
Browse files Browse the repository at this point in the history
For established xDS gRPC streams recheck ACLs for each DiscoveryRequest
or DiscoveryResponse. If more than 5 minutes has elapsed since the last
ACL check, recheck even without an incoming DiscoveryRequest or
DiscoveryResponse. ACL failures will terminate the stream.
  • Loading branch information
rboyer committed Jan 18, 2019
1 parent 2dea3e2 commit 7c34dbd
Show file tree
Hide file tree
Showing 3 changed files with 294 additions and 16 deletions.
2 changes: 2 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,8 @@ func (a *Agent) listenAndServeGRPC() error {
Authz: a,
ResolveToken: a.resolveToken,
}
a.xdsServer.Initialize()

var err error
a.grpcServer, err = a.xdsServer.GRPCServer(a.config.CertFile, a.config.KeyFile)
if err != nil {
Expand Down
93 changes: 80 additions & 13 deletions agent/xds/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"log"
"sync/atomic"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -56,6 +57,10 @@ const (
// LocalAgentClusterName is the name we give the local agent "cluster" in
// Envoy config.
LocalAgentClusterName = "local_agent"

// DefaultAuthCheckFrequency is the default value for
// Server.AuthCheckFrequency to use when the zero value is provided.
DefaultAuthCheckFrequency = 5 * time.Minute
)

// ACLResolverFunc is a shim to resolve ACLs. Since ACL enforcement is so far
Expand Down Expand Up @@ -90,6 +95,18 @@ type Server struct {
CfgMgr ConfigManager
Authz ConnectAuthz
ResolveToken ACLResolverFunc
// AuthCheckFrequency is how often we should re-check the credentials used
// during a long-lived gRPC Stream after it has been initially established.
// This is only used during idle periods of stream interactions (i.e. when
// there has been no recent DiscoveryRequest).
AuthCheckFrequency time.Duration
}

// Initialize will finish configuring the Server for first use.
func (s *Server) Initialize() {
if s.AuthCheckFrequency == 0 {
s.AuthCheckFrequency = DefaultAuthCheckFrequency
}
}

// StreamAggregatedResources implements
Expand Down Expand Up @@ -126,7 +143,7 @@ func (s *Server) StreamAggregatedResources(stream ADSStream) error {

const (
stateInit int = iota
statePendingAuth
statePendingInitialConfig
stateRunning
)

Expand Down Expand Up @@ -176,8 +193,53 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest)
},
}

var timer *time.Timer
var authTimerChan <-chan time.Time
extendAuthTimer := func(timerReceived bool) {
if timer == nil {
return
}
if !timerReceived {
if !timer.Stop() {
<-timer.C
}
}
timer.Reset(s.AuthCheckFrequency)
}

checkStreamACLs := func(cfgSnap *proxycfg.ConfigSnapshot) error {
if cfgSnap == nil {
return status.Errorf(codes.Unauthenticated, "unauthenticated: no config snapshot")
}

token := tokenFromStream(stream)
rule, err := s.ResolveToken(token)

if acl.IsErrNotFound(err) {
return status.Errorf(codes.Unauthenticated, "unauthenticated: %v", err)
} else if acl.IsErrPermissionDenied(err) {
return status.Errorf(codes.PermissionDenied, "permission denied: %v", err)
} else if err != nil {
return err
}

if rule != nil && !rule.ServiceWrite(cfgSnap.Proxy.DestinationServiceName, nil) {
return status.Errorf(codes.PermissionDenied, "permission denied")
}

// Authed OK!
return nil
}

for {
select {
case <-authTimerChan:
// It's been too long since a Discovery{Request,Response} so recheck ACLs.
if err := checkStreamACLs(cfgSnap); err != nil {
return err
}
extendAuthTimer(true)

case req, ok = <-reqCh:
if !ok {
// reqCh is closed when stream.Recv errors which is how we detect client
Expand Down Expand Up @@ -218,27 +280,32 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest)
defer watchCancel()

// Now wait for the config so we can check ACL
state = statePendingAuth
case statePendingAuth:
state = statePendingInitialConfig
case statePendingInitialConfig:
if cfgSnap == nil {
// Nothing we can do until we get the initial config
continue
}
// Got config, try to authenticate
token := tokenFromStream(stream)
rule, err := s.ResolveToken(token)
if err != nil {
return err
}
if rule != nil && !rule.ServiceWrite(cfgSnap.Proxy.DestinationServiceName, nil) {
return status.Errorf(codes.PermissionDenied, "permission denied")
}
// Authed OK!

// Got config, try to authenticate next.
state = stateRunning

// Lets actually process the config we just got or we'll mis responding
fallthrough
case stateRunning:
// Check ACLs on every Discovery{Request,Response}.
if err := checkStreamACLs(cfgSnap); err != nil {
return err
}
extendAuthTimer(false)

if timer == nil {
// Opt this stream into periodic auth checks in the background.
timer = time.NewTimer(s.AuthCheckFrequency)
defer timer.Stop()
authTimerChan = timer.C
}

// See if any handlers need to have the current (possibly new) config
// sent. Note the order here is actually significant so we can't just
// range the map which has no determined order. It's important because:
Expand Down
Loading

0 comments on commit 7c34dbd

Please sign in to comment.