Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(kuma-cp): retrieve name from owner not parsing pod name for Deployments/CronJob #5569

Merged
merged 8 commits into from
Jan 9, 2023
5 changes: 4 additions & 1 deletion pkg/plugins/runtime/k8s/controllers/egress_converter.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package controllers

import (
"context"

"github.com/pkg/errors"
kube_core "k8s.io/api/core/v1"

Expand All @@ -9,14 +11,15 @@ import (
)

func (p *PodConverter) EgressFor(
ctx context.Context,
zoneEgress *mesh_proto.ZoneEgress,
pod *kube_core.Pod,
services []*kube_core.Service,
) error {
if len(services) != 1 {
return errors.Errorf("egress should be matched by exactly one service. Matched %d services", len(services))
}
ifaces, err := InboundInterfacesFor(p.Zone, pod, services)
ifaces, err := p.InboundConverter.InboundInterfacesFor(ctx, p.Zone, pod, services)
if err != nil {
return errors.Wrap(err, "could not generate inbound interfaces")
}
Expand Down
31 changes: 15 additions & 16 deletions pkg/plugins/runtime/k8s/controllers/inbound_converter.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package controllers

import (
"context"
"fmt"
"strconv"
"strings"
Expand All @@ -20,6 +21,10 @@ const (
KubePortTag = "k8s.kuma.io/service-port"
)

type InboundConverter struct {
NameExtractor NameExtractor
}

func inboundForService(zone string, pod *kube_core.Pod, service *kube_core.Service) (ifaces []*mesh_proto.Dataplane_Networking_Inbound) {
for _, svcPort := range service.Spec.Ports {
if svcPort.Protocol != "" && svcPort.Protocol != kube_core.ProtocolTCP {
Expand Down Expand Up @@ -74,7 +79,7 @@ func inboundForService(zone string, pod *kube_core.Pod, service *kube_core.Servi
return
}

func inboundForServiceless(zone string, pod *kube_core.Pod) (ifaces []*mesh_proto.Dataplane_Networking_Inbound) {
func inboundForServiceless(zone string, pod *kube_core.Pod, name string) (ifaces []*mesh_proto.Dataplane_Networking_Inbound) {
lukidzi marked this conversation as resolved.
Show resolved Hide resolved
// The Pod does not have any services associated with it, just get the data from the Pod itself

// We still need that extra listener with a service because it is required in many places of the code (e.g. mTLS)
Expand All @@ -85,7 +90,7 @@ func inboundForServiceless(zone string, pod *kube_core.Pod) (ifaces []*mesh_prot
// will create lots of code changes to account for this other type of dataplne (we already have GW and Ingress),
// including GUI and CLI changes

tags := InboundTagsForPod(zone, pod)
tags := InboundTagsForPod(zone, pod, name)
var health *mesh_proto.Dataplane_Networking_Inbound_Health

for _, container := range pod.Spec.Containers {
Expand Down Expand Up @@ -118,7 +123,7 @@ func inboundForServiceless(zone string, pod *kube_core.Pod) (ifaces []*mesh_prot
return
}

func InboundInterfacesFor(zone string, pod *kube_core.Pod, services []*kube_core.Service) ([]*mesh_proto.Dataplane_Networking_Inbound, error) {
func (i *InboundConverter) InboundInterfacesFor(ctx context.Context, zone string, pod *kube_core.Pod, services []*kube_core.Service) ([]*mesh_proto.Dataplane_Networking_Inbound, error) {
ifaces := []*mesh_proto.Dataplane_Networking_Inbound{}
for _, svc := range services {
svcIfaces := inboundForService(zone, pod, svc)
Expand All @@ -129,8 +134,12 @@ func InboundInterfacesFor(zone string, pod *kube_core.Pod, services []*kube_core
if len(services) > 0 {
return nil, errors.Errorf("A service that selects pod %s was found, but it doesn't match any container ports.", pod.GetName())
}
name, _, err := i.NameExtractor.Name(ctx, pod)
if err != nil {
return nil, err
}

ifaces = append(ifaces, inboundForServiceless(zone, pod)...)
ifaces = append(ifaces, inboundForServiceless(zone, pod, name)...)
}
return ifaces, nil
}
Expand Down Expand Up @@ -193,7 +202,7 @@ func ProtocolTagFor(svc *kube_core.Service, svcPort *kube_core.ServicePort) stri
return strings.ToLower(protocolValue)
}

func InboundTagsForPod(zone string, pod *kube_core.Pod) map[string]string {
func InboundTagsForPod(zone string, pod *kube_core.Pod, name string) map[string]string {
tags := util_k8s.CopyStringMap(pod.Labels)
for key, value := range tags {
if value == "" {
Expand All @@ -204,7 +213,7 @@ func InboundTagsForPod(zone string, pod *kube_core.Pod) map[string]string {
tags = make(map[string]string)
}
tags[KubeNamespaceTag] = pod.Namespace
tags[mesh_proto.ServiceTag] = fmt.Sprintf("%s_%s_svc", nameFromPod(pod), pod.Namespace)
tags[mesh_proto.ServiceTag] = fmt.Sprintf("%s_%s_svc", name, pod.Namespace)
if zone != "" {
tags[mesh_proto.ZoneTag] = zone
}
Expand All @@ -213,13 +222,3 @@ func InboundTagsForPod(zone string, pod *kube_core.Pod) map[string]string {

return tags
}

func nameFromPod(pod *kube_core.Pod) string {
// the name is in format <name>-<replica set id>-<pod id>
split := strings.Split(pod.Name, "-")
if len(split) > 2 {
split = split[:len(split)-2]
}

return strings.Join(split, "-")
}
2 changes: 1 addition & 1 deletion pkg/plugins/runtime/k8s/controllers/ingress_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (p *PodConverter) IngressFor(
if len(services) != 1 {
return errors.Errorf("ingress should be matched by exactly one service. Matched %d services", len(services))
}
ifaces, err := InboundInterfacesFor(p.Zone, pod, services)
ifaces, err := p.InboundConverter.InboundInterfacesFor(ctx, p.Zone, pod, services)
if err != nil {
return errors.Wrap(err, "could not generate inbound interfaces")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/plugins/runtime/k8s/controllers/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func (r *PodReconciler) createOrUpdateEgress(ctx context.Context, pod *kube_core
Mesh: model.NoMesh,
}
operationResult, err := kube_controllerutil.CreateOrUpdate(ctx, r.Client, egress, func() error {
if err := r.PodConverter.PodToEgress(egress, pod, services); err != nil {
if err := r.PodConverter.PodToEgress(ctx, egress, pod, services); err != nil {
return errors.Wrap(err, "unable to translate a Pod into a Egress")
}
if err := kube_controllerutil.SetControllerReference(pod, egress, r.Scheme); err != nil {
Expand Down
54 changes: 11 additions & 43 deletions pkg/plugins/runtime/k8s/controllers/pod_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"context"
"fmt"
"regexp"
"strings"

"github.com/pkg/errors"
kube_apps "k8s.io/api/apps/v1"
kube_core "k8s.io/api/core/v1"
kube_client "sigs.k8s.io/controller-runtime/pkg/client"

Expand All @@ -27,8 +27,8 @@ var (
type PodConverter struct {
ServiceGetter kube_client.Reader
NodeGetter kube_client.Reader
ReplicaSetGetter kube_client.Reader
ResourceConverter k8s_common.Converter
InboundConverter InboundConverter
Zone string
KubeOutboundsAsVIPs bool
}
Expand Down Expand Up @@ -60,10 +60,10 @@ func (p *PodConverter) PodToIngress(ctx context.Context, zoneIngress *mesh_k8s.Z
return nil
}

func (p *PodConverter) PodToEgress(zoneEgress *mesh_k8s.ZoneEgress, pod *kube_core.Pod, services []*kube_core.Service) error {
func (p *PodConverter) PodToEgress(ctx context.Context, zoneEgress *mesh_k8s.ZoneEgress, pod *kube_core.Pod, services []*kube_core.Service) error {
zoneEgressProto := &mesh_proto.ZoneEgress{}
// Pass the current dataplane, so we won't override available services in Egress section
if err := p.EgressFor(zoneEgressProto, pod, services); err != nil {
if err := p.EgressFor(ctx, zoneEgressProto, pod, services); err != nil {
return err
}

Expand Down Expand Up @@ -128,7 +128,7 @@ func (p *PodConverter) dataplaneFor(
if exist {
switch gwType {
case "enabled":
gateway, err := GatewayByServiceFor(p.Zone, pod, services)
gateway, err := p.GatewayByServiceFor(ctx, p.Zone, pod, services)
if err != nil {
return nil, err
}
Expand All @@ -143,7 +143,7 @@ func (p *PodConverter) dataplaneFor(
return nil, errors.Errorf("invalid delegated gateway type '%s'", gwType)
}
} else {
ifaces, err := InboundInterfacesFor(p.Zone, pod, services)
ifaces, err := p.InboundConverter.InboundInterfacesFor(ctx, p.Zone, pod, services)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -181,8 +181,8 @@ func (p *PodConverter) dataplaneFor(
return dataplane, nil
}

func GatewayByServiceFor(clusterName string, pod *kube_core.Pod, services []*kube_core.Service) (*mesh_proto.Dataplane_Networking_Gateway, error) {
interfaces, err := InboundInterfacesFor(clusterName, pod, services)
func (p *PodConverter) GatewayByServiceFor(ctx context.Context, clusterName string, pod *kube_core.Pod, services []*kube_core.Service) (*mesh_proto.Dataplane_Networking_Gateway, error) {
interfaces, err := p.InboundConverter.InboundInterfacesFor(ctx, clusterName, pod, services)
if err != nil {
return nil, err
}
Expand All @@ -192,46 +192,14 @@ func GatewayByServiceFor(clusterName string, pod *kube_core.Pod, services []*kub
}, nil
}

// DeploymentFor returns the name of the deployment that the pod exists within. The second return
// value indicates whether or not the deployment was found when no error occurs, otherwise an
// error is returned as the third return value.
func (p *PodConverter) DeploymentFor(ctx context.Context, namespace string, pod *kube_core.Pod) (string, bool, error) {
owners := pod.GetObjectMeta().GetOwnerReferences()
var rs *kube_apps.ReplicaSet
for _, owner := range owners {
if owner.Kind == "ReplicaSet" {
rs = &kube_apps.ReplicaSet{}
rsKey := kube_client.ObjectKey{Namespace: namespace, Name: owner.Name}
if err := p.ReplicaSetGetter.Get(ctx, rsKey, rs); err != nil {
return "", false, err
}
break
}
}

if rs == nil {
return "", false, nil
}

rsOwners := rs.GetObjectMeta().GetOwnerReferences()
for _, owner := range rsOwners {
if owner.Kind == "Deployment" {
return owner.Name, true, nil
}
}

return "", false, nil
}

func (p *PodConverter) GatewayByDeploymentFor(ctx context.Context, clusterName string, pod *kube_core.Pod, services []*kube_core.Service) (*mesh_proto.Dataplane_Networking_Gateway, error) {
namespace := pod.GetObjectMeta().GetNamespace()
deployment, found, err := p.DeploymentFor(ctx, namespace, pod)
deployment, kind, err := p.InboundConverter.NameExtractor.Name(ctx, pod)
if err != nil {
return nil, err
}
if !found {
// Fall back on old service tags if Pod not part of Deployment
return GatewayByServiceFor(clusterName, pod, services)
if !strings.Contains(kind, "Deployment") {
lukidzi marked this conversation as resolved.
Show resolved Hide resolved
return p.GatewayByServiceFor(ctx, clusterName, pod, services)
}
return &mesh_proto.Dataplane_Networking_Gateway{
Type: mesh_proto.Dataplane_Networking_Gateway_DELEGATED,
Expand Down
Loading