Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

delta: [#558] Support dynamic wildcard subscription in delta-xds #559

Merged
merged 6 commits into from
Jun 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 8 additions & 11 deletions pkg/cache/v3/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,29 +53,26 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St
}

// Compute resources for removal
// The resource version can be set to "" here to trigger a removal even if never returned before
alecholmez marked this conversation as resolved.
Show resolved Hide resolved
for name := range state.GetResourceVersions() {
if _, ok := resources.resourceMap[name]; !ok {
toRemove = append(toRemove, name)
}
}
default:
// Reply only with the requested resources
nextVersionMap = make(map[string]string, len(state.GetResourceVersions()))
for name, prevVersion := range state.GetResourceVersions() {
nextVersionMap = make(map[string]string, len(state.GetSubscribedResourceNames()))
// state.GetResourceVersions() may include resources no longer subscribed
// In the current code this gets silently cleaned when updating the version map
for name := range state.GetSubscribedResourceNames() {
prevVersion, found := state.GetResourceVersions()[name]
if r, ok := resources.resourceMap[name]; ok {
nextVersion := resources.versionMap[name]
if prevVersion != nextVersion {
filtered = append(filtered, r)
}
nextVersionMap[name] = nextVersion
} else {
// We track non-existent resources for non-wildcard streams until the client explicitly unsubscribes from them.
alecholmez marked this conversation as resolved.
Show resolved Hide resolved
nextVersionMap[name] = ""
// The version check is to make sure we are only sending an update once right after removal.
// If the client keeps the subscription, we skip the add for every subsequent response.
if prevVersion != "" {
toRemove = append(toRemove, name)
}
} else if found {
toRemove = append(toRemove, name)
}
}
}
Expand Down
10 changes: 8 additions & 2 deletions pkg/cache/v3/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,17 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
// all resources as well as individual resource removals
for _, typ := range testTypes {
watches[typ] = make(chan cache.DeltaResponse, 1)
state := stream.NewStreamState(false, versionMap[typ])
for resource := range versionMap[typ] {
state.GetSubscribedResourceNames()[resource] = struct{}{}
}
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: "node",
},
TypeUrl: typ,
ResourceNamesSubscribe: names[typ],
}, stream.NewStreamState(false, versionMap[typ]), watches[typ])
}, state, watches[typ])
}

if count := c.GetStatusInfo(key).GetNumDeltaWatches(); count != len(testTypes) {
Expand Down Expand Up @@ -221,13 +225,15 @@ func TestSnapshotDeltaCacheWatchTimeout(t *testing.T) {

// Create a non-buffered channel that will block sends.
watchCh := make(chan cache.DeltaResponse)
state := stream.NewStreamState(false, nil)
state.SetSubscribedResourceNames(map[string]struct{}{names[rsrc.EndpointType][0]: {}})
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: key,
},
TypeUrl: rsrc.EndpointType,
ResourceNamesSubscribe: names[rsrc.EndpointType],
}, stream.NewStreamState(false, map[string]string{names[rsrc.EndpointType][0]: ""}), watchCh)
}, state, watchCh)

// The first time we set the snapshot without consuming from the blocking channel, so this should time out.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
Expand Down
6 changes: 5 additions & 1 deletion pkg/cache/v3/linear.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ func (cache *LinearCache) notifyAll(modified map[string]struct{}) {
}

for id, watch := range cache.deltaWatches {
if !watch.StreamState.WatchesResources(modified) {
continue
}

res := cache.respondDelta(watch.Request, watch.Response, watch.StreamState)
if res != nil {
delete(cache.deltaWatches, id)
Expand Down Expand Up @@ -391,7 +395,7 @@ func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.S
watchID := cache.nextDeltaWatchID()
if cache.log != nil {
cache.log.Infof("[linear cache] open delta watch ID:%d for %s Resources:%v, system version %q", watchID,
cache.typeURL, state.GetResourceVersions(), cache.getVersion())
cache.typeURL, state.GetSubscribedResourceNames(), cache.getVersion())
}

cache.deltaWatches[watchID] = DeltaResponseWatch{Request: request, Response: value, StreamState: state}
Expand Down
22 changes: 16 additions & 6 deletions pkg/cache/v3/linear_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,13 +487,15 @@ func TestLinearDeltaExistingResources(t *testing.T) {
err = c.UpdateResource("b", b)
assert.NoError(t, err)

state := stream.NewStreamState(false, map[string]string{"b": "", "c": ""}) // watching b and c - not interested in a
state := stream.NewStreamState(false, nil)
state.SetSubscribedResourceNames(map[string]struct{}{"b": {}, "c": {}}) // watching b and c - not interested in a
w := make(chan DeltaResponse, 1)
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w)
checkDeltaWatchCount(t, c, 0)
verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}}, []string{})

