diff --git a/pkg/server/delta/v3/server.go b/pkg/server/delta/v3/server.go index b53110de2b..de9bde0cc9 100644 --- a/pkg/server/delta/v3/server.go +++ b/pkg/server/delta/v3/server.go @@ -234,7 +234,6 @@ func (s *server) unsubscribe(resources []string, streamState *stream.StreamState streamState.SetWildcard(false) continue } - delete(sv, resource) if _, ok := sv[resource]; ok && streamState.IsWildcard() { // xds protocol specifically states that if a resource if unsubscribed while a wildcard watch is present, // the control-plane must return a response with either the resource set as removed (if no longer present in the snapshot) @@ -243,5 +242,6 @@ func (s *server) unsubscribe(resources []string, streamState *stream.StreamState // or mark it as removed (done in createDeltaResponse) streamState.GetResourceVersions()[resource] = "" } + delete(sv, resource) } } diff --git a/pkg/server/v3/delta_test.go b/pkg/server/v3/delta_test.go index 75c97bd809..204599613c 100644 --- a/pkg/server/v3/delta_test.go +++ b/pkg/server/v3/delta_test.go @@ -17,78 +17,72 @@ import ( rsrc "github.com/envoyproxy/go-control-plane/pkg/resource/v3" "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" "github.com/envoyproxy/go-control-plane/pkg/server/v3" + "github.com/envoyproxy/go-control-plane/pkg/test/resource/v3" ) func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryRequest, state stream.StreamState, out chan cache.DeltaResponse) func() { config.deltaCounts[req.TypeUrl] = config.deltaCounts[req.TypeUrl] + 1 - if len(config.deltaResponses[req.TypeUrl]) > 0 { - res := config.deltaResponses[req.TypeUrl][0] - r, _ := res.GetDeltaDiscoveryResponse() - - resourceMap := map[string]types.Resource{} - versionMap := map[string]string{} - for _, resource := range r.Resources { - resourceMap[resource.Name] = resource - res, _ := cache.MarshalResource(resource) - versionMap[resource.Name] = cache.HashResource(res) + // This is duplicated from pkg/cache/v3/delta.go as private there + resourceMap := config.deltaResources[req.TypeUrl] + versionMap := map[string]string{} + for name, resource := range resourceMap { + marshaledResource, _ := cache.MarshalResource(resource) + versionMap[name] = cache.HashResource(marshaledResource) + } + var nextVersionMap map[string]string + var filtered []types.Resource + var toRemove []string + + // If we are handling a wildcard request, we want to respond with all resources + switch { + case state.IsWildcard(): + if len(state.GetResourceVersions()) == 0 { + filtered = make([]types.Resource, 0, len(resourceMap)) } - - // This code is copied as private from pkg/cache/v3/delta.go:createDeltaResponse - - // variables to build our response with - var nextVersionMap map[string]string - var filtered []types.Resource - var toRemove []string - - // If we are handling a wildcard request, we want to respond with all resources - switch { - case state.IsWildcard(): - if len(state.GetResourceVersions()) == 0 { - filtered = make([]types.Resource, 0, len(resourceMap)) - } - nextVersionMap = make(map[string]string, len(resourceMap)) - for name, r := range resourceMap { - // Since we've already precomputed the version hashes of the new snapshot, - // we can just set it here to be used for comparison later - version := versionMap[name] - nextVersionMap[name] = version - prevVersion, found := state.GetResourceVersions()[name] - if !found || (prevVersion != version) { - filtered = append(filtered, r) - } + nextVersionMap = make(map[string]string, len(resourceMap)) + for name, r := range resourceMap { + // Since we've already precomputed the version hashes of the new snapshot, + // we can just set it here to be used for comparison later + version := versionMap[name] + nextVersionMap[name] = version + prevVersion, found := state.GetResourceVersions()[name] + if !found || (prevVersion != version) { + filtered = append(filtered, r) } + } - // Compute resources for removal - for name := range state.GetResourceVersions() { - if _, ok := resourceMap[name]; !ok { - toRemove = append(toRemove, name) - } + // Compute resources for removal + for name := range state.GetResourceVersions() { + if _, ok := resourceMap[name]; !ok { + toRemove = append(toRemove, name) } - default: - // Reply only with the requested resources - nextVersionMap = make(map[string]string, len(state.GetSubscribedResourceNames())) - // state.GetResourceVersions() may include resources no longer subscribed - // In the current code this gets silently cleaned when updating the version map - for name := range state.GetSubscribedResourceNames() { - prevVersion, found := state.GetResourceVersions()[name] - if r, ok := resourceMap[name]; ok { - nextVersion := versionMap[name] - if prevVersion != nextVersion { - filtered = append(filtered, r) - } - nextVersionMap[name] = nextVersion - } else if found { - toRemove = append(toRemove, name) + } + default: + nextVersionMap = make(map[string]string, len(state.GetSubscribedResourceNames())) + // state.GetResourceVersions() may include resources no longer subscribed + // In the current code this gets silently cleaned when updating the version map + for name := range state.GetSubscribedResourceNames() { + prevVersion, found := state.GetResourceVersions()[name] + if r, ok := resourceMap[name]; ok { + nextVersion := versionMap[name] + if prevVersion != nextVersion { + filtered = append(filtered, r) } + nextVersionMap[name] = nextVersion + } else if found { + toRemove = append(toRemove, name) } } + } + if len(filtered)+len(toRemove) > 0 { out <- &cache.RawDeltaResponse{ - DeltaRequest: req, - Resources: filtered, - RemovedResources: toRemove, - NextVersionMap: nextVersionMap, + DeltaRequest: req, + Resources: filtered, + RemovedResources: toRemove, + SystemVersionInfo: "", + NextVersionMap: nextVersionMap, } } else { config.deltaWatches++ @@ -159,78 +153,39 @@ func makeMockDeltaStream(t *testing.T) *mockDeltaStream { } } -func makeDeltaResponses() map[string][]cache.DeltaResponse { - return map[string][]cache.DeltaResponse{ - rsrc.EndpointType: { - &cache.RawDeltaResponse{ - Resources: []types.Resource{endpoint}, - DeltaRequest: &discovery.DeltaDiscoveryRequest{TypeUrl: rsrc.EndpointType}, - SystemVersionInfo: "1", - }, +func makeDeltaResources() map[string]map[string]types.Resource { + return map[string]map[string]types.Resource{ + rsrc.EndpointType: map[string]types.Resource{ + endpoint.GetClusterName(): endpoint, }, - rsrc.ClusterType: { - &cache.RawDeltaResponse{ - Resources: []types.Resource{cluster}, - DeltaRequest: &discovery.DeltaDiscoveryRequest{TypeUrl: rsrc.ClusterType}, - SystemVersionInfo: "2", - }, + rsrc.ClusterType: map[string]types.Resource{ + cluster.Name: cluster, }, - rsrc.RouteType: { - &cache.RawDeltaResponse{ - Resources: []types.Resource{route}, - DeltaRequest: &discovery.DeltaDiscoveryRequest{TypeUrl: rsrc.RouteType}, - SystemVersionInfo: "3", - }, + rsrc.RouteType: map[string]types.Resource{ + route.Name: route, }, - rsrc.ScopedRouteType: { - &cache.RawDeltaResponse{ - Resources: []types.Resource{scopedRoute}, - DeltaRequest: &discovery.DeltaDiscoveryRequest{TypeUrl: rsrc.ScopedRouteType}, - SystemVersionInfo: "4", - }, + rsrc.ScopedRouteType: map[string]types.Resource{ + scopedRoute.Name: scopedRoute, }, - rsrc.VirtualHostType: { - &cache.RawDeltaResponse{ - Resources: []types.Resource{virtualHost}, - DeltaRequest: &discovery.DeltaDiscoveryRequest{TypeUrl: rsrc.VirtualHostType}, - SystemVersionInfo: "5", - }, + rsrc.VirtualHostType: map[string]types.Resource{ + virtualHost.Name: virtualHost, }, - rsrc.ListenerType: { - &cache.RawDeltaResponse{ - Resources: []types.Resource{httpListener, httpScopedListener}, - DeltaRequest: &discovery.DeltaDiscoveryRequest{TypeUrl: rsrc.ListenerType}, - SystemVersionInfo: "6", - }, + rsrc.ListenerType: map[string]types.Resource{ + httpListener.Name: httpListener, + httpScopedListener.Name: httpScopedListener, }, - rsrc.SecretType: { - &cache.RawDeltaResponse{ - SystemVersionInfo: "7", - Resources: []types.Resource{secret}, - DeltaRequest: &discovery.DeltaDiscoveryRequest{TypeUrl: rsrc.SecretType}, - }, + rsrc.SecretType: map[string]types.Resource{ + secret.Name: secret, }, - rsrc.RuntimeType: { - &cache.RawDeltaResponse{ - SystemVersionInfo: "8", - Resources: []types.Resource{runtime}, - DeltaRequest: &discovery.DeltaDiscoveryRequest{TypeUrl: rsrc.RuntimeType}, - }, + rsrc.RuntimeType: map[string]types.Resource{ + runtime.Name: runtime, }, - rsrc.ExtensionConfigType: { - &cache.RawDeltaResponse{ - SystemVersionInfo: "9", - Resources: []types.Resource{extensionConfig}, - DeltaRequest: &discovery.DeltaDiscoveryRequest{TypeUrl: rsrc.ExtensionConfigType}, - }, + rsrc.ExtensionConfigType: map[string]types.Resource{ + extensionConfig.Name: extensionConfig, }, // Pass-through type (types without explicit handling) - opaqueType: { - &cache.RawDeltaResponse{ - SystemVersionInfo: "10", - Resources: []types.Resource{opaque}, - DeltaRequest: &discovery.DeltaDiscoveryRequest{TypeUrl: opaqueType}, - }, + opaqueType: map[string]types.Resource{ + "opaque": opaque, }, } } @@ -267,7 +222,7 @@ func TestDeltaResponseHandlersWildcard(t *testing.T) { for _, typ := range testTypes { t.Run(typ, func(t *testing.T) { config := makeMockConfigWatcher() - config.deltaResponses = makeDeltaResponses() + config.deltaResources = makeDeltaResources() s := server.NewServer(context.Background(), config, server.CallbackFuncs{}) resp := makeMockDeltaStream(t) @@ -296,17 +251,16 @@ func TestDeltaResponseHandlers(t *testing.T) { for _, typ := range testTypes { t.Run(typ, func(t *testing.T) { config := makeMockConfigWatcher() - config.deltaResponses = makeDeltaResponses() + config.deltaResources = makeDeltaResources() s := server.NewServer(context.Background(), config, server.CallbackFuncs{}) resp := makeMockDeltaStream(t) - // This is a wildcard request since we don't specify a list of resource subscriptions - res, err := config.deltaResponses[typ][0].GetDeltaDiscoveryResponse() - if err != nil { - t.Error(err) + resourceNames := []string{} + for resourceName := range config.deltaResources[typ] { + resourceNames = append(resourceNames, resourceName) } // We only subscribe to one resource to see if we get the appropriate number of resources back - resp.recv <- &discovery.DeltaDiscoveryRequest{Node: node, TypeUrl: typ, ResourceNamesSubscribe: []string{res.Resources[0].Name}} + resp.recv <- &discovery.DeltaDiscoveryRequest{Node: node, TypeUrl: typ, ResourceNamesSubscribe: resourceNames} go func() { err := process(typ, resp, s) @@ -330,7 +284,7 @@ func TestSendDeltaError(t *testing.T) { for _, typ := range testTypes { t.Run(typ, func(t *testing.T) { config := makeMockConfigWatcher() - config.deltaResponses = makeDeltaResponses() + config.deltaResources = makeDeltaResources() s := server.NewServer(context.Background(), config, server.CallbackFuncs{}) // make a request with an error @@ -352,7 +306,7 @@ func TestSendDeltaError(t *testing.T) { func TestDeltaAggregatedHandlers(t *testing.T) { config := makeMockConfigWatcher() - config.deltaResponses = makeDeltaResponses() + config.deltaResources = makeDeltaResources() resp := makeMockDeltaStream(t) reqs := []*discovery.DeltaDiscoveryRequest{ @@ -432,6 +386,7 @@ func TestDeltaAggregateRequestType(t *testing.T) { if err := s.DeltaAggregatedResources(resp); err == nil { t.Error("DeltaAggregatedResources() => got nil, want an error") } + close(resp.recv) } func TestDeltaCancellations(t *testing.T) { @@ -477,7 +432,7 @@ func TestDeltaCallbackError(t *testing.T) { for _, typ := range testTypes { t.Run(typ, func(t *testing.T) { config := makeMockConfigWatcher() - config.deltaResponses = makeDeltaResponses() + config.deltaResources = makeDeltaResources() s := server.NewServer(context.Background(), config, server.CallbackFuncs{ DeltaStreamOpenFunc: func(ctx context.Context, i int64, s string) error { @@ -501,3 +456,161 @@ func TestDeltaCallbackError(t *testing.T) { }) } } + +func TestDeltaWildcardSubscriptions(t *testing.T) { + config := makeMockConfigWatcher() + config.deltaResources = map[string]map[string]types.Resource{ + rsrc.EndpointType: map[string]types.Resource{ + "endpoints0": resource.MakeEndpoint("endpoints0", 1234), + "endpoints1": resource.MakeEndpoint("endpoints1", 1234), + "endpoints2": resource.MakeEndpoint("endpoints2", 1234), + "endpoints3": resource.MakeEndpoint("endpoints3", 1234), + }, + } + + validateResponse := func(t *testing.T, replies <-chan *discovery.DeltaDiscoveryResponse, expectedResources []string, expectedRemovedResources []string) { + t.Helper() + select { + case response := <-replies: + assert.Equal(t, rsrc.EndpointType, response.TypeUrl) + if assert.Equal(t, len(expectedResources), len(response.Resources)) { + var names []string + for _, resource := range response.Resources { + names = append(names, resource.Name) + } + assert.ElementsMatch(t, names, expectedResources) + assert.ElementsMatch(t, response.RemovedResources, expectedRemovedResources) + } + case <-time.After(1 * time.Second): + t.Fatalf("got no response") + } + } + + updateResources := func(port uint32) { + config.deltaResources[rsrc.EndpointType]["endpoints0"] = resource.MakeEndpoint("endpoints0", port) + config.deltaResources[rsrc.EndpointType]["endpoints1"] = resource.MakeEndpoint("endpoints1", port) + config.deltaResources[rsrc.EndpointType]["endpoints2"] = resource.MakeEndpoint("endpoints2", port) + config.deltaResources[rsrc.EndpointType]["endpoints3"] = resource.MakeEndpoint("endpoints3", port) + } + + t.Run("legacy still working", func(t *testing.T) { + resp := makeMockDeltaStream(t) + defer close(resp.recv) + s := server.NewServer(context.Background(), config, server.CallbackFuncs{}) + go func() { + err := s.DeltaAggregatedResources(resp) + assert.NoError(t, err) + }() + + resp.recv <- &discovery.DeltaDiscoveryRequest{ + Node: node, + TypeUrl: rsrc.EndpointType, + } + validateResponse(t, resp.sent, []string{"endpoints0", "endpoints1", "endpoints2", "endpoints3"}, nil) + + // Generate a change to ensure we receive updates if subscribed + updateResources(2345) + + // In legacy mode, adding a new resource behaves the same as if providing a subscription to wildcard first + resp.recv <- &discovery.DeltaDiscoveryRequest{ + Node: node, + TypeUrl: rsrc.EndpointType, + ResourceNamesSubscribe: []string{"endpoints0"}, + } + validateResponse(t, resp.sent, []string{"endpoints0", "endpoints1", "endpoints2", "endpoints3"}, nil) + + updateResources(1234) + + // We allow unsubscribing with the new method + resp.recv <- &discovery.DeltaDiscoveryRequest{ + Node: node, + TypeUrl: rsrc.EndpointType, + ResourceNamesUnsubscribe: []string{"*"}, + } + validateResponse(t, resp.sent, []string{"endpoints0"}, nil) + + }) + + t.Run("* subscribtion/unsubscription support", func(t *testing.T) { + resp := makeMockDeltaStream(t) + defer close(resp.recv) + s := server.NewServer(context.Background(), config, server.CallbackFuncs{}) + go func() { + err := s.DeltaAggregatedResources(resp) + assert.NoError(t, err) + }() + updateResources(1234) + + resp.recv <- &discovery.DeltaDiscoveryRequest{ + Node: node, + TypeUrl: rsrc.EndpointType, + ResourceNamesSubscribe: []string{"endpoints1"}, + } + validateResponse(t, resp.sent, []string{"endpoints1"}, nil) + + updateResources(2345) + + resp.recv <- &discovery.DeltaDiscoveryRequest{ + Node: node, + TypeUrl: rsrc.EndpointType, + ResourceNamesSubscribe: []string{"*"}, + } + validateResponse(t, resp.sent, []string{"endpoints0", "endpoints1", "endpoints2", "endpoints3"}, nil) + + updateResources(1234) + + resp.recv <- &discovery.DeltaDiscoveryRequest{ + Node: node, + TypeUrl: rsrc.EndpointType, + ResourceNamesSubscribe: []string{"endpoints2"}, + } + validateResponse(t, resp.sent, []string{"endpoints0", "endpoints1", "endpoints2", "endpoints3"}, nil) + + updateResources(2345) + + resp.recv <- &discovery.DeltaDiscoveryRequest{ + Node: node, + TypeUrl: rsrc.EndpointType, + ResourceNamesUnsubscribe: []string{"*"}, + } + validateResponse(t, resp.sent, []string{"endpoints1", "endpoints2"}, nil) + }) + + t.Run("resource specific subscribtions while using wildcard", func(t *testing.T) { + resp := makeMockDeltaStream(t) + defer close(resp.recv) + s := server.NewServer(context.Background(), config, server.CallbackFuncs{}) + go func() { + err := s.DeltaAggregatedResources(resp) + assert.NoError(t, err) + }() + + updateResources(1234) + + resp.recv <- &discovery.DeltaDiscoveryRequest{ + Node: node, + TypeUrl: rsrc.EndpointType, + ResourceNamesSubscribe: []string{"*"}, + } + validateResponse(t, resp.sent, []string{"endpoints0", "endpoints1", "endpoints2", "endpoints3"}, nil) + + updateResources(2345) + + resp.recv <- &discovery.DeltaDiscoveryRequest{ + Node: node, + TypeUrl: rsrc.EndpointType, + ResourceNamesSubscribe: []string{"endpoints2", "endpoints4"}, // endpoints4 does not exist + } + validateResponse(t, resp.sent, []string{"endpoints0", "endpoints1", "endpoints2", "endpoints3"}, nil) + + // Don't update the resources now, test unsubscribing does send the resource again + + resp.recv <- &discovery.DeltaDiscoveryRequest{ + Node: node, + TypeUrl: rsrc.EndpointType, + ResourceNamesUnsubscribe: []string{"endpoints2", "endpoints4"}, // endpoints4 does not exist + } + validateResponse(t, resp.sent, []string{"endpoints2"}, []string{"endpoints4"}) + }) + +} diff --git a/pkg/server/v3/server_test.go b/pkg/server/v3/server_test.go index 9f5c161122..02078bc3ad 100644 --- a/pkg/server/v3/server_test.go +++ b/pkg/server/v3/server_test.go @@ -41,7 +41,7 @@ type mockConfigWatcher struct { counts map[string]int deltaCounts map[string]int responses map[string][]cache.Response - deltaResponses map[string][]cache.DeltaResponse + deltaResources map[string]map[string]types.Resource watches int deltaWatches int