Skip to content

Commit

Permalink
fix(xds): don't read metadata in ProxyBuilders (backport #5414) (#5416)
Browse files Browse the repository at this point in the history
We were using a `DataplaneMetadataTracker` in all proxy builders.
This was causing issues as the proxy may disconnect while reconciliation is in progress.

We now set the proxy metadata in the DataplaneWatchdog so that it's read once only and nil checked.
We therefore protect ourselves against this race

Signed-off-by: Charly Molter <charly.molter@konghq.com>
(cherry picked from commit 39ba902)

Signed-off-by: Charly Molter <charly.molter@konghq.com>
  • Loading branch information
mergify[bot] authored Dec 2, 2022
1 parent d004fbc commit 8ac3a2f
Show file tree
Hide file tree
Showing 10 changed files with 32 additions and 68 deletions.
6 changes: 1 addition & 5 deletions pkg/api-server/inspect_endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/kumahq/kuma/pkg/plugins/runtime/gateway/route"
xds_context "github.com/kumahq/kuma/pkg/xds/context"
"github.com/kumahq/kuma/pkg/xds/envoy"
"github.com/kumahq/kuma/pkg/xds/server/callbacks"
"github.com/kumahq/kuma/pkg/xds/sync"
)

Expand All @@ -34,10 +33,7 @@ func getMatchedPolicies(
) (
*core_xds.MatchedPolicies, []gateway.GatewayListenerInfo, core_xds.Proxy, error,
) {
proxyBuilder := sync.DefaultDataplaneProxyBuilder(
*cfg,
callbacks.NewDataplaneMetadataTracker(),
envoy.APIV3)
proxyBuilder := sync.DefaultDataplaneProxyBuilder(*cfg, envoy.APIV3)
if proxy, err := proxyBuilder.Build(ctx, dataplaneKey, meshContext); err != nil {
return nil, nil, core_xds.Proxy{}, err
} else {
Expand Down
11 changes: 2 additions & 9 deletions pkg/plugins/runtime/gateway/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,17 +96,10 @@ func MakeProtoSnapshot(snap cache_v3.ResourceSnapshot) ProtoSnapshot {
}
}

type mockMetadataTracker struct{}

func (m mockMetadataTracker) Metadata(dpKey core_model.ResourceKey) *core_xds.DataplaneMetadata {
return nil
}

func MakeGeneratorContext(rt runtime.Runtime, key core_model.ResourceKey) (*xds_context.Context, *core_xds.Proxy) {
b := sync.DataplaneProxyBuilder{
MetadataTracker: mockMetadataTracker{},
Zone: rt.Config().Multizone.Zone.Name,
APIVersion: envoy.APIV3,
Zone: rt.Config().Multizone.Zone.Name,
APIVersion: envoy.APIV3,
}

cache, err := cla.NewCache(rt.Config().Store.Cache.ExpirationTime, rt.Metrics())
Expand Down
3 changes: 1 addition & 2 deletions pkg/xds/server/v3/snapshot_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/kumahq/kuma/pkg/xds/generator"
xds_hooks "github.com/kumahq/kuma/pkg/xds/hooks"
"github.com/kumahq/kuma/pkg/xds/server"
"github.com/kumahq/kuma/pkg/xds/server/callbacks"
v3 "github.com/kumahq/kuma/pkg/xds/server/v3"
"github.com/kumahq/kuma/pkg/xds/sync"
"github.com/kumahq/kuma/pkg/xds/template"
Expand Down Expand Up @@ -118,7 +117,7 @@ var _ = Describe("GenerateSnapshot", func() {
cfg.DNSServer.ServiceVipPort,
)

proxyBuilder = sync.DefaultDataplaneProxyBuilder(cfg, callbacks.NewDataplaneMetadataTracker(), envoy_common.APIV3)
proxyBuilder = sync.DefaultDataplaneProxyBuilder(cfg, envoy_common.APIV3)
})

create := func(r core_model.Resource) {
Expand Down
13 changes: 2 additions & 11 deletions pkg/xds/sync/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,22 @@ var (

func DefaultDataplaneProxyBuilder(
config kuma_cp.Config,
metadataTracker DataplaneMetadataTracker,
apiVersion envoy.APIVersion,
) *DataplaneProxyBuilder {
return &DataplaneProxyBuilder{
MetadataTracker: metadataTracker,
Zone: config.Multizone.Zone.Name,
APIVersion: apiVersion,
Zone: config.Multizone.Zone.Name,
APIVersion: apiVersion,
}
}

func DefaultIngressProxyBuilder(
rt core_runtime.Runtime,
metadataTracker DataplaneMetadataTracker,
apiVersion envoy.APIVersion,
) *IngressProxyBuilder {
return &IngressProxyBuilder{
ResManager: rt.ResourceManager(),
ReadOnlyResManager: rt.ReadOnlyResourceManager(),
LookupIP: rt.LookupIP(),
MetadataTracker: metadataTracker,
apiVersion: apiVersion,
meshCache: rt.MeshCache(),
zone: rt.Config().Multizone.Zone.Name,
Expand All @@ -47,15 +43,13 @@ func DefaultIngressProxyBuilder(
func DefaultEgressProxyBuilder(
ctx context.Context,
rt core_runtime.Runtime,
metadataTracker DataplaneMetadataTracker,
apiVersion envoy.APIVersion,
) *EgressProxyBuilder {
return &EgressProxyBuilder{
ctx: ctx,
ResManager: rt.ResourceManager(),
ReadOnlyResManager: rt.ReadOnlyResourceManager(),
LookupIP: rt.LookupIP(),
MetadataTracker: metadataTracker,
meshCache: rt.MeshCache(),
apiVersion: apiVersion,
zone: rt.Config().Multizone.Zone.Name,
Expand All @@ -77,20 +71,17 @@ func DefaultDataplaneWatchdogFactory(

dataplaneProxyBuilder := DefaultDataplaneProxyBuilder(
config,
metadataTracker,
apiVersion,
)

ingressProxyBuilder := DefaultIngressProxyBuilder(
rt,
metadataTracker,
apiVersion,
)

egressProxyBuilder := DefaultEgressProxyBuilder(
ctx,
rt,
metadataTracker,
apiVersion,
)

Expand Down
3 changes: 0 additions & 3 deletions pkg/xds/sync/dataplane_proxy_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ import (
var syncLog = core.Log.WithName("sync")

type DataplaneProxyBuilder struct {
MetadataTracker DataplaneMetadataTracker

Zone string
APIVersion envoy.APIVersion
}
Expand Down Expand Up @@ -64,7 +62,6 @@ func (p *DataplaneProxyBuilder) Build(ctx context.Context, key core_model.Resour
Id: core_xds.FromResourceKey(key),
APIVersion: p.APIVersion,
Dataplane: dp,
Metadata: p.MetadataTracker.Metadata(key),
Routing: *routing,
Policies: *matchedPolicies,
SecretsTracker: secretsTracker,
Expand Down
16 changes: 9 additions & 7 deletions pkg/xds/sync/dataplane_watchdog.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ func (d *DataplaneWatchdog) Sync(ctx context.Context) error {
}
switch d.dpType {
case mesh_proto.DataplaneProxyType:
return d.syncDataplane(ctx)
return d.syncDataplane(ctx, metadata)
case mesh_proto.IngressProxyType:
return d.syncIngress(ctx)
return d.syncIngress(ctx, metadata)
case mesh_proto.EgressProxyType:
return d.syncEgress(ctx)
return d.syncEgress(ctx, metadata)
default:
// It might be a case that dp type is not yet inferred because there is no Dataplane definition yet.
return nil
Expand All @@ -91,7 +91,7 @@ func (d *DataplaneWatchdog) Cleanup() error {

// syncDataplane syncs state of the Dataplane.
// It uses Mesh Hash to decide if we need to regenerate configuration or not.
func (d *DataplaneWatchdog) syncDataplane(ctx context.Context) error {
func (d *DataplaneWatchdog) syncDataplane(ctx context.Context, metadata *core_xds.DataplaneMetadata) error {
meshCtx, err := d.MeshCache.GetMeshContext(ctx, syncLog, d.key.Mesh)
if err != nil {
return err
Expand Down Expand Up @@ -126,6 +126,7 @@ func (d *DataplaneWatchdog) syncDataplane(ctx context.Context) error {
if !envoyCtx.Mesh.Resource.MTLSEnabled() {
d.EnvoyCpCtx.Secrets.Cleanup(d.key) // we need to cleanup secrets if mtls is disabled
}
proxy.Metadata = metadata
if err := d.DataplaneReconciler.Reconcile(*envoyCtx, proxy); err != nil {
return err
}
Expand All @@ -134,7 +135,7 @@ func (d *DataplaneWatchdog) syncDataplane(ctx context.Context) error {
}

// syncIngress synces state of Ingress Dataplane. Notice that it does not use Mesh Hash yet because Ingress supports many Meshes.
func (d *DataplaneWatchdog) syncIngress(ctx context.Context) error {
func (d *DataplaneWatchdog) syncIngress(ctx context.Context, metadata *core_xds.DataplaneMetadata) error {
envoyCtx := &xds_context.Context{
ControlPlane: d.EnvoyCpCtx,
Mesh: xds_context.MeshContext{}, // ZoneIngress does not have a mesh!
Expand All @@ -148,12 +149,13 @@ func (d *DataplaneWatchdog) syncIngress(ctx context.Context) error {
return errors.Wrap(err, "could not get Envoy Admin mTLS certs")
}
proxy.EnvoyAdminMTLSCerts = envoyAdminMTLS
proxy.Metadata = metadata
return d.IngressReconciler.Reconcile(*envoyCtx, proxy)
}

// syncEgress syncs state of Egress Dataplane. Notice that it does not use
// Mesh Hash yet because Egress supports many Meshes.
func (d *DataplaneWatchdog) syncEgress(ctx context.Context) error {
func (d *DataplaneWatchdog) syncEgress(ctx context.Context, metadata *core_xds.DataplaneMetadata) error {
envoyCtx := &xds_context.Context{
ControlPlane: d.EnvoyCpCtx,
Mesh: xds_context.MeshContext{}, // ZoneEgress does not have a mesh!
Expand All @@ -168,7 +170,7 @@ func (d *DataplaneWatchdog) syncEgress(ctx context.Context) error {
return errors.Wrap(err, "could not get Envoy Admin mTLS certs")
}
proxy.EnvoyAdminMTLSCerts = envoyAdminMTLS

proxy.Metadata = metadata
return d.EgressReconciler.Reconcile(*envoyCtx, proxy)
}

Expand Down
5 changes: 2 additions & 3 deletions pkg/xds/sync/dataplane_watchdog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,8 @@ var _ = Describe("Dataplane Watchdog", func() {

deps = sync.DataplaneWatchdogDependencies{
DataplaneProxyBuilder: &sync.DataplaneProxyBuilder{
MetadataTracker: metadataTracker,
APIVersion: envoy.APIV3,
Zone: zone,
APIVersion: envoy.APIV3,
Zone: zone,
},
DataplaneReconciler: snapshotReconciler,
EnvoyCpCtx: &xds_context.ControlPlaneContext{
Expand Down
16 changes: 7 additions & 9 deletions pkg/xds/sync/egress_proxy_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/kumahq/kuma/pkg/core/resources/manager"
core_model "github.com/kumahq/kuma/pkg/core/resources/model"
core_store "github.com/kumahq/kuma/pkg/core/resources/store"
"github.com/kumahq/kuma/pkg/core/xds"
core_xds "github.com/kumahq/kuma/pkg/core/xds"
xds_cache "github.com/kumahq/kuma/pkg/xds/cache/mesh"
envoy_common "github.com/kumahq/kuma/pkg/xds/envoy"
xds_topology "github.com/kumahq/kuma/pkg/xds/topology"
Expand All @@ -24,7 +24,6 @@ type EgressProxyBuilder struct {
ResManager manager.ResourceManager
ReadOnlyResManager manager.ReadOnlyResourceManager
LookupIP lookup.LookupIPFunc
MetadataTracker DataplaneMetadataTracker
meshCache *xds_cache.Cache

zone string
Expand All @@ -34,7 +33,7 @@ type EgressProxyBuilder struct {
func (p *EgressProxyBuilder) Build(
ctx context.Context,
key core_model.ResourceKey,
) (*xds.Proxy, error) {
) (*core_xds.Proxy, error) {
zoneEgress := core_mesh.NewZoneEgressResource()

if err := p.ReadOnlyResManager.Get(
Expand Down Expand Up @@ -80,7 +79,7 @@ func (p *EgressProxyBuilder) Build(
return zoneIngresses[a].GetMeta().GetName() < zoneIngresses[b].GetMeta().GetName()
})

var meshResourcesList []*xds.MeshResources
var meshResourcesList []*core_xds.MeshResources

for _, mesh := range meshes {
meshName := mesh.GetMeta().GetName()
Expand All @@ -96,7 +95,7 @@ func (p *EgressProxyBuilder) Build(
faultInjections := meshCtx.Resources.FaultInjections().Items
rateLimits := meshCtx.Resources.RateLimits().Items

meshResources := &xds.MeshResources{
meshResources := &core_xds.MeshResources{
Mesh: mesh,
TrafficRoutes: trafficRoutes,
ExternalServices: externalServices,
Expand Down Expand Up @@ -125,15 +124,14 @@ func (p *EgressProxyBuilder) Build(
meshResourcesList = append(meshResourcesList, meshResources)
}

proxy := &xds.Proxy{
Id: xds.FromResourceKey(key),
proxy := &core_xds.Proxy{
Id: core_xds.FromResourceKey(key),
APIVersion: p.apiVersion,
ZoneEgressProxy: &xds.ZoneEgressProxy{
ZoneEgressProxy: &core_xds.ZoneEgressProxy{
ZoneEgressResource: zoneEgress,
ZoneIngresses: zoneIngresses,
MeshResourcesList: meshResourcesList,
},
Metadata: p.MetadataTracker.Metadata(key),
}

return proxy, nil
Expand Down
18 changes: 8 additions & 10 deletions pkg/xds/sync/ingress_proxy_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
core_model "github.com/kumahq/kuma/pkg/core/resources/model"
"github.com/kumahq/kuma/pkg/core/resources/registry"
core_store "github.com/kumahq/kuma/pkg/core/resources/store"
"github.com/kumahq/kuma/pkg/core/xds"
core_xds "github.com/kumahq/kuma/pkg/core/xds"
xds_cache "github.com/kumahq/kuma/pkg/xds/cache/mesh"
"github.com/kumahq/kuma/pkg/xds/envoy"
"github.com/kumahq/kuma/pkg/xds/ingress"
Expand All @@ -21,14 +21,13 @@ type IngressProxyBuilder struct {
ResManager manager.ResourceManager
ReadOnlyResManager manager.ReadOnlyResourceManager
LookupIP lookup.LookupIPFunc
MetadataTracker DataplaneMetadataTracker
meshCache *xds_cache.Cache

apiVersion envoy.APIVersion
zone string
}

func (p *IngressProxyBuilder) Build(ctx context.Context, key core_model.ResourceKey) (*xds.Proxy, error) {
func (p *IngressProxyBuilder) Build(ctx context.Context, key core_model.ResourceKey) (*core_xds.Proxy, error) {
zoneIngress, err := p.getZoneIngress(ctx, key)
if err != nil {
return nil, err
Expand Down Expand Up @@ -62,18 +61,17 @@ func (p *IngressProxyBuilder) Build(ctx context.Context, key core_model.Resource

routing := p.resolveRouting(zoneIngress, zoneEgressesList, allMeshDataplanes, availableExternalServices, zoneIngressProxy.MeshGateways)

proxy := &xds.Proxy{
Id: xds.FromResourceKey(key),
proxy := &core_xds.Proxy{
Id: core_xds.FromResourceKey(key),
APIVersion: p.apiVersion,
ZoneIngress: zoneIngress,
Metadata: p.MetadataTracker.Metadata(key),
Routing: *routing,
ZoneIngressProxy: zoneIngressProxy,
}
return proxy, nil
}

func (p *IngressProxyBuilder) buildZoneIngressProxy(ctx context.Context) (*xds.ZoneIngressProxy, error) {
func (p *IngressProxyBuilder) buildZoneIngressProxy(ctx context.Context) (*core_xds.ZoneIngressProxy, error) {
routes := &core_mesh.TrafficRouteResourceList{}
if err := p.ReadOnlyResManager.List(ctx, routes); err != nil {
return nil, err
Expand All @@ -91,7 +89,7 @@ func (p *IngressProxyBuilder) buildZoneIngressProxy(ctx context.Context) (*xds.Z
return nil, err
}

return &xds.ZoneIngressProxy{
return &core_xds.ZoneIngressProxy{
TrafficRouteList: routes,
GatewayRoutes: gatewayRoutes,
MeshGateways: gateways,
Expand All @@ -118,13 +116,13 @@ func (p *IngressProxyBuilder) resolveRouting(
dataplanes *core_mesh.DataplaneResourceList,
externalServices *core_mesh.ExternalServiceResourceList,
meshGateways *core_mesh.MeshGatewayResourceList,
) *xds.Routing {
) *core_xds.Routing {
destinations := ingress.BuildDestinationMap(zoneIngress)
endpoints := ingress.BuildEndpointMap(
destinations, dataplanes.Items, externalServices.Items, zoneEgresses.Items, meshGateways.Items,
)

routing := &xds.Routing{
routing := &core_xds.Routing{
OutboundTargets: endpoints,
}
return routing
Expand Down
9 changes: 0 additions & 9 deletions pkg/xds/sync/proxy_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,6 @@ import (
"github.com/kumahq/kuma/pkg/xds/sync"
)

type mockMetadataTracker struct{}

func (m mockMetadataTracker) Metadata(dpKey core_model.ResourceKey) *core_xds.DataplaneMetadata {
return nil
}

func initializeStore(ctx context.Context, resourceManager core_manager.ResourceManager, fileWithResourcesName string) {
resourcePath := filepath.Join(
"testdata", "input", fileWithResourcesName,
Expand Down Expand Up @@ -75,7 +69,6 @@ func initializeStore(ctx context.Context, resourceManager core_manager.ResourceM
}

var _ = Describe("Proxy Builder", func() {
tracker := mockMetadataTracker{}
localZone := "zone-1"

ctx := context.Background()
Expand Down Expand Up @@ -112,7 +105,6 @@ var _ = Describe("Proxy Builder", func() {
egressProxyBuilder := sync.DefaultEgressProxyBuilder(
ctx,
rt,
tracker,
envoy_common.APIV3,
)

Expand Down Expand Up @@ -195,7 +187,6 @@ var _ = Describe("Proxy Builder", func() {
Describe("Build() zone ingress", func() {
ingressProxyBuilder := sync.DefaultIngressProxyBuilder(
rt,
tracker,
envoy_common.APIV3,
)

Expand Down

0 comments on commit 8ac3a2f

Please sign in to comment.