Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

incremental: support for linear and mux caches #459

Merged
merged 14 commits into from
Aug 16, 2021
Merged
12 changes: 10 additions & 2 deletions pkg/cache/v3/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,16 @@ import (
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
)

type ResourceContainer interface {
GetResources(typeURL string) map[string]types.Resource

GetVersionMap() map[string]map[string]string

GetVersion(typeURL string) string
}
alecholmez marked this conversation as resolved.
Show resolved Hide resolved

// Respond to a delta watch with the provided snapshot value. If the response is nil, there has been no state change.
func respondDelta(request *DeltaRequest, value chan DeltaResponse, state stream.StreamState, snapshot Snapshot, log log.Logger) *RawDeltaResponse {
func respondDelta(request *DeltaRequest, value chan DeltaResponse, state stream.StreamState, snapshot ResourceContainer, log log.Logger) *RawDeltaResponse {
resp, err := createDeltaResponse(request, state, snapshot, log)
if err != nil {
if log != nil {
Expand All @@ -42,7 +50,7 @@ func respondDelta(request *DeltaRequest, value chan DeltaResponse, state stream.
return nil
}

func createDeltaResponse(req *DeltaRequest, state stream.StreamState, snapshot Snapshot, log log.Logger) (*RawDeltaResponse, error) {
func createDeltaResponse(req *DeltaRequest, state stream.StreamState, snapshot ResourceContainer, log log.Logger) (*RawDeltaResponse, error) {
resources := snapshot.GetResources((req.TypeUrl))

// variables to build our response with
Expand Down
121 changes: 118 additions & 3 deletions pkg/cache/v3/linear.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"

"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/log"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
)

Expand All @@ -42,13 +44,42 @@ type LinearCache struct {
watches map[string]watches
// Set of watches for all resources in the collection
watchAll watches
// Set of delta watches. A delta watch always contain the list of subscribed resources
// together with its current version
// version and versionPrefix fields are ignored for delta watches, because we always generate the resource version
deltaWatches map[int64]DeltaResponseWatch
// Continously incremented counter used to index delta watches
deltaWatchCount int64
alecholmez marked this conversation as resolved.
Show resolved Hide resolved
// versionMap holds the current hash map of all resources in the cache.
// versionMap is only to be used with delta xDS.
versionMap map[string]string
// Continously incremented version
version uint64
// Version prefix to be sent to the clients
versionPrefix string
// Versions for each resource by name.
versionVector map[string]uint64
mu sync.RWMutex

log log.Logger

mu sync.RWMutex
}

// This struct exist only because we want to reuse respondDelta function
type cacheWrapper struct {
alecholmez marked this conversation as resolved.
Show resolved Hide resolved
cache *LinearCache
}

func (w *cacheWrapper) GetResources(typeURL string) map[string]types.Resource {
return w.cache.resources
}

func (w *cacheWrapper) GetVersionMap() map[string]map[string]string {
return map[string]map[string]string{w.cache.typeURL: w.cache.versionMap}
}

func (w *cacheWrapper) GetVersion(typeURL string) string {
return w.cache.getVersion()
}

var _ Cache = &LinearCache{}
Expand All @@ -75,13 +106,21 @@ func WithInitialResources(resources map[string]types.Resource) LinearCacheOption
}
}

func WithLogger(log log.Logger) LinearCacheOption {
return func(cache *LinearCache) {
cache.log = log
}
}

// NewLinearCache creates a new cache. See the comments on the struct definition.
func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache {
out := &LinearCache{
typeURL: typeURL,
resources: make(map[string]types.Resource),
watches: make(map[string]watches),
watchAll: make(watches),
deltaWatches: make(map[int64]DeltaResponseWatch),
versionMap: make(map[string]string),
version: 0,
versionVector: make(map[string]uint64),
}
Expand Down Expand Up @@ -111,7 +150,7 @@ func (cache *LinearCache) respond(value chan Response, staleResources []string)
value <- &RawResponse{
Request: &Request{TypeUrl: cache.typeURL},
Resources: resources,
Version: cache.versionPrefix + strconv.FormatUint(cache.version, 10),
Version: cache.getVersion(),
}
}

Expand All @@ -131,6 +170,14 @@ func (cache *LinearCache) notifyAll(modified map[string]struct{}) {
cache.respond(value, nil)
}
cache.watchAll = make(watches)

cache.updateVersionMap(modified)

for id, watch := range cache.deltaWatches {
if respondDelta(watch.Request, watch.Response, watch.StreamState, &cacheWrapper{cache}, cache.log) != nil {
delete(cache.deltaWatches, id)
}
}
}

// UpdateResource updates a resource in the collection.
Expand Down Expand Up @@ -277,7 +324,68 @@ func (cache *LinearCache) CreateWatch(request *Request, value chan Response) fun
}

func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState) (chan DeltaResponse, func()) {
return nil, nil
cache.mu.Lock()
defer cache.mu.Unlock()
value := make(chan DeltaResponse, 1)

// if respondDelta returns nil this means that there is no change in any resource version from the previous snapshot
alecholmez marked this conversation as resolved.
Show resolved Hide resolved
// create a new watch accordingly
if respondDelta(request, value, state, &cacheWrapper{cache}, cache.log) == nil {
watchID := cache.nextDeltaWatchID()
if cache.log != nil {
cache.log.Infof("open delta watch ID:%d for %s Resources:%v, system version %q", watchID,
alecholmez marked this conversation as resolved.
Show resolved Hide resolved
cache.typeURL, state.ResourceVersions, cache.getVersion())
}

cache.deltaWatches[watchID] = DeltaResponseWatch{Request: request, Response: value, StreamState: state}

return value, cache.cancelDeltaWatch(watchID)
}

return value, nil
}

func (cache *LinearCache) updateVersionMap(modified map[string]struct{}) error {
for name, r := range cache.resources {
//skip recalculating hash for the resoces that weren't modified
alecholmez marked this conversation as resolved.
Show resolved Hide resolved
if _, ok := modified[name]; !ok {
continue
}
// hash our verison in here and build the version map
marshaledResource, err := MarshalResource(r)
if err != nil {
return err
}
v := HashResource(marshaledResource)
if v == "" {
return errors.New("failed to build resource version")
}

cache.versionMap[GetResourceName(r)] = v
}
for name := range modified {
if r, ok := cache.resources[name]; !ok {
delete(cache.versionMap, GetResourceName(r))
}
}
return nil
}

func (cache *LinearCache) getVersion() string {
return cache.versionPrefix + strconv.FormatUint(cache.version, 10)
}

// cancellation function for cleaning stale watches
func (cache *LinearCache) cancelDeltaWatch(watchID int64) func() {
return func() {
cache.mu.Lock()
defer cache.mu.Unlock()
delete(cache.deltaWatches, watchID)
}
}

func (cache *LinearCache) nextDeltaWatchID() int64 {
return atomic.AddInt64(&cache.deltaWatchCount, 1)
}

func (cache *LinearCache) Fetch(ctx context.Context, request *Request) (Response, error) {
Expand All @@ -290,3 +398,10 @@ func (cache *LinearCache) NumWatches(name string) int {
defer cache.mu.RUnlock()
return len(cache.watches[name]) + len(cache.watchAll)
}

// Number of active delta watches.
func (cache *LinearCache) NumDeltaWatches() int {
cache.mu.Lock()
defer cache.mu.Unlock()
return len(cache.deltaWatches)
}
Loading