From ce80713755df9156321b3c66af5ec839def6a654 Mon Sep 17 00:00:00 2001 From: Sergey Matyukevich Date: Thu, 1 Jul 2021 12:05:27 -0600 Subject: [PATCH 1/8] Add delta xDS support to Linear and Mux caches Signed-off-by: Sergey Matyukevich --- pkg/cache/v3/delta.go | 12 +++- pkg/cache/v3/linear.go | 116 +++++++++++++++++++++++++++++++- pkg/cache/v3/linear_test.go | 128 ++++++++++++++++++++++++++++++++++++ pkg/cache/v3/mux.go | 14 +++- pkg/cache/v3/simple.go | 4 +- 5 files changed, 264 insertions(+), 10 deletions(-) diff --git a/pkg/cache/v3/delta.go b/pkg/cache/v3/delta.go index 6a101143ef..3ee61a1d04 100644 --- a/pkg/cache/v3/delta.go +++ b/pkg/cache/v3/delta.go @@ -20,8 +20,16 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) +type ResourceContainer interface { + GetResources(typeURL string) map[string]types.Resource + + GetVersionMap() map[string]map[string]string + + GetVersion(typeURL string) string +} + // Respond to a delta watch with the provided snapshot value. If the response is nil, there has been no state change. -func respondDelta(request *DeltaRequest, value chan DeltaResponse, state stream.StreamState, snapshot Snapshot, log log.Logger) *RawDeltaResponse { +func respondDelta(request *DeltaRequest, value chan DeltaResponse, state stream.StreamState, snapshot ResourceContainer, log log.Logger) *RawDeltaResponse { resp, err := createDeltaResponse(request, state, snapshot, log) if err != nil { if log != nil { @@ -42,7 +50,7 @@ func respondDelta(request *DeltaRequest, value chan DeltaResponse, state stream. return nil } -func createDeltaResponse(req *DeltaRequest, state stream.StreamState, snapshot Snapshot, log log.Logger) (*RawDeltaResponse, error) { +func createDeltaResponse(req *DeltaRequest, state stream.StreamState, snapshot ResourceContainer, log log.Logger) (*RawDeltaResponse, error) { resources := snapshot.GetResources((req.TypeUrl)) // variables to build our response with diff --git a/pkg/cache/v3/linear.go b/pkg/cache/v3/linear.go index bd3c9fb366..ba91af7561 100644 --- a/pkg/cache/v3/linear.go +++ b/pkg/cache/v3/linear.go @@ -20,8 +20,10 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "github.com/envoyproxy/go-control-plane/pkg/cache/types" + "github.com/envoyproxy/go-control-plane/pkg/log" "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) @@ -42,13 +44,42 @@ type LinearCache struct { watches map[string]watches // Set of watches for all resources in the collection watchAll watches + // Set of delta watches. A delta watch always contain the list of subscribed resources + // together with its current version + // version and versionPrefix fields are ignored for delta watches, because we always generate the resource version + deltaWatches map[int64]DeltaResponseWatch + // Continously incremented counter used to index delta watches + deltaWatchCount int64 + // versionMap holds the current hash map of all resources in the cache. + // versionMap is only to be used with delta xDS. + versionMap map[string]string // Continously incremented version version uint64 // Version prefix to be sent to the clients versionPrefix string // Versions for each resource by name. versionVector map[string]uint64 - mu sync.RWMutex + + log log.Logger + + mu sync.RWMutex +} + +// This struct exist only because we want to reuse respondDelta function +type cacheWrapper struct { + cache *LinearCache +} + +func (w *cacheWrapper) GetResources(typeURL string) map[string]types.Resource { + return w.cache.resources +} + +func (w *cacheWrapper) GetVersionMap() map[string]map[string]string { + return map[string]map[string]string{w.cache.typeURL: w.cache.versionMap} +} + +func (w *cacheWrapper) GetVersion(typeURL string) string { + return w.cache.getVersion() } var _ Cache = &LinearCache{} @@ -75,6 +106,12 @@ func WithInitialResources(resources map[string]types.Resource) LinearCacheOption } } +func WithLogger(log log.Logger) LinearCacheOption { + return func(cache *LinearCache) { + cache.log = log + } +} + // NewLinearCache creates a new cache. See the comments on the struct definition. func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache { out := &LinearCache{ @@ -82,6 +119,8 @@ func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache { resources: make(map[string]types.Resource), watches: make(map[string]watches), watchAll: make(watches), + deltaWatches: make(map[int64]DeltaResponseWatch), + versionMap: make(map[string]string), version: 0, versionVector: make(map[string]uint64), } @@ -111,7 +150,7 @@ func (cache *LinearCache) respond(value chan Response, staleResources []string) value <- &RawResponse{ Request: &Request{TypeUrl: cache.typeURL}, Resources: resources, - Version: cache.versionPrefix + strconv.FormatUint(cache.version, 10), + Version: cache.getVersion(), } } @@ -131,6 +170,14 @@ func (cache *LinearCache) notifyAll(modified map[string]struct{}) { cache.respond(value, nil) } cache.watchAll = make(watches) + + cache.updateVersionMap(modified) + + for id, watch := range cache.deltaWatches { + if respondDelta(watch.Request, watch.Response, watch.StreamState, &cacheWrapper{cache}, cache.log) != nil { + delete(cache.deltaWatches, id) + } + } } // UpdateResource updates a resource in the collection. @@ -277,7 +324,63 @@ func (cache *LinearCache) CreateWatch(request *Request, value chan Response) fun } func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState) (chan DeltaResponse, func()) { - return nil, nil + cache.mu.Lock() + defer cache.mu.Unlock() + value := make(chan DeltaResponse, 1) + + // if respondDelta returns nil this means that there is no change in any resource version from the previous snapshot + // create a new watch accordingly + if respondDelta(request, value, state, &cacheWrapper{cache}, cache.log) == nil { + watchID := cache.nextDeltaWatchID() + if cache.log != nil { + cache.log.Infof("open delta watch ID:%d for %s Resources:%v, system version %q", watchID, + cache.typeURL, state.ResourceVersions, cache.getVersion()) + } + + cache.deltaWatches[watchID] = DeltaResponseWatch{Request: request, Response: value, StreamState: state} + + return value, cache.cancelDeltaWatch(watchID) + } + + return value, nil +} + +func (cache *LinearCache) updateVersionMap(modified map[string]struct{}) error { + for name, r := range cache.resources { + //skip recalculating hash for the resoces that weren't modified + if _, ok := modified[name]; !ok { + continue + } + // hash our verison in here and build the version map + marshaledResource, err := MarshalResource(r) + if err != nil { + return err + } + v := HashResource(marshaledResource) + if v == "" { + return errors.New("failed to build resource version") + } + + cache.versionMap[GetResourceName(r)] = v + } + return nil +} + +func (cache *LinearCache) getVersion() string { + return cache.versionPrefix + strconv.FormatUint(cache.version, 10) +} + +// cancellation function for cleaning stale watches +func (cache *LinearCache) cancelDeltaWatch(watchID int64) func() { + return func() { + cache.mu.Lock() + defer cache.mu.Unlock() + delete(cache.deltaWatches, watchID) + } +} + +func (cache *LinearCache) nextDeltaWatchID() int64 { + return atomic.AddInt64(&cache.deltaWatchCount, 1) } func (cache *LinearCache) Fetch(ctx context.Context, request *Request) (Response, error) { @@ -290,3 +393,10 @@ func (cache *LinearCache) NumWatches(name string) int { defer cache.mu.RUnlock() return len(cache.watches[name]) + len(cache.watchAll) } + +// Number of active delta watches. +func (cache *LinearCache) NumDeltaWatches() int { + cache.mu.Lock() + defer cache.mu.Unlock() + return len(cache.deltaWatches) +} diff --git a/pkg/cache/v3/linear_test.go b/pkg/cache/v3/linear_test.go index 2a33388a92..7a622210af 100644 --- a/pkg/cache/v3/linear_test.go +++ b/pkg/cache/v3/linear_test.go @@ -15,13 +15,16 @@ package cache import ( + "errors" "fmt" "reflect" "testing" wrappers "github.com/golang/protobuf/ptypes/wrappers" + endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/types" + "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) const ( @@ -56,6 +59,45 @@ func verifyResponse(t *testing.T, ch <-chan Response, version string, num int) { } } +type resourceInfo struct { + name string + version string +} + +func verifyDeltaResponse(t *testing.T, ch <-chan DeltaResponse, resources []resourceInfo) { + t.Helper() + r := <-ch + if r.GetDeltaRequest().TypeUrl != testType { + t.Errorf("unexpected empty request type URL: %q", r.GetDeltaRequest().TypeUrl) + } + out, err := r.GetDeltaDiscoveryResponse() + if err != nil { + t.Fatal(err) + } + if len(out.Resources) != len(resources) { + t.Errorf("unexpected number of responses: got %d, want %d", len(out.Resources), len(resources)) + } + for _, r := range resources { + found := false + for _, r1 := range out.Resources { + if r1.Name == r.name && r1.Version == r.version { + found = true + break + } else if r1.Name == r.name { + t.Errorf("unexpected version for resource %q: got %q, want %q", r.name, r1.Version, r.version) + found = true + break + } + } + if !found { + t.Errorf("resource with name %q not found in response", r.name) + } + } + if out.TypeUrl != testType { + t.Errorf("unexpected type URL: %q", out.TypeUrl) + } +} + func checkWatchCount(t *testing.T, c *LinearCache, name string, count int) { t.Helper() if i := c.NumWatches(name); i != count { @@ -63,6 +105,13 @@ func checkWatchCount(t *testing.T, c *LinearCache, name string, count int) { } } +func checkDeltaWatchCount(t *testing.T, c *LinearCache, count int) { + t.Helper() + if i := c.NumDeltaWatches(); i != count { + t.Errorf("unexpected number of delta watches: got %d, want %d", i, count) + } +} + func mustBlock(t *testing.T, w <-chan Response) { select { case <-w: @@ -71,6 +120,26 @@ func mustBlock(t *testing.T, w <-chan Response) { } } +func mustBlockDelta(t *testing.T, w <-chan DeltaResponse) { + select { + case <-w: + t.Error("watch must block") + default: + } +} + +func hashResource(t *testing.T, resource types.Resource) string { + marshaledResource, err := MarshalResource(resource) + if err != nil { + t.Fatal(err) + } + v := HashResource(marshaledResource) + if v == "" { + t.Fatal(errors.New("failed to build resource version")) + } + return v +} + func TestLinearInitialResources(t *testing.T) { c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})) w := make(chan Response, 1) @@ -318,3 +387,62 @@ func TestLinearConcurrentSetWatch(t *testing.T) { }(i) } } + +func TestLinearDeltaWildcard(t *testing.T) { + c := NewLinearCache(testType) + state := stream.StreamState{Wildcard: true, ResourceVersions: map[string]string{}} + w, _ := c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state) + mustBlockDelta(t, w) + checkDeltaWatchCount(t, c, 1) + + a := &endpoint.ClusterLoadAssignment{ClusterName: "a"} + hash := hashResource(t, a) + c.UpdateResource("a", a) + checkDeltaWatchCount(t, c, 0) + verifyDeltaResponse(t, w, []resourceInfo{{"a", hash}}) +} + +func TestLinearDeltaExistingResources(t *testing.T) { + c := NewLinearCache(testType) + a := &endpoint.ClusterLoadAssignment{ClusterName: "a"} + hashA := hashResource(t, a) + c.UpdateResource("a", a) + b := &endpoint.ClusterLoadAssignment{ClusterName: "b"} + hashB := hashResource(t, b) + c.UpdateResource("b", b) + + state := stream.StreamState{Wildcard: false, ResourceVersions: map[string]string{"b": "", "c": ""}} // watching b and c - not interested in a + w, _ := c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state) + checkDeltaWatchCount(t, c, 0) + verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}}) + + state = stream.StreamState{Wildcard: false, ResourceVersions: map[string]string{"a": "", "b": ""}} + w, _ = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state) + checkDeltaWatchCount(t, c, 0) + verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}) +} + +func TestLinearDeltaInitialResourcesVersionSet(t *testing.T) { + c := NewLinearCache(testType) + a := &endpoint.ClusterLoadAssignment{ClusterName: "a"} + hashA := hashResource(t, a) + c.UpdateResource("a", a) + b := &endpoint.ClusterLoadAssignment{ClusterName: "b"} + hashB := hashResource(t, b) + c.UpdateResource("b", b) + + state := stream.StreamState{Wildcard: false, ResourceVersions: map[string]string{"a": "", "b": hashB}} + w, _ := c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state) + checkDeltaWatchCount(t, c, 0) + verifyDeltaResponse(t, w, []resourceInfo{{"a", hashA}}) // b is up to date and shouldn't be returned + + state = stream.StreamState{Wildcard: false, ResourceVersions: map[string]string{"a": hashA, "b": hashB}} + w, _ = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state) + mustBlockDelta(t, w) + checkDeltaWatchCount(t, c, 1) + b = &endpoint.ClusterLoadAssignment{ClusterName: "b", Endpoints: []*endpoint.LocalityLbEndpoints{{Priority: 10}}} // new version of b + hashB = hashResource(t, b) + c.UpdateResource("b", b) + checkDeltaWatchCount(t, c, 0) + verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}}) +} diff --git a/pkg/cache/v3/mux.go b/pkg/cache/v3/mux.go index 1942a1465e..58696f440b 100644 --- a/pkg/cache/v3/mux.go +++ b/pkg/cache/v3/mux.go @@ -29,7 +29,8 @@ import ( // making sure there is always a matching cache. type MuxCache struct { // Classification functions. - Classify func(Request) string + Classify func(Request) string + ClassifyDelta func(DeltaRequest) string // Muxed caches. Caches map[string]Cache } @@ -46,8 +47,15 @@ func (mux *MuxCache) CreateWatch(request *Request, value chan Response) func() { return cache.CreateWatch(request, value) } -func (cache *MuxCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState) (chan DeltaResponse, func()) { - return nil, nil +func (mux *MuxCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState) (chan DeltaResponse, func()) { + key := mux.ClassifyDelta(*request) + cache, exists := mux.Caches[key] + if !exists { + value := make(chan DeltaResponse, 0) + close(value) + return value, nil + } + return cache.CreateDeltaWatch(request, state) } func (mux *MuxCache) Fetch(ctx context.Context, request *Request) (Response, error) { diff --git a/pkg/cache/v3/simple.go b/pkg/cache/v3/simple.go index 111e5f4898..ee0af993d5 100644 --- a/pkg/cache/v3/simple.go +++ b/pkg/cache/v3/simple.go @@ -226,7 +226,7 @@ func (cache *snapshotCache) SetSnapshot(node string, snapshot Snapshot) error { watch.Request, watch.Response, watch.StreamState, - snapshot, + &snapshot, cache.log, ) // If we detect a nil response here, that means there has been no state change @@ -421,7 +421,7 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream cache.log.Errorf("failed to compute version for snapshot resources inline, waiting for next snapshot update") delayedResponse = true } - delayedResponse = respondDelta(request, value, state, snapshot, cache.log) == nil + delayedResponse = respondDelta(request, value, state, &snapshot, cache.log) == nil } if delayedResponse { From e6b4cafdae63b4ea20f947a2d9441d45f287772b Mon Sep 17 00:00:00 2001 From: Sergey Matyukevich Date: Thu, 1 Jul 2021 18:44:45 -0600 Subject: [PATCH 2/8] Add more tests Signed-off-by: Sergey Matyukevich --- pkg/cache/v3/linear.go | 5 ++ pkg/cache/v3/linear_test.go | 93 +++++++++++++++++++++++++++++++++---- 2 files changed, 88 insertions(+), 10 deletions(-) diff --git a/pkg/cache/v3/linear.go b/pkg/cache/v3/linear.go index ba91af7561..450adac834 100644 --- a/pkg/cache/v3/linear.go +++ b/pkg/cache/v3/linear.go @@ -363,6 +363,11 @@ func (cache *LinearCache) updateVersionMap(modified map[string]struct{}) error { cache.versionMap[GetResourceName(r)] = v } + for name := range modified { + if r, ok := cache.resources[name]; !ok { + delete(cache.versionMap, GetResourceName(r)) + } + } return nil } diff --git a/pkg/cache/v3/linear_test.go b/pkg/cache/v3/linear_test.go index 7a622210af..99773bcaec 100644 --- a/pkg/cache/v3/linear_test.go +++ b/pkg/cache/v3/linear_test.go @@ -64,7 +64,7 @@ type resourceInfo struct { version string } -func verifyDeltaResponse(t *testing.T, ch <-chan DeltaResponse, resources []resourceInfo) { +func verifyDeltaResponse(t *testing.T, ch <-chan DeltaResponse, resources []resourceInfo, deleted []string) { t.Helper() r := <-ch if r.GetDeltaRequest().TypeUrl != testType { @@ -96,6 +96,21 @@ func verifyDeltaResponse(t *testing.T, ch <-chan DeltaResponse, resources []reso if out.TypeUrl != testType { t.Errorf("unexpected type URL: %q", out.TypeUrl) } + if len(out.RemovedResources) != len(deleted) { + t.Errorf("unexpected number of removed resurces: got %d, want %d", len(out.RemovedResources), len(deleted)) + } + for _, r := range deleted { + found := false + for _, rr := range out.RemovedResources { + if r == rr { + found = true + break + } + } + if !found { + t.Errorf("Expected resource %s to be deleted", r) + } + } } func checkWatchCount(t *testing.T, c *LinearCache, name string, count int) { @@ -390,16 +405,20 @@ func TestLinearConcurrentSetWatch(t *testing.T) { func TestLinearDeltaWildcard(t *testing.T) { c := NewLinearCache(testType) - state := stream.StreamState{Wildcard: true, ResourceVersions: map[string]string{}} - w, _ := c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state) - mustBlockDelta(t, w) - checkDeltaWatchCount(t, c, 1) + state1 := stream.StreamState{Wildcard: true, ResourceVersions: map[string]string{}} + w1, _ := c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state1) + mustBlockDelta(t, w1) + state2 := stream.StreamState{Wildcard: true, ResourceVersions: map[string]string{}} + w2, _ := c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state2) + mustBlockDelta(t, w1) + checkDeltaWatchCount(t, c, 2) a := &endpoint.ClusterLoadAssignment{ClusterName: "a"} hash := hashResource(t, a) c.UpdateResource("a", a) checkDeltaWatchCount(t, c, 0) - verifyDeltaResponse(t, w, []resourceInfo{{"a", hash}}) + verifyDeltaResponse(t, w1, []resourceInfo{{"a", hash}}, nil) + verifyDeltaResponse(t, w2, []resourceInfo{{"a", hash}}, nil) } func TestLinearDeltaExistingResources(t *testing.T) { @@ -414,12 +433,12 @@ func TestLinearDeltaExistingResources(t *testing.T) { state := stream.StreamState{Wildcard: false, ResourceVersions: map[string]string{"b": "", "c": ""}} // watching b and c - not interested in a w, _ := c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state) checkDeltaWatchCount(t, c, 0) - verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}}) + verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}}, []string{"c"}) state = stream.StreamState{Wildcard: false, ResourceVersions: map[string]string{"a": "", "b": ""}} w, _ = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state) checkDeltaWatchCount(t, c, 0) - verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}) + verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}, nil) } func TestLinearDeltaInitialResourcesVersionSet(t *testing.T) { @@ -434,7 +453,7 @@ func TestLinearDeltaInitialResourcesVersionSet(t *testing.T) { state := stream.StreamState{Wildcard: false, ResourceVersions: map[string]string{"a": "", "b": hashB}} w, _ := c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state) checkDeltaWatchCount(t, c, 0) - verifyDeltaResponse(t, w, []resourceInfo{{"a", hashA}}) // b is up to date and shouldn't be returned + verifyDeltaResponse(t, w, []resourceInfo{{"a", hashA}}, nil) // b is up to date and shouldn't be returned state = stream.StreamState{Wildcard: false, ResourceVersions: map[string]string{"a": hashA, "b": hashB}} w, _ = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state) @@ -444,5 +463,59 @@ func TestLinearDeltaInitialResourcesVersionSet(t *testing.T) { hashB = hashResource(t, b) c.UpdateResource("b", b) checkDeltaWatchCount(t, c, 0) - verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}}) + verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}}, nil) +} + +func TestLinearDeltaResourceUpdate(t *testing.T) { + c := NewLinearCache(testType) + a := &endpoint.ClusterLoadAssignment{ClusterName: "a"} + hashA := hashResource(t, a) + c.UpdateResource("a", a) + b := &endpoint.ClusterLoadAssignment{ClusterName: "b"} + hashB := hashResource(t, b) + c.UpdateResource("b", b) + + state := stream.StreamState{Wildcard: false, ResourceVersions: map[string]string{"a": "", "b": ""}} + w, _ := c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state) + checkDeltaWatchCount(t, c, 0) + verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}, nil) + + state = stream.StreamState{Wildcard: false, ResourceVersions: map[string]string{"a": hashA, "b": hashB}} + w, _ = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state) + mustBlockDelta(t, w) + checkDeltaWatchCount(t, c, 1) + + a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ //resource update + {Priority: 10}, + }} + hashA = hashResource(t, a) + c.UpdateResource("a", a) + verifyDeltaResponse(t, w, []resourceInfo{{"a", hashA}}, nil) +} + +func TestLinearDeltaResourceDelete(t *testing.T) { + c := NewLinearCache(testType) + a := &endpoint.ClusterLoadAssignment{ClusterName: "a"} + hashA := hashResource(t, a) + c.UpdateResource("a", a) + b := &endpoint.ClusterLoadAssignment{ClusterName: "b"} + hashB := hashResource(t, b) + c.UpdateResource("b", b) + + state := stream.StreamState{Wildcard: false, ResourceVersions: map[string]string{"a": "", "b": ""}} + w, _ := c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state) + checkDeltaWatchCount(t, c, 0) + verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}, nil) + + state = stream.StreamState{Wildcard: false, ResourceVersions: map[string]string{"a": hashA, "b": hashB}} + w, _ = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state) + mustBlockDelta(t, w) + checkDeltaWatchCount(t, c, 1) + + a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ //resource update + {Priority: 10}, + }} + hashA = hashResource(t, a) + c.SetResources(map[string]types.Resource{"a": a}) + verifyDeltaResponse(t, w, []resourceInfo{{"a", hashA}}, []string{"b"}) } From 0f16bcae40d99830ce119f3329ba070fcf0ecc0c Mon Sep 17 00:00:00 2001 From: Sergey Matyukevich Date: Thu, 5 Aug 2021 11:58:27 -0600 Subject: [PATCH 3/8] Refactor resourceContainer interface Signed-off-by: Sergey Matyukevich --- pkg/cache/v3/delta.go | 14 +++++++------- pkg/cache/v3/snapshot.go | 4 ++-- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/cache/v3/delta.go b/pkg/cache/v3/delta.go index ed879668d9..586c7335fd 100644 --- a/pkg/cache/v3/delta.go +++ b/pkg/cache/v3/delta.go @@ -22,10 +22,10 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) -type ResourceContainer interface { +type resourceContainer interface { GetResources(typeURL string) map[string]types.Resource - GetVersionMap() map[string]map[string]string + GetVersionMap(typeURL string) map[string]string GetVersion(typeURL string) string } @@ -40,8 +40,8 @@ func (w *cacheWrapper) GetResources(typeURL string) map[string]types.Resource { return w.cache.resources } -func (w *cacheWrapper) GetVersionMap() map[string]map[string]string { - return map[string]map[string]string{w.cache.typeURL: w.cache.versionMap} +func (w *cacheWrapper) GetVersionMap(typeURL string) map[string]string { + return w.cache.versionMap } func (w *cacheWrapper) GetVersion(typeURL string) string { @@ -85,7 +85,7 @@ func respondDeltaLinear(request *DeltaRequest, value chan DeltaResponse, state s return nil } -func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.StreamState, snapshot ResourceContainer, log log.Logger) *RawDeltaResponse { +func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.StreamState, snapshot resourceContainer, log log.Logger) *RawDeltaResponse { resources := snapshot.GetResources((req.TypeUrl)) // variables to build our response with @@ -99,7 +99,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St for name, r := range resources { // Since we've already precomputed the version hashes of the new snapshot, // we can just set it here to be used for comparison later - version := snapshot.GetVersionMap()[req.TypeUrl][name] + version := snapshot.GetVersionMap(req.TypeUrl)[name] nextVersionMap[name] = version prevVersion, found := state.GetResourceVersions()[name] if !found || (prevVersion != nextVersionMap[name]) { @@ -110,7 +110,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St // Reply only with the requested resources for name, prevVersion := range state.GetResourceVersions() { if r, ok := resources[name]; ok { - nextVersion := snapshot.GetVersionMap()[req.TypeUrl][name] + nextVersion := snapshot.GetVersionMap(req.TypeUrl)[name] if prevVersion != nextVersion { filtered = append(filtered, r) } diff --git a/pkg/cache/v3/snapshot.go b/pkg/cache/v3/snapshot.go index 26a546230c..1ec498827f 100644 --- a/pkg/cache/v3/snapshot.go +++ b/pkg/cache/v3/snapshot.go @@ -229,8 +229,8 @@ func (s *Snapshot) GetVersion(typeURL string) string { } // GetVersionMap will return the internal version map of the currently applied snapshot. -func (s *Snapshot) GetVersionMap() map[string]map[string]string { - return s.VersionMap +func (s *Snapshot) GetVersionMap(typeUrl string) map[string]string { + return s.VersionMap[typeUrl] } // ConstructVersionMap will construct a version map based on the current state of a snapshot From 0d721599062b3857cfa77366c674ce0e2aaec4a5 Mon Sep 17 00:00:00 2001 From: Sergey Matyukevich Date: Thu, 12 Aug 2021 13:42:24 -0600 Subject: [PATCH 4/8] refactoring Signed-off-by: Sergey Matyukevich --- pkg/cache/v3/delta.go | 81 ++++++---------------------------------- pkg/cache/v3/linear.go | 24 +++++++++++- pkg/cache/v3/simple.go | 5 +-- pkg/cache/v3/snapshot.go | 30 +++++++++++++++ 4 files changed, 66 insertions(+), 74 deletions(-) diff --git a/pkg/cache/v3/delta.go b/pkg/cache/v3/delta.go index a159bec163..9e20db640f 100644 --- a/pkg/cache/v3/delta.go +++ b/pkg/cache/v3/delta.go @@ -22,84 +22,27 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) -type resourceContainer interface { - GetResources(typeURL string) map[string]types.Resource +type resourceContainer struct { + resourceMap map[string]types.Resource - GetVersionMap(typeURL string) map[string]string + versionMap map[string]string - GetVersion(typeURL string) string + systemVersion string } -// This struct exist only because we want to reuse createDeltaResponse function from linear cache code -// so we create a wrapper, which adopts LinearCche to satisfy ResourceContainer interface -type cacheWrapper struct { - cache *LinearCache -} - -func (w *cacheWrapper) GetResources(typeURL string) map[string]types.Resource { - return w.cache.resources -} - -func (w *cacheWrapper) GetVersionMap(typeURL string) map[string]string { - return w.cache.versionMap -} - -func (w *cacheWrapper) GetVersion(typeURL string) string { - return w.cache.getVersion() -} - -// Respond to a delta watch with the provided snapshot value. If the response is nil, there has been no state change. -func respondDeltaSnapshot(ctx context.Context, request *DeltaRequest, value chan DeltaResponse, state stream.StreamState, snapshot *Snapshot, log log.Logger) (*RawDeltaResponse, error) { - resp := createDeltaResponse(ctx, request, state, snapshot, log) - - // Only send a response if there were changes - // We want to respond immediately for the first wildcard request in a stream, even if the response is empty - // otherwise, envoy won't complete initialization - if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 || (state.IsWildcard() && state.IsFirst()) { - if log != nil { - log.Debugf("node: %s, sending delta response with resources: %v removed resources %v wildcard: %t", - request.GetNode().GetId(), resp.Resources, resp.RemovedResources, state.IsWildcard()) - } - select { - case value <- resp: - return resp, nil - case <-ctx.Done(): - return resp, context.Canceled - } - } - return nil, nil -} - -func respondDeltaLinear(request *DeltaRequest, value chan DeltaResponse, state stream.StreamState, cache *LinearCache, log log.Logger) *RawDeltaResponse { - resp := createDeltaResponse(context.Background(), request, state, &cacheWrapper{cache}, log) - - // Only send a response if there were changes - if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 { - if log != nil { - log.Debugf("node: %s, sending delta response with resources: %v removed resources %v wildcard: %t", - request.GetNode().GetId(), resp.Resources, resp.RemovedResources, state.IsWildcard()) - } - value <- resp - return resp - } - return nil -} - -func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.StreamState, snapshot resourceContainer, log log.Logger) *RawDeltaResponse { - resources := snapshot.GetResources((req.TypeUrl)) - +func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.StreamState, resources *resourceContainer, log log.Logger) *RawDeltaResponse { // variables to build our response with nextVersionMap := make(map[string]string) - filtered := make([]types.Resource, 0, len(resources)) + filtered := make([]types.Resource, 0, len(resources.resourceMap)) toRemove := make([]string, 0) // If we are handling a wildcard request, we want to respond with all resources switch { case state.IsWildcard(): - for name, r := range resources { + for name, r := range resources.resourceMap { // Since we've already precomputed the version hashes of the new snapshot, // we can just set it here to be used for comparison later - version := snapshot.GetVersionMap(req.TypeUrl)[name] + version := resources.versionMap[name] nextVersionMap[name] = version prevVersion, found := state.GetResourceVersions()[name] if !found || (prevVersion != nextVersionMap[name]) { @@ -109,8 +52,8 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St default: // Reply only with the requested resources for name, prevVersion := range state.GetResourceVersions() { - if r, ok := resources[name]; ok { - nextVersion := snapshot.GetVersionMap(req.TypeUrl)[name] + if r, ok := resources.resourceMap[name]; ok { + nextVersion := resources.versionMap[name] if prevVersion != nextVersion { filtered = append(filtered, r) } @@ -121,7 +64,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St // Compute resources for removal regardless of the request type for name := range state.GetResourceVersions() { - if _, ok := resources[name]; !ok { + if _, ok := resources.resourceMap[name]; !ok { toRemove = append(toRemove, name) } } @@ -131,7 +74,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St Resources: filtered, RemovedResources: toRemove, NextVersionMap: nextVersionMap, - SystemVersionInfo: snapshot.GetVersion(req.TypeUrl), + SystemVersionInfo: resources.systemVersion, Ctx: ctx, } } diff --git a/pkg/cache/v3/linear.go b/pkg/cache/v3/linear.go index 56c8e26f3d..91ab209beb 100644 --- a/pkg/cache/v3/linear.go +++ b/pkg/cache/v3/linear.go @@ -157,13 +157,33 @@ func (cache *LinearCache) notifyAll(modified map[string]struct{}) { cache.updateVersionMap(modified) for id, watch := range cache.deltaWatches { - res := respondDeltaLinear(watch.Request, watch.Response, watch.StreamState, cache, cache.log) + res := cache.respondDelta(watch.Request, watch.Response, watch.StreamState) if res != nil { delete(cache.deltaWatches, id) } } } +func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaResponse, state stream.StreamState) *RawDeltaResponse { + resources := &resourceContainer{ + resourceMap: cache.resources, + versionMap: cache.versionMap, + systemVersion: cache.getVersion(), + } + resp := createDeltaResponse(context.Background(), request, state, resources, cache.log) + + // Only send a response if there were changes + if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 { + if cache.log != nil { + cache.log.Debugf("node: %s, sending delta response with resources: %v removed resources %v wildcard: %t", + request.GetNode().GetId(), resp.Resources, resp.RemovedResources, state.IsWildcard()) + } + value <- resp + return resp + } + return nil +} + // UpdateResource updates a resource in the collection. func (cache *LinearCache) UpdateResource(name string, res types.Resource) error { if res == nil { @@ -311,7 +331,7 @@ func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.S cache.mu.Lock() defer cache.mu.Unlock() - response := respondDeltaLinear(request, value, state, cache, cache.log) + response := cache.respondDelta(request, value, state) // if respondDelta returns nil this means that there is no change in any resource version from the previous snapshot // create a new watch accordingly diff --git a/pkg/cache/v3/simple.go b/pkg/cache/v3/simple.go index bfaacf030b..e1e960dc17 100644 --- a/pkg/cache/v3/simple.go +++ b/pkg/cache/v3/simple.go @@ -224,12 +224,11 @@ func (cache *snapshotCache) SetSnapshot(ctx context.Context, node string, snapsh // process our delta watches for id, watch := range info.deltaWatches { - res, err := respondDeltaSnapshot( + res, err := snapshot.respondDelta( ctx, watch.Request, watch.Response, watch.StreamState, - &snapshot, cache.log, ) if err != nil { @@ -431,7 +430,7 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream cache.log.Errorf("failed to compute version for snapshot resources inline, waiting for next snapshot update") } } - response, err := respondDeltaSnapshot(context.Background(), request, value, state, &snapshot, cache.log) + response, err := snapshot.respondDelta(context.Background(), request, value, state, cache.log) if err != nil { if cache.log != nil { cache.log.Errorf("failed to respond with delta response, waiting for next snapshot update: %s", err) diff --git a/pkg/cache/v3/snapshot.go b/pkg/cache/v3/snapshot.go index d23bf0ec6e..ee43b2b6cc 100644 --- a/pkg/cache/v3/snapshot.go +++ b/pkg/cache/v3/snapshot.go @@ -15,10 +15,13 @@ package cache import ( + "context" "errors" "fmt" "github.com/envoyproxy/go-control-plane/pkg/cache/types" + "github.com/envoyproxy/go-control-plane/pkg/log" + "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) // Resources is a versioned group of resources. @@ -266,3 +269,30 @@ func (s *Snapshot) ConstructVersionMap() error { return nil } + +// Respond to a delta watch with the provided snapshot value. If the response is nil, there has been no state change. +func (s *Snapshot) respondDelta(ctx context.Context, request *DeltaRequest, value chan DeltaResponse, state stream.StreamState, log log.Logger) (*RawDeltaResponse, error) { + resources := &resourceContainer{ + resourceMap: s.GetResources(request.TypeUrl), + versionMap: s.GetVersionMap(request.TypeUrl), + systemVersion: s.GetVersion(request.TypeUrl), + } + resp := createDeltaResponse(ctx, request, state, resources, log) + + // Only send a response if there were changes + // We want to respond immediately for the first wildcard request in a stream, even if the response is empty + // otherwise, envoy won't complete initialization + if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 || (state.IsWildcard() && state.IsFirst()) { + if log != nil { + log.Debugf("node: %s, sending delta response with resources: %v removed resources %v wildcard: %t", + request.GetNode().GetId(), resp.Resources, resp.RemovedResources, state.IsWildcard()) + } + select { + case value <- resp: + return resp, nil + case <-ctx.Done(): + return resp, context.Canceled + } + } + return nil, nil +} From 4d5c8f51076ccef4f5038c1cd3c2abe328c8f511 Mon Sep 17 00:00:00 2001 From: Sergey Matyukevich Date: Thu, 12 Aug 2021 14:44:11 -0600 Subject: [PATCH 5/8] Update pkg/cache/v3/delta.go Co-authored-by: Alec Holmes Signed-off-by: Sergey Matyukevich --- pkg/cache/v3/delta.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/cache/v3/delta.go b/pkg/cache/v3/delta.go index 9e20db640f..3a17801407 100644 --- a/pkg/cache/v3/delta.go +++ b/pkg/cache/v3/delta.go @@ -30,7 +30,7 @@ type resourceContainer struct { systemVersion string } -func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.StreamState, resources *resourceContainer, log log.Logger) *RawDeltaResponse { +func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.StreamState, resources *resourceContainer) *RawDeltaResponse { // variables to build our response with nextVersionMap := make(map[string]string) filtered := make([]types.Resource, 0, len(resources.resourceMap)) From 66990d55cb7773bf85426620e29892aed27357fc Mon Sep 17 00:00:00 2001 From: Sergey Matyukevich Date: Fri, 13 Aug 2021 07:22:49 -0600 Subject: [PATCH 6/8] make format Signed-off-by: Sergey Matyukevich --- pkg/cache/v3/delta.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/cache/v3/delta.go b/pkg/cache/v3/delta.go index 3a17801407..9e20db640f 100644 --- a/pkg/cache/v3/delta.go +++ b/pkg/cache/v3/delta.go @@ -30,7 +30,7 @@ type resourceContainer struct { systemVersion string } -func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.StreamState, resources *resourceContainer) *RawDeltaResponse { +func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.StreamState, resources *resourceContainer, log log.Logger) *RawDeltaResponse { // variables to build our response with nextVersionMap := make(map[string]string) filtered := make([]types.Resource, 0, len(resources.resourceMap)) From 01b2649ffa10bcf242d551b65f45221cf70f512f Mon Sep 17 00:00:00 2001 From: Sergey Matyukevich Date: Mon, 16 Aug 2021 07:24:02 -0600 Subject: [PATCH 7/8] address review comments Signed-off-by: Sergey Matyukevich --- pkg/cache/v3/delta.go | 10 ++++------ pkg/cache/v3/linear.go | 9 ++++----- pkg/cache/v3/simple.go | 32 +++++++++++++++++++++++++++++--- pkg/cache/v3/snapshot.go | 30 ------------------------------ 4 files changed, 37 insertions(+), 44 deletions(-) diff --git a/pkg/cache/v3/delta.go b/pkg/cache/v3/delta.go index 9e20db640f..4dcb7f0861 100644 --- a/pkg/cache/v3/delta.go +++ b/pkg/cache/v3/delta.go @@ -18,19 +18,17 @@ import ( "context" "github.com/envoyproxy/go-control-plane/pkg/cache/types" - "github.com/envoyproxy/go-control-plane/pkg/log" "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) +// groups together resource-related arguments for the createDeltaResponse function type resourceContainer struct { - resourceMap map[string]types.Resource - - versionMap map[string]string - + resourceMap map[string]types.Resource + versionMap map[string]string systemVersion string } -func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.StreamState, resources *resourceContainer, log log.Logger) *RawDeltaResponse { +func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.StreamState, resources resourceContainer) *RawDeltaResponse { // variables to build our response with nextVersionMap := make(map[string]string) filtered := make([]types.Resource, 0, len(resources.resourceMap)) diff --git a/pkg/cache/v3/linear.go b/pkg/cache/v3/linear.go index 91ab209beb..2b046d31a8 100644 --- a/pkg/cache/v3/linear.go +++ b/pkg/cache/v3/linear.go @@ -165,12 +165,11 @@ func (cache *LinearCache) notifyAll(modified map[string]struct{}) { } func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaResponse, state stream.StreamState) *RawDeltaResponse { - resources := &resourceContainer{ + resp := createDeltaResponse(context.Background(), request, state, resourceContainer{ resourceMap: cache.resources, versionMap: cache.versionMap, systemVersion: cache.getVersion(), - } - resp := createDeltaResponse(context.Background(), request, state, resources, cache.log) + }) // Only send a response if there were changes if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 { @@ -333,7 +332,7 @@ func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.S response := cache.respondDelta(request, value, state) - // if respondDelta returns nil this means that there is no change in any resource version from the previous snapshot + // if respondDelta returns nil this means that there is no change in any resource version // create a new watch accordingly if response == nil { watchID := cache.nextDeltaWatchID() @@ -352,7 +351,7 @@ func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.S func (cache *LinearCache) updateVersionMap(modified map[string]struct{}) error { for name, r := range cache.resources { - //skip recalculating hash for the resoces that weren't modified + // skip recalculating hash for the resoces that weren't modified if _, ok := modified[name]; !ok { continue } diff --git a/pkg/cache/v3/simple.go b/pkg/cache/v3/simple.go index e1e960dc17..0ae379d64b 100644 --- a/pkg/cache/v3/simple.go +++ b/pkg/cache/v3/simple.go @@ -224,12 +224,12 @@ func (cache *snapshotCache) SetSnapshot(ctx context.Context, node string, snapsh // process our delta watches for id, watch := range info.deltaWatches { - res, err := snapshot.respondDelta( + res, err := cache.respondDelta( ctx, + &snapshot, watch.Request, watch.Response, watch.StreamState, - cache.log, ) if err != nil { return err @@ -430,7 +430,7 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream cache.log.Errorf("failed to compute version for snapshot resources inline, waiting for next snapshot update") } } - response, err := snapshot.respondDelta(context.Background(), request, value, state, cache.log) + response, err := cache.respondDelta(context.Background(), &snapshot, request, value, state) if err != nil { if cache.log != nil { cache.log.Errorf("failed to respond with delta response, waiting for next snapshot update: %s", err) @@ -455,6 +455,32 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream return nil } +// Respond to a delta watch with the provided snapshot value. If the response is nil, there has been no state change. +func (cache *snapshotCache) respondDelta(ctx context.Context, snapshot *Snapshot, request *DeltaRequest, value chan DeltaResponse, state stream.StreamState) (*RawDeltaResponse, error) { + resp := createDeltaResponse(ctx, request, state, resourceContainer{ + resourceMap: snapshot.GetResources(request.TypeUrl), + versionMap: snapshot.GetVersionMap(request.TypeUrl), + systemVersion: snapshot.GetVersion(request.TypeUrl), + }) + + // Only send a response if there were changes + // We want to respond immediately for the first wildcard request in a stream, even if the response is empty + // otherwise, envoy won't complete initialization + if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 || (state.IsWildcard() && state.IsFirst()) { + if cache.log != nil { + cache.log.Debugf("node: %s, sending delta response with resources: %v removed resources %v wildcard: %t", + request.GetNode().GetId(), resp.Resources, resp.RemovedResources, state.IsWildcard()) + } + select { + case value <- resp: + return resp, nil + case <-ctx.Done(): + return resp, context.Canceled + } + } + return nil, nil +} + func (cache *snapshotCache) nextDeltaWatchID() int64 { return atomic.AddInt64(&cache.deltaWatchCount, 1) } diff --git a/pkg/cache/v3/snapshot.go b/pkg/cache/v3/snapshot.go index ee43b2b6cc..d23bf0ec6e 100644 --- a/pkg/cache/v3/snapshot.go +++ b/pkg/cache/v3/snapshot.go @@ -15,13 +15,10 @@ package cache import ( - "context" "errors" "fmt" "github.com/envoyproxy/go-control-plane/pkg/cache/types" - "github.com/envoyproxy/go-control-plane/pkg/log" - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) // Resources is a versioned group of resources. @@ -269,30 +266,3 @@ func (s *Snapshot) ConstructVersionMap() error { return nil } - -// Respond to a delta watch with the provided snapshot value. If the response is nil, there has been no state change. -func (s *Snapshot) respondDelta(ctx context.Context, request *DeltaRequest, value chan DeltaResponse, state stream.StreamState, log log.Logger) (*RawDeltaResponse, error) { - resources := &resourceContainer{ - resourceMap: s.GetResources(request.TypeUrl), - versionMap: s.GetVersionMap(request.TypeUrl), - systemVersion: s.GetVersion(request.TypeUrl), - } - resp := createDeltaResponse(ctx, request, state, resources, log) - - // Only send a response if there were changes - // We want to respond immediately for the first wildcard request in a stream, even if the response is empty - // otherwise, envoy won't complete initialization - if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 || (state.IsWildcard() && state.IsFirst()) { - if log != nil { - log.Debugf("node: %s, sending delta response with resources: %v removed resources %v wildcard: %t", - request.GetNode().GetId(), resp.Resources, resp.RemovedResources, state.IsWildcard()) - } - select { - case value <- resp: - return resp, nil - case <-ctx.Done(): - return resp, context.Canceled - } - } - return nil, nil -} From 2c9fcfbf35de434c5d16b3eefdf4968df86f008b Mon Sep 17 00:00:00 2001 From: Sergey Matyukevich Date: Mon, 16 Aug 2021 10:48:36 -0600 Subject: [PATCH 8/8] all logging tags Signed-off-by: Sergey Matyukevich --- pkg/cache/v3/linear.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/cache/v3/linear.go b/pkg/cache/v3/linear.go index 2b046d31a8..870f683979 100644 --- a/pkg/cache/v3/linear.go +++ b/pkg/cache/v3/linear.go @@ -174,7 +174,7 @@ func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaRe // Only send a response if there were changes if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 { if cache.log != nil { - cache.log.Debugf("node: %s, sending delta response with resources: %v removed resources %v wildcard: %t", + cache.log.Debugf("[linear cache] node: %s, sending delta response with resources: %v removed resources %v wildcard: %t", request.GetNode().GetId(), resp.Resources, resp.RemovedResources, state.IsWildcard()) } value <- resp @@ -337,7 +337,7 @@ func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.S if response == nil { watchID := cache.nextDeltaWatchID() if cache.log != nil { - cache.log.Infof("open delta watch ID:%d for %s Resources:%v, system version %q", watchID, + cache.log.Infof("[linear cache] open delta watch ID:%d for %s Resources:%v, system version %q", watchID, cache.typeURL, state.GetResourceVersions(), cache.getVersion()) }