state = stream.NewStreamState(false, map[string]string{"a": "", "b": ""})
state = stream.NewStreamState(false, nil)
state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}})
w = make(chan DeltaResponse, 1)
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w)
checkDeltaWatchCount(t, c, 0)
Expand All @@ -511,13 +513,15 @@ func TestLinearDeltaInitialResourcesVersionSet(t *testing.T) {
err = c.UpdateResource("b", b)
assert.NoError(t, err)

state := stream.NewStreamState(false, map[string]string{"a": "", "b": hashB})
state := stream.NewStreamState(false, map[string]string{"b": hashB})
state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}})
w := make(chan DeltaResponse, 1)
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w)
checkDeltaWatchCount(t, c, 0)
verifyDeltaResponse(t, w, []resourceInfo{{"a", hashA}}, nil) // b is up to date and shouldn't be returned

state = stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB})
state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}})
w = make(chan DeltaResponse, 1)
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w)
mustBlockDelta(t, w)
Expand All @@ -543,14 +547,16 @@ func TestLinearDeltaResourceUpdate(t *testing.T) {
// There is currently no delta watch
checkVersionMapNotSet(t, c)

state := stream.NewStreamState(false, map[string]string{"a": "", "b": ""})
state := stream.NewStreamState(false, nil)
state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}})
w := make(chan DeltaResponse, 1)
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w)
checkDeltaWatchCount(t, c, 0)
verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}, nil)
checkVersionMapSet(t, c)

state = stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB})
state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}})
w = make(chan DeltaResponse, 1)
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w)
mustBlockDelta(t, w)
Expand All @@ -577,13 +583,15 @@ func TestLinearDeltaResourceDelete(t *testing.T) {
err = c.UpdateResource("b", b)
assert.NoError(t, err)

state := stream.NewStreamState(false, map[string]string{"a": "", "b": ""})
state := stream.NewStreamState(false, nil)
state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}})
w := make(chan DeltaResponse, 1)
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w)
checkDeltaWatchCount(t, c, 0)
verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}, nil)

state = stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB})
state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}})
w = make(chan DeltaResponse, 1)
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w)
mustBlockDelta(t, w)
Expand All @@ -600,7 +608,8 @@ func TestLinearDeltaResourceDelete(t *testing.T) {
func TestLinearDeltaMultiResourceUpdates(t *testing.T) {
c := NewLinearCache(testType)

state := stream.NewStreamState(false, map[string]string{"a": "", "b": ""})
state := stream.NewStreamState(false, nil)
state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}})
w := make(chan DeltaResponse, 1)
checkVersionMapNotSet(t, c)
assert.Equal(t, 0, c.NumResources())
Expand Down Expand Up @@ -745,6 +754,7 @@ func TestLinearMixedWatches(t *testing.T) {
checkVersionMapNotSet(t, c)

deltaState := stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB})
deltaState.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}})
wd := make(chan DeltaResponse, 1)

