Skip to content

Commit

Permalink
Rename ClientState to SubscriptionState. Ensure WatchesResources is a…
Browse files Browse the repository at this point in the history
…lso compatible with sotw

Signed-off-by: Valerian Roche <valerian.roche@datadoghq.com>
  • Loading branch information
valerian-roche committed Jun 9, 2023
1 parent 4fe6c1a commit db72c18
Show file tree
Hide file tree
Showing 18 changed files with 413 additions and 271 deletions.
19 changes: 13 additions & 6 deletions pkg/cache/v3/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ type Request = discovery.DiscoveryRequest
// DeltaRequest is an alias for the delta discovery request type.
type DeltaRequest = discovery.DeltaDiscoveryRequest

// ClientState provides additional data on the client knowledge for the type matching the request
// SubscriptionState provides additional data on the client knowledge for the type matching the request
// This allows proper implementation of stateful aspects of the protocol (e.g. returning only some updated resources)
// Though the methods may return mutable parts of the state for performance reasons,
// the cache is expected to consider this state as immutable and thread safe between a watch creation and its cancellation
type ClientState interface {
// GetKnownResources returns the list of resources the clients has ACKed and their associated version.
type SubscriptionState interface {
// GetKnownResources returns a list of resources that the client has ACK'd and their associated version.
// The versions are:
// - delta protocol: version of the specific resource set in the response
// - sotw protocol: version of the global response when the resource was last ACKed
Expand All @@ -53,8 +53,15 @@ type ClientState interface {
GetSubscribedResources() map[string]struct{}

// IsWildcard returns whether the client has a wildcard watch.
// This considers subtilities related to the current migration of wildcard definition within the protocol.
// This considers subtleties related to the current migration of wildcard definitions within the protocol.
// More details on the behavior of wildcard are present at https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#how-the-client-specifies-what-resources-to-return
IsWildcard() bool

// WatchesResources returns whether at least one of the resources provided is currently being watched by the subscription.
// It is currently only applicable to delta-xds.
// If the request is wildcard, it will always return true,
// otherwise it will compare the provided resources to the list of resources currently subscribed
WatchesResources(resourceNames map[string]struct{}) bool
}

// ConfigWatcher requests watches for configuration resources by a node, last
Expand All @@ -74,7 +81,7 @@ type ConfigWatcher interface {
//
// Cancel is an optional function to release resources in the producer. If
// provided, the consumer may call this function multiple times.
CreateWatch(*Request, ClientState, chan Response) (cancel func())
CreateWatch(*Request, SubscriptionState, chan Response) (cancel func(), err error)

// CreateDeltaWatch returns a new open incremental xDS watch.
// This is the entrypoint to propagate configuration changes the
Expand All @@ -86,7 +93,7 @@ type ConfigWatcher interface {
//
// Cancel is an optional function to release resources in the producer. If
// provided, the consumer may call this function multiple times.
CreateDeltaWatch(*DeltaRequest, ClientState, chan DeltaResponse) (cancel func())
CreateDeltaWatch(*DeltaRequest, SubscriptionState, chan DeltaResponse) (cancel func(), err error)
}

// ConfigFetcher fetches configuration resources from cache
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/v3/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type resourceContainer struct {
systemVersion string
}

func createDeltaResponse(ctx context.Context, req *DeltaRequest, state ClientState, resources resourceContainer) *RawDeltaResponse {
func createDeltaResponse(ctx context.Context, req *DeltaRequest, state SubscriptionState, resources resourceContainer) *RawDeltaResponse {
// variables to build our response with
var nextVersionMap map[string]string
var filtered []types.Resource
Expand Down
39 changes: 23 additions & 16 deletions pkg/cache/v3/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,15 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
// Make our initial request as a wildcard to get all resources and make sure the wildcard requesting works as intended
for _, typ := range testTypes {
watches[typ] = make(chan cache.DeltaResponse, 1)
state := stream.NewStreamState(true, nil)
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
state := stream.NewSubscriptionState(true, nil)
_, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: "node",
},
TypeUrl: typ,
ResourceNamesSubscribe: names[typ],
}, state, watches[typ])
require.NoError(t, err)
}

if err := c.SetSnapshot(context.Background(), key, fixture.snapshot()); err != nil {
Expand All @@ -68,17 +69,18 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
// all resources as well as individual resource removals
for _, typ := range testTypes {
watches[typ] = make(chan cache.DeltaResponse, 1)
state := stream.NewStreamState(false, versionMap[typ])
state := stream.NewSubscriptionState(false, versionMap[typ])
for resource := range versionMap[typ] {
state.GetSubscribedResources()[resource] = struct{}{}
}
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
_, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: "node",
},
TypeUrl: typ,
ResourceNamesSubscribe: names[typ],
}, state, watches[typ])
require.NoError(t, err)
}

