diff --git a/pkg/cache/v3/simple.go b/pkg/cache/v3/simple.go index 6e5cebcfbd..4697b74770 100644 --- a/pkg/cache/v3/simple.go +++ b/pkg/cache/v3/simple.go @@ -233,54 +233,93 @@ func (cache *snapshotCache) SetSnapshot(ctx context.Context, node string, snapsh info.mu.Lock() defer info.mu.Unlock() - // responder callback for SOTW watches - respond := func(watch ResponseWatch, id int64) error { - version := snapshot.GetVersion(watch.Request.TypeUrl) - if version != watch.Request.VersionInfo { - cache.log.Debugf("respond open watch %d %s%v with new version %q", id, watch.Request.TypeUrl, watch.Request.ResourceNames, version) - resources := snapshot.GetResourcesAndTTL(watch.Request.TypeUrl) - err := cache.respond(ctx, watch.Request, watch.Response, resources, version, false) - if err != nil { - return err - } - // discard the watch - delete(info.watches, id) + // Respond to SOTW watches for the node. + if err := cache.respondSOTWWatches(ctx, info, snapshot); err != nil { + return err + } + + // Respond to delta watches for the node. + return cache.respondDeltaWatches(ctx, info, snapshot) + } + + return nil +} + +func (cache *snapshotCache) respondSOTWWatches(ctx context.Context, info *statusInfo, snapshot ResourceSnapshot) error { + // responder callback for SOTW watches + respond := func(watch ResponseWatch, id int64) error { + version := snapshot.GetVersion(watch.Request.TypeUrl) + if version != watch.Request.VersionInfo { + cache.log.Debugf("respond open watch %d %s%v with new version %q", id, watch.Request.TypeUrl, watch.Request.ResourceNames, version) + resources := snapshot.GetResourcesAndTTL(watch.Request.TypeUrl) + err := cache.respond(ctx, watch.Request, watch.Response, resources, version, false) + if err != nil { + return err } - return nil + // discard the watch + delete(info.watches, id) } + return nil + } - // If ADS is enabled we need to order response watches so we guarantee - // sending them in the correct order. Go's default implementation - // of maps are randomized order when ranged over. - if cache.ads { - info.orderResponseWatches() - for _, key := range info.orderedWatches { - err := respond(info.watches[key.ID], key.ID) - if err != nil { - return err - } + // If ADS is enabled we need to order response watches so we guarantee + // sending them in the correct order. Go's default implementation + // of maps are randomized order when ranged over. + if cache.ads { + info.orderResponseWatches() + for _, key := range info.orderedWatches { + err := respond(info.watches[key.ID], key.ID) + if err != nil { + return err } - } else { - for id, watch := range info.watches { - err := respond(watch, id) - if err != nil { - return err - } + } + } else { + for id, watch := range info.watches { + err := respond(watch, id) + if err != nil { + return err } } + } + + return nil +} + +func (cache *snapshotCache) respondDeltaWatches(ctx context.Context, info *statusInfo, snapshot ResourceSnapshot) error { + // We only calculate version hashes when using delta. We don't + // want to do this when using SOTW so we can avoid unnecessary + // computational cost if not using delta. + if len(info.deltaWatches) > 0 { + err := snapshot.ConstructVersionMap() + if err != nil { + return err + } + } - // We only calculate version hashes when using delta. We don't - // want to do this when using SOTW so we can avoid unnecessary - // computational cost if not using delta. - if len(info.deltaWatches) > 0 { - err := snapshot.ConstructVersionMap() + // If ADS is enabled we need to order response delta watches so we guarantee + // sending them in the correct order. Go's default implementation + // of maps are randomized order when ranged over. + if cache.ads { + info.orderResponseDeltaWatches() + for _, key := range info.orderedDeltaWatches { + watch := info.deltaWatches[key.ID] + res, err := cache.respondDelta( + ctx, + snapshot, + watch.Request, + watch.Response, + watch.StreamState, + ) if err != nil { return err } + // If we detect a nil response here, that means there has been no state change + // so we don't want to respond or remove any existing resource watches + if res != nil { + delete(info.deltaWatches, key.ID) + } } - - // this won't run if there are no delta watches - // to process. + } else { for id, watch := range info.deltaWatches { res, err := cache.respondDelta( ctx, @@ -299,7 +338,6 @@ func (cache *snapshotCache) SetSnapshot(ctx context.Context, node string, snapsh } } } - return nil } diff --git a/pkg/cache/v3/status.go b/pkg/cache/v3/status.go index 1b3e8f490b..e50f85beff 100644 --- a/pkg/cache/v3/status.go +++ b/pkg/cache/v3/status.go @@ -70,7 +70,8 @@ type statusInfo struct { orderedWatches keys // deltaWatches are indexed channels for the delta response watches and the original requests - deltaWatches map[int64]DeltaResponseWatch + deltaWatches map[int64]DeltaResponseWatch + orderedDeltaWatches keys // the timestamp of the last watch request lastWatchRequestTime time.Time @@ -177,3 +178,22 @@ func (info *statusInfo) orderResponseWatches() { // This is only run when we enable ADS on the cache. sort.Sort(info.orderedWatches) } + +// orderResponseDeltaWatches will track a list of delta watch keys and order them if +// true is passed. +func (info *statusInfo) orderResponseDeltaWatches() { + info.orderedDeltaWatches = make(keys, len(info.deltaWatches)) + + var index int + for id, deltaWatch := range info.deltaWatches { + info.orderedDeltaWatches[index] = key{ + ID: id, + TypeURL: deltaWatch.Request.TypeUrl, + } + index++ + } + + // Sort our list which we can use in the SetSnapshot functions. + // This is only run when we enable ADS on the cache. + sort.Sort(info.orderedDeltaWatches) +}