Skip to content

Commit

Permalink
[Sotw][Linear cache] Ensure watches are properly considering subscrip…
Browse files Browse the repository at this point in the history
…tion changes to not miss new resources (#10)

* Fix linear tests to properly track subscription and adapt to the new version handling. Ensure we always reply with all resources when version is not set or the version prefix does not match our cache

* Add tests for sotw linear watches

* Fix comments and linting

Signed-off-by: Valerian Roche <valerian.roche@datadoghq.com>

---------

Signed-off-by: Valerian Roche <valerian.roche@datadoghq.com>
  • Loading branch information
valerian-roche committed Feb 5, 2024
1 parent a7a87e8 commit 835ef87
Show file tree
Hide file tree
Showing 9 changed files with 867 additions and 317 deletions.
25 changes: 25 additions & 0 deletions pkg/cache/v3/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ type Response interface {
// Get the version in the Response.
GetVersion() (string, error)

// GetReturnedResources returns the map of resources and their versions returned in the subscription.
// It may include more resources than directly set in the response to consider the full state of the client.
// The caller is expected to provide this unchanged to the next call to CreateWatch as part of the subscription.
GetReturnedResources() map[string]string

// Get the context provided during response creation.
GetContext() context.Context
}
Expand Down Expand Up @@ -156,6 +161,12 @@ type RawResponse struct {
// Resources to be included in the response.
Resources []types.ResourceWithTTL

// ReturnedResources tracks the resources returned for the subscription and the version when it was last returned,
// including previously returned ones when using non-full state resources.
// It allows the cache to know what the client knows. The server will transparently forward this
// across requests, and the cache is responsible for its interpretation.
ReturnedResources map[string]string

// Whether this is a heartbeat response. For xDS versions that support TTL, this
// will be converted into a response that doesn't contain the actual resource protobuf.
// This allows for more lightweight updates that server only to update the TTL timer.
Expand Down Expand Up @@ -208,6 +219,12 @@ type PassthroughResponse struct {
DiscoveryResponse *discovery.DiscoveryResponse

ctx context.Context

// ReturnedResources tracks the resources returned for the subscription and the version when it was last returned,
// including previously returned ones when using non-full state resources.
// It allows the cache to know what the client knows. The server will transparently forward this
// across requests, and the cache is responsible for its interpretation.
ReturnedResources map[string]string
}

// DeltaPassthroughResponse is a pre constructed xDS response that need not go through marshaling transformations.
Expand Down Expand Up @@ -265,6 +282,10 @@ func (r *RawResponse) GetDiscoveryResponse() (*discovery.DiscoveryResponse, erro
return marshaledResponse.(*discovery.DiscoveryResponse), nil
}

func (r *RawResponse) GetReturnedResources() map[string]string {
return r.ReturnedResources
}

// GetDeltaDiscoveryResponse performs the marshaling the first time its called and uses the cached response subsequently.
// We can do this because the marshaled response does not change across the calls.
// This caching behavior is important in high throughput scenarios because grpc marshaling has a cost and it drives the cpu utilization under load.
Expand Down Expand Up @@ -368,6 +389,10 @@ func (r *PassthroughResponse) GetDiscoveryResponse() (*discovery.DiscoveryRespon
return r.DiscoveryResponse, nil
}

func (r *PassthroughResponse) GetReturnedResources() map[string]string {
return r.ReturnedResources
}

// GetDeltaDiscoveryResponse returns the final passthrough Delta Discovery Response.
func (r *DeltaPassthroughResponse) GetDeltaDiscoveryResponse() (*discovery.DeltaDiscoveryResponse, error) {
return r.DeltaDiscoveryResponse, nil
Expand Down
11 changes: 6 additions & 5 deletions pkg/cache/v3/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/envoyproxy/go-control-plane/pkg/log"
rsrc "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
"github.com/envoyproxy/go-control-plane/pkg/test/resource/v3"
Expand All @@ -30,7 +31,7 @@ func assertResourceMapEqual(t *testing.T, want, got map[string]types.Resource) {
}

func TestSnapshotCacheDeltaWatch(t *testing.T) {
c := cache.NewSnapshotCache(false, group{}, logger{t: t})
c := cache.NewSnapshotCache(false, group{}, log.NewTestLogger(t))
watches := make(map[string]chan cache.DeltaResponse)
subscriptions := make(map[string]stream.Subscription)

Expand Down Expand Up @@ -118,7 +119,7 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
}

func TestDeltaRemoveResources(t *testing.T) {
c := cache.NewSnapshotCache(false, group{}, logger{t: t})
c := cache.NewSnapshotCache(false, group{}, log.NewTestLogger(t))
watches := make(map[string]chan cache.DeltaResponse)
subscriptions := make(map[string]*stream.Subscription)

Expand Down Expand Up @@ -198,7 +199,7 @@ func TestDeltaRemoveResources(t *testing.T) {
}

func TestConcurrentSetDeltaWatch(t *testing.T) {
c := cache.NewSnapshotCache(false, group{}, logger{t: t})
c := cache.NewSnapshotCache(false, group{}, log.NewTestLogger(t))
for i := 0; i < 50; i++ {
version := fmt.Sprintf("v%d", i)
func(i int) {
Expand Down Expand Up @@ -235,7 +236,7 @@ func TestConcurrentSetDeltaWatch(t *testing.T) {
type testKey struct{}

func TestSnapshotDeltaCacheWatchTimeout(t *testing.T) {
c := cache.NewSnapshotCache(true, group{}, logger{t: t})
c := cache.NewSnapshotCache(true, group{}, log.NewTestLogger(t))

// Create a non-buffered channel that will block sends.
watchCh := make(chan cache.DeltaResponse)
Expand Down Expand Up @@ -280,7 +281,7 @@ func TestSnapshotDeltaCacheWatchTimeout(t *testing.T) {
}

func TestSnapshotCacheDeltaWatchCancel(t *testing.T) {
c := cache.NewSnapshotCache(true, group{}, logger{t: t})
c := cache.NewSnapshotCache(true, group{}, log.NewTestLogger(t))
for _, typ := range testTypes {
responses := make(chan cache.DeltaResponse, 1)
cancel, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Expand Down
Loading

0 comments on commit 835ef87

Please sign in to comment.