Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kuma-cp): identify gateway service by deployment #4703

Merged
merged 5 commits into from
Aug 5, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 75 additions & 10 deletions pkg/plugins/runtime/k8s/controllers/pod_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"regexp"

"github.com/pkg/errors"
kube_apps "k8s.io/api/apps/v1"
kube_core "k8s.io/api/core/v1"
kube_client "sigs.k8s.io/controller-runtime/pkg/client"

Expand All @@ -26,6 +27,8 @@ var (
type PodConverter struct {
ServiceGetter kube_client.Reader
NodeGetter kube_client.Reader
ReplicaSetGetter kube_client.Reader
DeploymentGetter kube_client.Reader
ResourceConverter k8s_common.Converter
Zone string
KubeOutboundsAsVIPs bool
Expand Down Expand Up @@ -122,16 +125,23 @@ func (p *PodConverter) dataplaneFor(

dataplane.Networking.Address = pod.Status.PodIP

enabled, exist, err = annotations.GetEnabled(metadata.KumaGatewayAnnotation)
if err != nil {
return nil, err
}
if exist && enabled {
gateway, err := GatewayFor(p.Zone, pod, services)
if err != nil {
return nil, err
gwType, exist := annotations.GetString(metadata.KumaGatewayAnnotation)
if exist {
if gwType == "enabled" {
gateway, err := GatewayByServiceFor(p.Zone, pod, services)
if err != nil {
return nil, err
}
dataplane.Networking.Gateway = gateway
} else if gwType == "provided" {
gateway, err := p.GatewayByDeploymentFor(ctx, p.Zone, pod, services)
if err != nil {
return nil, err
}
dataplane.Networking.Gateway = gateway
} else {
return nil, errors.Errorf("invalid delegated gateway type '%s'", gwType)
}
dataplane.Networking.Gateway = gateway
} else {
ifaces, err := InboundInterfacesFor(p.Zone, pod, services)
if err != nil {
Expand Down Expand Up @@ -171,7 +181,7 @@ func (p *PodConverter) dataplaneFor(
return dataplane, nil
}

func GatewayFor(clusterName string, pod *kube_core.Pod, services []*kube_core.Service) (*mesh_proto.Dataplane_Networking_Gateway, error) {
func GatewayByServiceFor(clusterName string, pod *kube_core.Pod, services []*kube_core.Service) (*mesh_proto.Dataplane_Networking_Gateway, error) {
interfaces, err := InboundInterfacesFor(clusterName, pod, services)
if err != nil {
return nil, err
Expand All @@ -182,6 +192,61 @@ func GatewayFor(clusterName string, pod *kube_core.Pod, services []*kube_core.Se
}, nil
}

// DeploymentFor returns the name of the deployment that the pod exists within. The second return
// value indicates whether or not the deployment was found when no error occurs, otherwise an
// error is returned as the third return value.
func (p *PodConverter) DeploymentFor(ctx context.Context, namespace string, pod *kube_core.Pod) (string, bool, error) {
owners := pod.GetObjectMeta().GetOwnerReferences()
rs := &kube_apps.ReplicaSet{}
for _, owner := range owners {
if owner.Kind == "ReplicaSet" {
rsKey := kube_client.ObjectKey{Namespace: namespace, Name: owner.Name}
if err := p.ReplicaSetGetter.Get(ctx, rsKey, rs); err != nil {
return "", false, err
}
break
}
}

if rs == nil {
return "", false, nil
}

rsOwners := rs.GetObjectMeta().GetOwnerReferences()
dpl := &kube_apps.Deployment{}
for _, owner := range rsOwners {
if owner.Kind == "Deployment" {
dplKey := kube_client.ObjectKey{Namespace: namespace, Name: owner.Name}
if err := p.DeploymentGetter.Get(ctx, dplKey, dpl); err != nil {
parkanzky marked this conversation as resolved.
Show resolved Hide resolved
return "", false, err
}
break
}
}

if dpl != nil {
return dpl.Name, true, nil
}

return "", false, nil
}

func (p *PodConverter) GatewayByDeploymentFor(ctx context.Context, clusterName string, pod *kube_core.Pod, services []*kube_core.Service) (*mesh_proto.Dataplane_Networking_Gateway, error) {
namespace := pod.GetObjectMeta().GetNamespace()
deployment, found, err := p.DeploymentFor(ctx, namespace, pod)
parkanzky marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
if !found {
// Fall back on old service tags if Pod not part of Deployment
return GatewayByServiceFor(clusterName, pod, services)
}
return &mesh_proto.Dataplane_Networking_Gateway{
Type: mesh_proto.Dataplane_Networking_Gateway_DELEGATED,
Tags: map[string]string{"kuma.io/service-name": fmt.Sprintf("%s_%s_svc", deployment, namespace)},
}, nil
}

func MetricsFor(pod *kube_core.Pod) (*mesh_proto.MetricsBackend, error) {
path, _ := metadata.Annotations(pod.Annotations).GetString(metadata.KumaMetricsPrometheusPath)
port, exist, err := metadata.Annotations(pod.Annotations).GetUint32(metadata.KumaMetricsPrometheusPort)
Expand Down
2 changes: 2 additions & 0 deletions pkg/plugins/runtime/k8s/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ func addPodReconciler(mgr kube_ctrl.Manager, rt core_runtime.Runtime, converter
PodConverter: k8s_controllers.PodConverter{
ServiceGetter: mgr.GetClient(),
NodeGetter: mgr.GetClient(),
ReplicaSetGetter: mgr.GetClient(),
DeploymentGetter: mgr.GetClient(),
Zone: rt.Config().Multizone.Zone.Name,
ResourceConverter: converter,
KubeOutboundsAsVIPs: rt.Config().Experimental.KubeOutboundsAsVIPs,
Expand Down