// Initial update
Expand Down
6 changes: 3 additions & 3 deletions pkg/cache/v3/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func (cache *snapshotCache) ClearSnapshot(node string) {

// nameSet creates a map from a string slice to value true.
func nameSet(names []string) map[string]bool {
set := make(map[string]bool)
set := make(map[string]bool, len(names))
for _, name := range names {
set[name] = true
}
Expand Down Expand Up @@ -498,9 +498,9 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream
watchID := cache.nextDeltaWatchID()

if exists {
cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q, version %q", watchID, t, state.GetResourceVersions(), nodeID, snapshot.GetVersion(t))
cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q, version %q", watchID, t, state.GetSubscribedResourceNames(), nodeID, snapshot.GetVersion(t))
} else {
cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q", watchID, t, state.GetResourceVersions(), nodeID)
cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q", watchID, t, state.GetSubscribedResourceNames(), nodeID)
}

info.setDeltaResponseWatch(watchID, DeltaResponseWatch{Request: request, Response: value, StreamState: state})
Expand Down
42 changes: 33 additions & 9 deletions pkg/server/delta/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,17 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
if !ok {
// Initialize the state of the stream.
// Since there was no previous state, we know we're handling the first request of this type
// so we set the initial resource versions if we have any, and also signal if this stream is in wildcard mode.
// so we set the initial resource versions if we have any.
// We also set the stream as wildcard based on its legacy meaning (no resource name sent in resource_names_subscribe).
// If the state starts with this legacy mode, adding new resources will not unsubscribe from wildcard.
// It can still be done by explicitly unsubscribing from "*"
watch.state = stream.NewStreamState(len(req.GetResourceNamesSubscribe()) == 0, req.GetInitialResourceVersions())
} else {
watch.Cancel()
}

s.subscribe(req.GetResourceNamesSubscribe(), watch.state.GetResourceVersions())
s.unsubscribe(req.GetResourceNamesUnsubscribe(), watch.state.GetResourceVersions())
s.subscribe(req.GetResourceNamesSubscribe(), &watch.state)
s.unsubscribe(req.GetResourceNamesUnsubscribe(), &watch.state)

watch.responses = make(chan cache.DeltaResponse, 1)
watch.cancel = s.cache.CreateDeltaWatch(req, watch.state, watch.responses)
Expand Down Expand Up @@ -210,17 +213,38 @@ func (s *server) DeltaStreamHandler(str stream.DeltaStream, typeURL string) erro
}

// When we subscribe, we just want to make the cache know we are subscribing to a resource.
// Providing a name with an empty version is enough to make that happen.
func (s *server) subscribe(resources []string, sv map[string]string) {
// Even if the stream is wildcard, we keep the list of explicitly subscribed resources as the wildcard subscription can be discarded later on.
func (s *server) subscribe(resources []string, streamState *stream.StreamState) {
sv := streamState.GetSubscribedResourceNames()
for _, resource := range resources {
sv[resource] = ""
if resource == "*" {
streamState.SetWildcard(true)
continue
}
sv[resource] = struct{}{}
}
}

// Unsubscriptions remove resources from the stream state to
// indicate to the cache that we don't care about the resource anymore
func (s *server) unsubscribe(resources []string, sv map[string]string) {
// Unsubscriptions remove resources from the stream's subscribed resource list.
// If a client explicitly unsubscribes from a wildcard request, the stream is updated and now watches only subscribed resources.
func (s *server) unsubscribe(resources []string, streamState *stream.StreamState) {
sv := streamState.GetSubscribedResourceNames()
for _, resource := range resources {
if resource == "*" {
streamState.SetWildcard(false)
continue
}
if _, ok := sv[resource]; ok && streamState.IsWildcard() {
// The XDS protocol states that:
// * if a watch is currently wildcard
// * a resource is explicitly unsubscribed by name
// Then the control-plane must return in the response whether the resource is removed (if no longer present for this node)
// or still existing. In the latter case the entire resource must be returned, same as if it had been created or updated
// To achieve that, we mark the resource as having been returned with an empty version. While creating the response, the cache will either:
// * detect the version change, and return the resource (as an update)
// * detect the resource deletion, and set it as removed in the response
streamState.GetResourceVersions()[resource] = ""
}
delete(sv, resource)
}
}
51 changes: 45 additions & 6 deletions pkg/server/stream/v3/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,54 @@ type DeltaStream interface {

// StreamState will keep track of resource state per type on a stream.
type StreamState struct { // nolint:golint,revive
// Indicates whether the original DeltaRequest was a wildcard LDS/RDS request.
// Indicates whether the delta stream currently has a wildcard watch
wildcard bool

// Provides the list of resources explicitly requested by the client
// This list might be non-empty even when set as wildcard
subscribedResourceNames map[string]struct{}

// ResourceVersions contains a hash of the resource as the value and the resource name as the key.
// This field stores the last state sent to the client.
resourceVersions map[string]string

// knownResourceNames contains resource names that a client has received previously
knownResourceNames map[string]map[string]struct{}

// indicates whether the object has beed modified since its creation
// indicates whether the object has been modified since its creation
first bool
}

// GetSubscribedResourceNames returns the list of resources currently explicitly subscribed to
// If the request is set to wildcard it may be empty
// Currently populated only when using delta-xds
func (s *StreamState) GetSubscribedResourceNames() map[string]struct{} {
alecholmez marked this conversation as resolved.
Show resolved Hide resolved
return s.subscribedResourceNames
}

// SetSubscribedResourceNames is setting the list of resources currently explicitly subscribed to
// It is decorrelated from the wildcard state of the stream
// Currently used only when using delta-xds
func (s *StreamState) SetSubscribedResourceNames(subscribedResourceNames map[string]struct{}) {
s.subscribedResourceNames = subscribedResourceNames
}

// WatchesResources returns whether at least one of the resource provided is currently watch by the stream
// It is currently only applicable to delta-xds
// If the request is wildcard, it will always return true
// Otherwise it will compare the provided resources to the list of resources currently subscribed
func (s *StreamState) WatchesResources(resourceNames map[string]struct{}) bool {
alecholmez marked this conversation as resolved.
Show resolved Hide resolved
if s.IsWildcard() {
return true
}
for resourceName := range resourceNames {
if _, ok := s.subscribedResourceNames[resourceName]; ok {
return true
}
}
return false
}

func (s *StreamState) GetResourceVersions() map[string]string {
return s.resourceVersions
}
Expand All @@ -50,6 +84,10 @@ func (s *StreamState) IsFirst() bool {
return s.first
}

func (s *StreamState) SetWildcard(wildcard bool) {
s.wildcard = wildcard
}

func (s *StreamState) IsWildcard() bool {
return s.wildcard
}
Expand All @@ -73,10 +111,11 @@ func (s *StreamState) GetKnownResourceNames(url string) map[string]struct{} {
// NewStreamState initializes a stream state.
func NewStreamState(wildcard bool, initialResourceVersions map[string]string) StreamState {
state := StreamState{
wildcard: wildcard,
resourceVersions: initialResourceVersions,
first: true,
knownResourceNames: map[string]map[string]struct{}{},
wildcard: wildcard,
subscribedResourceNames: map[string]struct{}{},
resourceVersions: initialResourceVersions,
first: true,
knownResourceNames: map[string]map[string]struct{}{},
}

if initialResourceVersions == nil {
Expand Down
Loading