Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(kuma-cp): zone ingress mixes services with the same name in different meshes #6364

Merged
merged 12 commits into from
Apr 3, 2023
Merged
14 changes: 10 additions & 4 deletions pkg/core/xds/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ type Proxy struct {
Id ProxyId
APIVersion APIVersion
Dataplane *core_mesh.DataplaneResource
ZoneIngress *core_mesh.ZoneIngressResource
Metadata *DataplaneMetadata
Routing Routing
Policies MatchedPolicies
Expand Down Expand Up @@ -195,10 +194,17 @@ type ZoneEgressProxy struct {
MeshResourcesList []*MeshResources
}

type MeshIngressResources struct {
Mesh *core_mesh.MeshResource
EndpointMap EndpointMap
}

type ZoneIngressProxy struct {
GatewayRoutes *core_mesh.MeshGatewayRouteResourceList
MeshGateways *core_mesh.MeshGatewayResourceList
PolicyResources map[core_model.ResourceType]core_model.ResourceList
ZoneIngressResource *core_mesh.ZoneIngressResource
GatewayRoutes *core_mesh.MeshGatewayRouteResourceList
MeshGateways *core_mesh.MeshGatewayResourceList
PolicyResources map[core_model.ResourceType]core_model.ResourceList
MeshResourceList []*MeshIngressResources
michaelbeaumont marked this conversation as resolved.
Show resolved Hide resolved
}

type VIPDomains struct {
Expand Down
3 changes: 1 addition & 2 deletions pkg/xds/cache/mesh/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"time"

"github.com/go-logr/logr"
"github.com/patrickmn/go-cache"

"github.com/kumahq/kuma/pkg/metrics"
Expand Down Expand Up @@ -47,7 +46,7 @@ func NewCache(
}, nil
}

func (c *Cache) GetMeshContext(ctx context.Context, syncLog logr.Logger, mesh string) (xds_context.MeshContext, error) {
func (c *Cache) GetMeshContext(ctx context.Context, mesh string) (xds_context.MeshContext, error) {
// Check our short TTL cache for a context, ignoring whether there have been
// changes since it was generated.
elt, err := c.cache.GetOrRetrieve(ctx, mesh, once.RetrieverFunc(func(ctx context.Context, key string) (interface{}, error) {
Expand Down
31 changes: 15 additions & 16 deletions pkg/xds/cache/mesh/cache_test.go

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion pkg/xds/generator/admin_proxy_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,5 +107,9 @@ func (g AdminProxyGenerator) getAddress(proxy *core_xds.Proxy) string {
return proxy.ZoneEgressProxy.ZoneEgressResource.Spec.GetNetworking().GetAddress()
}

return proxy.ZoneIngress.Spec.GetNetworking().GetAddress()
if proxy.ZoneIngressProxy != nil {
return proxy.ZoneIngressProxy.ZoneIngressResource.Spec.GetNetworking().GetAddress()
}

return ""
}
2 changes: 1 addition & 1 deletion pkg/xds/generator/egress/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ var _ = Describe("EgressGenerator", func() {
loader := fakeLoader{}

for _, meshResources := range meshResourcesMap {
meshResources.EndpointMap = xds_topology.BuildRemoteEndpointMap(
meshResources.EndpointMap = xds_topology.BuildEgressEndpointMap(
context.Background(),
meshResources.Mesh,
zoneName,
Expand Down
58 changes: 33 additions & 25 deletions pkg/xds/generator/ingress_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (i IngressGenerator) Generate(ctx xds_context.Context, proxy *core_xds.Prox

destinationsPerService := i.destinations(proxy.ZoneIngressProxy)

listener, err := i.generateLDS(proxy, proxy.ZoneIngress, destinationsPerService, proxy.APIVersion)
listener, err := i.generateLDS(proxy.ZoneIngressProxy.ZoneIngressResource, destinationsPerService, proxy.APIVersion)
if err != nil {
return nil, err
}
Expand All @@ -46,19 +46,21 @@ func (i IngressGenerator) Generate(ctx xds_context.Context, proxy *core_xds.Prox
Resource: listener,
})

services := i.services(proxy)
for _, mr := range proxy.ZoneIngressProxy.MeshResourceList {
services := i.services(mr)

cdsResources, err := i.generateCDS(services, destinationsPerService, proxy.APIVersion)
if err != nil {
return nil, err
}
resources.Add(cdsResources...)
cdsResources, err := i.generateCDS(services, destinationsPerService, proxy.APIVersion, mr)
if err != nil {
return nil, err
}
resources.Add(cdsResources...)

edsResources, err := i.generateEDS(proxy, services, proxy.APIVersion)
if err != nil {
return nil, err
edsResources, err := i.generateEDS(services, proxy.APIVersion, mr)
if err != nil {
return nil, err
}
resources.Add(edsResources...)
}
resources.Add(edsResources...)

return resources, nil
}
Expand All @@ -70,27 +72,27 @@ func (i IngressGenerator) Generate(ctx xds_context.Context, proxy *core_xds.Prox
// This approach has a limitation: additional tags on outbound in Universal mode won't work across different zones.
// Traffic is NOT decrypted here, therefore we don't need certificates and mTLS settings
func (i IngressGenerator) generateLDS(
proxy *core_xds.Proxy,
ingress *core_mesh.ZoneIngressResource,
destinationsPerService map[string][]tags.Tags,
apiVersion core_xds.APIVersion,
) (envoy_common.NamedResource, error) {
inboundListenerName := envoy_names.GetInboundListenerName(proxy.ZoneIngress.Spec.GetNetworking().GetAddress(), proxy.ZoneIngress.Spec.GetNetworking().GetPort())
inboundListenerName := envoy_names.GetInboundListenerName(ingress.Spec.GetNetworking().GetAddress(), ingress.Spec.GetNetworking().GetPort())
inboundListenerBuilder := envoy_listeners.NewListenerBuilder(apiVersion).
Configure(envoy_listeners.InboundListener(inboundListenerName, ingress.Spec.GetNetworking().GetAddress(), ingress.Spec.GetNetworking().GetPort(), core_xds.SocketAddressProtocolTCP)).
Configure(envoy_listeners.TLSInspector())

if len(proxy.ZoneIngress.Spec.AvailableServices) == 0 {
if len(ingress.Spec.AvailableServices) == 0 {
inboundListenerBuilder = inboundListenerBuilder.
Configure(envoy_listeners.FilterChain(envoy_listeners.NewFilterChainBuilder(apiVersion)))
}

sniUsed := map[string]bool{}

for _, inbound := range proxy.ZoneIngress.Spec.GetAvailableServices() {
for _, inbound := range ingress.Spec.GetAvailableServices() {
service := inbound.Tags[mesh_proto.ServiceTag]
destinations := destinationsPerService[service]
destinations = append(destinations, destinationsPerService[mesh_proto.MatchAllTag]...)
clusterName := envoy_names.GetMeshClusterName(inbound.Mesh, service)

for _, destination := range destinations {
meshDestination := destination.
Expand All @@ -105,8 +107,8 @@ func (i IngressGenerator) generateLDS(
envoy_listeners.NewFilterChainBuilder(apiVersion).Configure(
envoy_listeners.MatchTransportProtocol("tls"),
envoy_listeners.MatchServerNames(sni),
envoy_listeners.TcpProxyWithMetadata(service, envoy_common.NewCluster(
envoy_common.WithService(service),
envoy_listeners.TcpProxyWithMetadata(clusterName, envoy_common.NewCluster(
envoy_common.WithName(clusterName),
envoy_common.WithTags(meshDestination.WithoutTags(mesh_proto.ServiceTag)),
)),
),
Expand Down Expand Up @@ -233,9 +235,9 @@ func (_ IngressGenerator) destinations(
return destinations
}

func (_ IngressGenerator) services(proxy *core_xds.Proxy) []string {
func (_ IngressGenerator) services(mr *core_xds.MeshIngressResources) []string {
var services []string
for service := range proxy.Routing.OutboundTargets {
for service := range mr.EndpointMap {
services = append(services, service)
}
sort.Strings(services)
Expand All @@ -246,21 +248,25 @@ func (i IngressGenerator) generateCDS(
services []string,
destinationsPerService map[string][]tags.Tags,
apiVersion core_xds.APIVersion,
mr *core_xds.MeshIngressResources,
) ([]*core_xds.Resource, error) {
var resources []*core_xds.Resource
for _, service := range services {
clusterName := envoy_names.GetMeshClusterName(mr.Mesh.GetMeta().GetName(), service)

tagSlice := tags.TagsSlice(append(destinationsPerService[service], destinationsPerService[mesh_proto.MatchAllTag]...))
tagKeySlice := tagSlice.ToTagKeysSlice().Transform(tags.Without(mesh_proto.ServiceTag), tags.With("mesh"))

edsCluster, err := envoy_clusters.NewClusterBuilder(apiVersion).
Configure(envoy_clusters.EdsCluster(service)).
Configure(envoy_clusters.EdsCluster(clusterName)).
Configure(envoy_clusters.LbSubset(tagKeySlice)).
Configure(envoy_clusters.DefaultTimeout()).
Build()
if err != nil {
return nil, err
}
resources = append(resources, &core_xds.Resource{
Name: service,
Name: clusterName,
Origin: OriginIngress,
Resource: edsCluster,
})
Expand All @@ -269,19 +275,21 @@ func (i IngressGenerator) generateCDS(
}

func (_ IngressGenerator) generateEDS(
proxy *core_xds.Proxy,
services []string,
apiVersion core_xds.APIVersion,
mr *core_xds.MeshIngressResources,
) ([]*core_xds.Resource, error) {
var resources []*core_xds.Resource
for _, service := range services {
endpoints := proxy.Routing.OutboundTargets[service]
cla, err := envoy_endpoints.CreateClusterLoadAssignment(service, endpoints, apiVersion)
endpoints := mr.EndpointMap[service]

clusterName := envoy_names.GetMeshClusterName(mr.Mesh.GetMeta().GetName(), service)
cla, err := envoy_endpoints.CreateClusterLoadAssignment(clusterName, endpoints, apiVersion)
if err != nil {
return nil, err
}
resources = append(resources, &core_xds.Resource{
Name: service,
Name: clusterName,
Origin: OriginIngress,
Resource: cla,
})
Expand Down
Loading