if count := c.GetStatusInfo(key).GetNumDeltaWatches(); count != len(testTypes) {
Expand Down Expand Up @@ -111,21 +113,22 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
func TestDeltaRemoveResources(t *testing.T) {
c := cache.NewSnapshotCache(false, group{}, logger{t: t})
watches := make(map[string]chan cache.DeltaResponse)
streams := make(map[string]*stream.StreamState)
streams := make(map[string]*stream.SubscriptionState)

// At this stage the cache is empty, so a watch is opened
for _, typ := range testTypes {
watches[typ] = make(chan cache.DeltaResponse, 1)
state := stream.NewStreamState(true, make(map[string]string))
state := stream.NewSubscriptionState(true, make(map[string]string))
streams[typ] = &state
// We don't specify any resource name subscriptions here because we want to make sure we test wildcard
// functionality. This means we should receive all resources back without requesting a subscription by name.
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
_, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: "node",
},
TypeUrl: typ,
}, *streams[typ], watches[typ])
require.NoError(t, err)
}

snapshot := fixture.snapshot()
Expand All @@ -141,7 +144,7 @@ func TestDeltaRemoveResources(t *testing.T) {
case out := <-watches[typ]:
assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResources(typ))
nextVersionMap := out.GetNextVersionMap()
streams[typ].SetResourceVersions(nextVersionMap)
streams[typ].SetKnownResources(nextVersionMap)
case <-time.After(time.Second):
require.Fail(t, "failed to receive a snapshot response")
}
Expand All @@ -152,13 +155,14 @@ func TestDeltaRemoveResources(t *testing.T) {
// test the removal of certain resources from a partial snapshot
for _, typ := range testTypes {
watches[typ] = make(chan cache.DeltaResponse, 1)
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
_, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: "node",
},
TypeUrl: typ,
ResponseNonce: "nonce",
}, *streams[typ], watches[typ])
require.NoError(t, err)
}

assert.Equal(t, len(testTypes), c.GetStatusInfo(key).GetNumDeltaWatches(), "watches should be created for the latest version")
Expand Down Expand Up @@ -202,14 +206,15 @@ func TestConcurrentSetDeltaWatch(t *testing.T) {
t.Fatalf("snapshot failed: %s", err)
}
} else {
state := stream.NewStreamState(false, make(map[string]string))
cancel := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
state := stream.NewSubscriptionState(false, make(map[string]string))
cancel, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: id,
},
TypeUrl: rsrc.EndpointType,
ResourceNamesSubscribe: []string{clusterName},
}, state, responses)
require.NoError(t, err)

defer cancel()
}
Expand All @@ -225,21 +230,22 @@ func TestSnapshotDeltaCacheWatchTimeout(t *testing.T) {

// Create a non-buffered channel that will block sends.
watchCh := make(chan cache.DeltaResponse)
state := stream.NewStreamState(false, nil)
state := stream.NewSubscriptionState(false, nil)
state.SetSubscribedResources(map[string]struct{}{names[rsrc.EndpointType][0]: {}})
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
_, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: key,
},
TypeUrl: rsrc.EndpointType,
ResourceNamesSubscribe: names[rsrc.EndpointType],
}, state, watchCh)
require.NoError(t, err)

// The first time we set the snapshot without consuming from the blocking channel, so this should time out.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()

err := c.SetSnapshot(ctx, key, fixture.snapshot())
err = c.SetSnapshot(ctx, key, fixture.snapshot())
assert.EqualError(t, err, context.Canceled.Error())

