From 2dea3e2bd7395c2ef92cd0563d398d62c8f99391 Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" Date: Fri, 18 Jan 2019 15:00:54 -0600 Subject: [PATCH 1/2] Fix some test typos. --- agent/xds/server.go | 4 ++-- agent/xds/server_test.go | 30 +++++++++++++++--------------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/agent/xds/server.go b/agent/xds/server.go index 0676bc37ecf3..70892b48b0e9 100644 --- a/agent/xds/server.go +++ b/agent/xds/server.go @@ -364,12 +364,12 @@ func (s *Server) Check(ctx context.Context, r *envoyauthz.CheckRequest) (*envoya if err != nil { // Treat this as an auth error since Envoy has sent something it considers // valid, it's just not an identity we trust. - return deniedResponse("Destination Principal is not a valid Connect identitiy") + return deniedResponse("Destination Principal is not a valid Connect identity") } destID, ok := dest.(*connect.SpiffeIDService) if !ok { - return deniedResponse("Destination Principal is not a valid Service identitiy") + return deniedResponse("Destination Principal is not a valid Service identity") } // For now we don't validate the trust domain of the _destination_ at all - diff --git a/agent/xds/server_test.go b/agent/xds/server_test.go index b11904cce2bc..bdafbc3d240b 100644 --- a/agent/xds/server_test.go +++ b/agent/xds/server_test.go @@ -82,7 +82,7 @@ func (m *testManager) Watch(proxyID string) (<-chan *proxycfg.ConfigSnapshot, pr // AssertWatchCancelled checks that the most recent call to a Watch cancel func // was from the specified proxyID and that one is made in a short time. This // probably won't work if you are running multiple Watches in parallel on -// multiple proxyIDS due to timing/ordering issues but I dont think we need to +// multiple proxyIDS due to timing/ordering issues but I don't think we need to // do that. func (m *testManager) AssertWatchCancelled(t *testing.T, proxyID string) { t.Helper() @@ -162,11 +162,11 @@ func TestServer_StreamAggregatedResources_BasicProtocol(t *testing.T) { // And should get a response immediately. assertResponseSent(t, envoy.stream.sendCh, expectListenerJSON(t, snap, "", 1, 3)) - // Now send Route request along with next listner one + // Now send Route request along with next listener one envoy.SendReq(t, RouteType, 0, 0) envoy.SendReq(t, ListenerType, 1, 3) - // We don't serve routes yet so this shoould block with no response + // We don't serve routes yet so this should block with no response assertChanBlocked(t, envoy.stream.sendCh) // WOOP! Envoy now has full connect config. Lets verify that if we update it, @@ -179,7 +179,7 @@ func TestServer_StreamAggregatedResources_BasicProtocol(t *testing.T) { mgr.DeliverConfig(t, "web-sidecar-proxy", snap) // All 3 response that have something to return should return with new version - // note that the ordering is not determinisic in general. Trying to make this + // note that the ordering is not deterministic in general. Trying to make this // test order-agnostic though is a massive pain since we are comparing // non-identical JSON strings (so can simply sort by anything) and because we // don't know the order the nonces will be assigned. For now we rely and @@ -189,21 +189,21 @@ func TestServer_StreamAggregatedResources_BasicProtocol(t *testing.T) { assertResponseSent(t, envoy.stream.sendCh, expectEndpointsJSON(t, snap, "", 2, 5)) assertResponseSent(t, envoy.stream.sendCh, expectListenerJSON(t, snap, "", 2, 6)) - // Let's pretent that Envoy doesn't like that new listener config. It will ACK + // Let's pretend that Envoy doesn't like that new listener config. It will ACK // all the others (same version) but NACK the listener. This is the most // subtle part of xDS and the server implementation so I'll elaborate. A full // description of the protocol can be found at // https://github.com/envoyproxy/data-plane-api/blob/master/XDS_PROTOCOL.md. - // Envoy delays making a followup reqeest for a type until after it has + // Envoy delays making a followup request for a type until after it has // processed and applied the last response. The next request then will include - // the nonce in the last response which acknowledges _recieving_ and handling + // the nonce in the last response which acknowledges _receiving_ and handling // that response. It also includes the currently applied version. If all is // good and it successfully applies the config, then the version in the next // response will be the same version just sent. This is considered to be an // ACK of that version for that type. If envoy fails to apply the config for // some reason, it will still acknowledge that it received it (still return // the responses nonce), but will show the previous version it's still using. - // This is considered a NACK. It's impotant that the server pay attention to + // This is considered a NACK. It's important that the server pay attention to // the _nonce_ and not the version when deciding what to send otherwise a bad // version that can't be applied in Envoy will cause a busy loop. // @@ -489,7 +489,7 @@ func assertChanBlocked(t *testing.T, ch chan *envoy.DiscoveryResponse) { t.Helper() select { case r := <-ch: - t.Fatalf("chan should block but recieved: %v", r) + t.Fatalf("chan should block but received: %v", r) case <-time.After(10 * time.Millisecond): return } @@ -501,12 +501,12 @@ func assertResponseSent(t *testing.T, ch chan *envoy.DiscoveryResponse, wantJSON case r := <-ch: assertResponse(t, r, wantJSON) case <-time.After(50 * time.Millisecond): - t.Fatalf("no response recieved after 50ms") + t.Fatalf("no response received after 50ms") } } // assertResponse is a helper to test a envoy.DiscoveryResponse matches the -// JSON representaion we expect. We use JSON because the responses use protobuf +// JSON representation we expect. We use JSON because the responses use protobuf // Any type which includes binary protobuf encoding and would make creating // expected structs require the same code that is under test! func assertResponse(t *testing.T, r *envoy.DiscoveryResponse, wantJSON string) { @@ -519,7 +519,7 @@ func assertResponse(t *testing.T, r *envoy.DiscoveryResponse, wantJSON string) { require.JSONEqf(t, wantJSON, gotJSON, "got:\n%s", gotJSON) } -func TestServer_StreamAggregatedResources_ACLEnforcment(t *testing.T) { +func TestServer_StreamAggregatedResources_ACLEnforcement(t *testing.T) { tests := []struct { name string @@ -529,7 +529,7 @@ func TestServer_StreamAggregatedResources_ACLEnforcment(t *testing.T) { wantDenied bool }{ // Note that although we've stubbed actual ACL checks in the testManager - // ConnectAuthorize mock, by asserting against specifc reason strings here + // ConnectAuthorize mock, by asserting against specific reason strings here // even in the happy case which can't match the default one returned by the // mock we are implicitly validating that the implementation used the // correct token from the context. @@ -685,7 +685,7 @@ func TestServer_Check(t *testing.T) { destPrincipal: "not-a-spiffe-id", // Should never make it to authz call. wantDenied: true, - wantReason: "Destination Principal is not a valid Connect identitiy", + wantReason: "Destination Principal is not a valid Connect identity", }, { name: "dest not a service URI", @@ -693,7 +693,7 @@ func TestServer_Check(t *testing.T) { destPrincipal: "spiffe://trust-domain.consul", // Should never make it to authz call. wantDenied: true, - wantReason: "Destination Principal is not a valid Service identitiy", + wantReason: "Destination Principal is not a valid Service identity", }, { name: "ACL not got permission for authz call", From d3eb781384d0b5a8abde110c75e47739b62ce634 Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" Date: Fri, 11 Jan 2019 09:43:18 -0600 Subject: [PATCH 2/2] Check ACLs more often for xDS endpoints. For established xDS gRPC streams recheck ACLs for each DiscoveryRequest or DiscoveryResponse. If more than 5 minutes has elapsed since the last ACL check, recheck even without an incoming DiscoveryRequest or DiscoveryResponse. ACL failures will terminate the stream. --- agent/agent.go | 2 + agent/xds/server.go | 79 +++++++++++--- agent/xds/server_test.go | 215 ++++++++++++++++++++++++++++++++++++++- 3 files changed, 280 insertions(+), 16 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 0ed27cb610e9..53d8091ec5e4 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -522,6 +522,8 @@ func (a *Agent) listenAndServeGRPC() error { Authz: a, ResolveToken: a.resolveToken, } + a.xdsServer.Initialize() + var err error a.grpcServer, err = a.xdsServer.GRPCServer(a.config.CertFile, a.config.KeyFile) if err != nil { diff --git a/agent/xds/server.go b/agent/xds/server.go index 70892b48b0e9..1a32b1c7670d 100644 --- a/agent/xds/server.go +++ b/agent/xds/server.go @@ -6,6 +6,7 @@ import ( "fmt" "log" "sync/atomic" + "time" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -56,6 +57,10 @@ const ( // LocalAgentClusterName is the name we give the local agent "cluster" in // Envoy config. LocalAgentClusterName = "local_agent" + + // DefaultAuthCheckFrequency is the default value for + // Server.AuthCheckFrequency to use when the zero value is provided. + DefaultAuthCheckFrequency = 5 * time.Minute ) // ACLResolverFunc is a shim to resolve ACLs. Since ACL enforcement is so far @@ -90,6 +95,18 @@ type Server struct { CfgMgr ConfigManager Authz ConnectAuthz ResolveToken ACLResolverFunc + // AuthCheckFrequency is how often we should re-check the credentials used + // during a long-lived gRPC Stream after it has been initially established. + // This is only used during idle periods of stream interactions (i.e. when + // there has been no recent DiscoveryRequest). + AuthCheckFrequency time.Duration +} + +// Initialize will finish configuring the Server for first use. +func (s *Server) Initialize() { + if s.AuthCheckFrequency == 0 { + s.AuthCheckFrequency = DefaultAuthCheckFrequency + } } // StreamAggregatedResources implements @@ -126,7 +143,7 @@ func (s *Server) StreamAggregatedResources(stream ADSStream) error { const ( stateInit int = iota - statePendingAuth + statePendingInitialConfig stateRunning ) @@ -176,8 +193,44 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest) }, } + var authTimer <-chan time.Time + extendAuthTimer := func() { + authTimer = time.After(s.AuthCheckFrequency) + } + + checkStreamACLs := func(cfgSnap *proxycfg.ConfigSnapshot) error { + if cfgSnap == nil { + return status.Errorf(codes.Unauthenticated, "unauthenticated: no config snapshot") + } + + token := tokenFromStream(stream) + rule, err := s.ResolveToken(token) + + if acl.IsErrNotFound(err) { + return status.Errorf(codes.Unauthenticated, "unauthenticated: %v", err) + } else if acl.IsErrPermissionDenied(err) { + return status.Errorf(codes.PermissionDenied, "permission denied: %v", err) + } else if err != nil { + return err + } + + if rule != nil && !rule.ServiceWrite(cfgSnap.Proxy.DestinationServiceName, nil) { + return status.Errorf(codes.PermissionDenied, "permission denied") + } + + // Authed OK! + return nil + } + for { select { + case <-authTimer: + // It's been too long since a Discovery{Request,Response} so recheck ACLs. + if err := checkStreamACLs(cfgSnap); err != nil { + return err + } + extendAuthTimer() + case req, ok = <-reqCh: if !ok { // reqCh is closed when stream.Recv errors which is how we detect client @@ -218,27 +271,27 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest) defer watchCancel() // Now wait for the config so we can check ACL - state = statePendingAuth - case statePendingAuth: + state = statePendingInitialConfig + case statePendingInitialConfig: if cfgSnap == nil { // Nothing we can do until we get the initial config continue } - // Got config, try to authenticate - token := tokenFromStream(stream) - rule, err := s.ResolveToken(token) - if err != nil { - return err - } - if rule != nil && !rule.ServiceWrite(cfgSnap.Proxy.DestinationServiceName, nil) { - return status.Errorf(codes.PermissionDenied, "permission denied") - } - // Authed OK! + + // Got config, try to authenticate next. state = stateRunning // Lets actually process the config we just got or we'll mis responding fallthrough case stateRunning: + // Check ACLs on every Discovery{Request,Response}. + if err := checkStreamACLs(cfgSnap); err != nil { + return err + } + // For the first time through the state machine, this is when the + // timer is first started. + extendAuthTimer() + // See if any handlers need to have the current (possibly new) config // sent. Note the order here is actually significant so we can't just // range the map which has no determined order. It's important because: diff --git a/agent/xds/server_test.go b/agent/xds/server_test.go index bdafbc3d240b..694cbce2972b 100644 --- a/agent/xds/server_test.go +++ b/agent/xds/server_test.go @@ -9,6 +9,7 @@ import ( "os" "strings" "sync" + "sync/atomic" "testing" "text/template" "time" @@ -115,7 +116,13 @@ func TestServer_StreamAggregatedResources_BasicProtocol(t *testing.T) { envoy := NewTestEnvoy(t, "web-sidecar-proxy", "") defer envoy.Close() - s := Server{logger, mgr, mgr, aclResolve} + s := Server{ + Logger: logger, + CfgMgr: mgr, + Authz: mgr, + ResolveToken: aclResolve, + } + s.Initialize() go func() { err := s.StreamAggregatedResources(envoy.stream) @@ -589,7 +596,13 @@ func TestServer_StreamAggregatedResources_ACLEnforcement(t *testing.T) { envoy := NewTestEnvoy(t, "web-sidecar-proxy", tt.token) defer envoy.Close() - s := Server{logger, mgr, mgr, aclResolve} + s := Server{ + Logger: logger, + CfgMgr: mgr, + Authz: mgr, + ResolveToken: aclResolve, + } + s.Initialize() errCh := make(chan error, 1) go func() { @@ -632,6 +645,196 @@ func TestServer_StreamAggregatedResources_ACLEnforcement(t *testing.T) { } } +func TestServer_StreamAggregatedResources_ACLTokenDeleted_StreamTerminatedDuringDiscoveryRequest(t *testing.T) { + aclRules := `service "web" { policy = "write" }` + token := "service-write-on-web" + + policy, err := acl.NewPolicyFromSource("", 0, aclRules, acl.SyntaxLegacy, nil) + require.NoError(t, err) + + var validToken atomic.Value + validToken.Store(token) + + logger := log.New(os.Stderr, "", log.LstdFlags) + mgr := newTestManager(t) + aclResolve := func(id string) (acl.Authorizer, error) { + if token := validToken.Load(); token == nil || id != token.(string) { + return nil, acl.ErrNotFound + } + + return acl.NewPolicyAuthorizer(acl.RootAuthorizer("deny"), []*acl.Policy{policy}, nil) + } + envoy := NewTestEnvoy(t, "web-sidecar-proxy", token) + defer envoy.Close() + + s := Server{ + Logger: logger, + CfgMgr: mgr, + Authz: mgr, + ResolveToken: aclResolve, + AuthCheckFrequency: 1 * time.Hour, // make sure this doesn't kick in + } + s.Initialize() + + errCh := make(chan error, 1) + go func() { + errCh <- s.StreamAggregatedResources(envoy.stream) + }() + + getError := func() (gotErr error, ok bool) { + select { + case err := <-errCh: + return err, true + default: + return nil, false + } + } + + // Register the proxy to create state needed to Watch() on + mgr.RegisterProxy(t, "web-sidecar-proxy") + + // Send initial cluster discover (OK) + envoy.SendReq(t, ClusterType, 0, 0) + { + err, ok := getError() + require.NoError(t, err) + require.False(t, ok) + } + + // Check no response sent yet + assertChanBlocked(t, envoy.stream.sendCh) + { + err, ok := getError() + require.NoError(t, err) + require.False(t, ok) + } + + // Deliver a new snapshot + snap := proxycfg.TestConfigSnapshot(t) + mgr.DeliverConfig(t, "web-sidecar-proxy", snap) + + assertResponseSent(t, envoy.stream.sendCh, expectClustersJSON(t, snap, token, 1, 1)) + + // Now nuke the ACL token. + validToken.Store("") + + // It also (in parallel) issues the next cluster request (which acts as an ACK + // of the version we sent) + envoy.SendReq(t, ClusterType, 1, 1) + + select { + case err := <-errCh: + require.Error(t, err) + gerr, ok := status.FromError(err) + require.Truef(t, ok, "not a grpc status error: type='%T' value=%v", err, err) + require.Equal(t, codes.Unauthenticated, gerr.Code()) + require.Equal(t, "unauthenticated: ACL not found", gerr.Message()) + + mgr.AssertWatchCancelled(t, "web-sidecar-proxy") + case <-time.After(50 * time.Millisecond): + t.Fatalf("timed out waiting for handler to finish") + } +} + +func TestServer_StreamAggregatedResources_ACLTokenDeleted_StreamTerminatedInBackground(t *testing.T) { + aclRules := `service "web" { policy = "write" }` + token := "service-write-on-web" + + policy, err := acl.NewPolicyFromSource("", 0, aclRules, acl.SyntaxLegacy, nil) + require.NoError(t, err) + + var validToken atomic.Value + validToken.Store(token) + + logger := log.New(os.Stderr, "", log.LstdFlags) + mgr := newTestManager(t) + aclResolve := func(id string) (acl.Authorizer, error) { + if token := validToken.Load(); token == nil || id != token.(string) { + return nil, acl.ErrNotFound + } + + return acl.NewPolicyAuthorizer(acl.RootAuthorizer("deny"), []*acl.Policy{policy}, nil) + } + envoy := NewTestEnvoy(t, "web-sidecar-proxy", token) + defer envoy.Close() + + s := Server{ + Logger: logger, + CfgMgr: mgr, + Authz: mgr, + ResolveToken: aclResolve, + AuthCheckFrequency: 100 * time.Millisecond, // Make this short. + } + s.Initialize() + + errCh := make(chan error, 1) + go func() { + errCh <- s.StreamAggregatedResources(envoy.stream) + }() + + getError := func() (gotErr error, ok bool) { + select { + case err := <-errCh: + return err, true + default: + return nil, false + } + } + + // Register the proxy to create state needed to Watch() on + mgr.RegisterProxy(t, "web-sidecar-proxy") + + // Send initial cluster discover (OK) + envoy.SendReq(t, ClusterType, 0, 0) + { + err, ok := getError() + require.NoError(t, err) + require.False(t, ok) + } + + // Check no response sent yet + assertChanBlocked(t, envoy.stream.sendCh) + { + err, ok := getError() + require.NoError(t, err) + require.False(t, ok) + } + + // Deliver a new snapshot + snap := proxycfg.TestConfigSnapshot(t) + mgr.DeliverConfig(t, "web-sidecar-proxy", snap) + + assertResponseSent(t, envoy.stream.sendCh, expectClustersJSON(t, snap, token, 1, 1)) + + // It also (in parallel) issues the next cluster request (which acts as an ACK + // of the version we sent) + envoy.SendReq(t, ClusterType, 1, 1) + + // Check no response sent yet + assertChanBlocked(t, envoy.stream.sendCh) + { + err, ok := getError() + require.NoError(t, err) + require.False(t, ok) + } + + // Now nuke the ACL token while there's no activity. + validToken.Store("") + + select { + case err := <-errCh: + require.Error(t, err) + gerr, ok := status.FromError(err) + require.Truef(t, ok, "not a grpc status error: type='%T' value=%v", err, err) + require.Equal(t, codes.Unauthenticated, gerr.Code()) + require.Equal(t, "unauthenticated: ACL not found", gerr.Message()) + + mgr.AssertWatchCancelled(t, "web-sidecar-proxy") + case <-time.After(200 * time.Millisecond): + t.Fatalf("timed out waiting for handler to finish") + } +} + // This tests the ext_authz service method that implements connect authz. func TestServer_Check(t *testing.T) { @@ -729,7 +932,13 @@ func TestServer_Check(t *testing.T) { envoy := NewTestEnvoy(t, "web-sidecar-proxy", token) defer envoy.Close() - s := Server{logger, mgr, mgr, aclResolve} + s := Server{ + Logger: logger, + CfgMgr: mgr, + Authz: mgr, + ResolveToken: aclResolve, + } + s.Initialize() // Create a context with the correct token ctx := metadata.NewIncomingContext(context.Background(),