Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check ACLs more often for xDS endpoints. #5237

Merged
merged 2 commits into from
Jan 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,8 @@ func (a *Agent) listenAndServeGRPC() error {
Authz: a,
ResolveToken: a.resolveToken,
}
a.xdsServer.Initialize()

var err error
a.grpcServer, err = a.xdsServer.GRPCServer(a.config.CertFile, a.config.KeyFile)
if err != nil {
Expand Down
83 changes: 68 additions & 15 deletions agent/xds/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"log"
"sync/atomic"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -56,6 +57,10 @@ const (
// LocalAgentClusterName is the name we give the local agent "cluster" in
// Envoy config.
LocalAgentClusterName = "local_agent"

// DefaultAuthCheckFrequency is the default value for
// Server.AuthCheckFrequency to use when the zero value is provided.
DefaultAuthCheckFrequency = 5 * time.Minute
)

// ACLResolverFunc is a shim to resolve ACLs. Since ACL enforcement is so far
Expand Down Expand Up @@ -90,6 +95,18 @@ type Server struct {
CfgMgr ConfigManager
Authz ConnectAuthz
ResolveToken ACLResolverFunc
// AuthCheckFrequency is how often we should re-check the credentials used
// during a long-lived gRPC Stream after it has been initially established.
// This is only used during idle periods of stream interactions (i.e. when
// there has been no recent DiscoveryRequest).
AuthCheckFrequency time.Duration
}

// Initialize will finish configuring the Server for first use.
func (s *Server) Initialize() {
if s.AuthCheckFrequency == 0 {
s.AuthCheckFrequency = DefaultAuthCheckFrequency
mkeeler marked this conversation as resolved.
Show resolved Hide resolved
}
}

// StreamAggregatedResources implements
Expand Down Expand Up @@ -126,7 +143,7 @@ func (s *Server) StreamAggregatedResources(stream ADSStream) error {

const (
stateInit int = iota
statePendingAuth
statePendingInitialConfig
stateRunning
)

Expand Down Expand Up @@ -176,8 +193,44 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest)
},
}

var authTimer <-chan time.Time
extendAuthTimer := func() {
authTimer = time.After(s.AuthCheckFrequency)
}

checkStreamACLs := func(cfgSnap *proxycfg.ConfigSnapshot) error {
if cfgSnap == nil {
return status.Errorf(codes.Unauthenticated, "unauthenticated: no config snapshot")
}

token := tokenFromStream(stream)
rule, err := s.ResolveToken(token)

if acl.IsErrNotFound(err) {
return status.Errorf(codes.Unauthenticated, "unauthenticated: %v", err)
rboyer marked this conversation as resolved.
Show resolved Hide resolved
} else if acl.IsErrPermissionDenied(err) {
return status.Errorf(codes.PermissionDenied, "permission denied: %v", err)
} else if err != nil {
return err
}

if rule != nil && !rule.ServiceWrite(cfgSnap.Proxy.DestinationServiceName, nil) {
return status.Errorf(codes.PermissionDenied, "permission denied")
}

// Authed OK!
return nil
}

for {
select {
case <-authTimer:
// It's been too long since a Discovery{Request,Response} so recheck ACLs.
if err := checkStreamACLs(cfgSnap); err != nil {
return err
}
extendAuthTimer()

case req, ok = <-reqCh:
if !ok {
// reqCh is closed when stream.Recv errors which is how we detect client
Expand Down Expand Up @@ -218,27 +271,27 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest)
defer watchCancel()

// Now wait for the config so we can check ACL
state = statePendingAuth
case statePendingAuth:
state = statePendingInitialConfig
case statePendingInitialConfig:
if cfgSnap == nil {
// Nothing we can do until we get the initial config
continue
}
// Got config, try to authenticate
token := tokenFromStream(stream)
rule, err := s.ResolveToken(token)
if err != nil {
return err
}
if rule != nil && !rule.ServiceWrite(cfgSnap.Proxy.DestinationServiceName, nil) {
return status.Errorf(codes.PermissionDenied, "permission denied")
}
// Authed OK!

// Got config, try to authenticate next.
state = stateRunning

// Lets actually process the config we just got or we'll mis responding
fallthrough
case stateRunning:
// Check ACLs on every Discovery{Request,Response}.
if err := checkStreamACLs(cfgSnap); err != nil {
return err
}
// For the first time through the state machine, this is when the
// timer is first started.
extendAuthTimer()

// See if any handlers need to have the current (possibly new) config
// sent. Note the order here is actually significant so we can't just
// range the map which has no determined order. It's important because:
Expand Down Expand Up @@ -364,12 +417,12 @@ func (s *Server) Check(ctx context.Context, r *envoyauthz.CheckRequest) (*envoya
if err != nil {
// Treat this as an auth error since Envoy has sent something it considers
// valid, it's just not an identity we trust.
return deniedResponse("Destination Principal is not a valid Connect identitiy")
return deniedResponse("Destination Principal is not a valid Connect identity")
}

destID, ok := dest.(*connect.SpiffeIDService)
if !ok {
return deniedResponse("Destination Principal is not a valid Service identitiy")
return deniedResponse("Destination Principal is not a valid Service identity")
}

// For now we don't validate the trust domain of the _destination_ at all -
Expand Down
Loading