Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kuma-cp): identify gateway service by deployment #4703

Merged
merged 5 commits into from
Aug 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 66 additions & 10 deletions pkg/plugins/runtime/k8s/controllers/pod_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"regexp"

"github.com/pkg/errors"
kube_apps "k8s.io/api/apps/v1"
kube_core "k8s.io/api/core/v1"
kube_client "sigs.k8s.io/controller-runtime/pkg/client"

Expand All @@ -26,6 +27,7 @@ var (
type PodConverter struct {
ServiceGetter kube_client.Reader
NodeGetter kube_client.Reader
ReplicaSetGetter kube_client.Reader
ResourceConverter k8s_common.Converter
Zone string
KubeOutboundsAsVIPs bool
Expand Down Expand Up @@ -122,16 +124,24 @@ func (p *PodConverter) dataplaneFor(

dataplane.Networking.Address = pod.Status.PodIP

enabled, exist, err = annotations.GetEnabled(metadata.KumaGatewayAnnotation)
if err != nil {
return nil, err
}
if exist && enabled {
gateway, err := GatewayFor(p.Zone, pod, services)
if err != nil {
return nil, err
gwType, exist := annotations.GetString(metadata.KumaGatewayAnnotation)
if exist {
switch gwType {
case "enabled":
gateway, err := GatewayByServiceFor(p.Zone, pod, services)
if err != nil {
return nil, err
}
dataplane.Networking.Gateway = gateway
case "provided":
gateway, err := p.GatewayByDeploymentFor(ctx, p.Zone, pod, services)
if err != nil {
return nil, err
}
dataplane.Networking.Gateway = gateway
default:
return nil, errors.Errorf("invalid delegated gateway type '%s'", gwType)
}
dataplane.Networking.Gateway = gateway
} else {
ifaces, err := InboundInterfacesFor(p.Zone, pod, services)
if err != nil {
Expand Down Expand Up @@ -171,7 +181,7 @@ func (p *PodConverter) dataplaneFor(
return dataplane, nil
}

func GatewayFor(clusterName string, pod *kube_core.Pod, services []*kube_core.Service) (*mesh_proto.Dataplane_Networking_Gateway, error) {
func GatewayByServiceFor(clusterName string, pod *kube_core.Pod, services []*kube_core.Service) (*mesh_proto.Dataplane_Networking_Gateway, error) {
interfaces, err := InboundInterfacesFor(clusterName, pod, services)
if err != nil {
return nil, err
Expand All @@ -182,6 +192,52 @@ func GatewayFor(clusterName string, pod *kube_core.Pod, services []*kube_core.Se
}, nil
}

// DeploymentFor returns the name of the deployment that the pod exists within. The second return
// value indicates whether or not the deployment was found when no error occurs, otherwise an
// error is returned as the third return value.
func (p *PodConverter) DeploymentFor(ctx context.Context, namespace string, pod *kube_core.Pod) (string, bool, error) {
owners := pod.GetObjectMeta().GetOwnerReferences()
rs := &kube_apps.ReplicaSet{}
for _, owner := range owners {
if owner.Kind == "ReplicaSet" {
rsKey := kube_client.ObjectKey{Namespace: namespace, Name: owner.Name}
if err := p.ReplicaSetGetter.Get(ctx, rsKey, rs); err != nil {
return "", false, err
}
break
}
}

if rs == nil {
return "", false, nil
}

rsOwners := rs.GetObjectMeta().GetOwnerReferences()
for _, owner := range rsOwners {
if owner.Kind == "Deployment" {
return owner.Name, true, nil
}
}

return "", false, nil
}

func (p *PodConverter) GatewayByDeploymentFor(ctx context.Context, clusterName string, pod *kube_core.Pod, services []*kube_core.Service) (*mesh_proto.Dataplane_Networking_Gateway, error) {
namespace := pod.GetObjectMeta().GetNamespace()
deployment, found, err := p.DeploymentFor(ctx, namespace, pod)
parkanzky marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
if !found {
// Fall back on old service tags if Pod not part of Deployment
return GatewayByServiceFor(clusterName, pod, services)
}
return &mesh_proto.Dataplane_Networking_Gateway{
Type: mesh_proto.Dataplane_Networking_Gateway_DELEGATED,
Tags: map[string]string{"kuma.io/service-name": fmt.Sprintf("%s_%s_svc", deployment, namespace)},
}, nil
}

func MetricsFor(pod *kube_core.Pod) (*mesh_proto.MetricsBackend, error) {
path, _ := metadata.Annotations(pod.Annotations).GetString(metadata.KumaMetricsPrometheusPath)
port, exist, err := metadata.Annotations(pod.Annotations).GetUint32(metadata.KumaMetricsPrometheusPort)
Expand Down
93 changes: 84 additions & 9 deletions pkg/plugins/runtime/k8s/controllers/pod_converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/pkg/errors"
kube_apps "k8s.io/api/apps/v1"
kube_core "k8s.io/api/core/v1"
kube_meta "k8s.io/apimachinery/pkg/apis/meta/v1"
kube_intstr "k8s.io/apimachinery/pkg/util/intstr"
Expand Down Expand Up @@ -62,6 +63,18 @@ var _ = Describe("PodToDataplane(..)", func() {
return services, nil
}

ParseReplicaSets := func(values []string) ([]*kube_apps.ReplicaSet, error) {
rsets := make([]*kube_apps.ReplicaSet, len(values))
for i, value := range values {
rset := kube_apps.ReplicaSet{}
if err := yaml.Unmarshal([]byte(value), &rset); err != nil {
return nil, err
}
rsets[i] = &rset
}
return rsets, nil
}

ParseDataplanes := func(values []string) ([]*mesh_k8s.Dataplane, error) {
dataplanes := make([]*mesh_k8s.Dataplane, len(values))
for i, value := range values {
Expand All @@ -75,12 +88,13 @@ var _ = Describe("PodToDataplane(..)", func() {
}

type testCase struct {
pod string
servicesForPod string
otherDataplanes string
otherServices string
node string
dataplane string
pod string
servicesForPod string
otherDataplanes string
otherServices string
otherReplicaSets string
node string
dataplane string
}
DescribeTable("should convert Pod into a Dataplane YAML version",
func(given testCase) {
Expand Down Expand Up @@ -121,6 +135,19 @@ var _ = Describe("PodToDataplane(..)", func() {
serviceGetter = reader
}

// other ReplicaSets
var replicaSetGetter kube_client.Reader
if given.otherReplicaSets != "" {
bytes, err = os.ReadFile(filepath.Join("testdata", given.otherReplicaSets))
Expect(err).ToNot(HaveOccurred())
YAMLs := util_yaml.SplitYAML(string(bytes))
rsets, err := ParseReplicaSets(YAMLs)
Expect(err).ToNot(HaveOccurred())
reader, err := newFakeReplicaSetReader(rsets)
Expect(err).ToNot(HaveOccurred())
replicaSetGetter = reader
}

// other dataplanes
var otherDataplanes []*mesh_k8s.Dataplane
if given.otherDataplanes != "" {
Expand All @@ -133,6 +160,7 @@ var _ = Describe("PodToDataplane(..)", func() {

converter := PodConverter{
ServiceGetter: serviceGetter,
ReplicaSetGetter: replicaSetGetter,
Zone: "zone-1",
ResourceConverter: k8s.NewSimpleConverter(),
}
Expand Down Expand Up @@ -160,7 +188,7 @@ var _ = Describe("PodToDataplane(..)", func() {
otherServices: "02.other-services.yaml",
dataplane: "02.dataplane.yaml",
}),
Entry("03. Pod with gateway annotation and 1 service", testCase{
Entry("03. Pod with gateway annotation and 1 service - legacy", testCase{
pod: "03.pod.yaml",
servicesForPod: "03.services-for-pod.yaml",
dataplane: "03.dataplane.yaml",
Expand Down Expand Up @@ -251,6 +279,23 @@ var _ = Describe("PodToDataplane(..)", func() {
servicesForPod: "19.services-for-pod.yaml",
dataplane: "19.dataplane.yaml",
}),
Entry("20. Pod with gateway annotation and 1 service identified by deployment", testCase{
parkanzky marked this conversation as resolved.
Show resolved Hide resolved
pod: "20.pod.yaml",
servicesForPod: "20.services-for-pod.yaml",
otherReplicaSets: "20.replicasets-for-pod.yaml",
dataplane: "20.dataplane.yaml",
}),
Entry("21. Pod with gateway annotation and 1 service with no replicaset", testCase{
pod: "21.pod.yaml",
servicesForPod: "21.services-for-pod.yaml",
dataplane: "21.dataplane.yaml",
}),
Entry("22. Pod with gateway annotation and 1 service with replicaset but no deployment", testCase{
pod: "22.pod.yaml",
servicesForPod: "22.services-for-pod.yaml",
otherReplicaSets: "22.replicasets-for-pod.yaml",
dataplane: "22.dataplane.yaml",
}),
)

DescribeTable("should convert Ingress Pod into an Ingress Dataplane YAML version",
Expand Down Expand Up @@ -857,9 +902,10 @@ func newFakeServiceReader(services []*kube_core.Service) (fakeServiceReader, err
var _ kube_client.Reader = fakeServiceReader{}

func (r fakeServiceReader) Get(ctx context.Context, key kube_client.ObjectKey, obj kube_client.Object) error {
data, ok := r[fmt.Sprintf("%s/%s", key.Namespace, key.Name)]
fqName := fmt.Sprintf("%s/%s", key.Namespace, key.Name)
data, ok := r[fqName]
if !ok {
return errors.New("not found")
return errors.Errorf("service not found: %s", fqName)
}
return yaml.Unmarshal([]byte(data), obj)
}
Expand All @@ -884,3 +930,32 @@ func (f fakeNodeReader) List(ctx context.Context, list kube_client.ObjectList, o
l.Items = append(l.Items, node)
return nil
}

type fakeReplicaSetReader map[string]string

func newFakeReplicaSetReader(replicaSets []*kube_apps.ReplicaSet) (fakeReplicaSetReader, error) {
replicaSetsMap := map[string]string{}
for _, rs := range replicaSets {
bytes, err := yaml.Marshal(rs)
if err != nil {
return nil, err
}
replicaSetsMap[rs.GetNamespace()+"/"+rs.GetName()] = string(bytes)
}
return replicaSetsMap, nil
}

var _ kube_client.Reader = fakeReplicaSetReader{}

func (r fakeReplicaSetReader) Get(ctx context.Context, key kube_client.ObjectKey, obj kube_client.Object) error {
fqName := fmt.Sprintf("%s/%s", key.Namespace, key.Name)
data, ok := r[fqName]
if !ok {
return errors.Errorf("replicaset not found: %s", fqName)
}
return yaml.Unmarshal([]byte(data), obj)
}

func (f fakeReplicaSetReader) List(ctx context.Context, list kube_client.ObjectList, opts ...kube_client.ListOption) error {
return errors.New("not implemented")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
mesh: default
metadata:
creationTimestamp: null
spec:
networking:
address: 192.168.0.1
gateway:
tags:
kuma.io/service-name: deploymentOne_demo_svc
19 changes: 19 additions & 0 deletions pkg/plugins/runtime/k8s/controllers/testdata/20.pod.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
metadata:
namespace: demo
ownerReferences:
- name: replicaSetOne
kind: ReplicaSet
name: example
labels:
app: example
version: "0.1"
annotations:
kuma.io/gateway: provided
spec:
containers:
- ports:
- containerPort: 7070
- containerPort: 6060
name: metrics
status:
podIP: 192.168.0.1
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
apiVersion: apps/v1
kind: ReplicaSet
metadata:
name: replicaSetOne
namespace: demo
ownerReferences:
- name: deploymentOne
kind: Deployment
spec:
template:
spec:
containers:
- name: app
image: app-image
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
metadata:
namespace: demo
name: example
spec:
clusterIP: 192.168.0.1
ports:
- # protocol defaults to TCP
port: 80
targetPort: 8080
- protocol: TCP
port: 443
targetPort: 8443
16 changes: 16 additions & 0 deletions pkg/plugins/runtime/k8s/controllers/testdata/21.dataplane.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
mesh: default
metadata:
creationTimestamp: null
spec:
networking:
address: 192.168.0.1
gateway:
tags:
app: example
k8s.kuma.io/namespace: demo
k8s.kuma.io/service-name: example
k8s.kuma.io/service-port: "80"
kuma.io/protocol: tcp
kuma.io/service: example_demo_svc_80
kuma.io/zone: zone-1
version: "0.1"
16 changes: 16 additions & 0 deletions pkg/plugins/runtime/k8s/controllers/testdata/21.pod.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
metadata:
namespace: demo
name: example
labels:
app: example
version: "0.1"
annotations:
kuma.io/gateway: provided
spec:
containers:
- ports:
- containerPort: 7070
- containerPort: 6060
name: metrics
status:
podIP: 192.168.0.1
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
metadata:
namespace: demo
name: example
spec:
clusterIP: 192.168.0.1
ports:
- # protocol defaults to TCP
port: 80
targetPort: 8080
- protocol: TCP
port: 443
targetPort: 8443
16 changes: 16 additions & 0 deletions pkg/plugins/runtime/k8s/controllers/testdata/22.dataplane.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
mesh: default
metadata:
creationTimestamp: null
spec:
networking:
address: 192.168.0.1
gateway:
tags:
app: example
k8s.kuma.io/namespace: demo
k8s.kuma.io/service-name: example
k8s.kuma.io/service-port: "80"
kuma.io/protocol: tcp
kuma.io/service: example_demo_svc_80
kuma.io/zone: zone-1
version: "0.1"
Loading