// Now reset the snapshot with a consuming channel. This verifies that if setting the snapshot fails,
Expand Down Expand Up @@ -269,14 +275,15 @@ func TestSnapshotCacheDeltaWatchCancel(t *testing.T) {
c := cache.NewSnapshotCache(true, group{}, logger{t: t})
for _, typ := range testTypes {
responses := make(chan cache.DeltaResponse, 1)
state := stream.NewStreamState(false, make(map[string]string))
cancel := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
state := stream.NewSubscriptionState(false, make(map[string]string))
cancel, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: key,
},
TypeUrl: typ,
ResourceNamesSubscribe: names[typ],
}, state, responses)
require.NoError(t, err)

// Cancel the watch
cancel()
Expand Down
25 changes: 13 additions & 12 deletions pkg/cache/v3/linear.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package cache
import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -163,19 +164,19 @@ func (cache *LinearCache) notifyAll(modified map[string]struct{}) {
}

for id, watch := range cache.deltaWatches {
if !watch.WatchesResources(modified) {
if !watch.subscriptionState.WatchesResources(modified) {
continue
}

res := cache.respondDelta(watch.Request, watch.Response, watch.clientState)
res := cache.respondDelta(watch.Request, watch.Response, watch.subscriptionState)
if res != nil {
delete(cache.deltaWatches, id)
}
}
}
}

func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaResponse, clientState ClientState) *RawDeltaResponse {
func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaResponse, clientState SubscriptionState) *RawDeltaResponse {
resp := createDeltaResponse(context.Background(), request, clientState, resourceContainer{
resourceMap: cache.resources,
versionMap: cache.versionMap,
Expand Down Expand Up @@ -297,10 +298,10 @@ func (cache *LinearCache) GetResources() map[string]types.Resource {
return resources
}

func (cache *LinearCache) CreateWatch(request *Request, clientState ClientState, value chan Response) func() {
func (cache *LinearCache) CreateWatch(request *Request, _ SubscriptionState, value chan Response) (func(), error) {
if request.TypeUrl != cache.typeURL {
value <- nil
return nil
return nil, fmt.Errorf("request type %s does not match cache type %s", request.TypeUrl, cache.typeURL)
}
// If the version is not up to date, check whether any requested resource has
// been updated between the last version and the current version. This avoids the problem
Expand Down Expand Up @@ -336,7 +337,7 @@ func (cache *LinearCache) CreateWatch(request *Request, clientState ClientState,
}
if stale {
cache.respond(value, staleResources)
return nil
return nil, nil
}
// Create open watches since versions are up to date.
if len(request.ResourceNames) == 0 {
Expand All @@ -345,7 +346,7 @@ func (cache *LinearCache) CreateWatch(request *Request, clientState ClientState,
cache.mu.Lock()
defer cache.mu.Unlock()
delete(cache.watchAll, value)
}
}, nil
}
for _, name := range request.ResourceNames {
set, exists := cache.watches[name]
Expand All @@ -367,10 +368,10 @@ func (cache *LinearCache) CreateWatch(request *Request, clientState ClientState,
delete(cache.watches, name)
}
}
}
}, nil
}

func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, clientState ClientState, value chan DeltaResponse) func() {
func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, clientState SubscriptionState, value chan DeltaResponse) (func(), error) {
cache.mu.Lock()
defer cache.mu.Unlock()

Expand Down Expand Up @@ -398,12 +399,12 @@ func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, clientState Cl
cache.typeURL, clientState.GetSubscribedResources(), cache.getVersion())
}

cache.deltaWatches[watchID] = DeltaResponseWatch{Request: request, Response: value, clientState: clientState}
cache.deltaWatches[watchID] = DeltaResponseWatch{Request: request, Response: value, subscriptionState: clientState}

return cache.cancelDeltaWatch(watchID)
return cache.cancelDeltaWatch(watchID), nil
}

return nil
return nil, nil
}

func (cache *LinearCache) updateVersionMap(modified map[string]struct{}) error {
Expand Down
Loading

0 comments on commit db72c18

Please sign in to comment.