Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

incremental: support for linear and mux caches #459

Merged
merged 14 commits into from
Aug 16, 2021
Merged
53 changes: 14 additions & 39 deletions pkg/cache/v3/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,54 +18,29 @@ import (
"context"

"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/log"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
)

// Respond to a delta watch with the provided snapshot value. If the response is nil, there has been no state change.
func respondDelta(ctx context.Context, request *DeltaRequest, value chan DeltaResponse, state stream.StreamState, snapshot Snapshot, log log.Logger) (*RawDeltaResponse, error) {
resp, err := createDeltaResponse(ctx, request, state, snapshot)
if err != nil {
if log != nil {
log.Errorf("Error creating delta response: %v", err)
}
return nil, nil
}

// Only send a response if there were changes
// We want to respond immediately for the first wildcard request in a stream, even if the response is empty
// otherwise, envoy won't complete initialization
if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 || (state.IsWildcard() && state.IsFirst()) {
if log != nil {
log.Debugf("node: %s, sending delta response with resources: %v removed resources %v wildcard: %t",
request.GetNode().GetId(), resp.Resources, resp.RemovedResources, state.IsWildcard())
}
select {
case value <- resp:
return resp, nil
case <-ctx.Done():
return resp, context.Canceled
}
}
return nil, nil
// groups together resource-related arguments for the createDeltaResponse function
type resourceContainer struct {
alecholmez marked this conversation as resolved.
Show resolved Hide resolved
resourceMap map[string]types.Resource
versionMap map[string]string
systemVersion string
}

// nolint:unparam // result 1 (error) is always nil (unparam)
func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.StreamState, snapshot Snapshot) (*RawDeltaResponse, error) {
resources := snapshot.GetResources((req.TypeUrl))

func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.StreamState, resources resourceContainer) *RawDeltaResponse {
// variables to build our response with
nextVersionMap := make(map[string]string)
filtered := make([]types.Resource, 0, len(resources))
filtered := make([]types.Resource, 0, len(resources.resourceMap))
toRemove := make([]string, 0)

// If we are handling a wildcard request, we want to respond with all resources
switch {
case state.IsWildcard():
for name, r := range resources {
for name, r := range resources.resourceMap {
// Since we've already precomputed the version hashes of the new snapshot,
// we can just set it here to be used for comparison later
version := snapshot.GetVersionMap()[req.TypeUrl][name]
alecholmez marked this conversation as resolved.
Show resolved Hide resolved
version := resources.versionMap[name]
nextVersionMap[name] = version
prevVersion, found := state.GetResourceVersions()[name]
if !found || (prevVersion != nextVersionMap[name]) {
Expand All @@ -75,8 +50,8 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St
default:
// Reply only with the requested resources
for name, prevVersion := range state.GetResourceVersions() {
if r, ok := resources[name]; ok {
nextVersion := snapshot.GetVersionMap()[req.TypeUrl][name]
if r, ok := resources.resourceMap[name]; ok {
nextVersion := resources.versionMap[name]
if prevVersion != nextVersion {
filtered = append(filtered, r)
}
Expand All @@ -87,7 +62,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St

// Compute resources for removal regardless of the request type
for name := range state.GetResourceVersions() {
if _, ok := resources[name]; !ok {
if _, ok := resources.resourceMap[name]; !ok {
toRemove = append(toRemove, name)
}
}
Expand All @@ -97,7 +72,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St
Resources: filtered,
RemovedResources: toRemove,
NextVersionMap: nextVersionMap,
SystemVersionInfo: snapshot.GetVersion(req.TypeUrl),
SystemVersionInfo: resources.systemVersion,
Ctx: ctx,
}, nil
}
}
123 changes: 121 additions & 2 deletions pkg/cache/v3/linear.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"

"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/log"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
)

Expand All @@ -42,13 +44,25 @@ type LinearCache struct {
watches map[string]watches
// Set of watches for all resources in the collection
watchAll watches
// Set of delta watches. A delta watch always contain the list of subscribed resources
// together with its current version
// version and versionPrefix fields are ignored for delta watches, because we always generate the resource version.
deltaWatches map[int64]DeltaResponseWatch
// Continously incremented counter used to index delta watches.
deltaWatchCount int64
alecholmez marked this conversation as resolved.
Show resolved Hide resolved
// versionMap holds the current hash map of all resources in the cache.
// versionMap is only to be used with delta xDS.
versionMap map[string]string
// Continuously incremented version.
version uint64
// Version prefix to be sent to the clients
versionPrefix string
// Versions for each resource by name.
versionVector map[string]uint64
mu sync.RWMutex

log log.Logger

mu sync.RWMutex
}

var _ Cache = &LinearCache{}
Expand All @@ -75,13 +89,21 @@ func WithInitialResources(resources map[string]types.Resource) LinearCacheOption
}
}

func WithLogger(log log.Logger) LinearCacheOption {
return func(cache *LinearCache) {
cache.log = log
}
}

// NewLinearCache creates a new cache. See the comments on the struct definition.
func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache {
out := &LinearCache{
typeURL: typeURL,
resources: make(map[string]types.Resource),
watches: make(map[string]watches),
watchAll: make(watches),
deltaWatches: make(map[int64]DeltaResponseWatch),
versionMap: make(map[string]string),
version: 0,
versionVector: make(map[string]uint64),
}
Expand Down Expand Up @@ -111,7 +133,7 @@ func (cache *LinearCache) respond(value chan Response, staleResources []string)
value <- &RawResponse{
Request: &Request{TypeUrl: cache.typeURL},
Resources: resources,
Version: cache.versionPrefix + strconv.FormatUint(cache.version, 10),
Version: cache.getVersion(),
}
}

Expand All @@ -131,6 +153,34 @@ func (cache *LinearCache) notifyAll(modified map[string]struct{}) {
cache.respond(value, nil)
}
cache.watchAll = make(watches)

cache.updateVersionMap(modified)

for id, watch := range cache.deltaWatches {
res := cache.respondDelta(watch.Request, watch.Response, watch.StreamState)
if res != nil {
delete(cache.deltaWatches, id)
}
}
}

func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaResponse, state stream.StreamState) *RawDeltaResponse {
resp := createDeltaResponse(context.Background(), request, state, resourceContainer{
resourceMap: cache.resources,
versionMap: cache.versionMap,
systemVersion: cache.getVersion(),
})

// Only send a response if there were changes
if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 {
if cache.log != nil {
cache.log.Debugf("[linear cache] node: %s, sending delta response with resources: %v removed resources %v wildcard: %t",
request.GetNode().GetId(), resp.Resources, resp.RemovedResources, state.IsWildcard())
}
value <- resp
return resp
}
return nil
}

// UpdateResource updates a resource in the collection.
Expand Down Expand Up @@ -277,9 +327,71 @@ func (cache *LinearCache) CreateWatch(request *Request, value chan Response) fun
}

