Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sotw: [#558] Support dynamic wildcard subscription in sotw-xds #571

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/cache/v3/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ type RawResponse struct {
// Resources to be included in the response.
Resources []types.ResourceWithTTL

// NextVersionMap maps the resource name to the empty string for resources that were included in the response.
NextVersionMap map[string]string

// Whether this is a heartbeat response. For xDS versions that support TTL, this
// will be converted into a response that doesn't contain the actual resource protobuf.
// This allows for more lightweight updates that server only to update the TTL timer.
Expand Down
4 changes: 2 additions & 2 deletions pkg/cache/v3/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
Id: "node",
},
TypeUrl: typ,
ResourceNamesSubscribe: names[typ],
ResourceNamesSubscribe: nil,
}, stream.NewStreamState(true, nil), watches[typ])
}

Expand Down Expand Up @@ -226,7 +226,7 @@ func TestSnapshotDeltaCacheWatchTimeout(t *testing.T) {
// Create a non-buffered channel that will block sends.
watchCh := make(chan cache.DeltaResponse)
state := stream.NewStreamState(false, nil)
state.SetSubscribedResourceNames(map[string]struct{}{names[rsrc.EndpointType][0]: {}})
state.SubscribeToResources(names[rsrc.EndpointType][:1])
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: key,
Expand Down
93 changes: 64 additions & 29 deletions pkg/cache/v3/linear.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,27 +114,43 @@ func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache {
}

func (cache *LinearCache) respond(value chan Response, staleResources []string) {
var resources []types.ResourceWithTTL
// TODO: optimize the resources slice creations across different clients
if len(staleResources) == 0 {
resources = make([]types.ResourceWithTTL, 0, len(cache.resources))
for _, resource := range cache.resources {
resources := make([]types.ResourceWithTTL, 0, len(staleResources))
resourceVersions := make(map[string]string, len(staleResources))
for _, name := range staleResources {
resource := cache.resources[name]
if resource != nil {
resources = append(resources, types.ResourceWithTTL{Resource: resource})
}
} else {
resources = make([]types.ResourceWithTTL, 0, len(staleResources))
for _, name := range staleResources {
resource := cache.resources[name]
if resource != nil {
resources = append(resources, types.ResourceWithTTL{Resource: resource})
}
resourceVersions[name] = ""
}
}
value <- &RawResponse{
Request: &Request{TypeUrl: cache.typeURL},
Resources: resources,
Version: cache.getVersion(),
Ctx: context.Background(),
Request: &Request{TypeUrl: cache.typeURL},
Resources: resources,
NextVersionMap: resourceVersions,
Version: cache.getVersion(),
Ctx: context.Background(),
}
}

func (cache *LinearCache) respondWildcards(respChannels map[chan Response]struct{}) {
if len(respChannels) == 0 {
return
}

resources := make([]types.ResourceWithTTL, 0, len(cache.resources))
resourceVersions := make(map[string]string, len(cache.resources))
for name, resource := range cache.resources {
resources = append(resources, types.ResourceWithTTL{Resource: resource})
resourceVersions[name] = ""
}
for respChannel := range respChannels {
respChannel <- &RawResponse{
Request: &Request{TypeUrl: cache.typeURL},
Resources: resources,
NextVersionMap: resourceVersions,
Version: cache.getVersion(),
Ctx: context.Background(),
}
}
}

Expand All @@ -150,9 +166,8 @@ func (cache *LinearCache) notifyAll(modified map[string]struct{}) {
for value, stale := range notifyList {
cache.respond(value, stale)
}
for value := range cache.watchAll {
cache.respond(value, nil)
}

cache.respondWildcards(cache.watchAll)
cache.watchAll = make(watches)

// Building the version map has a very high cost when using SetResources to do full updates.
Expand Down Expand Up @@ -303,11 +318,12 @@ func (cache *LinearCache) CreateWatch(request *Request, streamState stream.Strea
value <- nil
return nil
}

// If the version is not up to date, check whether any requested resource has
// been updated between the last version and the current version. This avoids the problem
// of sending empty updates whenever an irrelevant resource changes.
stale := false
staleResources := []string{} // empty means all
var staleResources map[string]struct{}

// strip version prefix if it is present
var lastVersion uint64
Expand All @@ -322,33 +338,49 @@ func (cache *LinearCache) CreateWatch(request *Request, streamState stream.Strea
defer cache.mu.Unlock()

if err != nil {
// The request does not include a version or the version could not be parsed.
// Send all resources matching the request with no regards to the version.
stale = true
staleResources = request.ResourceNames
} else if len(request.ResourceNames) == 0 {
if !streamState.IsWildcard() {
staleResources = streamState.GetSubscribedResourceNames()
}
} else if streamState.IsWildcard() {
stale = lastVersion != cache.version
} else {
for _, name := range request.ResourceNames {
staleResources = map[string]struct{}{}
for name := range streamState.GetSubscribedResourceNames() {
// When a resource is removed, its version defaults 0 and it is not considered stale.
if lastVersion < cache.versionVector[name] {
stale = true
staleResources = append(staleResources, name)
staleResources[name] = struct{}{}
}
}
}

if stale {
cache.respond(value, staleResources)
if streamState.IsWildcard() {
cache.respondWildcards(map[chan Response]struct{}{value: {}})
} else {
resourcesToSend := make([]string, 0, len(staleResources))
for name := range staleResources {
resourcesToSend = append(resourcesToSend, name)
}
cache.respond(value, resourcesToSend)
}
return nil
}

// Create open watches since versions are up to date.
if len(request.ResourceNames) == 0 {
if streamState.IsWildcard() {
cache.watchAll[value] = struct{}{}
return func() {
cache.mu.Lock()
defer cache.mu.Unlock()
delete(cache.watchAll, value)
}
}
for _, name := range request.ResourceNames {

for name := range streamState.GetSubscribedResourceNames() {
set, exists := cache.watches[name]
if !exists {
set = make(watches)
Expand All @@ -359,7 +391,10 @@ func (cache *LinearCache) CreateWatch(request *Request, streamState stream.Strea
return func() {
cache.mu.Lock()
defer cache.mu.Unlock()
for _, name := range request.ResourceNames {
// This creates a dependency on the streamstate not being altered between the call to CreateWatch
// and the call to the cancel method.
// It is currently enforced in the sotw server logic.
for name := range streamState.GetSubscribedResourceNames() {
set, exists := cache.watches[name]
if exists {
delete(set, value)
Expand Down
Loading