Skip to content

Commit

Permalink
[Linear] Ensure a resource is only serialized/hashed at most once in …
Browse files Browse the repository at this point in the history
…linear cache
  • Loading branch information
valerian-roche committed Dec 17, 2024
1 parent 835ea7e commit 4100f52
Show file tree
Hide file tree
Showing 14 changed files with 319 additions and 216 deletions.
213 changes: 137 additions & 76 deletions pkg/cache/v3/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,15 @@ package cache

import (
"context"
"errors"
"fmt"
"sync/atomic"

"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/durationpb"

"github.com/envoyproxy/go-control-plane/pkg/cache/types"

discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
)

// Request is an alias for the discovery request type.
Expand Down Expand Up @@ -179,13 +177,13 @@ type RawResponse struct {
Version string

// Resources to be included in the response.
Resources []types.ResourceWithTTL
resources []cachedResource

// ReturnedResources tracks the resources returned for the subscription and the version when it was last returned,
// including previously returned ones when using non-full state resources.
// It allows the cache to know what the client knows. The server will transparently forward this
// across requests, and the cache is responsible for its interpretation.
ReturnedResources map[string]string
returnedResources map[string]string

// Whether this is a heartbeat response. For xDS versions that support TTL, this
// will be converted into a response that doesn't contain the actual resource protobuf.
Expand All @@ -197,7 +195,7 @@ type RawResponse struct {
Ctx context.Context

// marshaledResponse holds an atomic reference to the serialized discovery response.
marshaledResponse atomic.Value
marshaledResponse atomic.Pointer[discovery.DiscoveryResponse]
}

// RawDeltaResponse is a pre-serialized xDS response that utilizes the delta discovery request/response objects.
Expand All @@ -209,10 +207,10 @@ type RawDeltaResponse struct {
SystemVersionInfo string

// Resources to be included in the response.
Resources []types.ResourceWithTTL
resources []cachedResource

// RemovedResources is a list of resource aliases which should be dropped by the consuming client.
RemovedResources []string
removedResources []string

// NextVersionMap consists of updated version mappings after this response is applied.
NextVersionMap map[string]string
Expand All @@ -222,7 +220,7 @@ type RawDeltaResponse struct {
Ctx context.Context

// Marshaled Resources to be included in the response.
marshaledResponse atomic.Value
marshaledResponse atomic.Pointer[discovery.DeltaDiscoveryResponse]
}

var (
Expand Down Expand Up @@ -266,44 +264,79 @@ var (
_ DeltaResponse = &DeltaPassthroughResponse{}
)

func NewTestRawResponse(req *discovery.DiscoveryRequest, version string, resources []types.ResourceWithTTL) *RawResponse {
cachedRes := []cachedResource{}
for _, res := range resources {
newRes := newCachedResource(GetResourceName(res.Resource), res.Resource, version)
newRes.ttl = res.TTL
cachedRes = append(cachedRes, *newRes)
}
return &RawResponse{
Request: req,
Version: version,
resources: cachedRes,
}
}

func NewTestRawDeltaResponse(req *discovery.DeltaDiscoveryRequest, version string, resources []types.ResourceWithTTL, removedResources []string, nextVersionMap map[string]string) *RawDeltaResponse {
cachedRes := []cachedResource{}
for _, res := range resources {
name := GetResourceName(res.Resource)
newRes := newCachedResource(name, res.Resource, nextVersionMap[name])
newRes.ttl = res.TTL
cachedRes = append(cachedRes, *newRes)
}
return &RawDeltaResponse{
DeltaRequest: req,
SystemVersionInfo: version,
resources: cachedRes,
removedResources: removedResources,
NextVersionMap: nextVersionMap,
}
}

// GetDiscoveryResponse performs the marshaling the first time its called and uses the cached response subsequently.
// This is necessary because the marshaled response does not change across the calls.
// This caching behavior is important in high throughput scenarios because grpc marshaling has a cost and it drives the cpu utilization under load.
func (r *RawResponse) GetDiscoveryResponse() (*discovery.DiscoveryResponse, error) {
marshaledResponse := r.marshaledResponse.Load()

if marshaledResponse == nil {
marshaledResources := make([]*anypb.Any, len(r.Resources))

for i, resource := range r.Resources {
maybeTtldResource, resourceType, err := r.maybeCreateTTLResource(resource)
if err != nil {
return nil, err
}
marshaledResource, err := MarshalResource(maybeTtldResource)
if err != nil {
return nil, err
}
marshaledResources[i] = &anypb.Any{
TypeUrl: resourceType,
Value: marshaledResource,
}
}
if marshaledResponse != nil {
return marshaledResponse, nil
}

marshaledResponse = &discovery.DiscoveryResponse{
VersionInfo: r.Version,
Resources: marshaledResources,
TypeUrl: r.GetRequest().GetTypeUrl(),
marshaledResources := make([]*anypb.Any, len(r.resources))

for i, resource := range r.resources {
marshaledResource, err := r.marshalTTLResource(resource)
if err != nil {
return nil, fmt.Errorf("processing %s: %w", GetResourceName(resource.resource), err)
}
marshaledResources[i] = marshaledResource
}

r.marshaledResponse.Store(marshaledResponse)
marshaledResponse = &discovery.DiscoveryResponse{
VersionInfo: r.Version,
Resources: marshaledResources,
TypeUrl: r.GetRequest().GetTypeUrl(),
}

return marshaledResponse.(*discovery.DiscoveryResponse), nil
r.marshaledResponse.Store(marshaledResponse)

return marshaledResponse, nil
}

func (r *RawResponse) GetReturnedResources() map[string]string {
return r.ReturnedResources
return r.returnedResources
}

// GetRawResources is used internally within go-control-plane. Its interface and content may change
func (r *RawResponse) GetRawResources() []types.ResourceWithTTL {
resources := make([]types.ResourceWithTTL, 0, len(r.resources))
for _, res := range r.resources {
resources = append(resources, types.ResourceWithTTL{Resource: res.resource, TTL: res.ttl})
}
return resources
}

// GetDeltaDiscoveryResponse performs the marshaling the first time its called and uses the cached response subsequently.
Expand All @@ -312,39 +345,49 @@ func (r *RawResponse) GetReturnedResources() map[string]string {
func (r *RawDeltaResponse) GetDeltaDiscoveryResponse() (*discovery.DeltaDiscoveryResponse, error) {
marshaledResponse := r.marshaledResponse.Load()

if marshaledResponse == nil {
marshaledResources := make([]*discovery.Resource, len(r.Resources))

for i, resource := range r.Resources {
name := GetResourceName(resource.Resource)
marshaledResource, err := MarshalResource(resource.Resource)
if err != nil {
return nil, err
}
version := HashResource(marshaledResource)
if version == "" {
return nil, errors.New("failed to create a resource hash")
}
marshaledResources[i] = &discovery.Resource{
Name: name,
Resource: &anypb.Any{
TypeUrl: r.GetDeltaRequest().GetTypeUrl(),
Value: marshaledResource,
},
Version: version,
}
}
if marshaledResponse != nil {
return marshaledResponse, nil
}

marshaledResponse = &discovery.DeltaDiscoveryResponse{
Resources: marshaledResources,
RemovedResources: r.RemovedResources,
TypeUrl: r.GetDeltaRequest().GetTypeUrl(),
SystemVersionInfo: r.SystemVersionInfo,
marshaledResources := make([]*discovery.Resource, len(r.resources))

for i, resource := range r.resources {
marshaledResource, err := resource.getMarshaledResource()
if err != nil {
return nil, fmt.Errorf("processing %s: %w", resource.name, err)
}
version, err := resource.getStableVersion()
if err != nil {
return nil, fmt.Errorf("processing version of %s: %w", resource.name, err)
}
marshaledResources[i] = &discovery.Resource{
Name: resource.name,
Resource: &anypb.Any{
TypeUrl: r.GetDeltaRequest().GetTypeUrl(),
Value: marshaledResource,
},
Version: version,
}
r.marshaledResponse.Store(marshaledResponse)
}

return marshaledResponse.(*discovery.DeltaDiscoveryResponse), nil
marshaledResponse = &discovery.DeltaDiscoveryResponse{
Resources: marshaledResources,
RemovedResources: r.removedResources,
TypeUrl: r.GetDeltaRequest().GetTypeUrl(),
SystemVersionInfo: r.SystemVersionInfo,
}
r.marshaledResponse.Store(marshaledResponse)

return marshaledResponse, nil
}

// GetRawResources is used internally within go-control-plane. Its interface and content may change
func (r *RawDeltaResponse) GetRawResources() []types.ResourceWithTTL {
resources := make([]types.ResourceWithTTL, 0, len(r.resources))
for _, res := range r.resources {
resources = append(resources, types.ResourceWithTTL{Resource: res.resource, TTL: res.ttl})
}
return resources
}

// GetRequest returns the original Discovery Request.
Expand Down Expand Up @@ -400,26 +443,44 @@ func (r *RawDeltaResponse) GetContext() context.Context {

var deltaResourceTypeURL = "type.googleapis.com/" + string(proto.MessageName(&discovery.Resource{}))

func (r *RawResponse) maybeCreateTTLResource(resource types.ResourceWithTTL) (types.Resource, string, error) {
if resource.TTL != nil {
wrappedResource := &discovery.Resource{
Name: GetResourceName(resource.Resource),
Ttl: durationpb.New(*resource.TTL),
func (r *RawResponse) marshalTTLResource(resource cachedResource) (*anypb.Any, error) {
if resource.ttl == nil {
marshaled, err := resource.getMarshaledResource()
if err != nil {
return nil, fmt.Errorf("marshaling: %w", err)
}
return &anypb.Any{
TypeUrl: r.GetRequest().GetTypeUrl(),
Value: marshaled,
}, nil
}

if !r.Heartbeat {
rsrc, err := anypb.New(resource.Resource)
if err != nil {
return nil, "", err
}
rsrc.TypeUrl = r.GetRequest().GetTypeUrl()
wrappedResource.Resource = rsrc
wrappedResource := &discovery.Resource{
Name: GetResourceName(resource.resource),
Ttl: durationpb.New(*resource.ttl),
}

if !r.Heartbeat {
marshaled, err := resource.getMarshaledResource()
if err != nil {
return nil, fmt.Errorf("marshaling: %w", err)
}
rsrc := new(anypb.Any)
rsrc.TypeUrl = r.GetRequest().GetTypeUrl()
rsrc.Value = marshaled

wrappedResource.Resource = rsrc
}

return wrappedResource, deltaResourceTypeURL, nil
marshaled, err := MarshalResource(wrappedResource)
if err != nil {
return nil, fmt.Errorf("marshaling discovery resource: %w", err)
}

return resource.Resource, r.GetRequest().GetTypeUrl(), nil
return &anypb.Any{
TypeUrl: deltaResourceTypeURL,
Value: marshaled,
}, nil
}

// GetDiscoveryResponse returns the final passthrough Discovery Response.
Expand Down
17 changes: 8 additions & 9 deletions pkg/cache/v3/cache_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package cache_test
package cache

import (
"testing"
Expand All @@ -12,7 +12,6 @@ import (
route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
)

Expand All @@ -21,11 +20,11 @@ const (
)

func TestResponseGetDiscoveryResponse(t *testing.T) {
routes := []types.ResourceWithTTL{{Resource: &route.RouteConfiguration{Name: resourceName}}}
resp := cache.RawResponse{
routes := []cachedResource{*newCachedResource(resourceName, &route.RouteConfiguration{Name: resourceName}, "v")}
resp := RawResponse{
Request: &discovery.DiscoveryRequest{TypeUrl: resource.RouteType},
Version: "v",
Resources: routes,
resources: routes,
}

discoveryResponse, err := resp.GetDiscoveryResponse()
Expand All @@ -52,7 +51,7 @@ func TestPassthroughResponseGetDiscoveryResponse(t *testing.T) {
Resources: []*anypb.Any{rsrc},
VersionInfo: "v",
}
resp := cache.PassthroughResponse{
resp := PassthroughResponse{
Request: &discovery.DiscoveryRequest{TypeUrl: resource.RouteType},
DiscoveryResponse: dr,
}
Expand All @@ -70,11 +69,11 @@ func TestPassthroughResponseGetDiscoveryResponse(t *testing.T) {
}

func TestHeartbeatResponseGetDiscoveryResponse(t *testing.T) {
routes := []types.ResourceWithTTL{{Resource: &route.RouteConfiguration{Name: resourceName}}}
resp := cache.RawResponse{
routes := []cachedResource{*newCachedResource(resourceName, &route.RouteConfiguration{Name: resourceName}, "v")}
resp := RawResponse{
Request: &discovery.DiscoveryRequest{TypeUrl: resource.RouteType},
Version: "v",
Resources: routes,
resources: routes,
Heartbeat: true,
}

Expand Down
Loading

0 comments on commit 4100f52

Please sign in to comment.