Skip to content

Commit

Permalink
fix(kuma-cp): zone ingress mixes services with the same name in diffe…
Browse files Browse the repository at this point in the history
…rent meshes (#6364)

---------

Signed-off-by: Ilya Lobkov <ilya.lobkov@konghq.com>
Signed-off-by: Ilya Lobkov <lobkovilya@yandex.ru>
  • Loading branch information
lobkovilya authored Apr 3, 2023
1 parent 435b884 commit d54b226
Show file tree
Hide file tree
Showing 22 changed files with 492 additions and 384 deletions.
14 changes: 10 additions & 4 deletions pkg/core/xds/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ type Proxy struct {
Id ProxyId
APIVersion APIVersion
Dataplane *core_mesh.DataplaneResource
ZoneIngress *core_mesh.ZoneIngressResource
Metadata *DataplaneMetadata
Routing Routing
Policies MatchedPolicies
Expand Down Expand Up @@ -195,10 +194,17 @@ type ZoneEgressProxy struct {
MeshResourcesList []*MeshResources
}

type MeshIngressResources struct {
Mesh *core_mesh.MeshResource
EndpointMap EndpointMap
}

type ZoneIngressProxy struct {
GatewayRoutes *core_mesh.MeshGatewayRouteResourceList
MeshGateways *core_mesh.MeshGatewayResourceList
PolicyResources map[core_model.ResourceType]core_model.ResourceList
ZoneIngressResource *core_mesh.ZoneIngressResource
GatewayRoutes *core_mesh.MeshGatewayRouteResourceList
MeshGateways *core_mesh.MeshGatewayResourceList
PolicyResources map[core_model.ResourceType]core_model.ResourceList
MeshResourceList []*MeshIngressResources
}

type VIPDomains struct {
Expand Down
3 changes: 1 addition & 2 deletions pkg/xds/cache/mesh/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"time"

"github.com/go-logr/logr"
"github.com/patrickmn/go-cache"

"github.com/kumahq/kuma/pkg/metrics"
Expand Down Expand Up @@ -47,7 +46,7 @@ func NewCache(
}, nil
}

func (c *Cache) GetMeshContext(ctx context.Context, syncLog logr.Logger, mesh string) (xds_context.MeshContext, error) {
func (c *Cache) GetMeshContext(ctx context.Context, mesh string) (xds_context.MeshContext, error) {
// Check our short TTL cache for a context, ignoring whether there have been
// changes since it was generated.
elt, err := c.cache.GetOrRetrieve(ctx, mesh, once.RetrieverFunc(func(ctx context.Context, key string) (interface{}, error) {
Expand Down
31 changes: 15 additions & 16 deletions pkg/xds/cache/mesh/cache_test.go

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion pkg/xds/generator/admin_proxy_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,5 +107,9 @@ func (g AdminProxyGenerator) getAddress(proxy *core_xds.Proxy) string {
return proxy.ZoneEgressProxy.ZoneEgressResource.Spec.GetNetworking().GetAddress()
}

return proxy.ZoneIngress.Spec.GetNetworking().GetAddress()
if proxy.ZoneIngressProxy != nil {
return proxy.ZoneIngressProxy.ZoneIngressResource.Spec.GetNetworking().GetAddress()
}

return ""
}
2 changes: 1 addition & 1 deletion pkg/xds/generator/egress/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ var _ = Describe("EgressGenerator", func() {
loader := fakeLoader{}

for _, meshResources := range meshResourcesMap {
meshResources.EndpointMap = xds_topology.BuildRemoteEndpointMap(
meshResources.EndpointMap = xds_topology.BuildEgressEndpointMap(
context.Background(),
meshResources.Mesh,
zoneName,
Expand Down
58 changes: 33 additions & 25 deletions pkg/xds/generator/ingress_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (i IngressGenerator) Generate(ctx xds_context.Context, proxy *core_xds.Prox

destinationsPerService := i.destinations(proxy.ZoneIngressProxy)

listener, err := i.generateLDS(proxy, proxy.ZoneIngress, destinationsPerService, proxy.APIVersion)
listener, err := i.generateLDS(proxy.ZoneIngressProxy.ZoneIngressResource, destinationsPerService, proxy.APIVersion)
if err != nil {
return nil, err
}
Expand All @@ -46,19 +46,21 @@ func (i IngressGenerator) Generate(ctx xds_context.Context, proxy *core_xds.Prox
Resource: listener,
})

services := i.services(proxy)
for _, mr := range proxy.ZoneIngressProxy.MeshResourceList {
services := i.services(mr)

cdsResources, err := i.generateCDS(services, destinationsPerService, proxy.APIVersion)
if err != nil {
return nil, err
}
resources.Add(cdsResources...)
cdsResources, err := i.generateCDS(services, destinationsPerService, proxy.APIVersion, mr)
if err != nil {
return nil, err
}
resources.Add(cdsResources...)

edsResources, err := i.generateEDS(proxy, services, proxy.APIVersion)
if err != nil {
return nil, err
edsResources, err := i.generateEDS(services, proxy.APIVersion, mr)
if err != nil {
return nil, err
}
resources.Add(edsResources...)
}
resources.Add(edsResources...)

return resources, nil
}
Expand All @@ -70,27 +72,27 @@ func (i IngressGenerator) Generate(ctx xds_context.Context, proxy *core_xds.Prox
// This approach has a limitation: additional tags on outbound in Universal mode won't work across different zones.
// Traffic is NOT decrypted here, therefore we don't need certificates and mTLS settings
func (i IngressGenerator) generateLDS(
proxy *core_xds.Proxy,
ingress *core_mesh.ZoneIngressResource,
destinationsPerService map[string][]tags.Tags,
apiVersion core_xds.APIVersion,
) (envoy_common.NamedResource, error) {
inboundListenerName := envoy_names.GetInboundListenerName(proxy.ZoneIngress.Spec.GetNetworking().GetAddress(), proxy.ZoneIngress.Spec.GetNetworking().GetPort())
inboundListenerName := envoy_names.GetInboundListenerName(ingress.Spec.GetNetworking().GetAddress(), ingress.Spec.GetNetworking().GetPort())
inboundListenerBuilder := envoy_listeners.NewListenerBuilder(apiVersion).
Configure(envoy_listeners.InboundListener(inboundListenerName, ingress.Spec.GetNetworking().GetAddress(), ingress.Spec.GetNetworking().GetPort(), core_xds.SocketAddressProtocolTCP)).
Configure(envoy_listeners.TLSInspector())

if len(proxy.ZoneIngress.Spec.AvailableServices) == 0 {
if len(ingress.Spec.AvailableServices) == 0 {
inboundListenerBuilder = inboundListenerBuilder.
Configure(envoy_listeners.FilterChain(envoy_listeners.NewFilterChainBuilder(apiVersion)))
}

sniUsed := map[string]bool{}

for _, inbound := range proxy.ZoneIngress.Spec.GetAvailableServices() {
for _, inbound := range ingress.Spec.GetAvailableServices() {
service := inbound.Tags[mesh_proto.ServiceTag]
destinations := destinationsPerService[service]
destinations = append(destinations, destinationsPerService[mesh_proto.MatchAllTag]...)
clusterName := envoy_names.GetMeshClusterName(inbound.Mesh, service)

for _, destination := range destinations {
meshDestination := destination.
Expand All @@ -105,8 +107,8 @@ func (i IngressGenerator) generateLDS(
envoy_listeners.NewFilterChainBuilder(apiVersion).Configure(
envoy_listeners.MatchTransportProtocol("tls"),
envoy_listeners.MatchServerNames(sni),
envoy_listeners.TcpProxyWithMetadata(service, envoy_common.NewCluster(
envoy_common.WithService(service),
envoy_listeners.TcpProxyWithMetadata(clusterName, envoy_common.NewCluster(
envoy_common.WithName(clusterName),
envoy_common.WithTags(meshDestination.WithoutTags(mesh_proto.ServiceTag)),
)),
),
Expand Down Expand Up @@ -233,9 +235,9 @@ func (_ IngressGenerator) destinations(
return destinations
}

func (_ IngressGenerator) services(proxy *core_xds.Proxy) []string {
func (_ IngressGenerator) services(mr *core_xds.MeshIngressResources) []string {
var services []string
for service := range proxy.Routing.OutboundTargets {
for service := range mr.EndpointMap {
services = append(services, service)
}
sort.Strings(services)
Expand All @@ -246,21 +248,25 @@ func (i IngressGenerator) generateCDS(
services []string,
destinationsPerService map[string][]tags.Tags,
apiVersion core_xds.APIVersion,
mr *core_xds.MeshIngressResources,
) ([]*core_xds.Resource, error) {
var resources []*core_xds.Resource
for _, service := range services {
clusterName := envoy_names.GetMeshClusterName(mr.Mesh.GetMeta().GetName(), service)

tagSlice := tags.TagsSlice(append(destinationsPerService[service], destinationsPerService[mesh_proto.MatchAllTag]...))
tagKeySlice := tagSlice.ToTagKeysSlice().Transform(tags.Without(mesh_proto.ServiceTag), tags.With("mesh"))

edsCluster, err := envoy_clusters.NewClusterBuilder(apiVersion).
Configure(envoy_clusters.EdsCluster(service)).
Configure(envoy_clusters.EdsCluster(clusterName)).
Configure(envoy_clusters.LbSubset(tagKeySlice)).
Configure(envoy_clusters.DefaultTimeout()).
Build()
if err != nil {
return nil, err
}
resources = append(resources, &core_xds.Resource{
Name: service,
Name: clusterName,
Origin: OriginIngress,
Resource: edsCluster,
})
Expand All @@ -269,19 +275,21 @@ func (i IngressGenerator) generateCDS(
}

func (_ IngressGenerator) generateEDS(
proxy *core_xds.Proxy,
services []string,
apiVersion core_xds.APIVersion,
mr *core_xds.MeshIngressResources,
) ([]*core_xds.Resource, error) {
var resources []*core_xds.Resource
for _, service := range services {
endpoints := proxy.Routing.OutboundTargets[service]
cla, err := envoy_endpoints.CreateClusterLoadAssignment(service, endpoints, apiVersion)
endpoints := mr.EndpointMap[service]

clusterName := envoy_names.GetMeshClusterName(mr.Mesh.GetMeta().GetName(), service)
cla, err := envoy_endpoints.CreateClusterLoadAssignment(clusterName, endpoints, apiVersion)
if err != nil {
return nil, err
}
resources = append(resources, &core_xds.Resource{
Name: service,
Name: clusterName,
Origin: OriginIngress,
Resource: cla,
})
Expand Down
Loading

0 comments on commit d54b226

Please sign in to comment.