From a36df793fffbbbedb91c00b29f03fc4aa5b43f5e Mon Sep 17 00:00:00 2001 From: Valerian Roche Date: Wed, 24 Aug 2022 22:06:29 -0400 Subject: [PATCH 1/6] Rework Cache interface to isolate it from streamState and make it more uniform between sotw and delta Signed-off-by: Valerian Roche --- pkg/cache/v3/cache.go | 26 +++++++- pkg/cache/v3/delta.go | 15 ++--- pkg/cache/v3/delta_test.go | 46 ++++++------- pkg/cache/v3/linear.go | 21 +++--- pkg/cache/v3/linear_test.go | 118 ++++++++++++++++----------------- pkg/cache/v3/mux.go | 6 +- pkg/cache/v3/simple.go | 27 ++++---- pkg/cache/v3/simple_test.go | 43 +++++++----- pkg/cache/v3/status.go | 19 +++++- pkg/server/delta/v3/server.go | 8 +-- pkg/server/sotw/v3/server.go | 24 ++++--- pkg/server/stream/v3/stream.go | 51 +------------- pkg/server/v3/delta_test.go | 37 +++++------ pkg/server/v3/server.go | 7 +- pkg/server/v3/server_test.go | 3 +- 15 files changed, 222 insertions(+), 229 deletions(-) diff --git a/pkg/cache/v3/cache.go b/pkg/cache/v3/cache.go index 35e963343b..be124cb484 100644 --- a/pkg/cache/v3/cache.go +++ b/pkg/cache/v3/cache.go @@ -26,7 +26,6 @@ import ( "google.golang.org/protobuf/types/known/durationpb" "github.com/envoyproxy/go-control-plane/pkg/cache/types" - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" ) @@ -37,6 +36,27 @@ type Request = discovery.DiscoveryRequest // DeltaRequest is an alias for the delta discovery request type. type DeltaRequest = discovery.DeltaDiscoveryRequest +// ClientState provides additional data on the client knowledge for the type matching the request +// This allows proper implementation of stateful aspects of the protocol (e.g. returning only some updated resources) +// Though the methods may return mutable parts of the state for performance reasons, +// the cache is expected to consider this state as immutable and thread safe between a watch creation and its cancellation +type ClientState interface { + // GetKnownResources returns the list of resources the clients has ACKed and their associated version. + // The versions are: + // - delta protocol: version of the specific resource set in the response + // - sotw protocol: version of the global response when the resource was last ACKed + GetKnownResources() map[string]string + + // GetSubscribedResources returns the list of resources currently subscribed to by the client for the type. + // For delta it keeps track across requests + // For sotw it is a normalized view of the request resources + GetSubscribedResources() map[string]struct{} + + // IsWildcard returns whether the client has a wildcard watch. + // This considers subtilities related to the current migration of wildcard definition within the protocol. + IsWildcard() bool +} + // ConfigWatcher requests watches for configuration resources by a node, last // applied version identifier, and resource names hint. The watch should send // the responses when they are ready. The watch can be canceled by the @@ -50,7 +70,7 @@ type ConfigWatcher interface { // // Cancel is an optional function to release resources in the producer. If // provided, the consumer may call this function multiple times. - CreateWatch(*Request, stream.StreamState, chan Response) (cancel func()) + CreateWatch(*Request, ClientState, chan Response) (cancel func()) // CreateDeltaWatch returns a new open incremental xDS watch. // @@ -59,7 +79,7 @@ type ConfigWatcher interface { // // Cancel is an optional function to release resources in the producer. If // provided, the consumer may call this function multiple times. - CreateDeltaWatch(*DeltaRequest, stream.StreamState, chan DeltaResponse) (cancel func()) + CreateDeltaWatch(*DeltaRequest, ClientState, chan DeltaResponse) (cancel func()) } // ConfigFetcher fetches configuration resources from cache diff --git a/pkg/cache/v3/delta.go b/pkg/cache/v3/delta.go index deaeeb7ed1..c5adf9fb8b 100644 --- a/pkg/cache/v3/delta.go +++ b/pkg/cache/v3/delta.go @@ -18,7 +18,6 @@ import ( "context" "github.com/envoyproxy/go-control-plane/pkg/cache/types" - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) // groups together resource-related arguments for the createDeltaResponse function @@ -28,7 +27,7 @@ type resourceContainer struct { systemVersion string } -func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.StreamState, resources resourceContainer) *RawDeltaResponse { +func createDeltaResponse(ctx context.Context, req *DeltaRequest, state ClientState, resources resourceContainer) *RawDeltaResponse { // variables to build our response with var nextVersionMap map[string]string var filtered []types.Resource @@ -37,7 +36,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St // If we are handling a wildcard request, we want to respond with all resources switch { case state.IsWildcard(): - if len(state.GetResourceVersions()) == 0 { + if len(state.GetKnownResources()) == 0 { filtered = make([]types.Resource, 0, len(resources.resourceMap)) } nextVersionMap = make(map[string]string, len(resources.resourceMap)) @@ -46,7 +45,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St // we can just set it here to be used for comparison later version := resources.versionMap[name] nextVersionMap[name] = version - prevVersion, found := state.GetResourceVersions()[name] + prevVersion, found := state.GetKnownResources()[name] if !found || (prevVersion != version) { filtered = append(filtered, r) } @@ -54,17 +53,17 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St // Compute resources for removal // The resource version can be set to "" here to trigger a removal even if never returned before - for name := range state.GetResourceVersions() { + for name := range state.GetKnownResources() { if _, ok := resources.resourceMap[name]; !ok { toRemove = append(toRemove, name) } } default: - nextVersionMap = make(map[string]string, len(state.GetSubscribedResourceNames())) + nextVersionMap = make(map[string]string, len(state.GetSubscribedResources())) // state.GetResourceVersions() may include resources no longer subscribed // In the current code this gets silently cleaned when updating the version map - for name := range state.GetSubscribedResourceNames() { - prevVersion, found := state.GetResourceVersions()[name] + for name := range state.GetSubscribedResources() { + prevVersion, found := state.GetKnownResources()[name] if r, ok := resources.resourceMap[name]; ok { nextVersion := resources.versionMap[name] if prevVersion != nextVersion { diff --git a/pkg/cache/v3/delta_test.go b/pkg/cache/v3/delta_test.go index 4999cad603..e171abad69 100644 --- a/pkg/cache/v3/delta_test.go +++ b/pkg/cache/v3/delta_test.go @@ -3,12 +3,12 @@ package cache_test import ( "context" "fmt" - "reflect" "testing" "time" "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "google.golang.org/protobuf/testing/protocmp" core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" @@ -35,13 +35,14 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) { // Make our initial request as a wildcard to get all resources and make sure the wildcard requesting works as intended for _, typ := range testTypes { watches[typ] = make(chan cache.DeltaResponse, 1) + state := stream.NewStreamState(true, nil) c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ Node: &core.Node{ Id: "node", }, TypeUrl: typ, ResourceNamesSubscribe: names[typ], - }, stream.NewStreamState(true, nil), watches[typ]) + }, &state, watches[typ]) } if err := c.SetSnapshot(context.Background(), key, fixture.snapshot()); err != nil { @@ -69,7 +70,7 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) { watches[typ] = make(chan cache.DeltaResponse, 1) state := stream.NewStreamState(false, versionMap[typ]) for resource := range versionMap[typ] { - state.GetSubscribedResourceNames()[resource] = struct{}{} + state.GetSubscribedResources()[resource] = struct{}{} } c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ Node: &core.Node{ @@ -77,7 +78,7 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) { }, TypeUrl: typ, ResourceNamesSubscribe: names[typ], - }, state, watches[typ]) + }, &state, watches[typ]) } if count := c.GetStatusInfo(key).GetNumDeltaWatches(); count != len(testTypes) { @@ -123,12 +124,10 @@ func TestDeltaRemoveResources(t *testing.T) { Id: "node", }, TypeUrl: typ, - }, *streams[typ], watches[typ]) + }, streams[typ], watches[typ]) } - if err := c.SetSnapshot(context.Background(), key, fixture.snapshot()); err != nil { - t.Fatal(err) - } + require.NoError(t, c.SetSnapshot(context.Background(), key, fixture.snapshot())) for _, typ := range testTypes { t.Run(typ, func(t *testing.T) { @@ -139,7 +138,7 @@ func TestDeltaRemoveResources(t *testing.T) { nextVersionMap := out.GetNextVersionMap() streams[typ].SetResourceVersions(nextVersionMap) case <-time.After(time.Second): - t.Fatal("failed to receive a snapshot response") + require.Fail(t, "failed to receive a snapshot response") } }) } @@ -152,20 +151,17 @@ func TestDeltaRemoveResources(t *testing.T) { Node: &core.Node{ Id: "node", }, - TypeUrl: typ, - }, *streams[typ], watches[typ]) + TypeUrl: typ, + ResponseNonce: "nonce", + }, streams[typ], watches[typ]) } - if count := c.GetStatusInfo(key).GetNumDeltaWatches(); count != len(testTypes) { - t.Errorf("watches should be created for the latest version, saw %d watches expected %d", count, len(testTypes)) - } + assert.Equal(t, len(testTypes), c.GetStatusInfo(key).GetNumDeltaWatches(), "watches should be created for the latest version") // set a partially versioned snapshot with no endpoints snapshot2 := fixture.snapshot() snapshot2.Resources[types.Endpoint] = cache.NewResources(fixture.version2, []types.Resource{}) - if err := c.SetSnapshot(context.Background(), key, snapshot2); err != nil { - t.Fatal(err) - } + require.NoError(t, c.SetSnapshot(context.Background(), key, snapshot2)) // validate response for endpoints select { @@ -176,11 +172,9 @@ func TestDeltaRemoveResources(t *testing.T) { nextVersionMap := out.GetNextVersionMap() // make sure the version maps are different since we no longer are tracking any endpoint resources - if reflect.DeepEqual(streams[testTypes[0]].GetResourceVersions(), nextVersionMap) { - t.Fatalf("versionMap for the endpoint resource type did not change, received: %v, instead of an empty map", nextVersionMap) - } + require.Equal(t, nextVersionMap, streams[testTypes[0]].GetKnownResources(), "versionMap for the endpoint resource type did not change") case <-time.After(time.Second): - t.Fatal("failed to receive snapshot response") + assert.Fail(t, "failed to receive snapshot response") } } @@ -203,13 +197,14 @@ func TestConcurrentSetDeltaWatch(t *testing.T) { t.Fatalf("snapshot failed: %s", err) } } else { + state := stream.NewStreamState(false, make(map[string]string)) cancel := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ Node: &core.Node{ Id: id, }, TypeUrl: rsrc.EndpointType, ResourceNamesSubscribe: []string{clusterName}, - }, stream.NewStreamState(false, make(map[string]string)), responses) + }, &state, responses) defer cancel() } @@ -226,14 +221,14 @@ func TestSnapshotDeltaCacheWatchTimeout(t *testing.T) { // Create a non-buffered channel that will block sends. watchCh := make(chan cache.DeltaResponse) state := stream.NewStreamState(false, nil) - state.SetSubscribedResourceNames(map[string]struct{}{names[rsrc.EndpointType][0]: {}}) + state.SetSubscribedResources(map[string]struct{}{names[rsrc.EndpointType][0]: {}}) c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ Node: &core.Node{ Id: key, }, TypeUrl: rsrc.EndpointType, ResourceNamesSubscribe: names[rsrc.EndpointType], - }, state, watchCh) + }, &state, watchCh) // The first time we set the snapshot without consuming from the blocking channel, so this should time out. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) @@ -269,13 +264,14 @@ func TestSnapshotCacheDeltaWatchCancel(t *testing.T) { c := cache.NewSnapshotCache(true, group{}, logger{t: t}) for _, typ := range testTypes { responses := make(chan cache.DeltaResponse, 1) + state := stream.NewStreamState(false, make(map[string]string)) cancel := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ Node: &core.Node{ Id: key, }, TypeUrl: typ, ResourceNamesSubscribe: names[typ], - }, stream.NewStreamState(false, make(map[string]string)), responses) + }, &state, responses) // Cancel the watch cancel() diff --git a/pkg/cache/v3/linear.go b/pkg/cache/v3/linear.go index ae55105d59..8b4b4b1652 100644 --- a/pkg/cache/v3/linear.go +++ b/pkg/cache/v3/linear.go @@ -24,7 +24,6 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/log" - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) type watches = map[chan Response]struct{} @@ -164,11 +163,11 @@ func (cache *LinearCache) notifyAll(modified map[string]struct{}) { } for id, watch := range cache.deltaWatches { - if !watch.StreamState.WatchesResources(modified) { + if !watch.WatchesResources(modified) { continue } - res := cache.respondDelta(watch.Request, watch.Response, watch.StreamState) + res := cache.respondDelta(watch.Request, watch.Response, watch.clientState) if res != nil { delete(cache.deltaWatches, id) } @@ -176,8 +175,8 @@ func (cache *LinearCache) notifyAll(modified map[string]struct{}) { } } -func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaResponse, state stream.StreamState) *RawDeltaResponse { - resp := createDeltaResponse(context.Background(), request, state, resourceContainer{ +func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaResponse, clientState ClientState) *RawDeltaResponse { + resp := createDeltaResponse(context.Background(), request, clientState, resourceContainer{ resourceMap: cache.resources, versionMap: cache.versionMap, systemVersion: cache.getVersion(), @@ -187,7 +186,7 @@ func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaRe if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 { if cache.log != nil { cache.log.Debugf("[linear cache] node: %s, sending delta response with resources: %v removed resources %v wildcard: %t", - request.GetNode().GetId(), resp.Resources, resp.RemovedResources, state.IsWildcard()) + request.GetNode().GetId(), resp.Resources, resp.RemovedResources, clientState.IsWildcard()) } value <- resp return resp @@ -298,7 +297,7 @@ func (cache *LinearCache) GetResources() map[string]types.Resource { return resources } -func (cache *LinearCache) CreateWatch(request *Request, streamState stream.StreamState, value chan Response) func() { +func (cache *LinearCache) CreateWatch(request *Request, clientState ClientState, value chan Response) func() { if request.TypeUrl != cache.typeURL { value <- nil return nil @@ -371,7 +370,7 @@ func (cache *LinearCache) CreateWatch(request *Request, streamState stream.Strea } } -func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) func() { +func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, clientState ClientState, value chan DeltaResponse) func() { cache.mu.Lock() defer cache.mu.Unlock() @@ -388,7 +387,7 @@ func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.S cache.log.Errorf("failed to update version map: %v", err) } } - response := cache.respondDelta(request, value, state) + response := cache.respondDelta(request, value, clientState) // if respondDelta returns nil this means that there is no change in any resource version // create a new watch accordingly @@ -396,10 +395,10 @@ func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.S watchID := cache.nextDeltaWatchID() if cache.log != nil { cache.log.Infof("[linear cache] open delta watch ID:%d for %s Resources:%v, system version %q", watchID, - cache.typeURL, state.GetSubscribedResourceNames(), cache.getVersion()) + cache.typeURL, clientState.GetSubscribedResources(), cache.getVersion()) } - cache.deltaWatches[watchID] = DeltaResponseWatch{Request: request, Response: value, StreamState: state} + cache.deltaWatches[watchID] = DeltaResponseWatch{Request: request, Response: value, clientState: clientState} return cache.cancelDeltaWatch(watchID) } diff --git a/pkg/cache/v3/linear_test.go b/pkg/cache/v3/linear_test.go index 617d90366e..8f8db7fe04 100644 --- a/pkg/cache/v3/linear_test.go +++ b/pkg/cache/v3/linear_test.go @@ -191,19 +191,19 @@ func hashResource(t *testing.T, resource types.Resource) string { func createWildcardDeltaWatch(c *LinearCache, w chan DeltaResponse) { state := stream.NewStreamState(true, nil) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) resp := <-w state.SetResourceVersions(resp.GetNextVersionMap()) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) // Ensure the watch is set properly with cache values + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) // Ensure the watch is set properly with cache values } func TestLinearInitialResources(t *testing.T) { streamState := stream.NewStreamState(false, map[string]string{}) c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})) w := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType}, streamState, w) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType}, &streamState, w) verifyResponse(t, w, "0", 1) - c.CreateWatch(&Request{TypeUrl: testType}, streamState, w) + c.CreateWatch(&Request{TypeUrl: testType}, &streamState, w) verifyResponse(t, w, "0", 2) checkVersionMapNotSet(t, c) } @@ -217,7 +217,7 @@ func TestLinearCornerCases(t *testing.T) { } // create an incorrect type URL request w := make(chan Response, 1) - c.CreateWatch(&Request{TypeUrl: "test"}, streamState, w) + c.CreateWatch(&Request{TypeUrl: "test"}, &streamState, w) select { case r := <-w: if r != nil { @@ -234,12 +234,12 @@ func TestLinearBasic(t *testing.T) { // Create watches before a resource is ready w1 := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w1) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, &streamState, w1) mustBlock(t, w1) checkVersionMapNotSet(t, c) w := make(chan Response, 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, &streamState, w) mustBlock(t, w) checkWatchCount(t, c, "a", 2) checkWatchCount(t, c, "b", 1) @@ -250,19 +250,19 @@ func TestLinearBasic(t *testing.T) { verifyResponse(t, w, "1", 1) // Request again, should get same response - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, &streamState, w) checkWatchCount(t, c, "a", 0) verifyResponse(t, w, "1", 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, &streamState, w) checkWatchCount(t, c, "a", 0) verifyResponse(t, w, "1", 1) // Add another element and update the first, response should be different require.NoError(t, c.UpdateResource("b", testResource("b"))) require.NoError(t, c.UpdateResource("a", testResource("aa"))) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, &streamState, w) verifyResponse(t, w, "3", 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, &streamState, w) verifyResponse(t, w, "3", 2) // Ensure the version map was not created as we only ever used stow watches checkVersionMapNotSet(t, c) @@ -274,10 +274,10 @@ func TestLinearSetResources(t *testing.T) { // Create new resources w1 := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w1) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, &streamState, w1) mustBlock(t, w1) w2 := make(chan Response, 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w2) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, &streamState, w2) mustBlock(t, w2) c.SetResources(map[string]types.Resource{ "a": testResource("a"), @@ -287,9 +287,9 @@ func TestLinearSetResources(t *testing.T) { verifyResponse(t, w2, "1", 2) // the version was only incremented once for all resources // Add another element and update the first, response should be different - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w1) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, &streamState, w1) mustBlock(t, w1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w2) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, &streamState, w2) mustBlock(t, w2) c.SetResources(map[string]types.Resource{ "a": testResource("aa"), @@ -300,9 +300,9 @@ func TestLinearSetResources(t *testing.T) { verifyResponse(t, w2, "2", 3) // Delete resource - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "2"}, streamState, w1) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "2"}, &streamState, w1) mustBlock(t, w1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "2"}, streamState, w2) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "2"}, &streamState, w2) mustBlock(t, w2) c.SetResources(map[string]types.Resource{ "b": testResource("b"), @@ -334,14 +334,14 @@ func TestLinearVersionPrefix(t *testing.T) { c := NewLinearCache(testType, WithVersionPrefix("instance1-")) w := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, &streamState, w) verifyResponse(t, w, "instance1-0", 0) require.NoError(t, c.UpdateResource("a", testResource("a"))) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, &streamState, w) verifyResponse(t, w, "instance1-1", 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "instance1-1"}, streamState, w) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "instance1-1"}, &streamState, w) mustBlock(t, w) checkWatchCount(t, c, "a", 1) } @@ -350,17 +350,17 @@ func TestLinearDeletion(t *testing.T) { streamState := stream.NewStreamState(false, map[string]string{}) c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})) w := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, &streamState, w) mustBlock(t, w) checkWatchCount(t, c, "a", 1) require.NoError(t, c.DeleteResource("a")) verifyResponse(t, w, "1", 0) checkWatchCount(t, c, "a", 0) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, &streamState, w) verifyResponse(t, w, "1", 1) checkWatchCount(t, c, "b", 0) require.NoError(t, c.DeleteResource("b")) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, &streamState, w) verifyResponse(t, w, "2", 0) checkWatchCount(t, c, "b", 0) } @@ -369,10 +369,10 @@ func TestLinearWatchTwo(t *testing.T) { streamState := stream.NewStreamState(false, map[string]string{}) c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})) w := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) + c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"}, &streamState, w) mustBlock(t, w) w1 := make(chan Response, 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w1) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, &streamState, w1) mustBlock(t, w1) require.NoError(t, c.UpdateResource("a", testResource("aa"))) // should only get the modified resource @@ -387,14 +387,14 @@ func TestLinearCancel(t *testing.T) { // cancel watch-all w := make(chan Response, 1) - cancel := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w) + cancel := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, &streamState, w) mustBlock(t, w) checkWatchCount(t, c, "a", 1) cancel() checkWatchCount(t, c, "a", 0) // cancel watch for "a" - cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w) + cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, &streamState, w) mustBlock(t, w) checkWatchCount(t, c, "a", 1) cancel() @@ -404,10 +404,10 @@ func TestLinearCancel(t *testing.T) { w2 := make(chan Response, 1) w3 := make(chan Response, 1) w4 := make(chan Response, 1) - cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w) - cancel2 := c.CreateWatch(&Request{ResourceNames: []string{"b"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w2) - cancel3 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w3) - cancel4 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w4) + cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, &streamState, w) + cancel2 := c.CreateWatch(&Request{ResourceNames: []string{"b"}, TypeUrl: testType, VersionInfo: "1"}, &streamState, w2) + cancel3 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, &streamState, w3) + cancel4 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, &streamState, w4) mustBlock(t, w) mustBlock(t, w2) mustBlock(t, w3) @@ -449,7 +449,7 @@ func TestLinearConcurrentSetWatch(t *testing.T) { ResourceNames: []string{id, id2}, VersionInfo: "0", TypeUrl: testType, - }, streamState, value) + }, &streamState, value) // wait until all updates apply verifyResponse(t, value, "", 1) } @@ -462,11 +462,11 @@ func TestLinearDeltaWildcard(t *testing.T) { c := NewLinearCache(testType) state1 := stream.NewStreamState(true, map[string]string{}) w1 := make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state1, w1) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state1, w1) mustBlockDelta(t, w1) state2 := stream.NewStreamState(true, map[string]string{}) w2 := make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state2, w2) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state2, w2) mustBlockDelta(t, w1) checkDeltaWatchCount(t, c, 2) @@ -491,16 +491,16 @@ func TestLinearDeltaExistingResources(t *testing.T) { assert.NoError(t, err) state := stream.NewStreamState(false, nil) - state.SetSubscribedResourceNames(map[string]struct{}{"b": {}, "c": {}}) // watching b and c - not interested in a + state.SetSubscribedResources(map[string]struct{}{"b": {}, "c": {}}) // watching b and c - not interested in a w := make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) checkDeltaWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}}, []string{}) state = stream.NewStreamState(false, nil) - state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + state.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}}) w = make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) checkDeltaWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}, nil) } @@ -517,16 +517,16 @@ func TestLinearDeltaInitialResourcesVersionSet(t *testing.T) { assert.NoError(t, err) state := stream.NewStreamState(false, map[string]string{"b": hashB}) - state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + state.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}}) w := make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) checkDeltaWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"a", hashA}}, nil) // b is up to date and shouldn't be returned state = stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB}) - state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + state.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}}) w = make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) b = &endpoint.ClusterLoadAssignment{ClusterName: "b", Endpoints: []*endpoint.LocalityLbEndpoints{{Priority: 10}}} // new version of b @@ -551,17 +551,17 @@ func TestLinearDeltaResourceUpdate(t *testing.T) { checkVersionMapNotSet(t, c) state := stream.NewStreamState(false, nil) - state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + state.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}}) w := make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) checkDeltaWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}, nil) checkVersionMapSet(t, c) state = stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB}) - state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + state.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}}) w = make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) @@ -587,16 +587,16 @@ func TestLinearDeltaResourceDelete(t *testing.T) { assert.NoError(t, err) state := stream.NewStreamState(false, nil) - state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + state.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}}) w := make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) checkDeltaWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}, nil) state = stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB}) - state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + state.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}}) w = make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) @@ -612,13 +612,13 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { c := NewLinearCache(testType) state := stream.NewStreamState(false, nil) - state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + state.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}}) w := make(chan DeltaResponse, 1) checkVersionMapNotSet(t, c) assert.Equal(t, 0, c.NumResources()) // Initial update - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) // The version map should now be created, even if empty @@ -636,7 +636,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { state.SetResourceVersions(resp.GetNextVersionMap()) // Multiple updates - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ //resource update @@ -656,7 +656,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { state.SetResourceVersions(resp.GetNextVersionMap()) // Update/add/delete - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ //resource update @@ -675,7 +675,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { state.SetResourceVersions(resp.GetNextVersionMap()) // Re-add previously deleted watched resource - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) b = &endpoint.ClusterLoadAssignment{ClusterName: "b", Endpoints: []*endpoint.LocalityLbEndpoints{}} // recreate watched resource @@ -738,7 +738,7 @@ func TestLinearMixedWatches(t *testing.T) { sotwState := stream.NewStreamState(false, nil) w := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()}, sotwState, w) + c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()}, &sotwState, w) mustBlock(t, w) checkVersionMapNotSet(t, c) @@ -752,16 +752,16 @@ func TestLinearMixedWatches(t *testing.T) { verifyResponse(t, w, c.getVersion(), 1) checkVersionMapNotSet(t, c) - c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()}, sotwState, w) + c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()}, &sotwState, w) mustBlock(t, w) checkVersionMapNotSet(t, c) deltaState := stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB}) - deltaState.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + deltaState.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}}) wd := make(chan DeltaResponse, 1) // Initial update - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, deltaState, wd) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &deltaState, wd) mustBlockDelta(t, wd) checkDeltaWatchCount(t, c, 1) checkVersionMapSet(t, c) diff --git a/pkg/cache/v3/mux.go b/pkg/cache/v3/mux.go index db5a65d0a7..5b5db4b489 100644 --- a/pkg/cache/v3/mux.go +++ b/pkg/cache/v3/mux.go @@ -17,8 +17,6 @@ package cache import ( "context" "errors" - - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) // MuxCache multiplexes across several caches using a classification function. @@ -37,7 +35,7 @@ type MuxCache struct { var _ Cache = &MuxCache{} -func (mux *MuxCache) CreateWatch(request *Request, state stream.StreamState, value chan Response) func() { +func (mux *MuxCache) CreateWatch(request *Request, state ClientState, value chan Response) func() { key := mux.Classify(request) cache, exists := mux.Caches[key] if !exists { @@ -47,7 +45,7 @@ func (mux *MuxCache) CreateWatch(request *Request, state stream.StreamState, val return cache.CreateWatch(request, state, value) } -func (mux *MuxCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) func() { +func (mux *MuxCache) CreateDeltaWatch(request *DeltaRequest, state ClientState, value chan DeltaResponse) func() { key := mux.ClassifyDelta(request) cache, exists := mux.Caches[key] if !exists { diff --git a/pkg/cache/v3/simple.go b/pkg/cache/v3/simple.go index 5fe36c5b9e..bb2c5a313a 100644 --- a/pkg/cache/v3/simple.go +++ b/pkg/cache/v3/simple.go @@ -23,7 +23,6 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/log" - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) // ResourceSnapshot is an abstract snapshot of a collection of resources that @@ -265,7 +264,7 @@ func (cache *snapshotCache) SetSnapshot(ctx context.Context, node string, snapsh snapshot, watch.Request, watch.Response, - watch.StreamState, + watch.clientState, ) if err != nil { return err @@ -322,7 +321,7 @@ func superset(names map[string]bool, resources map[string]types.ResourceWithTTL) } // CreateWatch returns a watch for an xDS request. -func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.StreamState, value chan Response) func() { +func (cache *snapshotCache) CreateWatch(request *Request, clientState ClientState, value chan Response) func() { nodeID := cache.hash.ID(request.Node) cache.mu.Lock() @@ -347,7 +346,7 @@ func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.Str } if exists { - knownResourceNames := streamState.GetKnownResourceNames(request.TypeUrl) + knownResourceNames := clientState.GetKnownResources() diff := []string{} for _, r := range request.ResourceNames { if _, ok := knownResourceNames[r]; !ok { @@ -461,7 +460,7 @@ func createResponse(ctx context.Context, request *Request, resources map[string] } // CreateDeltaWatch returns a watch for a delta xDS request which implements the Simple SnapshotCache. -func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) func() { +func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, clientState ClientState, value chan DeltaResponse) func() { nodeID := cache.hash.ID(request.Node) t := request.GetTypeUrl() @@ -490,7 +489,7 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream if err != nil { cache.log.Errorf("failed to compute version for snapshot resources inline: %s", err) } - response, err := cache.respondDelta(context.Background(), snapshot, request, value, state) + response, err := cache.respondDelta(context.Background(), snapshot, request, value, clientState) if err != nil { cache.log.Errorf("failed to respond with delta response: %s", err) } @@ -502,12 +501,12 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream watchID := cache.nextDeltaWatchID() if exists { - cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q, version %q", watchID, t, state.GetSubscribedResourceNames(), nodeID, snapshot.GetVersion(t)) + cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q, version %q", watchID, t, clientState.GetSubscribedResources(), nodeID, snapshot.GetVersion(t)) } else { - cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q", watchID, t, state.GetSubscribedResourceNames(), nodeID) + cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q", watchID, t, clientState.GetSubscribedResources(), nodeID) } - info.setDeltaResponseWatch(watchID, DeltaResponseWatch{Request: request, Response: value, StreamState: state}) + info.setDeltaResponseWatch(watchID, DeltaResponseWatch{Request: request, Response: value, clientState: clientState}) return cache.cancelDeltaWatch(nodeID, watchID) } @@ -515,20 +514,20 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream } // Respond to a delta watch with the provided snapshot value. If the response is nil, there has been no state change. -func (cache *snapshotCache) respondDelta(ctx context.Context, snapshot ResourceSnapshot, request *DeltaRequest, value chan DeltaResponse, state stream.StreamState) (*RawDeltaResponse, error) { - resp := createDeltaResponse(ctx, request, state, resourceContainer{ +func (cache *snapshotCache) respondDelta(ctx context.Context, snapshot ResourceSnapshot, request *DeltaRequest, value chan DeltaResponse, clientState ClientState) (*RawDeltaResponse, error) { + resp := createDeltaResponse(ctx, request, clientState, resourceContainer{ resourceMap: snapshot.GetResources(request.TypeUrl), versionMap: snapshot.GetVersionMap(request.TypeUrl), systemVersion: snapshot.GetVersion(request.TypeUrl), }) // Only send a response if there were changes - // We want to respond immediately for the first wildcard request in a stream, even if the response is empty + // We want to respond immediately for the first request in a stream if it is wildcard, even if the response is empty // otherwise, envoy won't complete initialization - if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 || (state.IsWildcard() && state.IsFirst()) { + if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 || (clientState.IsWildcard() && request.ResponseNonce == "") { if cache.log != nil { cache.log.Debugf("node: %s, sending delta response with resources: %v removed resources %v wildcard: %t", - request.GetNode().GetId(), resp.Resources, resp.RemovedResources, state.IsWildcard()) + request.GetNode().GetId(), resp.Resources, resp.RemovedResources, clientState.IsWildcard()) } select { case value <- resp: diff --git a/pkg/cache/v3/simple_test.go b/pkg/cache/v3/simple_test.go index d5e1f92711..1f5a626239 100644 --- a/pkg/cache/v3/simple_test.go +++ b/pkg/cache/v3/simple_test.go @@ -125,7 +125,7 @@ func TestSnapshotCacheWithTTL(t *testing.T) { t.Run(typ, func(t *testing.T) { defer wg.Done() value := make(chan cache.Response, 1) - c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, streamState, value) + c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, &streamState, value) select { case out := <-value: if gotVersion, _ := out.GetVersion(); gotVersion != fixture.version { @@ -135,7 +135,9 @@ func TestSnapshotCacheWithTTL(t *testing.T) { t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshotWithTTL.GetResourcesAndTTL(typ)) } // Update streamState - streamState.SetKnownResourceNamesAsList(typ, out.GetRequest().GetResourceNames()) + for _, resource := range out.GetRequest().GetResourceNames() { + streamState.GetKnownResources()[resource] = fixture.version + } case <-time.After(2 * time.Second): t.Errorf("failed to receive snapshot response") } @@ -155,7 +157,7 @@ func TestSnapshotCacheWithTTL(t *testing.T) { for { value := make(chan cache.Response, 1) cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ], VersionInfo: fixture.version}, - streamState, value) + &streamState, value) select { case out := <-value: @@ -172,7 +174,9 @@ func TestSnapshotCacheWithTTL(t *testing.T) { updatesByType[typ]++ - streamState.SetKnownResourceNamesAsList(typ, out.GetRequest().ResourceNames) + for _, resource := range out.GetRequest().GetResourceNames() { + streamState.GetKnownResources()[resource] = fixture.version + } case <-end: cancel() return @@ -216,7 +220,7 @@ func TestSnapshotCache(t *testing.T) { value := make(chan cache.Response, 1) streamState := stream.NewStreamState(false, map[string]string{}) c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: []string{"none"}}, - streamState, value) + &streamState, value) select { case out := <-value: t.Errorf("watch for endpoints and mismatched names => got %v, want none", out) @@ -228,7 +232,7 @@ func TestSnapshotCache(t *testing.T) { value := make(chan cache.Response, 1) streamState := stream.NewStreamState(false, map[string]string{}) c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, - streamState, value) + &streamState, value) select { case out := <-value: snapshot := fixture.snapshot() @@ -282,7 +286,7 @@ func TestSnapshotCacheWatch(t *testing.T) { streamState := stream.NewStreamState(false, map[string]string{}) for _, typ := range testTypes { watches[typ] = make(chan cache.Response, 1) - c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, streamState, watches[typ]) + c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, &streamState, watches[typ]) } if err := c.SetSnapshot(context.Background(), key, fixture.snapshot()); err != nil { t.Fatal(err) @@ -298,7 +302,9 @@ func TestSnapshotCacheWatch(t *testing.T) { if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshot.GetResourcesAndTTL(typ)) { t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshot.GetResourcesAndTTL(typ)) } - streamState.SetKnownResourceNamesAsList(typ, out.GetRequest().GetResourceNames()) + for _, resource := range out.GetRequest().GetResourceNames() { + streamState.GetKnownResources()[resource] = fixture.version + } case <-time.After(time.Second): t.Fatal("failed to receive snapshot response") } @@ -309,7 +315,7 @@ func TestSnapshotCacheWatch(t *testing.T) { for _, typ := range testTypes { watches[typ] = make(chan cache.Response, 1) c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ], VersionInfo: fixture.version}, - streamState, watches[typ]) + &streamState, watches[typ]) } if count := c.GetStatusInfo(key).GetNumWatches(); count != len(testTypes) { t.Errorf("watches should be created for the latest version: %d", count) @@ -357,7 +363,7 @@ func TestConcurrentSetWatch(t *testing.T) { cancel := c.CreateWatch(&discovery.DiscoveryRequest{ Node: &core.Node{Id: id}, TypeUrl: rsrc.EndpointType, - }, streamState, value) + }, &streamState, value) defer cancel() } @@ -370,7 +376,7 @@ func TestSnapshotCacheWatchCancel(t *testing.T) { streamState := stream.NewStreamState(false, map[string]string{}) for _, typ := range testTypes { value := make(chan cache.Response, 1) - cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, streamState, value) + cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, &streamState, value) cancel() } // should be status info for the node @@ -396,7 +402,7 @@ func TestSnapshotCacheWatchTimeout(t *testing.T) { watchCh := make(chan cache.Response) streamState := stream.NewStreamState(false, map[string]string{}) c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: names[rsrc.EndpointType]}, - streamState, watchCh) + &streamState, watchCh) // The first time we set the snapshot without consuming from the blocking channel, so this should time out. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) @@ -450,8 +456,9 @@ func TestSnapshotCreateWatchWithResourcePreviouslyNotRequested(t *testing.T) { // Request resource with name=ClusterName go func() { + state := stream.NewStreamState(false, map[string]string{}) c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: []string{clusterName}}, - stream.NewStreamState(false, map[string]string{}), watch) + &state, watch) }() select { @@ -470,9 +477,9 @@ func TestSnapshotCreateWatchWithResourcePreviouslyNotRequested(t *testing.T) { // Request additional resource with name=clusterName2 for same version go func() { state := stream.NewStreamState(false, map[string]string{}) - state.SetKnownResourceNames(rsrc.EndpointType, map[string]struct{}{clusterName: {}}) + state.SetResourceVersions(map[string]string{clusterName: fixture.version}) c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, VersionInfo: fixture.version, - ResourceNames: []string{clusterName, clusterName2}}, state, watch) + ResourceNames: []string{clusterName, clusterName2}}, &state, watch) }() select { @@ -489,9 +496,9 @@ func TestSnapshotCreateWatchWithResourcePreviouslyNotRequested(t *testing.T) { // Repeat request for with same version and make sure a watch is created state := stream.NewStreamState(false, map[string]string{}) - state.SetKnownResourceNames(rsrc.EndpointType, map[string]struct{}{clusterName: {}, clusterName2: {}}) + state.SetResourceVersions(map[string]string{clusterName: fixture.version, clusterName2: fixture.version}) if cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, VersionInfo: fixture.version, - ResourceNames: []string{clusterName, clusterName2}}, state, watch); cancel == nil { + ResourceNames: []string{clusterName, clusterName2}}, &state, watch); cancel == nil { t.Fatal("Should create a watch") } else { cancel() @@ -617,7 +624,7 @@ func TestAvertPanicForWatchOnNonExistentSnapshot(t *testing.T) { } ss := stream.NewStreamState(false, map[string]string{"cluster": "abcdef"}) responder := make(chan cache.Response) - c.CreateWatch(req, ss, responder) + c.CreateWatch(req, &ss, responder) go func() { // Wait for at least one heartbeat to occur, then set snapshot. diff --git a/pkg/cache/v3/status.go b/pkg/cache/v3/status.go index 84db1f9821..43c6d109db 100644 --- a/pkg/cache/v3/status.go +++ b/pkg/cache/v3/status.go @@ -19,7 +19,6 @@ import ( "time" core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) // NodeHash computes string identifiers for Envoy nodes. @@ -99,7 +98,23 @@ type DeltaResponseWatch struct { Response chan DeltaResponse // VersionMap for the stream - StreamState stream.StreamState + clientState ClientState +} + +// WatchesResources returns whether at least one of the resource provided is currently watch by the stream +// It is currently only applicable to delta-xds +// If the request is wildcard, it will always return true +// Otherwise it will compare the provided resources to the list of resources currently subscribed +func (w *DeltaResponseWatch) WatchesResources(resourceNames map[string]struct{}) bool { + if w.clientState.IsWildcard() { + return true + } + for resourceName := range resourceNames { + if _, ok := w.clientState.GetSubscribedResources()[resourceName]; ok { + return true + } + } + return false } // newStatusInfo initializes a status info data structure. diff --git a/pkg/server/delta/v3/server.go b/pkg/server/delta/v3/server.go index 5f10266aba..e556e54a74 100644 --- a/pkg/server/delta/v3/server.go +++ b/pkg/server/delta/v3/server.go @@ -174,7 +174,7 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De s.unsubscribe(req.GetResourceNamesUnsubscribe(), &watch.state) watch.responses = make(chan cache.DeltaResponse, 1) - watch.cancel = s.cache.CreateDeltaWatch(req, watch.state, watch.responses) + watch.cancel = s.cache.CreateDeltaWatch(req, &watch.state, watch.responses) watches.deltaWatches[typeURL] = watch go func() { @@ -216,7 +216,7 @@ func (s *server) DeltaStreamHandler(str stream.DeltaStream, typeURL string) erro // When we subscribe, we just want to make the cache know we are subscribing to a resource. // Even if the stream is wildcard, we keep the list of explicitly subscribed resources as the wildcard subscription can be discarded later on. func (s *server) subscribe(resources []string, streamState *stream.StreamState) { - sv := streamState.GetSubscribedResourceNames() + sv := streamState.GetSubscribedResources() for _, resource := range resources { if resource == "*" { streamState.SetWildcard(true) @@ -229,7 +229,7 @@ func (s *server) subscribe(resources []string, streamState *stream.StreamState) // Unsubscriptions remove resources from the stream's subscribed resource list. // If a client explicitly unsubscribes from a wildcard request, the stream is updated and now watches only subscribed resources. func (s *server) unsubscribe(resources []string, streamState *stream.StreamState) { - sv := streamState.GetSubscribedResourceNames() + sv := streamState.GetSubscribedResources() for _, resource := range resources { if resource == "*" { streamState.SetWildcard(false) @@ -244,7 +244,7 @@ func (s *server) unsubscribe(resources []string, streamState *stream.StreamState // To achieve that, we mark the resource as having been returned with an empty version. While creating the response, the cache will either: // * detect the version change, and return the resource (as an update) // * detect the resource deletion, and set it as removed in the response - streamState.GetResourceVersions()[resource] = "" + streamState.GetKnownResources()[resource] = "" } delete(sv, resource) } diff --git a/pkg/server/sotw/v3/server.go b/pkg/server/sotw/v3/server.go index 91681237c9..fcde7d89b2 100644 --- a/pkg/server/sotw/v3/server.go +++ b/pkg/server/sotw/v3/server.go @@ -69,7 +69,7 @@ type server struct { // regardless current snapshot version (even if it is not changed yet) type lastDiscoveryResponse struct { nonce string - resources map[string]struct{} + resources map[string]string } // process handles a bi-di stream request @@ -81,7 +81,7 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq // ignores stale nonces. nonce is only modified within send() function. var streamNonce int64 - streamState := stream.NewStreamState(false, map[string]string{}) + streamStates := map[string]stream.StreamState{} lastDiscoveryResponses := map[string]lastDiscoveryResponse{} // a collection of stack allocated watches per request type @@ -112,12 +112,17 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq streamNonce = streamNonce + 1 out.Nonce = strconv.FormatInt(streamNonce, 10) + version, err := resp.GetVersion() + if err != nil { + return "", err + } + lastResponse := lastDiscoveryResponse{ nonce: out.Nonce, - resources: make(map[string]struct{}), + resources: make(map[string]string), } for _, r := range resp.GetRequest().ResourceNames { - lastResponse.resources[r] = struct{}{} + lastResponse.resources[r] = version } lastDiscoveryResponses[resp.GetRequest().TypeUrl] = lastResponse @@ -183,14 +188,15 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq } } + typeURL := req.GetTypeUrl() + state := streamStates[typeURL] if lastResponse, ok := lastDiscoveryResponses[req.TypeUrl]; ok { if lastResponse.nonce == "" || lastResponse.nonce == nonce { // Let's record Resource names that a client has received. - streamState.SetKnownResourceNames(req.TypeUrl, lastResponse.resources) + state.SetResourceVersions(lastResponse.resources) } } - typeURL := req.GetTypeUrl() responder := make(chan cache.Response, 1) if w, ok := watches.responders[typeURL]; ok { // We've found a pre-existing watch, lets check and update if needed. @@ -199,7 +205,7 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq w.close() watches.addWatch(typeURL, &watch{ - cancel: s.cache.CreateWatch(req, streamState, responder), + cancel: s.cache.CreateWatch(req, &state, responder), response: responder, }) } @@ -207,11 +213,13 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq // No pre-existing watch exists, let's create one. // We need to precompute the watches first then open a watch in the cache. watches.addWatch(typeURL, &watch{ - cancel: s.cache.CreateWatch(req, streamState, responder), + cancel: s.cache.CreateWatch(req, &state, responder), response: responder, }) } + streamStates[typeURL] = state + // Recompute the dynamic select cases for this stream. watches.recompute(s.ctx, reqCh) default: diff --git a/pkg/server/stream/v3/stream.go b/pkg/server/stream/v3/stream.go index b5832b7d58..2147dc46e0 100644 --- a/pkg/server/stream/v3/stream.go +++ b/pkg/server/stream/v3/stream.go @@ -33,57 +33,30 @@ type StreamState struct { // nolint:golint,revive // ResourceVersions contains a hash of the resource as the value and the resource name as the key. // This field stores the last state sent to the client. resourceVersions map[string]string - - // knownResourceNames contains resource names that a client has received previously - knownResourceNames map[string]map[string]struct{} - - // indicates whether the object has been modified since its creation - first bool } // GetSubscribedResourceNames returns the list of resources currently explicitly subscribed to // If the request is set to wildcard it may be empty // Currently populated only when using delta-xds -func (s *StreamState) GetSubscribedResourceNames() map[string]struct{} { +func (s *StreamState) GetSubscribedResources() map[string]struct{} { return s.subscribedResourceNames } // SetSubscribedResourceNames is setting the list of resources currently explicitly subscribed to // It is decorrelated from the wildcard state of the stream // Currently used only when using delta-xds -func (s *StreamState) SetSubscribedResourceNames(subscribedResourceNames map[string]struct{}) { +func (s *StreamState) SetSubscribedResources(subscribedResourceNames map[string]struct{}) { s.subscribedResourceNames = subscribedResourceNames } -// WatchesResources returns whether at least one of the resource provided is currently watch by the stream -// It is currently only applicable to delta-xds -// If the request is wildcard, it will always return true -// Otherwise it will compare the provided resources to the list of resources currently subscribed -func (s *StreamState) WatchesResources(resourceNames map[string]struct{}) bool { - if s.IsWildcard() { - return true - } - for resourceName := range resourceNames { - if _, ok := s.subscribedResourceNames[resourceName]; ok { - return true - } - } - return false -} - -func (s *StreamState) GetResourceVersions() map[string]string { +func (s *StreamState) GetKnownResources() map[string]string { return s.resourceVersions } func (s *StreamState) SetResourceVersions(resourceVersions map[string]string) { - s.first = false s.resourceVersions = resourceVersions } -func (s *StreamState) IsFirst() bool { - return s.first -} - func (s *StreamState) SetWildcard(wildcard bool) { s.wildcard = wildcard } @@ -92,30 +65,12 @@ func (s *StreamState) IsWildcard() bool { return s.wildcard } -func (s *StreamState) SetKnownResourceNames(url string, names map[string]struct{}) { - s.knownResourceNames[url] = names -} - -func (s *StreamState) SetKnownResourceNamesAsList(url string, names []string) { - m := map[string]struct{}{} - for _, name := range names { - m[name] = struct{}{} - } - s.knownResourceNames[url] = m -} - -func (s *StreamState) GetKnownResourceNames(url string) map[string]struct{} { - return s.knownResourceNames[url] -} - // NewStreamState initializes a stream state. func NewStreamState(wildcard bool, initialResourceVersions map[string]string) StreamState { state := StreamState{ wildcard: wildcard, subscribedResourceNames: map[string]struct{}{}, resourceVersions: initialResourceVersions, - first: true, - knownResourceNames: map[string]map[string]struct{}{}, } if initialResourceVersions == nil { diff --git a/pkg/server/v3/delta_test.go b/pkg/server/v3/delta_test.go index 204599613c..66ddc5bd1a 100644 --- a/pkg/server/v3/delta_test.go +++ b/pkg/server/v3/delta_test.go @@ -15,12 +15,11 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" rsrc "github.com/envoyproxy/go-control-plane/pkg/resource/v3" - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" "github.com/envoyproxy/go-control-plane/pkg/server/v3" "github.com/envoyproxy/go-control-plane/pkg/test/resource/v3" ) -func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryRequest, state stream.StreamState, out chan cache.DeltaResponse) func() { +func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryRequest, state cache.ClientState, out chan cache.DeltaResponse) func() { config.deltaCounts[req.TypeUrl] = config.deltaCounts[req.TypeUrl] + 1 // This is duplicated from pkg/cache/v3/delta.go as private there @@ -37,7 +36,7 @@ func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryR // If we are handling a wildcard request, we want to respond with all resources switch { case state.IsWildcard(): - if len(state.GetResourceVersions()) == 0 { + if len(state.GetKnownResources()) == 0 { filtered = make([]types.Resource, 0, len(resourceMap)) } nextVersionMap = make(map[string]string, len(resourceMap)) @@ -46,24 +45,24 @@ func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryR // we can just set it here to be used for comparison later version := versionMap[name] nextVersionMap[name] = version - prevVersion, found := state.GetResourceVersions()[name] + prevVersion, found := state.GetKnownResources()[name] if !found || (prevVersion != version) { filtered = append(filtered, r) } } // Compute resources for removal - for name := range state.GetResourceVersions() { + for name := range state.GetKnownResources() { if _, ok := resourceMap[name]; !ok { toRemove = append(toRemove, name) } } default: - nextVersionMap = make(map[string]string, len(state.GetSubscribedResourceNames())) + nextVersionMap = make(map[string]string, len(state.GetSubscribedResources())) // state.GetResourceVersions() may include resources no longer subscribed // In the current code this gets silently cleaned when updating the version map - for name := range state.GetSubscribedResourceNames() { - prevVersion, found := state.GetResourceVersions()[name] + for name := range state.GetSubscribedResources() { + prevVersion, found := state.GetKnownResources()[name] if r, ok := resourceMap[name]; ok { nextVersion := versionMap[name] if prevVersion != nextVersion { @@ -155,36 +154,36 @@ func makeMockDeltaStream(t *testing.T) *mockDeltaStream { func makeDeltaResources() map[string]map[string]types.Resource { return map[string]map[string]types.Resource{ - rsrc.EndpointType: map[string]types.Resource{ + rsrc.EndpointType: { endpoint.GetClusterName(): endpoint, }, - rsrc.ClusterType: map[string]types.Resource{ + rsrc.ClusterType: { cluster.Name: cluster, }, - rsrc.RouteType: map[string]types.Resource{ + rsrc.RouteType: { route.Name: route, }, - rsrc.ScopedRouteType: map[string]types.Resource{ + rsrc.ScopedRouteType: { scopedRoute.Name: scopedRoute, }, - rsrc.VirtualHostType: map[string]types.Resource{ + rsrc.VirtualHostType: { virtualHost.Name: virtualHost, }, - rsrc.ListenerType: map[string]types.Resource{ + rsrc.ListenerType: { httpListener.Name: httpListener, httpScopedListener.Name: httpScopedListener, }, - rsrc.SecretType: map[string]types.Resource{ + rsrc.SecretType: { secret.Name: secret, }, - rsrc.RuntimeType: map[string]types.Resource{ + rsrc.RuntimeType: { runtime.Name: runtime, }, - rsrc.ExtensionConfigType: map[string]types.Resource{ + rsrc.ExtensionConfigType: { extensionConfig.Name: extensionConfig, }, // Pass-through type (types without explicit handling) - opaqueType: map[string]types.Resource{ + opaqueType: { "opaque": opaque, }, } @@ -460,7 +459,7 @@ func TestDeltaCallbackError(t *testing.T) { func TestDeltaWildcardSubscriptions(t *testing.T) { config := makeMockConfigWatcher() config.deltaResources = map[string]map[string]types.Resource{ - rsrc.EndpointType: map[string]types.Resource{ + rsrc.EndpointType: { "endpoints0": resource.MakeEndpoint("endpoints0", 1234), "endpoints1": resource.MakeEndpoint("endpoints1", 1234), "endpoints2": resource.MakeEndpoint("endpoints2", 1234), diff --git a/pkg/server/v3/server.go b/pkg/server/v3/server.go index ed0b0cb6be..56c683a2de 100644 --- a/pkg/server/v3/server.go +++ b/pkg/server/v3/server.go @@ -29,7 +29,6 @@ import ( core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" clusterservice "github.com/envoyproxy/go-control-plane/envoy/service/cluster/v3" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" - discoverygrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" endpointservice "github.com/envoyproxy/go-control-plane/envoy/service/endpoint/v3" extensionconfigservice "github.com/envoyproxy/go-control-plane/envoy/service/extension/v3" listenerservice "github.com/envoyproxy/go-control-plane/envoy/service/listener/v3" @@ -48,7 +47,7 @@ type Server interface { routeservice.ScopedRoutesDiscoveryServiceServer routeservice.VirtualHostDiscoveryServiceServer listenerservice.ListenerDiscoveryServiceServer - discoverygrpc.AggregatedDiscoveryServiceServer + discovery.AggregatedDiscoveryServiceServer secretservice.SecretDiscoveryServiceServer runtimeservice.RuntimeDiscoveryServiceServer extensionconfigservice.ExtensionConfigDiscoveryServiceServer @@ -184,7 +183,7 @@ func (s *server) StreamHandler(stream stream.Stream, typeURL string) error { return s.sotw.StreamHandler(stream, typeURL) } -func (s *server) StreamAggregatedResources(stream discoverygrpc.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error { +func (s *server) StreamAggregatedResources(stream discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error { return s.StreamHandler(stream, resource.AnyType) } @@ -297,7 +296,7 @@ func (s *server) DeltaStreamHandler(stream stream.DeltaStream, typeURL string) e return s.delta.DeltaStreamHandler(stream, typeURL) } -func (s *server) DeltaAggregatedResources(stream discoverygrpc.AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error { +func (s *server) DeltaAggregatedResources(stream discovery.AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error { return s.DeltaStreamHandler(stream, resource.AnyType) } diff --git a/pkg/server/v3/server_test.go b/pkg/server/v3/server_test.go index 02078bc3ad..8668e46296 100644 --- a/pkg/server/v3/server_test.go +++ b/pkg/server/v3/server_test.go @@ -32,7 +32,6 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" rsrc "github.com/envoyproxy/go-control-plane/pkg/resource/v3" - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" "github.com/envoyproxy/go-control-plane/pkg/server/v3" "github.com/envoyproxy/go-control-plane/pkg/test/resource/v3" ) @@ -48,7 +47,7 @@ type mockConfigWatcher struct { mu *sync.RWMutex } -func (config *mockConfigWatcher) CreateWatch(req *discovery.DiscoveryRequest, state stream.StreamState, out chan cache.Response) func() { +func (config *mockConfigWatcher) CreateWatch(req *discovery.DiscoveryRequest, state cache.ClientState, out chan cache.Response) func() { config.counts[req.TypeUrl] = config.counts[req.TypeUrl] + 1 if len(config.responses[req.TypeUrl]) > 0 { out <- config.responses[req.TypeUrl][0] From af7a06d28cde21320c6b25dd118d91f28009dd75 Mon Sep 17 00:00:00 2001 From: Valerian Roche Date: Wed, 24 Aug 2022 23:09:50 -0400 Subject: [PATCH 2/6] Fix unittest update wrongly inverted an assert Signed-off-by: Valerian Roche --- pkg/cache/v3/delta_test.go | 23 ++++++++++++++--------- pkg/cache/v3/simple_test.go | 20 ++++++++++++++++---- 2 files changed, 30 insertions(+), 13 deletions(-) diff --git a/pkg/cache/v3/delta_test.go b/pkg/cache/v3/delta_test.go index e171abad69..c46cb17be7 100644 --- a/pkg/cache/v3/delta_test.go +++ b/pkg/cache/v3/delta_test.go @@ -113,6 +113,7 @@ func TestDeltaRemoveResources(t *testing.T) { watches := make(map[string]chan cache.DeltaResponse) streams := make(map[string]*stream.StreamState) + // At this stage the cache is empty, so a watch is opened for _, typ := range testTypes { watches[typ] = make(chan cache.DeltaResponse, 1) state := stream.NewStreamState(true, make(map[string]string)) @@ -127,13 +128,17 @@ func TestDeltaRemoveResources(t *testing.T) { }, streams[typ], watches[typ]) } - require.NoError(t, c.SetSnapshot(context.Background(), key, fixture.snapshot())) + snapshot := fixture.snapshot() + snapshot.Resources[types.Endpoint] = cache.NewResources(fixture.version, []types.Resource{ + testEndpoint, + resource.MakeEndpoint("otherCluster", 8080), + }) + require.NoError(t, c.SetSnapshot(context.Background(), key, snapshot)) for _, typ := range testTypes { t.Run(typ, func(t *testing.T) { select { case out := <-watches[typ]: - snapshot := fixture.snapshot() assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResources(typ)) nextVersionMap := out.GetNextVersionMap() streams[typ].SetResourceVersions(nextVersionMap) @@ -158,21 +163,21 @@ func TestDeltaRemoveResources(t *testing.T) { assert.Equal(t, len(testTypes), c.GetStatusInfo(key).GetNumDeltaWatches(), "watches should be created for the latest version") - // set a partially versioned snapshot with no endpoints + // set a partially versioned snapshot with only one endpoint snapshot2 := fixture.snapshot() - snapshot2.Resources[types.Endpoint] = cache.NewResources(fixture.version2, []types.Resource{}) + snapshot2.Resources[types.Endpoint] = cache.NewResources(fixture.version2, []types.Resource{ + testEndpoint, // this cluster is not changed, we do not expect it back in "resources" + }) require.NoError(t, c.SetSnapshot(context.Background(), key, snapshot2)) // validate response for endpoints select { case out := <-watches[testTypes[0]]: - snapshot2 := fixture.snapshot() - snapshot2.Resources[types.Endpoint] = cache.NewResources(fixture.version2, []types.Resource{}) - assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot2.GetResources(rsrc.EndpointType)) + assert.Empty(t, out.(*cache.RawDeltaResponse).Resources) + assert.Equal(t, []string{"otherCluster"}, out.(*cache.RawDeltaResponse).RemovedResources) nextVersionMap := out.GetNextVersionMap() - // make sure the version maps are different since we no longer are tracking any endpoint resources - require.Equal(t, nextVersionMap, streams[testTypes[0]].GetKnownResources(), "versionMap for the endpoint resource type did not change") + assert.NotEqual(t, nextVersionMap, streams[testTypes[0]].GetKnownResources(), "versionMap for the endpoint resource type did not change") case <-time.After(time.Second): assert.Fail(t, "failed to receive snapshot response") } diff --git a/pkg/cache/v3/simple_test.go b/pkg/cache/v3/simple_test.go index 1f5a626239..e7f44d1895 100644 --- a/pkg/cache/v3/simple_test.go +++ b/pkg/cache/v3/simple_test.go @@ -91,10 +91,22 @@ type logger struct { t *testing.T } -func (log logger) Debugf(format string, args ...interface{}) { log.t.Logf(format, args...) } -func (log logger) Infof(format string, args ...interface{}) { log.t.Logf(format, args...) } -func (log logger) Warnf(format string, args ...interface{}) { log.t.Logf(format, args...) } -func (log logger) Errorf(format string, args ...interface{}) { log.t.Logf(format, args...) } +func (log logger) Debugf(format string, args ...interface{}) { + log.t.Helper() + log.t.Logf(format, args...) +} +func (log logger) Infof(format string, args ...interface{}) { + log.t.Helper() + log.t.Logf(format, args...) +} +func (log logger) Warnf(format string, args ...interface{}) { + log.t.Helper() + log.t.Logf(format, args...) +} +func (log logger) Errorf(format string, args ...interface{}) { + log.t.Helper() + log.t.Logf(format, args...) +} func TestSnapshotCacheWithTTL(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) From 4e00420ad55245446655ec66b739039ab7c06aa5 Mon Sep 17 00:00:00 2001 From: Valerian Roche Date: Wed, 24 Aug 2022 23:55:33 -0400 Subject: [PATCH 3/6] [#583][Sotw] Ensure resources are properly sent again if envoy unsubscribes then subscribes again to a resource Fix potential deadlock in sotw-ads related to improper cleanup of watches in Linear cache when using delta in non-wildcard Fix improper request set on sotw responses in Linear cache Replaced lastResponse in sotw server by staged resources pending ACK Signed-off-by: Valerian Roche --- pkg/cache/v3/cache.go | 20 ++++ pkg/cache/v3/linear.go | 85 +++++++++---- pkg/cache/v3/linear_test.go | 162 +++++++++++++++++++++---- pkg/cache/v3/simple.go | 16 ++- pkg/server/sotw/v3/server.go | 74 +++++++----- pkg/server/stream/v3/stream.go | 29 ++++- pkg/server/v3/server_test.go | 210 ++++++++++++++++++++++++++++----- 7 files changed, 480 insertions(+), 116 deletions(-) diff --git a/pkg/cache/v3/cache.go b/pkg/cache/v3/cache.go index be124cb484..5f382ecb1e 100644 --- a/pkg/cache/v3/cache.go +++ b/pkg/cache/v3/cache.go @@ -105,6 +105,9 @@ type Response interface { // Get the version in the Response. GetVersion() (string, error) + // Get the list of resources part of the response without having to cast resources + GetResourceNames() []string + // Get the context provided during response creation. GetContext() context.Context } @@ -142,6 +145,9 @@ type RawResponse struct { // Resources to be included in the response. Resources []types.ResourceWithTTL + // Names of the resources included in the response + ResourceNames []string + // Whether this is a heartbeat response. For xDS versions that support TTL, this // will be converted into a response that doesn't contain the actual resource protobuf. // This allows for more lightweight updates that server only to update the TTL timer. @@ -191,6 +197,9 @@ type PassthroughResponse struct { // The discovery response that needs to be sent as is, without any marshaling transformations. DiscoveryResponse *discovery.DiscoveryResponse + // Names of the resources set in the response + ResourceNames []string + ctx context.Context } @@ -288,6 +297,12 @@ func (r *RawDeltaResponse) GetDeltaDiscoveryResponse() (*discovery.DeltaDiscover return marshaledResponse.(*discovery.DeltaDiscoveryResponse), nil } +// GetResourceNames returns the list of resources returned within the response +// without having to decode the resources +func (r *RawResponse) GetResourceNames() []string { + return r.ResourceNames +} + // GetRequest returns the original Discovery Request. func (r *RawResponse) GetRequest() *discovery.DiscoveryRequest { return r.Request @@ -350,6 +365,11 @@ func (r *PassthroughResponse) GetDiscoveryResponse() (*discovery.DiscoveryRespon return r.DiscoveryResponse, nil } +// GetResourceNames returns the list of resources included within the response +func (r *PassthroughResponse) GetResourceNames() []string { + return r.ResourceNames +} + // GetDeltaDiscoveryResponse returns the final passthrough Delta Discovery Response. func (r *DeltaPassthroughResponse) GetDeltaDiscoveryResponse() (*discovery.DeltaDiscoveryResponse, error) { return r.DeltaDiscoveryResponse, nil diff --git a/pkg/cache/v3/linear.go b/pkg/cache/v3/linear.go index 8b4b4b1652..2a334e5472 100644 --- a/pkg/cache/v3/linear.go +++ b/pkg/cache/v3/linear.go @@ -26,7 +26,7 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/log" ) -type watches = map[chan Response]struct{} +type watches = map[ResponseWatch]struct{} // LinearCache supports collections of opaque resources. This cache has a // single collection indexed by resource names and manages resource versions @@ -112,45 +112,57 @@ func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache { return out } -func (cache *LinearCache) respond(value chan Response, staleResources []string) { +func (cache *LinearCache) respond(req *Request, value chan Response, staleResources []string) { var resources []types.ResourceWithTTL + var resourceNames []string // TODO: optimize the resources slice creations across different clients if len(staleResources) == 0 { resources = make([]types.ResourceWithTTL, 0, len(cache.resources)) - for _, resource := range cache.resources { + resourceNames = make([]string, 0, len(cache.resources)) + for name, resource := range cache.resources { resources = append(resources, types.ResourceWithTTL{Resource: resource}) + resourceNames = append(resourceNames, name) } } else { resources = make([]types.ResourceWithTTL, 0, len(staleResources)) + resourceNames = make([]string, 0, len(staleResources)) for _, name := range staleResources { resource := cache.resources[name] if resource != nil { resources = append(resources, types.ResourceWithTTL{Resource: resource}) + resourceNames = append(resourceNames, name) } } } value <- &RawResponse{ - Request: &Request{TypeUrl: cache.typeURL}, - Resources: resources, - Version: cache.getVersion(), - Ctx: context.Background(), + Request: &Request{TypeUrl: cache.typeURL}, + Resources: resources, + ResourceNames: resourceNames, + Version: cache.getVersion(), + Ctx: context.Background(), } } func (cache *LinearCache) notifyAll(modified map[string]struct{}) { // de-duplicate watches that need to be responded - notifyList := make(map[chan Response][]string) + notifyList := make(map[ResponseWatch][]string) for name := range modified { for watch := range cache.watches[name] { notifyList[watch] = append(notifyList[watch], name) + + // Make sure we clean the watch for ALL resources it might be associated with, + // as the channel will no longer be listened to + for _, resource := range watch.Request.ResourceNames { + delete(cache.watches[resource], watch) + } } delete(cache.watches, name) } - for value, stale := range notifyList { - cache.respond(value, stale) + for watch, stale := range notifyList { + cache.respond(watch.Request, watch.Response, stale) } - for value := range cache.watchAll { - cache.respond(value, nil) + for watch := range cache.watchAll { + cache.respond(watch.Request, watch.Response, nil) } cache.watchAll = make(watches) @@ -306,7 +318,7 @@ func (cache *LinearCache) CreateWatch(request *Request, clientState ClientState, // been updated between the last version and the current version. This avoids the problem // of sending empty updates whenever an irrelevant resource changes. stale := false - staleResources := []string{} // empty means all + var staleResources []string // empty means all // strip version prefix if it is present var lastVersion uint64 @@ -323,37 +335,66 @@ func (cache *LinearCache) CreateWatch(request *Request, clientState ClientState, if err != nil { stale = true staleResources = request.ResourceNames - } else if len(request.ResourceNames) == 0 { + if cache.log != nil { + cache.log.Debugf("Watch is stale as version failed to parse %s", err.Error()) + } + } else if clientState.IsWildcard() { stale = lastVersion != cache.version + if cache.log != nil { + cache.log.Debugf("Watch is stale as version differs for wildcard watch") + } } else { + // Non wildcard case, we only reply resources that have effectively changed since the version set in the request + // This is used for instance in EDS for _, name := range request.ResourceNames { - // When a resource is removed, its version defaults 0 and it is not considered stale. - if lastVersion < cache.versionVector[name] { + // The resource does not exist currently, we won't reply for it + if resourceVersion, ok := cache.versionVector[name]; !ok { + continue + } else if lastVersion < resourceVersion { + // The version of the request is older than the last change for the resource, return it + stale = true + staleResources = append(staleResources, name) + } else if _, ok := clientState.GetKnownResources()[name]; !ok { + // Resource is not currently known by the client (e.g. a resource is added in the resourceNames) stale = true staleResources = append(staleResources, name) } } + if cache.log != nil && stale { + cache.log.Debugf("Watch is stale with stale resources %v", staleResources) + } } if stale { - cache.respond(value, staleResources) + cache.respond(request, value, staleResources) return nil } // Create open watches since versions are up to date. - if len(request.ResourceNames) == 0 { - cache.watchAll[value] = struct{}{} + watch := ResponseWatch{request, value} + if clientState.IsWildcard() { + if cache.log != nil { + cache.log.Infof("[linear cache] open watch for %s all resources, system version %q", + cache.typeURL, cache.getVersion()) + } + cache.watchAll[watch] = struct{}{} return func() { cache.mu.Lock() defer cache.mu.Unlock() - delete(cache.watchAll, value) + delete(cache.watchAll, watch) } } + + // Non-wildcard case + if cache.log != nil { + cache.log.Infof("[linear cache] open watch for %s resources %v, system version %q", + cache.typeURL, request.ResourceNames, cache.getVersion()) + } for _, name := range request.ResourceNames { set, exists := cache.watches[name] if !exists { set = make(watches) cache.watches[name] = set } - set[value] = struct{}{} + set[watch] = struct{}{} } return func() { cache.mu.Lock() @@ -361,7 +402,7 @@ func (cache *LinearCache) CreateWatch(request *Request, clientState ClientState, for _, name := range request.ResourceNames { set, exists := cache.watches[name] if exists { - delete(set, value) + delete(set, watch) } if len(set) == 0 { delete(cache.watches, name) diff --git a/pkg/cache/v3/linear_test.go b/pkg/cache/v3/linear_test.go index 8f8db7fe04..0653054bd9 100644 --- a/pkg/cache/v3/linear_test.go +++ b/pkg/cache/v3/linear_test.go @@ -34,6 +34,39 @@ const ( testType = "google.protobuf.StringValue" ) +type testLogger struct { + t *testing.T +} + +func newTestLogger(t *testing.T) testLogger { + return testLogger{t} +} + +func (l testLogger) log(level string, format string, args ...interface{}) { + l.t.Helper() + l.t.Logf("["+level+"] "+format, args...) +} + +func (l testLogger) Debugf(format string, args ...interface{}) { + l.t.Helper() + l.log("INFO", format, args...) +} + +func (l testLogger) Infof(format string, args ...interface{}) { + l.t.Helper() + l.log("INFO", format, args...) +} + +func (l testLogger) Warnf(format string, args ...interface{}) { + l.t.Helper() + l.log("INFO", format, args...) +} + +func (l testLogger) Errorf(format string, args ...interface{}) { + l.t.Helper() + l.log("INFO", format, args...) +} + func testResource(s string) types.Resource { return wrapperspb.String(s) } @@ -162,6 +195,7 @@ func checkVersionMapSet(t *testing.T, c *LinearCache) { } func mustBlock(t *testing.T, w <-chan Response) { + t.Helper() select { case <-w: t.Error("watch must block") @@ -170,6 +204,7 @@ func mustBlock(t *testing.T, w <-chan Response) { } func mustBlockDelta(t *testing.T, w <-chan DeltaResponse) { + t.Helper() select { case <-w: t.Error("watch must block") @@ -178,6 +213,7 @@ func mustBlockDelta(t *testing.T, w <-chan DeltaResponse) { } func hashResource(t *testing.T, resource types.Resource) string { + t.Helper() marshaledResource, err := MarshalResource(resource) if err != nil { t.Fatal(err) @@ -229,18 +265,29 @@ func TestLinearCornerCases(t *testing.T) { } func TestLinearBasic(t *testing.T) { - streamState := stream.NewStreamState(false, map[string]string{}) c := NewLinearCache(testType) + c.log = newTestLogger(t) // Create watches before a resource is ready + stream1 := stream.NewStreamState(false, map[string]string{}) w1 := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, &streamState, w1) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: ""}, &stream1, w1) + verifyResponse(t, w1, "0", 0) + stream1.GetKnownResources()["a"] = "0" + + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, &stream1, w1) mustBlock(t, w1) checkVersionMapNotSet(t, c) + stream := stream.NewStreamState(true, map[string]string{}) w := make(chan Response, 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, &streamState, w) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: ""}, &stream, w) + verifyResponse(t, w, "0", 0) + + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, &stream, w) mustBlock(t, w) + checkVersionMapNotSet(t, c) + checkWatchCount(t, c, "a", 2) checkWatchCount(t, c, "b", 1) require.NoError(t, c.UpdateResource("a", testResource("a"))) @@ -248,36 +295,49 @@ func TestLinearBasic(t *testing.T) { checkWatchCount(t, c, "b", 0) verifyResponse(t, w1, "1", 1) verifyResponse(t, w, "1", 1) + stream.GetKnownResources()["a"] = "1" // Request again, should get same response - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, &streamState, w) + stream.SetWildcard(false) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, &stream, w) checkWatchCount(t, c, "a", 0) verifyResponse(t, w, "1", 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, &streamState, w) + + // Version is old, so should return it + stream.SetWildcard(true) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, &stream, w) checkWatchCount(t, c, "a", 0) verifyResponse(t, w, "1", 1) // Add another element and update the first, response should be different require.NoError(t, c.UpdateResource("b", testResource("b"))) require.NoError(t, c.UpdateResource("a", testResource("aa"))) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, &streamState, w) + + stream.SetWildcard(false) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, &stream, w) verifyResponse(t, w, "3", 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, &streamState, w) + + stream.SetWildcard(true) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, &stream, w) verifyResponse(t, w, "3", 2) // Ensure the version map was not created as we only ever used stow watches checkVersionMapNotSet(t, c) } func TestLinearSetResources(t *testing.T) { - streamState := stream.NewStreamState(false, map[string]string{}) c := NewLinearCache(testType) + c.log = newTestLogger(t) + state1 := stream.NewStreamState(false, map[string]string{}) + state1.GetKnownResources()["a"] = "0" // Create new resources w1 := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, &streamState, w1) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, &state1, w1) mustBlock(t, w1) + + state2 := stream.NewStreamState(true, map[string]string{}) w2 := make(chan Response, 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, &streamState, w2) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, &state2, w2) mustBlock(t, w2) c.SetResources(map[string]types.Resource{ "a": testResource("a"), @@ -287,9 +347,9 @@ func TestLinearSetResources(t *testing.T) { verifyResponse(t, w2, "1", 2) // the version was only incremented once for all resources // Add another element and update the first, response should be different - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, &streamState, w1) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, &state1, w1) mustBlock(t, w1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, &streamState, w2) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, &state2, w2) mustBlock(t, w2) c.SetResources(map[string]types.Resource{ "a": testResource("aa"), @@ -300,9 +360,9 @@ func TestLinearSetResources(t *testing.T) { verifyResponse(t, w2, "2", 3) // Delete resource - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "2"}, &streamState, w1) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "2"}, &state1, w1) mustBlock(t, w1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "2"}, &streamState, w2) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "2"}, &state2, w2) mustBlock(t, w2) c.SetResources(map[string]types.Resource{ "b": testResource("b"), @@ -314,6 +374,7 @@ func TestLinearSetResources(t *testing.T) { func TestLinearGetResources(t *testing.T) { c := NewLinearCache(testType) + c.log = newTestLogger(t) expectedResources := map[string]types.Resource{ "a": testResource("a"), @@ -332,14 +393,16 @@ func TestLinearGetResources(t *testing.T) { func TestLinearVersionPrefix(t *testing.T) { streamState := stream.NewStreamState(false, map[string]string{}) c := NewLinearCache(testType, WithVersionPrefix("instance1-")) + c.log = newTestLogger(t) w := make(chan Response, 1) c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, &streamState, w) verifyResponse(t, w, "instance1-0", 0) require.NoError(t, c.UpdateResource("a", testResource("a"))) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, &streamState, w) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "instance1-0"}, &streamState, w) verifyResponse(t, w, "instance1-1", 1) + streamState.GetKnownResources()["a"] = "instance1-1" c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "instance1-1"}, &streamState, w) mustBlock(t, w) @@ -347,42 +410,73 @@ func TestLinearVersionPrefix(t *testing.T) { } func TestLinearDeletion(t *testing.T) { - streamState := stream.NewStreamState(false, map[string]string{}) c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})) + c.log = newTestLogger(t) + + streamState := stream.NewStreamState(false, map[string]string{}) w := make(chan Response, 1) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: ""}, &streamState, w) + verifyResponse(t, w, "0", 1) + streamState.GetKnownResources()["a"] = "0" + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, &streamState, w) mustBlock(t, w) checkWatchCount(t, c, "a", 1) require.NoError(t, c.DeleteResource("a")) verifyResponse(t, w, "1", 0) checkWatchCount(t, c, "a", 0) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, &streamState, w) - verifyResponse(t, w, "1", 1) - checkWatchCount(t, c, "b", 0) - require.NoError(t, c.DeleteResource("b")) + streamState.SetWildcard(true) c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, &streamState, w) + mustBlock(t, w) + checkWatchCount(t, c, "b", 1) + + require.NoError(t, c.DeleteResource("b")) verifyResponse(t, w, "2", 0) checkWatchCount(t, c, "b", 0) } func TestLinearWatchTwo(t *testing.T) { - streamState := stream.NewStreamState(false, map[string]string{}) c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})) + c.log = newTestLogger(t) + + // Default case, stream starts with no version + state := stream.NewStreamState(false, map[string]string{}) w := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"}, &streamState, w) + c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: ""}, &state, w) + verifyResponse(t, w, "0", 2) + state.GetKnownResources()["a"] = "0" + state.GetKnownResources()["b"] = "0" + + c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"}, &state, w) mustBlock(t, w) + + // Wildcard should be able to start with a version + // It will only return if the version is not up-to-date + state1 := stream.NewStreamState(true, map[string]string{}) w1 := make(chan Response, 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, &streamState, w1) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, &state1, w1) mustBlock(t, w1) + require.NoError(t, c.UpdateResource("a", testResource("aa"))) // should only get the modified resource verifyResponse(t, w, "1", 1) + // Receive an update for all resources verifyResponse(t, w1, "1", 2) } -func TestLinearCancel(t *testing.T) { +func TestLinearResourceSubscription(t *testing.T) { streamState := stream.NewStreamState(false, map[string]string{}) + c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})) + w := make(chan Response, 1) + + c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "1"}, &streamState, w) + verifyResponse(t, w, "0", 2) +} + +func TestLinearCancel(t *testing.T) { + streamState := stream.NewStreamState(true, map[string]string{}) c := NewLinearCache(testType) + c.log = newTestLogger(t) require.NoError(t, c.UpdateResource("a", testResource("a"))) // cancel watch-all @@ -392,8 +486,10 @@ func TestLinearCancel(t *testing.T) { checkWatchCount(t, c, "a", 1) cancel() checkWatchCount(t, c, "a", 0) + streamState.GetKnownResources()["a"] = "1" // cancel watch for "a" + streamState.SetWildcard(false) cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, &streamState, w) mustBlock(t, w) checkWatchCount(t, c, "a", 1) @@ -406,6 +502,7 @@ func TestLinearCancel(t *testing.T) { w4 := make(chan Response, 1) cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, &streamState, w) cancel2 := c.CreateWatch(&Request{ResourceNames: []string{"b"}, TypeUrl: testType, VersionInfo: "1"}, &streamState, w2) + streamState.SetWildcard(true) cancel3 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, &streamState, w3) cancel4 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, &streamState, w4) mustBlock(t, w) @@ -431,6 +528,7 @@ func TestLinearCancel(t *testing.T) { func TestLinearConcurrentSetWatch(t *testing.T) { streamState := stream.NewStreamState(false, map[string]string{}) c := NewLinearCache(testType) + c.log = newTestLogger(t) n := 50 for i := 0; i < 2*n; i++ { func(i int) { @@ -460,6 +558,7 @@ func TestLinearConcurrentSetWatch(t *testing.T) { func TestLinearDeltaWildcard(t *testing.T) { c := NewLinearCache(testType) + c.log = newTestLogger(t) state1 := stream.NewStreamState(true, map[string]string{}) w1 := make(chan DeltaResponse, 1) c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state1, w1) @@ -481,6 +580,7 @@ func TestLinearDeltaWildcard(t *testing.T) { func TestLinearDeltaExistingResources(t *testing.T) { c := NewLinearCache(testType) + c.log = newTestLogger(t) a := &endpoint.ClusterLoadAssignment{ClusterName: "a"} hashA := hashResource(t, a) err := c.UpdateResource("a", a) @@ -507,6 +607,7 @@ func TestLinearDeltaExistingResources(t *testing.T) { func TestLinearDeltaInitialResourcesVersionSet(t *testing.T) { c := NewLinearCache(testType) + c.log = newTestLogger(t) a := &endpoint.ClusterLoadAssignment{ClusterName: "a"} hashA := hashResource(t, a) err := c.UpdateResource("a", a) @@ -539,6 +640,7 @@ func TestLinearDeltaInitialResourcesVersionSet(t *testing.T) { func TestLinearDeltaResourceUpdate(t *testing.T) { c := NewLinearCache(testType) + c.log = newTestLogger(t) a := &endpoint.ClusterLoadAssignment{ClusterName: "a"} hashA := hashResource(t, a) err := c.UpdateResource("a", a) @@ -577,6 +679,7 @@ func TestLinearDeltaResourceUpdate(t *testing.T) { func TestLinearDeltaResourceDelete(t *testing.T) { c := NewLinearCache(testType) + c.log = newTestLogger(t) a := &endpoint.ClusterLoadAssignment{ClusterName: "a"} hashA := hashResource(t, a) err := c.UpdateResource("a", a) @@ -610,6 +713,7 @@ func TestLinearDeltaResourceDelete(t *testing.T) { func TestLinearDeltaMultiResourceUpdates(t *testing.T) { c := NewLinearCache(testType) + c.log = newTestLogger(t) state := stream.NewStreamState(false, nil) state.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}}) @@ -727,6 +831,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { func TestLinearMixedWatches(t *testing.T) { c := NewLinearCache(testType) + c.log = newTestLogger(t) a := &endpoint.ClusterLoadAssignment{ClusterName: "a"} err := c.UpdateResource("a", a) assert.NoError(t, err) @@ -738,6 +843,11 @@ func TestLinearMixedWatches(t *testing.T) { sotwState := stream.NewStreamState(false, nil) w := make(chan Response, 1) + c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()}, &sotwState, w) + verifyResponse(t, w, c.getVersion(), 2) + sotwState.GetKnownResources()["a"] = c.getVersion() + sotwState.GetKnownResources()["b"] = c.getVersion() + c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()}, &sotwState, w) mustBlock(t, w) checkVersionMapNotSet(t, c) @@ -748,8 +858,10 @@ func TestLinearMixedWatches(t *testing.T) { hashA := hashResource(t, a) err = c.UpdateResources(map[string]types.Resource{"a": a}, nil) assert.NoError(t, err) - // This behavior is currently invalid for cds and lds, but due to a current limitation of linear cache sotw implementation + // For non-wildcard, we only return the state of the resource that has been updated + // This is non-applicable for cds/lds that will always send wildcard requests verifyResponse(t, w, c.getVersion(), 1) + sotwState.GetKnownResources()["a"] = c.getVersion() checkVersionMapNotSet(t, c) c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()}, &sotwState, w) diff --git a/pkg/cache/v3/simple.go b/pkg/cache/v3/simple.go index bb2c5a313a..57a022d51e 100644 --- a/pkg/cache/v3/simple.go +++ b/pkg/cache/v3/simple.go @@ -433,6 +433,7 @@ func (cache *snapshotCache) respond(ctx context.Context, request *Request, value func createResponse(ctx context.Context, request *Request, resources map[string]types.ResourceWithTTL, version string, heartbeat bool) Response { filtered := make([]types.ResourceWithTTL, 0, len(resources)) + resourceNames := make([]string, 0, len(resources)) // Reply only with the requested resources. Envoy may ask each resource // individually in a separate stream. It is ok to reply with the same version @@ -442,20 +443,23 @@ func createResponse(ctx context.Context, request *Request, resources map[string] for name, resource := range resources { if set[name] { filtered = append(filtered, resource) + resourceNames = append(resourceNames, name) } } } else { - for _, resource := range resources { + for name, resource := range resources { filtered = append(filtered, resource) + resourceNames = append(resourceNames, name) } } return &RawResponse{ - Request: request, - Version: version, - Resources: filtered, - Heartbeat: heartbeat, - Ctx: ctx, + Request: request, + Version: version, + Resources: filtered, + ResourceNames: resourceNames, + Heartbeat: heartbeat, + Ctx: ctx, } } diff --git a/pkg/server/sotw/v3/server.go b/pkg/server/sotw/v3/server.go index fcde7d89b2..8a30ce420a 100644 --- a/pkg/server/sotw/v3/server.go +++ b/pkg/server/sotw/v3/server.go @@ -63,15 +63,6 @@ type server struct { streamCount int64 } -// Discovery response that is sent over GRPC stream -// We need to record what resource names are already sent to a client -// So if the client requests a new name we can respond back -// regardless current snapshot version (even if it is not changed yet) -type lastDiscoveryResponse struct { - nonce string - resources map[string]string -} - // process handles a bi-di stream request func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryRequest, defaultTypeURL string) error { // increment stream count @@ -82,7 +73,6 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq var streamNonce int64 streamStates := map[string]stream.StreamState{} - lastDiscoveryResponses := map[string]lastDiscoveryResponse{} // a collection of stack allocated watches per request type watches := newWatches() @@ -112,20 +102,6 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq streamNonce = streamNonce + 1 out.Nonce = strconv.FormatInt(streamNonce, 10) - version, err := resp.GetVersion() - if err != nil { - return "", err - } - - lastResponse := lastDiscoveryResponse{ - nonce: out.Nonce, - resources: make(map[string]string), - } - for _, r := range resp.GetRequest().ResourceNames { - lastResponse.resources[r] = version - } - lastDiscoveryResponses[resp.GetRequest().TypeUrl] = lastResponse - if s.callbacks != nil { s.callbacks.OnStreamResponse(resp.GetContext(), streamID, resp.GetRequest(), out) } @@ -190,13 +166,25 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq typeURL := req.GetTypeUrl() state := streamStates[typeURL] - if lastResponse, ok := lastDiscoveryResponses[req.TypeUrl]; ok { - if lastResponse.nonce == "" || lastResponse.nonce == nonce { - // Let's record Resource names that a client has received. - state.SetResourceVersions(lastResponse.resources) + // Check if we're pending an ACK + if responder, ok := watches.responders[typeURL]; ok { + if responder.nonce != "" && responder.nonce == nonce { + // The nonce is matching, this is an ACK from the client + state.CommitPendingResources() } } + // Remove resources no longer subscribed from the stream state + // This ensures we will send a resource if it is unsubscribed then subscribed again + // without a cache version change + knownResources := state.GetKnownResources() + unsubscribedResources := getUnsubscribedResources(req.ResourceNames, knownResources) + for _, resourceName := range unsubscribedResources { + delete(knownResources, resourceName) + } + // Remove from pending resources to ensure we won't lose this state when commiting + state.RemovePendingResources(unsubscribedResources) + responder := make(chan cache.Response, 1) if w, ok := watches.responders[typeURL]; ok { // We've found a pre-existing watch, lets check and update if needed. @@ -235,6 +223,23 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq return err } + // Track the resources returned in the response + // Those are staged pending the client ACK + // The protocol clearly states that if we send another response prior to an ACK + // the previous one is to be considered as discarded + version, err := res.GetVersion() + if err != nil { + return err + } + + state := streamStates[res.GetRequest().TypeUrl] + resources := make(map[string]string, len(res.GetResourceNames())) + for _, name := range res.GetResourceNames() { + resources[name] = version + } + state.SetPendingResources(resources) + streamStates[res.GetRequest().TypeUrl] = state + watches.responders[res.GetRequest().TypeUrl].nonce = nonce } } @@ -263,3 +268,16 @@ func (s *server) StreamHandler(stream stream.Stream, typeURL string) error { return s.process(stream, reqCh, typeURL) } + +func getUnsubscribedResources(newResources []string, knownResources map[string]string) (removedResources []string) { + newResourcesMap := make(map[string]struct{}, len(newResources)) + for _, resourceName := range newResources { + newResourcesMap[resourceName] = struct{}{} + } + for resourceName := range knownResources { + if _, ok := newResourcesMap[resourceName]; !ok { + removedResources = append(removedResources, resourceName) + } + } + return +} diff --git a/pkg/server/stream/v3/stream.go b/pkg/server/stream/v3/stream.go index 2147dc46e0..6e2e10e511 100644 --- a/pkg/server/stream/v3/stream.go +++ b/pkg/server/stream/v3/stream.go @@ -33,6 +33,10 @@ type StreamState struct { // nolint:golint,revive // ResourceVersions contains a hash of the resource as the value and the resource name as the key. // This field stores the last state sent to the client. resourceVersions map[string]string + + // Provides the list of resources (and their version) that have been sent to the client + // but not ACKed yet + pendingResources map[string]string } // GetSubscribedResourceNames returns the list of resources currently explicitly subscribed to @@ -49,7 +53,28 @@ func (s *StreamState) SetSubscribedResources(subscribedResourceNames map[string] s.subscribedResourceNames = subscribedResourceNames } +func (s *StreamState) SetPendingResources(resources map[string]string) { + s.pendingResources = resources +} + +func (s *StreamState) RemovePendingResources(resources []string) { + for _, resource := range resources { + delete(s.pendingResources, resource) + } +} + +func (s *StreamState) CommitPendingResources() { + clientVersions := s.GetKnownResources() + for name, version := range s.pendingResources { + clientVersions[name] = version + delete(s.pendingResources, name) + } +} + func (s *StreamState) GetKnownResources() map[string]string { + if s.resourceVersions == nil { + s.resourceVersions = make(map[string]string) + } return s.resourceVersions } @@ -73,9 +98,5 @@ func NewStreamState(wildcard bool, initialResourceVersions map[string]string) St resourceVersions: initialResourceVersions, } - if initialResourceVersions == nil { - state.resourceVersions = make(map[string]string) - } - return state } diff --git a/pkg/server/v3/server_test.go b/pkg/server/v3/server_test.go index 8668e46296..2e2f0b0d00 100644 --- a/pkg/server/v3/server_test.go +++ b/pkg/server/v3/server_test.go @@ -50,7 +50,9 @@ type mockConfigWatcher struct { func (config *mockConfigWatcher) CreateWatch(req *discovery.DiscoveryRequest, state cache.ClientState, out chan cache.Response) func() { config.counts[req.TypeUrl] = config.counts[req.TypeUrl] + 1 if len(config.responses[req.TypeUrl]) > 0 { - out <- config.responses[req.TypeUrl][0] + resp := config.responses[req.TypeUrl][0].(*cache.RawResponse) + resp.Request = req + out <- resp config.responses[req.TypeUrl] = config.responses[req.TypeUrl][1:] } else { config.watches++ @@ -177,73 +179,83 @@ func makeResponses() map[string][]cache.Response { return map[string][]cache.Response{ rsrc.EndpointType: { &cache.RawResponse{ - Version: "1", - Resources: []types.ResourceWithTTL{{Resource: endpoint}}, - Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType}, + Version: "1", + Resources: []types.ResourceWithTTL{{Resource: endpoint}}, + ResourceNames: []string{clusterName}, + Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType}, }, }, rsrc.ClusterType: { &cache.RawResponse{ - Version: "2", - Resources: []types.ResourceWithTTL{{Resource: cluster}}, - Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.ClusterType}, + Version: "2", + Resources: []types.ResourceWithTTL{{Resource: cluster}}, + ResourceNames: []string{clusterName}, + Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.ClusterType}, }, }, rsrc.RouteType: { &cache.RawResponse{ - Version: "3", - Resources: []types.ResourceWithTTL{{Resource: route}}, - Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.RouteType}, + Version: "3", + Resources: []types.ResourceWithTTL{{Resource: route}}, + ResourceNames: []string{routeName}, + Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.RouteType}, }, }, rsrc.ScopedRouteType: { &cache.RawResponse{ - Version: "4", - Resources: []types.ResourceWithTTL{{Resource: scopedRoute}}, - Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.ScopedRouteType}, + Version: "4", + Resources: []types.ResourceWithTTL{{Resource: scopedRoute}}, + ResourceNames: []string{routeName}, + Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.ScopedRouteType}, }, }, rsrc.VirtualHostType: { &cache.RawResponse{ - Version: "5", - Resources: []types.ResourceWithTTL{{Resource: virtualHost}}, - Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.VirtualHostType}, + Version: "5", + Resources: []types.ResourceWithTTL{{Resource: virtualHost}}, + ResourceNames: []string{virtualHostName}, + Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.VirtualHostType}, }, }, rsrc.ListenerType: { &cache.RawResponse{ - Version: "6", - Resources: []types.ResourceWithTTL{{Resource: httpListener}, {Resource: httpScopedListener}}, - Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.ListenerType}, + Version: "6", + Resources: []types.ResourceWithTTL{{Resource: httpListener}, {Resource: httpScopedListener}}, + ResourceNames: []string{listenerName, scopedListenerName}, + Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.ListenerType}, }, }, rsrc.SecretType: { &cache.RawResponse{ - Version: "7", - Resources: []types.ResourceWithTTL{{Resource: secret}}, - Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.SecretType}, + Version: "7", + Resources: []types.ResourceWithTTL{{Resource: secret}}, + ResourceNames: []string{secretName}, + Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.SecretType}, }, }, rsrc.RuntimeType: { &cache.RawResponse{ - Version: "8", - Resources: []types.ResourceWithTTL{{Resource: runtime}}, - Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.RuntimeType}, + Version: "8", + Resources: []types.ResourceWithTTL{{Resource: runtime}}, + ResourceNames: []string{runtimeName}, + Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.RuntimeType}, }, }, rsrc.ExtensionConfigType: { &cache.RawResponse{ - Version: "9", - Resources: []types.ResourceWithTTL{{Resource: extensionConfig}}, - Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.ExtensionConfigType}, + Version: "9", + Resources: []types.ResourceWithTTL{{Resource: extensionConfig}}, + ResourceNames: []string{extensionConfigName}, + Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.ExtensionConfigType}, }, }, // Pass-through type (xDS does not exist for this type) opaqueType: { &cache.RawResponse{ - Version: "10", - Resources: []types.ResourceWithTTL{{Resource: opaque}}, - Request: &discovery.DiscoveryRequest{TypeUrl: opaqueType}, + Version: "10", + Resources: []types.ResourceWithTTL{{Resource: opaque}}, + ResourceNames: []string{"opaque"}, + Request: &discovery.DiscoveryRequest{TypeUrl: opaqueType}, }, }, } @@ -681,3 +693,139 @@ func TestCallbackError(t *testing.T) { }) } } + +type LinearCacheMock struct { + // name -> version + assert func(req *discovery.DiscoveryRequest, state cache.ClientState) + resources []types.ResourceWithTTL + resourceNames []string + version string +} + +func (mock *LinearCacheMock) CreateWatch(req *discovery.DiscoveryRequest, state cache.ClientState, out chan cache.Response) func() { + if mock.assert != nil { + mock.assert(req, state) + } + if mock.version != "" { + out <- &cache.RawResponse{ + Request: req, + Version: mock.version, + Resources: mock.resources, + ResourceNames: mock.resourceNames, + } + } + return func() {} +} + +func (mock *LinearCacheMock) CreateDeltaWatch(req *discovery.DeltaDiscoveryRequest, state cache.ClientState, out chan cache.DeltaResponse) func() { + return nil +} +func (mock *LinearCacheMock) Fetch(ctx context.Context, req *discovery.DiscoveryRequest) (cache.Response, error) { + return nil, errors.New("unimplemented") +} + +func TestSubscriptionsThroughLinearCache(t *testing.T) { + resp := makeMockStream(t) + linearCache := LinearCacheMock{} + defer close(resp.recv) + + go func() { + s := server.NewServer(context.Background(), &linearCache, server.CallbackFuncs{}) + assert.NoError(t, s.StreamAggregatedResources(resp)) + }() + + linearCache.version = "1" + linearCache.resources = []types.ResourceWithTTL{ + {Resource: endpoint}, + } + linearCache.resourceNames = []string{clusterName} + linearCache.assert = func(req *discovery.DiscoveryRequest, state cache.ClientState) { + assert.Equal(t, []string{clusterName}, req.ResourceNames) + assert.Empty(t, state.GetKnownResources()) + } + + var nonce string + resp.recv <- &discovery.DiscoveryRequest{ + Node: node, + ResponseNonce: nonce, + TypeUrl: rsrc.EndpointType, + ResourceNames: []string{clusterName}, + } + + select { + case epResponse := <-resp.sent: + assert.Len(t, epResponse.Resources, 1) + nonce = epResponse.Nonce + case <-time.After(100 * time.Millisecond): + assert.Fail(t, "no response received") + } + + linearCache.version = "" + linearCache.assert = func(req *discovery.DiscoveryRequest, state cache.ClientState) { + assert.Equal(t, []string{}, req.ResourceNames) + // This should also be empty + assert.Empty(t, state.GetKnownResources()) + } + + // No longer listen to this resource + resp.recv <- &discovery.DiscoveryRequest{ + Node: node, + ResponseNonce: nonce, + TypeUrl: rsrc.EndpointType, + ResourceNames: []string{}, + } + + select { + case epResponse := <-resp.sent: + assert.Fail(t, "unexpected response") + nonce = epResponse.Nonce + case <-time.After(100 * time.Millisecond): + // go on + } + + // Cache version did not change + linearCache.version = "1" + linearCache.assert = func(req *discovery.DiscoveryRequest, state cache.ClientState) { + assert.Equal(t, []string{clusterName}, req.ResourceNames) + // This should also be empty + assert.Empty(t, state.GetKnownResources()) + } + + //Subscribe to it again + resp.recv <- &discovery.DiscoveryRequest{ + Node: node, + ResponseNonce: nonce, + TypeUrl: rsrc.EndpointType, + ResourceNames: []string{clusterName}, + } + + select { + case epResponse := <-resp.sent: + assert.Len(t, epResponse.Resources, 1) + nonce = epResponse.Nonce + case <-time.After(100 * time.Millisecond): + assert.Fail(t, "no response received") + } + + // Cache version did not change + linearCache.version = "" + linearCache.assert = func(req *discovery.DiscoveryRequest, state cache.ClientState) { + assert.Equal(t, []string{clusterName}, req.ResourceNames) + // This should also be empty + assert.Equal(t, map[string]string{clusterName: "1"}, state.GetKnownResources()) + } + + // Don't change anything, simply ack the current one + resp.recv <- &discovery.DiscoveryRequest{ + Node: node, + ResponseNonce: nonce, + TypeUrl: rsrc.EndpointType, + ResourceNames: []string{clusterName}, + } + select { + case <-resp.sent: + assert.Fail(t, "unexpected response") + case <-time.After(100 * time.Millisecond): + // go on + } +} From 42d626f4adcea1e7f6c317618954e32a6fb16843 Mon Sep 17 00:00:00 2001 From: Valerian Roche Date: Thu, 25 Aug 2022 09:56:06 -0400 Subject: [PATCH 4/6] Synchronize test helper to pass race detection Signed-off-by: Valerian Roche --- pkg/server/v3/server_test.go | 55 +++++++++++++++++++++--------------- 1 file changed, 32 insertions(+), 23 deletions(-) diff --git a/pkg/server/v3/server_test.go b/pkg/server/v3/server_test.go index 2e2f0b0d00..ff5ab893a2 100644 --- a/pkg/server/v3/server_test.go +++ b/pkg/server/v3/server_test.go @@ -26,6 +26,7 @@ import ( "google.golang.org/grpc" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" @@ -694,15 +695,29 @@ func TestCallbackError(t *testing.T) { } } +type Assert func(req *discovery.DiscoveryRequest, state cache.ClientState) type LinearCacheMock struct { // name -> version assert func(req *discovery.DiscoveryRequest, state cache.ClientState) resources []types.ResourceWithTTL resourceNames []string version string + + // This mutex is not really useful as the test is organized to not conflict + // But it is needed to make sure race detector accepts it + mu sync.Mutex +} + +func (m *LinearCacheMock) setExpectation(assert Assert, version string) { + m.mu.Lock() + defer m.mu.Unlock() + m.assert = assert + m.version = version } func (mock *LinearCacheMock) CreateWatch(req *discovery.DiscoveryRequest, state cache.ClientState, out chan cache.Response) func() { + mock.mu.Lock() + defer mock.mu.Unlock() if mock.assert != nil { mock.assert(req, state) } @@ -726,23 +741,20 @@ func (mock *LinearCacheMock) Fetch(ctx context.Context, req *discovery.Discovery func TestSubscriptionsThroughLinearCache(t *testing.T) { resp := makeMockStream(t) - linearCache := LinearCacheMock{} + linearCache := LinearCacheMock{ + resources: []types.ResourceWithTTL{{Resource: endpoint}}, + resourceNames: []string{clusterName}, + } defer close(resp.recv) - + s := server.NewServer(context.Background(), &linearCache, server.CallbackFuncs{}) go func() { - s := server.NewServer(context.Background(), &linearCache, server.CallbackFuncs{}) assert.NoError(t, s.StreamAggregatedResources(resp)) }() - linearCache.version = "1" - linearCache.resources = []types.ResourceWithTTL{ - {Resource: endpoint}, - } - linearCache.resourceNames = []string{clusterName} - linearCache.assert = func(req *discovery.DiscoveryRequest, state cache.ClientState) { + linearCache.setExpectation(func(req *discovery.DiscoveryRequest, state cache.ClientState) { assert.Equal(t, []string{clusterName}, req.ResourceNames) assert.Empty(t, state.GetKnownResources()) - } + }, "1") var nonce string resp.recv <- &discovery.DiscoveryRequest{ @@ -757,15 +769,14 @@ func TestSubscriptionsThroughLinearCache(t *testing.T) { assert.Len(t, epResponse.Resources, 1) nonce = epResponse.Nonce case <-time.After(100 * time.Millisecond): - assert.Fail(t, "no response received") + require.Fail(t, "no response received") } - linearCache.version = "" - linearCache.assert = func(req *discovery.DiscoveryRequest, state cache.ClientState) { + linearCache.setExpectation(func(req *discovery.DiscoveryRequest, state cache.ClientState) { assert.Equal(t, []string{}, req.ResourceNames) // This should also be empty assert.Empty(t, state.GetKnownResources()) - } + }, "") // No longer listen to this resource resp.recv <- &discovery.DiscoveryRequest{ @@ -777,19 +788,18 @@ func TestSubscriptionsThroughLinearCache(t *testing.T) { select { case epResponse := <-resp.sent: - assert.Fail(t, "unexpected response") + require.Fail(t, "unexpected response") nonce = epResponse.Nonce case <-time.After(100 * time.Millisecond): // go on } // Cache version did not change - linearCache.version = "1" - linearCache.assert = func(req *discovery.DiscoveryRequest, state cache.ClientState) { + linearCache.setExpectation(func(req *discovery.DiscoveryRequest, state cache.ClientState) { assert.Equal(t, []string{clusterName}, req.ResourceNames) // This should also be empty assert.Empty(t, state.GetKnownResources()) - } + }, "1") //Subscribe to it again resp.recv <- &discovery.DiscoveryRequest{ @@ -804,16 +814,15 @@ func TestSubscriptionsThroughLinearCache(t *testing.T) { assert.Len(t, epResponse.Resources, 1) nonce = epResponse.Nonce case <-time.After(100 * time.Millisecond): - assert.Fail(t, "no response received") + require.Fail(t, "no response received") } // Cache version did not change - linearCache.version = "" - linearCache.assert = func(req *discovery.DiscoveryRequest, state cache.ClientState) { + linearCache.setExpectation(func(req *discovery.DiscoveryRequest, state cache.ClientState) { assert.Equal(t, []string{clusterName}, req.ResourceNames) // This should also be empty assert.Equal(t, map[string]string{clusterName: "1"}, state.GetKnownResources()) - } + }, "") // Don't change anything, simply ack the current one resp.recv <- &discovery.DiscoveryRequest{ @@ -824,7 +833,7 @@ func TestSubscriptionsThroughLinearCache(t *testing.T) { } select { case <-resp.sent: - assert.Fail(t, "unexpected response") + require.Fail(t, "unexpected response") case <-time.After(100 * time.Millisecond): // go on } From fe06bc1ea3e5ac5688c5edfebf857436f61e3e8d Mon Sep 17 00:00:00 2001 From: Valerian Roche Date: Thu, 25 Aug 2022 10:06:33 -0400 Subject: [PATCH 5/6] Fix linting errors Signed-off-by: Valerian Roche --- pkg/cache/v3/linear.go | 2 +- pkg/cache/v3/linear_test.go | 6 +++--- pkg/server/v3/server_test.go | 10 +++++----- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/cache/v3/linear.go b/pkg/cache/v3/linear.go index 2a334e5472..742ca8523c 100644 --- a/pkg/cache/v3/linear.go +++ b/pkg/cache/v3/linear.go @@ -135,7 +135,7 @@ func (cache *LinearCache) respond(req *Request, value chan Response, staleResour } } value <- &RawResponse{ - Request: &Request{TypeUrl: cache.typeURL}, + Request: req, Resources: resources, ResourceNames: resourceNames, Version: cache.getVersion(), diff --git a/pkg/cache/v3/linear_test.go b/pkg/cache/v3/linear_test.go index 0653054bd9..8a8cebee42 100644 --- a/pkg/cache/v3/linear_test.go +++ b/pkg/cache/v3/linear_test.go @@ -49,7 +49,7 @@ func (l testLogger) log(level string, format string, args ...interface{}) { func (l testLogger) Debugf(format string, args ...interface{}) { l.t.Helper() - l.log("INFO", format, args...) + l.log("DEBUG", format, args...) } func (l testLogger) Infof(format string, args ...interface{}) { @@ -59,12 +59,12 @@ func (l testLogger) Infof(format string, args ...interface{}) { func (l testLogger) Warnf(format string, args ...interface{}) { l.t.Helper() - l.log("INFO", format, args...) + l.log("WARN", format, args...) } func (l testLogger) Errorf(format string, args ...interface{}) { l.t.Helper() - l.log("INFO", format, args...) + l.log("ERROR", format, args...) } func testResource(s string) types.Resource { diff --git a/pkg/server/v3/server_test.go b/pkg/server/v3/server_test.go index ff5ab893a2..7aca416154 100644 --- a/pkg/server/v3/server_test.go +++ b/pkg/server/v3/server_test.go @@ -708,11 +708,11 @@ type LinearCacheMock struct { mu sync.Mutex } -func (m *LinearCacheMock) setExpectation(assert Assert, version string) { - m.mu.Lock() - defer m.mu.Unlock() - m.assert = assert - m.version = version +func (mock *LinearCacheMock) setExpectation(assert Assert, version string) { + mock.mu.Lock() + defer mock.mu.Unlock() + mock.assert = assert + mock.version = version } func (mock *LinearCacheMock) CreateWatch(req *discovery.DiscoveryRequest, state cache.ClientState, out chan cache.Response) func() { From 408400c2ae728d6cfa0afe44a55cf3517b320528 Mon Sep 17 00:00:00 2001 From: Valerian Roche Date: Thu, 25 Aug 2022 15:54:56 -0400 Subject: [PATCH 6/6] Ensure we don't create a race between cache and server in sotw. Move nonce and version within streamState to properly track it with the pending resources Signed-off-by: Valerian Roche --- pkg/server/sotw/v3/server.go | 59 ++++++++++++++++++---------------- pkg/server/sotw/v3/watches.go | 1 - pkg/server/stream/v3/stream.go | 23 ++++++++++--- pkg/server/v3/server_test.go | 25 +++++++------- 4 files changed, 62 insertions(+), 46 deletions(-) diff --git a/pkg/server/sotw/v3/server.go b/pkg/server/sotw/v3/server.go index 8a30ce420a..27501f4461 100644 --- a/pkg/server/sotw/v3/server.go +++ b/pkg/server/sotw/v3/server.go @@ -165,11 +165,30 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq } typeURL := req.GetTypeUrl() - state := streamStates[typeURL] - // Check if we're pending an ACK - if responder, ok := watches.responders[typeURL]; ok { - if responder.nonce != "" && responder.nonce == nonce { - // The nonce is matching, this is an ACK from the client + // State cannot be modified until any potential watch is closed + state, ok := streamStates[typeURL] + if !ok { + // We don't have a current state for this type, create one + state = stream.NewStreamState(len(req.ResourceNames) == 0, nil) + } else if nonce != state.LastResponseNonce() { + // The request does not match the last response we sent. + // The protocol is a bit unclear in this case, but currently we discard such request and wait + // for the client to acknowledge the previous response. + // This is unclear how this handles cases where a response would be missed as any subsequent request will be discarded. + + // We can continue here as the watch list hasn't changed so we don't need to recompute the watches select + continue + } + + responder := make(chan cache.Response, 1) + if w, ok := watches.responders[typeURL]; ok { + // If we had an open watch, close it to make sure we don't end up sending a cache response while we update the state of the request + w.close() + + // Check if the new request ACKs the previous response + // This is defined as nonce and versions are matching, as well as no error present + if state.LastResponseNonce() == nonce && state.LastResponseVersion() == req.VersionInfo && req.ErrorDetail == nil { + // The nonce and versions are matching, this is an ACK from the client state.CommitPendingResources() } } @@ -185,26 +204,10 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq // Remove from pending resources to ensure we won't lose this state when commiting state.RemovePendingResources(unsubscribedResources) - responder := make(chan cache.Response, 1) - if w, ok := watches.responders[typeURL]; ok { - // We've found a pre-existing watch, lets check and update if needed. - // If these requirements aren't satisfied, leave an open watch. - if w.nonce == "" || w.nonce == nonce { - w.close() - - watches.addWatch(typeURL, &watch{ - cancel: s.cache.CreateWatch(req, &state, responder), - response: responder, - }) - } - } else { - // No pre-existing watch exists, let's create one. - // We need to precompute the watches first then open a watch in the cache. - watches.addWatch(typeURL, &watch{ - cancel: s.cache.CreateWatch(req, &state, responder), - response: responder, - }) - } + watches.addWatch(typeURL, &watch{ + cancel: s.cache.CreateWatch(req, &state, responder), + response: responder, + }) streamStates[typeURL] = state @@ -237,10 +240,10 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq for _, name := range res.GetResourceNames() { resources[name] = version } - state.SetPendingResources(resources) + // Pending resources can be modified in the server while a watch is opened + // It will only be visible to caches once those resources are commited + state.SetPendingResources(nonce, version, resources) streamStates[res.GetRequest().TypeUrl] = state - - watches.responders[res.GetRequest().TypeUrl].nonce = nonce } } } diff --git a/pkg/server/sotw/v3/watches.go b/pkg/server/sotw/v3/watches.go index 45670d6a91..610f087f1d 100644 --- a/pkg/server/sotw/v3/watches.go +++ b/pkg/server/sotw/v3/watches.go @@ -63,7 +63,6 @@ func (w *watches) recompute(ctx context.Context, req <-chan *discovery.Discovery // watch contains the necessary modifiables for receiving resource responses type watch struct { cancel func() - nonce string response chan cache.Response } diff --git a/pkg/server/stream/v3/stream.go b/pkg/server/stream/v3/stream.go index 6e2e10e511..19d8a67537 100644 --- a/pkg/server/stream/v3/stream.go +++ b/pkg/server/stream/v3/stream.go @@ -37,6 +37,10 @@ type StreamState struct { // nolint:golint,revive // Provides the list of resources (and their version) that have been sent to the client // but not ACKed yet pendingResources map[string]string + + // Provides the nonce and versions of the last response sent on the stream. Used to validate ACK/NACK and avoid races + lastNonce string + lastVersion string } // GetSubscribedResourceNames returns the list of resources currently explicitly subscribed to @@ -53,8 +57,10 @@ func (s *StreamState) SetSubscribedResources(subscribedResourceNames map[string] s.subscribedResourceNames = subscribedResourceNames } -func (s *StreamState) SetPendingResources(resources map[string]string) { +func (s *StreamState) SetPendingResources(nonce, version string, resources map[string]string) { s.pendingResources = resources + s.lastNonce = nonce + s.lastVersion = version } func (s *StreamState) RemovePendingResources(resources []string) { @@ -71,10 +77,15 @@ func (s *StreamState) CommitPendingResources() { } } +func (s *StreamState) LastResponseNonce() string { + return s.lastNonce +} + +func (s *StreamState) LastResponseVersion() string { + return s.lastVersion +} + func (s *StreamState) GetKnownResources() map[string]string { - if s.resourceVersions == nil { - s.resourceVersions = make(map[string]string) - } return s.resourceVersions } @@ -98,5 +109,9 @@ func NewStreamState(wildcard bool, initialResourceVersions map[string]string) St resourceVersions: initialResourceVersions, } + if initialResourceVersions == nil { + state.resourceVersions = make(map[string]string) + } + return state } diff --git a/pkg/server/v3/server_test.go b/pkg/server/v3/server_test.go index 7aca416154..e696a2d463 100644 --- a/pkg/server/v3/server_test.go +++ b/pkg/server/v3/server_test.go @@ -530,7 +530,7 @@ func TestStaleNonce(t *testing.T) { err := s.StreamAggregatedResources(resp) assert.NoError(t, err) // should be two watches called - assert.False(t, !reflect.DeepEqual(map[string]int{typ: 2}, config.counts)) + assert.Equal(t, map[string]int{typ: 2}, config.counts) close(stop) }() select { @@ -756,18 +756,16 @@ func TestSubscriptionsThroughLinearCache(t *testing.T) { assert.Empty(t, state.GetKnownResources()) }, "1") - var nonce string resp.recv <- &discovery.DiscoveryRequest{ Node: node, - ResponseNonce: nonce, TypeUrl: rsrc.EndpointType, ResourceNames: []string{clusterName}, } + var epResponse *discovery.DiscoveryResponse select { - case epResponse := <-resp.sent: + case epResponse = <-resp.sent: assert.Len(t, epResponse.Resources, 1) - nonce = epResponse.Nonce case <-time.After(100 * time.Millisecond): require.Fail(t, "no response received") } @@ -781,18 +779,19 @@ func TestSubscriptionsThroughLinearCache(t *testing.T) { // No longer listen to this resource resp.recv <- &discovery.DiscoveryRequest{ Node: node, - ResponseNonce: nonce, + ResponseNonce: epResponse.Nonce, + VersionInfo: epResponse.VersionInfo, TypeUrl: rsrc.EndpointType, ResourceNames: []string{}, } select { - case epResponse := <-resp.sent: + case <-resp.sent: require.Fail(t, "unexpected response") - nonce = epResponse.Nonce case <-time.After(100 * time.Millisecond): // go on } + fmt.Println("now?") // Cache version did not change linearCache.setExpectation(func(req *discovery.DiscoveryRequest, state cache.ClientState) { @@ -804,15 +803,15 @@ func TestSubscriptionsThroughLinearCache(t *testing.T) { //Subscribe to it again resp.recv <- &discovery.DiscoveryRequest{ Node: node, - ResponseNonce: nonce, + ResponseNonce: epResponse.Nonce, + VersionInfo: epResponse.VersionInfo, TypeUrl: rsrc.EndpointType, ResourceNames: []string{clusterName}, } select { - case epResponse := <-resp.sent: + case epResponse = <-resp.sent: assert.Len(t, epResponse.Resources, 1) - nonce = epResponse.Nonce case <-time.After(100 * time.Millisecond): require.Fail(t, "no response received") } @@ -820,14 +819,14 @@ func TestSubscriptionsThroughLinearCache(t *testing.T) { // Cache version did not change linearCache.setExpectation(func(req *discovery.DiscoveryRequest, state cache.ClientState) { assert.Equal(t, []string{clusterName}, req.ResourceNames) - // This should also be empty assert.Equal(t, map[string]string{clusterName: "1"}, state.GetKnownResources()) }, "") // Don't change anything, simply ack the current one resp.recv <- &discovery.DiscoveryRequest{ Node: node, - ResponseNonce: nonce, + ResponseNonce: epResponse.Nonce, + VersionInfo: epResponse.VersionInfo, TypeUrl: rsrc.EndpointType, ResourceNames: []string{clusterName}, }