Skip to content

Commit

Permalink
Use mutex to make cachedResource lazy accessors thread safe
Browse files Browse the repository at this point in the history
  • Loading branch information
valerian-roche committed Dec 17, 2024
1 parent 4100f52 commit 70ffc5b
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 26 deletions.
14 changes: 7 additions & 7 deletions pkg/cache/v3/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ type RawResponse struct {
Version string

// Resources to be included in the response.
resources []cachedResource
resources []*cachedResource

// ReturnedResources tracks the resources returned for the subscription and the version when it was last returned,
// including previously returned ones when using non-full state resources.
Expand Down Expand Up @@ -207,7 +207,7 @@ type RawDeltaResponse struct {
SystemVersionInfo string

// Resources to be included in the response.
resources []cachedResource
resources []*cachedResource

// RemovedResources is a list of resource aliases which should be dropped by the consuming client.
removedResources []string
Expand Down Expand Up @@ -265,11 +265,11 @@ var (
)

func NewTestRawResponse(req *discovery.DiscoveryRequest, version string, resources []types.ResourceWithTTL) *RawResponse {
cachedRes := []cachedResource{}
cachedRes := []*cachedResource{}
for _, res := range resources {
newRes := newCachedResource(GetResourceName(res.Resource), res.Resource, version)
newRes.ttl = res.TTL
cachedRes = append(cachedRes, *newRes)
cachedRes = append(cachedRes, newRes)
}
return &RawResponse{
Request: req,
Expand All @@ -279,12 +279,12 @@ func NewTestRawResponse(req *discovery.DiscoveryRequest, version string, resourc
}

func NewTestRawDeltaResponse(req *discovery.DeltaDiscoveryRequest, version string, resources []types.ResourceWithTTL, removedResources []string, nextVersionMap map[string]string) *RawDeltaResponse {
cachedRes := []cachedResource{}
cachedRes := []*cachedResource{}
for _, res := range resources {
name := GetResourceName(res.Resource)
newRes := newCachedResource(name, res.Resource, nextVersionMap[name])
newRes.ttl = res.TTL
cachedRes = append(cachedRes, *newRes)
cachedRes = append(cachedRes, newRes)
}
return &RawDeltaResponse{
DeltaRequest: req,
Expand Down Expand Up @@ -443,7 +443,7 @@ func (r *RawDeltaResponse) GetContext() context.Context {

var deltaResourceTypeURL = "type.googleapis.com/" + string(proto.MessageName(&discovery.Resource{}))

func (r *RawResponse) marshalTTLResource(resource cachedResource) (*anypb.Any, error) {
func (r *RawResponse) marshalTTLResource(resource *cachedResource) (*anypb.Any, error) {
if resource.ttl == nil {
marshaled, err := resource.getMarshaledResource()
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/cache/v3/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const (
)

func TestResponseGetDiscoveryResponse(t *testing.T) {
routes := []cachedResource{*newCachedResource(resourceName, &route.RouteConfiguration{Name: resourceName}, "v")}
routes := []*cachedResource{newCachedResource(resourceName, &route.RouteConfiguration{Name: resourceName}, "v")}
resp := RawResponse{
Request: &discovery.DiscoveryRequest{TypeUrl: resource.RouteType},
Version: "v",
Expand Down Expand Up @@ -69,7 +69,7 @@ func TestPassthroughResponseGetDiscoveryResponse(t *testing.T) {
}

func TestHeartbeatResponseGetDiscoveryResponse(t *testing.T) {
routes := []cachedResource{*newCachedResource(resourceName, &route.RouteConfiguration{Name: resourceName}, "v")}
routes := []*cachedResource{newCachedResource(resourceName, &route.RouteConfiguration{Name: resourceName}, "v")}
resp := RawResponse{
Request: &discovery.DiscoveryRequest{TypeUrl: resource.RouteType},
Version: "v",
Expand Down
8 changes: 4 additions & 4 deletions pkg/cache/v3/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ type resourceContainer struct {
func createDeltaResponse(ctx context.Context, req *DeltaRequest, sub Subscription, resources resourceContainer, cacheVersion string) *RawDeltaResponse {
// variables to build our response with
var nextVersionMap map[string]string
var filtered []cachedResource
var filtered []*cachedResource
var toRemove []string

// If we are handling a wildcard request, we want to respond with all resources
switch {
case sub.IsWildcard():
if len(sub.ReturnedResources()) == 0 {
filtered = make([]cachedResource, 0, len(resources.resourceMap))
filtered = make([]*cachedResource, 0, len(resources.resourceMap))
}
nextVersionMap = make(map[string]string, len(resources.resourceMap))
for name, r := range resources.resourceMap {
Expand All @@ -46,7 +46,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, sub Subscriptio
nextVersionMap[name] = version
prevVersion, found := sub.ReturnedResources()[name]
if !found || (prevVersion != version) {
filtered = append(filtered, *newCachedResource(name, r, version))
filtered = append(filtered, newCachedResource(name, r, version))
}
}

Expand All @@ -66,7 +66,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, sub Subscriptio
if r, ok := resources.resourceMap[name]; ok {
nextVersion := resources.versionMap[name]
if prevVersion != nextVersion {
filtered = append(filtered, *newCachedResource(name, r, nextVersion))
filtered = append(filtered, newCachedResource(name, r, nextVersion))
}
nextVersionMap[name] = nextVersion
} else if found {
Expand Down
28 changes: 21 additions & 7 deletions pkg/cache/v3/linear.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ type cachedResource struct {
// marshaledResource contains the marshaled version of the resource.
// It is lazy initialized and should be accessed through getMarshaledResource
marshaledResource []byte

// mu is the mutex used to lazy instantiate the marshaled resource and stable version.
mu sync.Mutex
}

func newCachedResource(name string, res types.Resource, cacheVersion string) *cachedResource {
Expand All @@ -65,8 +68,14 @@ func newCachedResourceWithTTL(name string, res types.ResourceWithTTL, cacheVersi
}

// getMarshaledResource lazily marshals the resource and return the bytes.
// It is not safe to call it concurrently.
func (c *cachedResource) getMarshaledResource() ([]byte, error) {
c.mu.Lock()
defer c.mu.Unlock()

return c.marshalResourceLocked()
}

func (c *cachedResource) marshalResourceLocked() ([]byte, error) {
if c.marshaledResource != nil {
return c.marshaledResource, nil
}
Expand All @@ -80,13 +89,19 @@ func (c *cachedResource) getMarshaledResource() ([]byte, error) {
}

// getStableVersion lazily hashes the resource and return the stable hash used to track version changes.
// It is not safe to call it concurrently.
func (c *cachedResource) getStableVersion() (string, error) {
c.mu.Lock()
defer c.mu.Unlock()

return c.computeStableVersionLocked()
}

func (c *cachedResource) computeStableVersionLocked() (string, error) {
if c.stableVersion != "" {
return c.stableVersion, nil
}

marshaledResource, err := c.getMarshaledResource()
marshaledResource, err := c.marshalResourceLocked()
if err != nil {
return "", err
}
Expand All @@ -95,7 +110,6 @@ func (c *cachedResource) getStableVersion() (string, error) {
}

// getVersion returns the requested version.
// It is not safe to call it concurrently.
func (c *cachedResource) getVersion(useStableVersion bool) (string, error) {
if !useStableVersion {
return c.cacheVersion, nil
Expand All @@ -118,7 +132,7 @@ type watch interface {
getSubscription() Subscription
// buildResponse computes the actual WatchResponse object to be sent on the watch.
// cachedResource instances are passed by copy as their lazy accessors are not thread-safe.
buildResponse(updatedResources []cachedResource, removedResources []string, returnedVersions map[string]string, version string) WatchResponse
buildResponse(updatedResources []*cachedResource, removedResources []string, returnedVersions map[string]string, version string) WatchResponse
// sendResponse sends the response for the watch.
// It must be called at most once.
sendResponse(resp WatchResponse)
Expand Down Expand Up @@ -370,10 +384,10 @@ func (cache *LinearCache) computeResponse(watch watch, replyEvenIfEmpty bool) (W
returnedVersions[resourceName] = version
}

resources := make([]cachedResource, 0, len(resourcesToReturn))
resources := make([]*cachedResource, 0, len(resourcesToReturn))
for _, resourceName := range resourcesToReturn {
cachedResource := cache.resources[resourceName]
resources = append(resources, *cachedResource)
resources = append(resources, cachedResource)
version, err := cachedResource.getVersion(watch.useStableVersion())
if err != nil {
return nil, fmt.Errorf("failed to compute version of %s: %w", resourceName, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/v3/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func GetResourceNames(resources []types.ResourceWithTTL) []string {
}

// getCachedResourceNames returns the resource names for a list of valid xDS response types.
func getCachedResourceNames(resources []cachedResource) []string {
func getCachedResourceNames(resources []*cachedResource) []string {
out := make([]string, len(resources))
for i, r := range resources {
out[i] = GetResourceName(r.resource)
Expand Down
6 changes: 3 additions & 3 deletions pkg/cache/v3/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ func (cache *snapshotCache) respond(ctx context.Context, watch ResponseWatch, re
}

func createResponse(ctx context.Context, request *Request, resources map[string]types.ResourceWithTTL, version string, heartbeat bool) Response {
filtered := make([]cachedResource, 0, len(resources))
filtered := make([]*cachedResource, 0, len(resources))
returnedResources := make(map[string]string, len(resources))

// Reply only with the requested resources. Envoy may ask each resource
Expand All @@ -512,13 +512,13 @@ func createResponse(ctx context.Context, request *Request, resources map[string]
set := nameSet(request.GetResourceNames())
for name, resource := range resources {
if set[name] {
filtered = append(filtered, *newCachedResourceWithTTL(name, resource, version))
filtered = append(filtered, newCachedResourceWithTTL(name, resource, version))
returnedResources[name] = version
}
}
} else {
for name, resource := range resources {
filtered = append(filtered, *newCachedResourceWithTTL(name, resource, version))
filtered = append(filtered, newCachedResourceWithTTL(name, resource, version))
returnedResources[name] = version
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/cache/v3/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (w ResponseWatch) isDelta() bool {
return false
}

func (w ResponseWatch) buildResponse(updatedResources []cachedResource, _ []string, returnedVersions map[string]string, version string) WatchResponse {
func (w ResponseWatch) buildResponse(updatedResources []*cachedResource, _ []string, returnedVersions map[string]string, version string) WatchResponse {
return &RawResponse{
Request: w.Request,
resources: updatedResources,
Expand Down Expand Up @@ -189,7 +189,7 @@ func (w DeltaResponseWatch) getSubscription() Subscription {
return w.subscription
}

func (w DeltaResponseWatch) buildResponse(updatedResources []cachedResource, removedResources []string, returnedVersions map[string]string, version string) WatchResponse {
func (w DeltaResponseWatch) buildResponse(updatedResources []*cachedResource, removedResources []string, returnedVersions map[string]string, version string) WatchResponse {
return &RawDeltaResponse{
DeltaRequest: w.Request,
resources: updatedResources,
Expand Down

0 comments on commit 70ffc5b

Please sign in to comment.