Skip to content

Commit

Permalink
Use sync.Once instead of mutex and rename 'stable' version to 'resour…
Browse files Browse the repository at this point in the history
…ce' version
  • Loading branch information
valerian-roche committed Dec 23, 2024
1 parent 56b52f9 commit 9e682c8
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 117 deletions.
2 changes: 1 addition & 1 deletion pkg/cache/v3/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func (r *RawDeltaResponse) GetDeltaDiscoveryResponse() (*discovery.DeltaDiscover
if err != nil {
return nil, fmt.Errorf("processing %s: %w", resource.name, err)
}
version, err := resource.getStableVersion()
version, err := resource.getResourceVersion()
if err != nil {
return nil, fmt.Errorf("processing version of %s: %w", resource.name, err)
}
Expand Down
123 changes: 44 additions & 79 deletions pkg/cache/v3/linear.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,6 @@ type cachedResource struct {

// cacheVersion is the version of the cache at the time of last update, used in sotw.
cacheVersion string

// stableVersion is the version of the resource itself (a hash of its content after deterministic marshaling).
// It is lazy initialized and should be accessed through getStableVersion.
stableVersion string

// marshaledResource contains the marshaled version of the resource.
// It is lazy initialized and should be accessed through getMarshaledResource
marshaledResource []byte

// mu is the mutex used to lazy compute the marshaled resource and stable version.
mu sync.Mutex
}

func newCachedResource(name string, res types.Resource, cacheVersion string) *cachedResource {
Expand All @@ -69,62 +58,38 @@ func newCachedResourceWithTTL(name string, res types.ResourceWithTTL, cacheVersi

// getMarshaledResource lazily marshals the resource and returns the bytes.
func (c *cachedResource) getMarshaledResource() ([]byte, error) {
c.mu.Lock()
defer c.mu.Unlock()

return c.marshalResourceLocked()
}

func (c *cachedResource) marshalResourceLocked() ([]byte, error) {
if c.marshaledResource != nil {
return c.marshaledResource, nil
}

marshaledResource, err := MarshalResource(c.resource)
if err != nil {
return nil, err
}
c.marshaledResource = marshaledResource
return c.marshaledResource, nil
return sync.OnceValues(func() ([]byte, error) {
return MarshalResource(c.resource)
})()
}

// getStableVersion lazily hashes the resource and returns the stable hash used to track version changes.
func (c *cachedResource) getStableVersion() (string, error) {
c.mu.Lock()
defer c.mu.Unlock()

return c.computeStableVersionLocked()
}

func (c *cachedResource) computeStableVersionLocked() (string, error) {
if c.stableVersion != "" {
return c.stableVersion, nil
}

marshaledResource, err := c.marshalResourceLocked()
if err != nil {
return "", err
}
c.stableVersion = HashResource(marshaledResource)
return c.stableVersion, nil
// getResourceVersion lazily hashes the resource and returns the stable hash used to track version changes.
func (c *cachedResource) getResourceVersion() (string, error) {
return sync.OnceValues(func() (string, error) {
marshaled, err := c.getMarshaledResource()
if err != nil {
return "", fmt.Errorf("marshaling resource: %w", err)
}
return HashResource(marshaled), nil
})()
}

// getVersion returns the requested version.
func (c *cachedResource) getVersion(useStableVersion bool) (string, error) {
if !useStableVersion {
func (c *cachedResource) getVersion(useResourceVersion bool) (string, error) {
if !useResourceVersion {
return c.cacheVersion, nil
}

return c.getStableVersion()
return c.getResourceVersion()
}

type watch interface {
// isDelta indicates whether the watch is a delta one.
// It should not be used to take functional decisions, but is still currently used pending final changes.
// It can be used to generate statistics.
isDelta() bool
// useStableVersion indicates whether versions returned in the response are built using stable versions instead of cache update versions.
useStableVersion() bool
// useResourceVersion indicates whether versions returned in the response are built using resource versions instead of cache update versions.
useResourceVersion() bool
// sendFullStateResponses requires that all resources matching the request, with no regards to which ones actually updated, must be provided in the response.
// As a consequence, sending a response with no resources has a functional meaning of no matching resources available.
sendFullStateResponses() bool
Expand Down Expand Up @@ -173,11 +138,11 @@ type LinearCache struct {
// cache instances and avoid issues of version reuse.
versionPrefix string

// useStableVersionsInSotw switches to a new version model for sotw watches.
// When activated, versions are stored in subscriptions using stable versions, and the response version
// is an hash of the returned versions to allow watch resumptions when reconnecting to the cache with a
// useResourceVersionsInSotw switches to a new version model for sotw watches.
// When activated, versions are stored in subscriptions using resource versions, and the response version
// is a hash of the returned versions to allow watch resumptions when reconnecting to the cache with a
// new subscription.
useStableVersionsInSotw bool
useResourceVersionsInSotw bool

watchCount int

Expand All @@ -194,7 +159,7 @@ type LinearCacheOption func(*LinearCache)
// WithVersionPrefix sets a version prefix of the form "prefixN" in the version info.
// Version prefix can be used to distinguish replicated instances of the cache, in case
// a client re-connects to another instance.
// Deprecated: use WithSotwStableVersions instead to avoid issues when reconnecting to other instances
// Deprecated: use WithSotwResourceVersions instead to avoid issues when reconnecting to other instances
// while avoiding resending resources if unchanged.
func WithVersionPrefix(prefix string) LinearCacheOption {
return func(cache *LinearCache) {
Expand All @@ -217,15 +182,15 @@ func WithLogger(log log.Logger) LinearCacheOption {
}
}

// WithSotwStableVersions changes the versions returned in sotw to encode the list of resources known
// WithSotwResourceVersions changes the versions returned in sotw to encode the list of resources known
// in the subscription.
// The use of stable versions for sotw also deduplicates updates to clients if the cache updates are
// The use of resource versions for sotw also deduplicates updates to clients if the cache updates are
// not changing the content of the resource.
// When used, the use of WithVersionPrefix is no longer needed to manage reconnection to other instances
// and should not be used.
func WithSotwStableVersions() LinearCacheOption {
func WithSotwResourceVersions() LinearCacheOption {
return func(cache *LinearCache) {
cache.useStableVersionsInSotw = true
cache.useResourceVersionsInSotw = true
}
}

Expand Down Expand Up @@ -253,10 +218,10 @@ func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache {
// computeResourceChange compares the subscription known resources and the cache current state to compute the list of resources
// which have changed and should be notified to the user.
//
// The useStableVersion argument defines what version type to use for resources:
// The useResourceVersion argument defines what version type to use for resources:
// - if set to false versions are based on when resources were updated in the cache.
// - if set to true versions are a stable property of the resource, with no regard to when it was added to the cache.
func (cache *LinearCache) computeResourceChange(sub Subscription, useStableVersion bool) (updated, removed []string, err error) {
func (cache *LinearCache) computeResourceChange(sub Subscription, useResourceVersion bool) (updated, removed []string, err error) {
var changedResources []string
var removedResources []string

Expand All @@ -268,7 +233,7 @@ func (cache *LinearCache) computeResourceChange(sub Subscription, useStableVersi
// This resource is not yet known by the client (new resource added in the cache or newly subscribed).
changedResources = append(changedResources, resourceName)
} else {
resourceVersion, err := resource.getVersion(useStableVersion)
resourceVersion, err := resource.getVersion(useResourceVersion)
if err != nil {
return nil, nil, fmt.Errorf("failed to compute version of %s: %w", resourceName, err)
}
Expand Down Expand Up @@ -304,7 +269,7 @@ func (cache *LinearCache) computeResourceChange(sub Subscription, useStableVersi
// This resource is not yet known by the client (new resource added in the cache or newly subscribed).
changedResources = append(changedResources, resourceName)
} else {
resourceVersion, err := res.getVersion(useStableVersion)
resourceVersion, err := res.getVersion(useResourceVersion)
if err != nil {
return nil, nil, fmt.Errorf("failed to compute version of %s: %w", resourceName, err)
}
Expand All @@ -329,7 +294,7 @@ func (cache *LinearCache) computeResourceChange(sub Subscription, useStableVersi

func (cache *LinearCache) computeResponse(watch watch, replyEvenIfEmpty bool) (WatchResponse, error) {
sub := watch.getSubscription()
changedResources, removedResources, err := cache.computeResourceChange(sub, watch.useStableVersion())
changedResources, removedResources, err := cache.computeResourceChange(sub, watch.useResourceVersion())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -388,7 +353,7 @@ func (cache *LinearCache) computeResponse(watch watch, replyEvenIfEmpty bool) (W
for _, resourceName := range resourcesToReturn {
cachedResource := cache.resources[resourceName]
resources = append(resources, cachedResource)
version, err := cachedResource.getVersion(watch.useStableVersion())
version, err := cachedResource.getVersion(watch.useResourceVersion())
if err != nil {
return nil, fmt.Errorf("failed to compute version of %s: %w", resourceName, err)
}
Expand All @@ -403,8 +368,8 @@ func (cache *LinearCache) computeResponse(watch watch, replyEvenIfEmpty bool) (W

// TODO(valerian-roche): remove this leak of delta/sotw behavior here.
responseVersion := cache.getVersion()
if watch.useStableVersion() && !watch.isDelta() {
responseVersion = cache.versionPrefix + computeSotwStableVersion(returnedVersions)
if watch.useResourceVersion() && !watch.isDelta() {
responseVersion = cache.versionPrefix + computeSotwResourceVersion(returnedVersions)
}

return watch.buildResponse(resources, removedResources, returnedVersions, responseVersion), nil
Expand Down Expand Up @@ -516,8 +481,8 @@ func (cache *LinearCache) SetResources(resources map[string]types.Resource) {
}

// We assume all resources passed to SetResources are changed.
// In delta and if stable versions are used for sotw, identical resources will not trigger watches.
// In sotw without stable versions used, all those resources will trigger watches, even if identical.
// In delta and if resource versions are used for sotw, identical resources will not trigger watches.
// In sotw without resource versions used, all those resources will trigger watches, even if identical.
for name, resource := range resources {
cache.resources[name] = newCachedResource(name, resource, version)
modified = append(modified, name)
Expand Down Expand Up @@ -568,13 +533,13 @@ func (cache *LinearCache) CreateWatch(request *Request, sub Subscription, value
// We could optimize the reconnection case here if:
// - we take the assumption that clients will not start requesting wildcard while providing a version. We could then ignore requests providing the resources.
// - we use the version as some form of hash of resources known, and we can then consider it as a way to correctly verify whether all resources are unchanged.
// When using the `WithSotwStableVersions` option, this optimization is activated and avoids resending all the dataset on wildcard watch resumption if no change has occurred.
// When using the `WithSotwResourceVersions` option, this optimization is activated and avoids resending all the dataset on wildcard watch resumption if no change has occurred.
watch := ResponseWatch{
Request: request,
Response: value,
subscription: sub,
enableStableVersion: cache.useStableVersionsInSotw,
fullStateResponses: ResourceRequiresFullStateInSotw(cache.typeURL),
Request: request,
Response: value,
subscription: sub,
enableResourceVersion: cache.useResourceVersionsInSotw,
fullStateResponses: ResourceRequiresFullStateInSotw(cache.typeURL),
}

cache.mu.Lock()
Expand All @@ -590,12 +555,12 @@ func (cache *LinearCache) CreateWatch(request *Request, sub Subscription, value
// - is the first
// - is wildcard
// - provides a non-empty version, matching the version prefix
// and the cache uses stable versions, if the generated versions are the same as the previous one, we do not return the response.
// and the cache uses resource versions, if the generated versions are the same as the previous one, we do not return the response.
// This avoids resending all data if the new subscription is just a resumption of the previous one.
// This optimization is only done on wildcard as we cannot track if a subscription is "new" at this stage and needs to be returned.
// In the context of wildcard it could be incorrect if the subscription is newly wildcard, and we already returned all objects,
// but as of Q1-2024 there are no known usecases of a subscription becoming wildcard (in envoy of xds-grpc).
if cache.useStableVersionsInSotw && sub.IsWildcard() && request.GetResponseNonce() == "" && !replyEvenIfEmpty {
if cache.useResourceVersionsInSotw && sub.IsWildcard() && request.GetResponseNonce() == "" && !replyEvenIfEmpty {
if request.GetVersionInfo() != response.GetResponseVersion() {
// The response has a different returned version map as the request
shouldReply = true
Expand Down
33 changes: 2 additions & 31 deletions pkg/cache/v3/linear_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,20 +206,6 @@ func checkTotalWatchCount(t *testing.T, c *LinearCache, count int) {
}
}

func checkStableVersionsAreNotComputed(t *testing.T, c *LinearCache, resources ...string) {
t.Helper()
for _, res := range resources {
assert.Empty(t, c.resources[res].stableVersion, "stable version not set on resource %s", res)
}
}

func checkStableVersionsAreComputed(t *testing.T, c *LinearCache, resources ...string) {
t.Helper()
for _, res := range resources {
assert.NotEmpty(t, c.resources[res].stableVersion, "stable version not set on resource %s", res)
}
}

func mustBlock(t *testing.T, w <-chan Response) {
t.Helper()
select {
Expand Down Expand Up @@ -775,16 +761,13 @@ func TestLinearDeltaResourceUpdate(t *testing.T) {
hashB := hashResource(t, b)
err = c.UpdateResource("b", b)
require.NoError(t, err)
// There is currently no delta watch
checkStableVersionsAreNotComputed(t, c, "a", "b")

req := &DeltaRequest{TypeUrl: testType, ResourceNamesSubscribe: []string{"a", "b"}}
w := make(chan DeltaResponse, 1)
_, err = c.CreateDeltaWatch(req, subFromDeltaRequest(req), w)
require.NoError(t, err)
checkTotalWatchCount(t, c, 0)
verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}, nil)
checkStableVersionsAreComputed(t, c, "a", "b")

req = &DeltaRequest{TypeUrl: testType, ResourceNamesSubscribe: []string{"a", "b"}, InitialResourceVersions: map[string]string{"a": hashA, "b": hashB}}
w = make(chan DeltaResponse, 1)
Expand All @@ -800,7 +783,6 @@ func TestLinearDeltaResourceUpdate(t *testing.T) {
err = c.UpdateResource("a", a)
require.NoError(t, err)
verifyDeltaResponse(t, w, []resourceInfo{{"a", hashA}}, nil)
checkStableVersionsAreComputed(t, c, "a")
}

func TestLinearDeltaResourceDelete(t *testing.T) {
Expand Down Expand Up @@ -857,7 +839,6 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) {
require.NoError(t, err)
resp := <-w
validateDeltaResponse(t, resp, []resourceInfo{{"a", hashA}, {"b", hashB}}, nil)
checkStableVersionsAreComputed(t, c, "a", "b")
assert.Equal(t, 2, c.NumResources())

sub.SetReturnedResources(resp.GetNextVersionMap())
Expand All @@ -880,7 +861,6 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) {
require.NoError(t, err)
resp = <-w
validateDeltaResponse(t, resp, []resourceInfo{{"a", hashA}, {"b", hashB}}, nil)
checkStableVersionsAreComputed(t, c, "a", "b")
assert.Equal(t, 2, c.NumResources())
sub.SetReturnedResources(resp.GetNextVersionMap())

Expand All @@ -900,9 +880,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) {
assert.NotContains(t, c.resources, "b", "resource with name b was found in cache")
resp = <-w
validateDeltaResponse(t, resp, []resourceInfo{{"a", hashA}}, []string{"b"})
checkStableVersionsAreComputed(t, c, "a")
// d is not watched currently
checkStableVersionsAreNotComputed(t, c, "d")
assert.Equal(t, 2, c.NumResources())
sub.SetReturnedResources(resp.GetNextVersionMap())

Expand All @@ -919,7 +897,6 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) {
assert.NotContains(t, c.resources, "d", "resource with name d was found in cache")
resp = <-w
validateDeltaResponse(t, resp, []resourceInfo{{"b", hashB}}, nil) // d is not watched and should not be returned
checkStableVersionsAreComputed(t, c, "b")
assert.Equal(t, 2, c.NumResources())
sub.SetReturnedResources(resp.GetNextVersionMap())

Expand All @@ -937,7 +914,6 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) {
require.NoError(t, err)
verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"d", hashD}}, nil)
// d is now watched and should be returned
checkStableVersionsAreComputed(t, c, "b", "d")
assert.Equal(t, 3, c.NumResources())

// Wildcard update/delete
Expand Down Expand Up @@ -975,8 +951,6 @@ func TestLinearMixedWatches(t *testing.T) {
_, err = c.CreateWatch(sotwReq, sotwSub, w)
require.NoError(t, err)
mustBlock(t, w)
// Only sotw watches, should not have triggered stable resource computation
checkStableVersionsAreNotComputed(t, c, "a", "b")
checkTotalWatchCount(t, c, 1)
checkWatchCount(t, c, "a", 1)
checkWatchCount(t, c, "b", 1)
Expand All @@ -990,15 +964,13 @@ func TestLinearMixedWatches(t *testing.T) {
// This behavior is currently invalid for cds and lds, but due to a current limitation of linear cache sotw implementation
resp := verifyResponseResources(t, w, resource.EndpointType, c.getVersion(), "a")
updateFromSotwResponse(resp, &sotwSub, sotwReq)
checkStableVersionsAreNotComputed(t, c, "a", "b")
checkTotalWatchCount(t, c, 0)
checkWatchCount(t, c, "a", 0)
checkWatchCount(t, c, "b", 0)

_, err = c.CreateWatch(sotwReq, sotwSub, w)
require.NoError(t, err)
mustBlock(t, w)
checkStableVersionsAreNotComputed(t, c, "a", "b")
checkTotalWatchCount(t, c, 1)

deltaReq := &DeltaRequest{TypeUrl: resource.EndpointType, ResourceNamesSubscribe: []string{"a", "b"}, InitialResourceVersions: map[string]string{"a": hashA, "b": hashB}}
Expand All @@ -1009,7 +981,6 @@ func TestLinearMixedWatches(t *testing.T) {
require.NoError(t, err)
mustBlockDelta(t, wd)
checkTotalWatchCount(t, c, 2)
checkStableVersionsAreComputed(t, c, "a", "b")
checkWatchCount(t, c, "a", 2)
checkWatchCount(t, c, "b", 2)

Expand Down Expand Up @@ -1485,7 +1456,7 @@ func TestLinearSotwVersion(t *testing.T) {
"b": &endpoint.ClusterLoadAssignment{ClusterName: "b"},
"c": &endpoint.ClusterLoadAssignment{ClusterName: "c"},
},
), WithSotwStableVersions())
), WithSotwResourceVersions())

buildRequest := func(res []string, version string) *discovery.DiscoveryRequest {
return &discovery.DiscoveryRequest{
Expand Down Expand Up @@ -1542,7 +1513,7 @@ func TestLinearSotwVersion(t *testing.T) {
"b": &endpoint.ClusterLoadAssignment{ClusterName: "b"},
"c": &endpoint.ClusterLoadAssignment{ClusterName: "c"},
},
), WithSotwStableVersions(), WithVersionPrefix("test-prefix-"))
), WithSotwResourceVersions(), WithVersionPrefix("test-prefix-"))

t.Run("watch opened with the same last version missing prefix", func(t *testing.T) {
req := buildRequest([]string{}, lastVersion)
Expand Down
Loading

0 comments on commit 9e682c8

Please sign in to comment.