func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) func() {
cache.mu.Lock()
defer cache.mu.Unlock()

response := cache.respondDelta(request, value, state)

// if respondDelta returns nil this means that there is no change in any resource version
// create a new watch accordingly
if response == nil {
watchID := cache.nextDeltaWatchID()
if cache.log != nil {
cache.log.Infof("[linear cache] open delta watch ID:%d for %s Resources:%v, system version %q", watchID,
cache.typeURL, state.GetResourceVersions(), cache.getVersion())
}

cache.deltaWatches[watchID] = DeltaResponseWatch{Request: request, Response: value, StreamState: state}

return cache.cancelDeltaWatch(watchID)
}

return nil
}

func (cache *LinearCache) updateVersionMap(modified map[string]struct{}) error {
for name, r := range cache.resources {
// skip recalculating hash for the resoces that weren't modified
if _, ok := modified[name]; !ok {
continue
}
// hash our verison in here and build the version map
marshaledResource, err := MarshalResource(r)
if err != nil {
return err
}
v := HashResource(marshaledResource)
if v == "" {
return errors.New("failed to build resource version")
}

cache.versionMap[GetResourceName(r)] = v
}
for name := range modified {
if r, ok := cache.resources[name]; !ok {
delete(cache.versionMap, GetResourceName(r))
}
}
return nil
}

func (cache *LinearCache) getVersion() string {
return cache.versionPrefix + strconv.FormatUint(cache.version, 10)
}

// cancellation function for cleaning stale watches
func (cache *LinearCache) cancelDeltaWatch(watchID int64) func() {
return func() {
cache.mu.Lock()
defer cache.mu.Unlock()
delete(cache.deltaWatches, watchID)
}
}

func (cache *LinearCache) nextDeltaWatchID() int64 {
return atomic.AddInt64(&cache.deltaWatchCount, 1)
}

func (cache *LinearCache) Fetch(ctx context.Context, request *Request) (Response, error) {
return nil, errors.New("not implemented")
}
Expand All @@ -290,3 +402,10 @@ func (cache *LinearCache) NumWatches(name string) int {
defer cache.mu.RUnlock()
return len(cache.watches[name]) + len(cache.watchAll)
}

// Number of active delta watches.
func (cache *LinearCache) NumDeltaWatches() int {
cache.mu.Lock()
defer cache.mu.Unlock()
return len(cache.deltaWatches)
}
Loading