Skip to content

Commit

Permalink
fix(xds): don't read metadata in ProxyBuilders (#5414)
Browse files Browse the repository at this point in the history
We were using a `DataplaneMetadataTracker` in all proxy builders.
This was causing issues as the proxy may disconnect while reconciliation is in progress.

We now set the proxy metadata in the DataplaneWatchdog so that it's read once only and nil checked.
We therefore protect ourselves against this race

Signed-off-by: Charly Molter <charly.molter@konghq.com>
(cherry picked from commit 39ba902)

# Conflicts:
#	pkg/api-server/inspect_endpoints.go
#	pkg/xds/sync/components.go
#	pkg/xds/sync/egress_proxy_builder.go
#	pkg/xds/sync/ingress_proxy_builder.go
  • Loading branch information
lahabana authored and mergify[bot] committed Dec 2, 2022
1 parent d004fbc commit 43289c1
Show file tree
Hide file tree
Showing 10 changed files with 56 additions and 64 deletions.
9 changes: 5 additions & 4 deletions pkg/api-server/inspect_endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ import (
"github.com/kumahq/kuma/pkg/plugins/runtime/gateway/route"
xds_context "github.com/kumahq/kuma/pkg/xds/context"
"github.com/kumahq/kuma/pkg/xds/envoy"
<<<<<<< HEAD
"github.com/kumahq/kuma/pkg/xds/server/callbacks"
=======
"github.com/kumahq/kuma/pkg/xds/envoy/tags"
>>>>>>> 39ba902fa (fix(xds): don't read metadata in ProxyBuilders (#5414))
"github.com/kumahq/kuma/pkg/xds/sync"
)

Expand All @@ -34,10 +38,7 @@ func getMatchedPolicies(
) (
*core_xds.MatchedPolicies, []gateway.GatewayListenerInfo, core_xds.Proxy, error,
) {
proxyBuilder := sync.DefaultDataplaneProxyBuilder(
*cfg,
callbacks.NewDataplaneMetadataTracker(),
envoy.APIV3)
proxyBuilder := sync.DefaultDataplaneProxyBuilder(*cfg, envoy.APIV3)
if proxy, err := proxyBuilder.Build(ctx, dataplaneKey, meshContext); err != nil {
return nil, nil, core_xds.Proxy{}, err
} else {
Expand Down
11 changes: 2 additions & 9 deletions pkg/plugins/runtime/gateway/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,17 +96,10 @@ func MakeProtoSnapshot(snap cache_v3.ResourceSnapshot) ProtoSnapshot {
}
}

type mockMetadataTracker struct{}

func (m mockMetadataTracker) Metadata(dpKey core_model.ResourceKey) *core_xds.DataplaneMetadata {
return nil
}

func MakeGeneratorContext(rt runtime.Runtime, key core_model.ResourceKey) (*xds_context.Context, *core_xds.Proxy) {
b := sync.DataplaneProxyBuilder{
MetadataTracker: mockMetadataTracker{},
Zone: rt.Config().Multizone.Zone.Name,
APIVersion: envoy.APIV3,
Zone: rt.Config().Multizone.Zone.Name,
APIVersion: envoy.APIV3,
}

cache, err := cla.NewCache(rt.Config().Store.Cache.ExpirationTime, rt.Metrics())
Expand Down
3 changes: 1 addition & 2 deletions pkg/xds/server/v3/snapshot_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/kumahq/kuma/pkg/xds/generator"
xds_hooks "github.com/kumahq/kuma/pkg/xds/hooks"
"github.com/kumahq/kuma/pkg/xds/server"
"github.com/kumahq/kuma/pkg/xds/server/callbacks"
v3 "github.com/kumahq/kuma/pkg/xds/server/v3"
"github.com/kumahq/kuma/pkg/xds/sync"
"github.com/kumahq/kuma/pkg/xds/template"
Expand Down Expand Up @@ -118,7 +117,7 @@ var _ = Describe("GenerateSnapshot", func() {
cfg.DNSServer.ServiceVipPort,
)

proxyBuilder = sync.DefaultDataplaneProxyBuilder(cfg, callbacks.NewDataplaneMetadataTracker(), envoy_common.APIV3)
proxyBuilder = sync.DefaultDataplaneProxyBuilder(cfg, envoy_common.APIV3)
})

create := func(r core_model.Resource) {
Expand Down
22 changes: 14 additions & 8 deletions pkg/xds/sync/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,32 @@ var (

func DefaultDataplaneProxyBuilder(
config kuma_cp.Config,
<<<<<<< HEAD
metadataTracker DataplaneMetadataTracker,
apiVersion envoy.APIVersion,
=======
apiVersion core_xds.APIVersion,
>>>>>>> 39ba902fa (fix(xds): don't read metadata in ProxyBuilders (#5414))
) *DataplaneProxyBuilder {
return &DataplaneProxyBuilder{
MetadataTracker: metadataTracker,
Zone: config.Multizone.Zone.Name,
APIVersion: apiVersion,
Zone: config.Multizone.Zone.Name,
APIVersion: apiVersion,
}
}

func DefaultIngressProxyBuilder(
rt core_runtime.Runtime,
<<<<<<< HEAD
metadataTracker DataplaneMetadataTracker,
apiVersion envoy.APIVersion,
=======
apiVersion core_xds.APIVersion,
>>>>>>> 39ba902fa (fix(xds): don't read metadata in ProxyBuilders (#5414))
) *IngressProxyBuilder {
return &IngressProxyBuilder{
ResManager: rt.ResourceManager(),
ReadOnlyResManager: rt.ReadOnlyResourceManager(),
LookupIP: rt.LookupIP(),
MetadataTracker: metadataTracker,
apiVersion: apiVersion,
meshCache: rt.MeshCache(),
zone: rt.Config().Multizone.Zone.Name,
Expand All @@ -47,15 +53,18 @@ func DefaultIngressProxyBuilder(
func DefaultEgressProxyBuilder(
ctx context.Context,
rt core_runtime.Runtime,
<<<<<<< HEAD
metadataTracker DataplaneMetadataTracker,
apiVersion envoy.APIVersion,
=======
apiVersion core_xds.APIVersion,
>>>>>>> 39ba902fa (fix(xds): don't read metadata in ProxyBuilders (#5414))
) *EgressProxyBuilder {
return &EgressProxyBuilder{
ctx: ctx,
ResManager: rt.ResourceManager(),
ReadOnlyResManager: rt.ReadOnlyResourceManager(),
LookupIP: rt.LookupIP(),
MetadataTracker: metadataTracker,
meshCache: rt.MeshCache(),
apiVersion: apiVersion,
zone: rt.Config().Multizone.Zone.Name,
Expand All @@ -77,20 +86,17 @@ func DefaultDataplaneWatchdogFactory(

dataplaneProxyBuilder := DefaultDataplaneProxyBuilder(
config,
metadataTracker,
apiVersion,
)

ingressProxyBuilder := DefaultIngressProxyBuilder(
rt,
metadataTracker,
apiVersion,
)

egressProxyBuilder := DefaultEgressProxyBuilder(
ctx,
rt,
metadataTracker,
apiVersion,
)

Expand Down
3 changes: 0 additions & 3 deletions pkg/xds/sync/dataplane_proxy_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ import (
var syncLog = core.Log.WithName("sync")

type DataplaneProxyBuilder struct {
MetadataTracker DataplaneMetadataTracker

Zone string
APIVersion envoy.APIVersion
}
Expand Down Expand Up @@ -64,7 +62,6 @@ func (p *DataplaneProxyBuilder) Build(ctx context.Context, key core_model.Resour
Id: core_xds.FromResourceKey(key),
APIVersion: p.APIVersion,
Dataplane: dp,
Metadata: p.MetadataTracker.Metadata(key),
Routing: *routing,
Policies: *matchedPolicies,
SecretsTracker: secretsTracker,
Expand Down
16 changes: 9 additions & 7 deletions pkg/xds/sync/dataplane_watchdog.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ func (d *DataplaneWatchdog) Sync(ctx context.Context) error {
}
switch d.dpType {
case mesh_proto.DataplaneProxyType:
return d.syncDataplane(ctx)
return d.syncDataplane(ctx, metadata)
case mesh_proto.IngressProxyType:
return d.syncIngress(ctx)
return d.syncIngress(ctx, metadata)
case mesh_proto.EgressProxyType:
return d.syncEgress(ctx)
return d.syncEgress(ctx, metadata)
default:
// It might be a case that dp type is not yet inferred because there is no Dataplane definition yet.
return nil
Expand All @@ -91,7 +91,7 @@ func (d *DataplaneWatchdog) Cleanup() error {

// syncDataplane syncs state of the Dataplane.
// It uses Mesh Hash to decide if we need to regenerate configuration or not.
func (d *DataplaneWatchdog) syncDataplane(ctx context.Context) error {
func (d *DataplaneWatchdog) syncDataplane(ctx context.Context, metadata *core_xds.DataplaneMetadata) error {
meshCtx, err := d.MeshCache.GetMeshContext(ctx, syncLog, d.key.Mesh)
if err != nil {
return err
Expand Down Expand Up @@ -126,6 +126,7 @@ func (d *DataplaneWatchdog) syncDataplane(ctx context.Context) error {
if !envoyCtx.Mesh.Resource.MTLSEnabled() {
d.EnvoyCpCtx.Secrets.Cleanup(d.key) // we need to cleanup secrets if mtls is disabled
}
proxy.Metadata = metadata
if err := d.DataplaneReconciler.Reconcile(*envoyCtx, proxy); err != nil {
return err
}
Expand All @@ -134,7 +135,7 @@ func (d *DataplaneWatchdog) syncDataplane(ctx context.Context) error {
}

// syncIngress synces state of Ingress Dataplane. Notice that it does not use Mesh Hash yet because Ingress supports many Meshes.
func (d *DataplaneWatchdog) syncIngress(ctx context.Context) error {
func (d *DataplaneWatchdog) syncIngress(ctx context.Context, metadata *core_xds.DataplaneMetadata) error {
envoyCtx := &xds_context.Context{
ControlPlane: d.EnvoyCpCtx,
Mesh: xds_context.MeshContext{}, // ZoneIngress does not have a mesh!
Expand All @@ -148,12 +149,13 @@ func (d *DataplaneWatchdog) syncIngress(ctx context.Context) error {
return errors.Wrap(err, "could not get Envoy Admin mTLS certs")
}
proxy.EnvoyAdminMTLSCerts = envoyAdminMTLS
proxy.Metadata = metadata
return d.IngressReconciler.Reconcile(*envoyCtx, proxy)
}

// syncEgress syncs state of Egress Dataplane. Notice that it does not use
// Mesh Hash yet because Egress supports many Meshes.
func (d *DataplaneWatchdog) syncEgress(ctx context.Context) error {
func (d *DataplaneWatchdog) syncEgress(ctx context.Context, metadata *core_xds.DataplaneMetadata) error {
envoyCtx := &xds_context.Context{
ControlPlane: d.EnvoyCpCtx,
Mesh: xds_context.MeshContext{}, // ZoneEgress does not have a mesh!
Expand All @@ -168,7 +170,7 @@ func (d *DataplaneWatchdog) syncEgress(ctx context.Context) error {
return errors.Wrap(err, "could not get Envoy Admin mTLS certs")
}
proxy.EnvoyAdminMTLSCerts = envoyAdminMTLS

proxy.Metadata = metadata
return d.EgressReconciler.Reconcile(*envoyCtx, proxy)
}

Expand Down
5 changes: 2 additions & 3 deletions pkg/xds/sync/dataplane_watchdog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,8 @@ var _ = Describe("Dataplane Watchdog", func() {

deps = sync.DataplaneWatchdogDependencies{
DataplaneProxyBuilder: &sync.DataplaneProxyBuilder{
MetadataTracker: metadataTracker,
APIVersion: envoy.APIV3,
Zone: zone,
APIVersion: envoy.APIV3,
Zone: zone,
},
DataplaneReconciler: snapshotReconciler,
EnvoyCpCtx: &xds_context.ControlPlaneContext{
Expand Down
20 changes: 11 additions & 9 deletions pkg/xds/sync/egress_proxy_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/kumahq/kuma/pkg/core/resources/manager"
core_model "github.com/kumahq/kuma/pkg/core/resources/model"
core_store "github.com/kumahq/kuma/pkg/core/resources/store"
"github.com/kumahq/kuma/pkg/core/xds"
core_xds "github.com/kumahq/kuma/pkg/core/xds"
xds_cache "github.com/kumahq/kuma/pkg/xds/cache/mesh"
envoy_common "github.com/kumahq/kuma/pkg/xds/envoy"
xds_topology "github.com/kumahq/kuma/pkg/xds/topology"
Expand All @@ -24,17 +24,20 @@ type EgressProxyBuilder struct {
ResManager manager.ResourceManager
ReadOnlyResManager manager.ReadOnlyResourceManager
LookupIP lookup.LookupIPFunc
MetadataTracker DataplaneMetadataTracker
meshCache *xds_cache.Cache

zone string
<<<<<<< HEAD
apiVersion envoy_common.APIVersion
=======
apiVersion core_xds.APIVersion
>>>>>>> 39ba902fa (fix(xds): don't read metadata in ProxyBuilders (#5414))
}

func (p *EgressProxyBuilder) Build(
ctx context.Context,
key core_model.ResourceKey,
) (*xds.Proxy, error) {
) (*core_xds.Proxy, error) {
zoneEgress := core_mesh.NewZoneEgressResource()

if err := p.ReadOnlyResManager.Get(
Expand Down Expand Up @@ -80,7 +83,7 @@ func (p *EgressProxyBuilder) Build(
return zoneIngresses[a].GetMeta().GetName() < zoneIngresses[b].GetMeta().GetName()
})

var meshResourcesList []*xds.MeshResources
var meshResourcesList []*core_xds.MeshResources

for _, mesh := range meshes {
meshName := mesh.GetMeta().GetName()
Expand All @@ -96,7 +99,7 @@ func (p *EgressProxyBuilder) Build(
faultInjections := meshCtx.Resources.FaultInjections().Items
rateLimits := meshCtx.Resources.RateLimits().Items

meshResources := &xds.MeshResources{
meshResources := &core_xds.MeshResources{
Mesh: mesh,
TrafficRoutes: trafficRoutes,
ExternalServices: externalServices,
Expand Down Expand Up @@ -125,15 +128,14 @@ func (p *EgressProxyBuilder) Build(
meshResourcesList = append(meshResourcesList, meshResources)
}

proxy := &xds.Proxy{
Id: xds.FromResourceKey(key),
proxy := &core_xds.Proxy{
Id: core_xds.FromResourceKey(key),
APIVersion: p.apiVersion,
ZoneEgressProxy: &xds.ZoneEgressProxy{
ZoneEgressProxy: &core_xds.ZoneEgressProxy{
ZoneEgressResource: zoneEgress,
ZoneIngresses: zoneIngresses,
MeshResourcesList: meshResourcesList,
},
Metadata: p.MetadataTracker.Metadata(key),
}

return proxy, nil
Expand Down
22 changes: 12 additions & 10 deletions pkg/xds/sync/ingress_proxy_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
core_model "github.com/kumahq/kuma/pkg/core/resources/model"
"github.com/kumahq/kuma/pkg/core/resources/registry"
core_store "github.com/kumahq/kuma/pkg/core/resources/store"
"github.com/kumahq/kuma/pkg/core/xds"
core_xds "github.com/kumahq/kuma/pkg/core/xds"
xds_cache "github.com/kumahq/kuma/pkg/xds/cache/mesh"
"github.com/kumahq/kuma/pkg/xds/envoy"
"github.com/kumahq/kuma/pkg/xds/ingress"
Expand All @@ -21,14 +21,17 @@ type IngressProxyBuilder struct {
ResManager manager.ResourceManager
ReadOnlyResManager manager.ReadOnlyResourceManager
LookupIP lookup.LookupIPFunc
MetadataTracker DataplaneMetadataTracker
meshCache *xds_cache.Cache

<<<<<<< HEAD
apiVersion envoy.APIVersion
=======
apiVersion core_xds.APIVersion
>>>>>>> 39ba902fa (fix(xds): don't read metadata in ProxyBuilders (#5414))
zone string
}

func (p *IngressProxyBuilder) Build(ctx context.Context, key core_model.ResourceKey) (*xds.Proxy, error) {
func (p *IngressProxyBuilder) Build(ctx context.Context, key core_model.ResourceKey) (*core_xds.Proxy, error) {
zoneIngress, err := p.getZoneIngress(ctx, key)
if err != nil {
return nil, err
Expand Down Expand Up @@ -62,18 +65,17 @@ func (p *IngressProxyBuilder) Build(ctx context.Context, key core_model.Resource

routing := p.resolveRouting(zoneIngress, zoneEgressesList, allMeshDataplanes, availableExternalServices, zoneIngressProxy.MeshGateways)

proxy := &xds.Proxy{
Id: xds.FromResourceKey(key),
proxy := &core_xds.Proxy{
Id: core_xds.FromResourceKey(key),
APIVersion: p.apiVersion,
ZoneIngress: zoneIngress,
Metadata: p.MetadataTracker.Metadata(key),
Routing: *routing,
ZoneIngressProxy: zoneIngressProxy,
}
return proxy, nil
}

func (p *IngressProxyBuilder) buildZoneIngressProxy(ctx context.Context) (*xds.ZoneIngressProxy, error) {
func (p *IngressProxyBuilder) buildZoneIngressProxy(ctx context.Context) (*core_xds.ZoneIngressProxy, error) {
routes := &core_mesh.TrafficRouteResourceList{}
if err := p.ReadOnlyResManager.List(ctx, routes); err != nil {
return nil, err
Expand All @@ -91,7 +93,7 @@ func (p *IngressProxyBuilder) buildZoneIngressProxy(ctx context.Context) (*xds.Z
return nil, err
}

return &xds.ZoneIngressProxy{
return &core_xds.ZoneIngressProxy{
TrafficRouteList: routes,
GatewayRoutes: gatewayRoutes,
MeshGateways: gateways,
Expand All @@ -118,13 +120,13 @@ func (p *IngressProxyBuilder) resolveRouting(
dataplanes *core_mesh.DataplaneResourceList,
externalServices *core_mesh.ExternalServiceResourceList,
meshGateways *core_mesh.MeshGatewayResourceList,
) *xds.Routing {
) *core_xds.Routing {
destinations := ingress.BuildDestinationMap(zoneIngress)
endpoints := ingress.BuildEndpointMap(
destinations, dataplanes.Items, externalServices.Items, zoneEgresses.Items, meshGateways.Items,
)

routing := &xds.Routing{
routing := &core_xds.Routing{
OutboundTargets: endpoints,
}
return routing
Expand Down
Loading

0 comments on commit 43289c1

Please sign in to comment.