Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(k8s): reconcile serviceMaps when using mesh namespace annotation #3815

Merged
merged 1 commit into from
Mar 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ require (
github.com/morikuni/aec v1.0.0 // indirect
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
github.com/natefinch/lumberjack v2.0.0+incompatible // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.2 // indirect
github.com/opencontainers/runc v1.0.2 // indirect
Expand Down Expand Up @@ -184,7 +183,6 @@ require (
google.golang.org/appengine v1.6.7 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.66.2 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
k8s.io/component-base v0.23.4 // indirect
Expand Down
20 changes: 12 additions & 8 deletions pkg/plugins/runtime/k8s/controllers/configmap_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/go-logr/logr"
"github.com/pkg/errors"
kube_core "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
kube_runtime "k8s.io/apimachinery/pkg/runtime"
kube_types "k8s.io/apimachinery/pkg/types"
kube_record "k8s.io/client-go/tools/record"
Expand All @@ -23,7 +24,7 @@ import (
"github.com/kumahq/kuma/pkg/dns/vips"
k8s_common "github.com/kumahq/kuma/pkg/plugins/common/k8s"
mesh_k8s "github.com/kumahq/kuma/pkg/plugins/resources/k8s/native/api/v1alpha1"
"github.com/kumahq/kuma/pkg/plugins/runtime/k8s/metadata"
k8s_util "github.com/kumahq/kuma/pkg/plugins/runtime/k8s/util"
)

// ConfigMapReconciler reconciles a ConfigMap object
Expand Down Expand Up @@ -70,7 +71,7 @@ func (r *ConfigMapReconciler) SetupWithManager(mgr kube_ctrl.Manager) error {
Complete(r)
}

func ServiceToConfigMapsMapper(client kube_client.Reader, l logr.Logger, ns string) kube_handler.MapFunc {
func ServiceToConfigMapsMapper(client kube_client.Reader, l logr.Logger, systemNamespace string) kube_handler.MapFunc {
l = l.WithName("service-to-configmap-mapper")
return func(obj kube_client.Object) []kube_reconile.Request {
cause, ok := obj.(*kube_core.Service)
Expand All @@ -83,21 +84,24 @@ func ServiceToConfigMapsMapper(client kube_client.Reader, l logr.Logger, ns stri
svcName := fmt.Sprintf("%s/%s", cause.Namespace, cause.Name)
// List Pods in the same namespace
pods := &kube_core.PodList{}
if err := client.List(ctx, pods, kube_client.InNamespace(obj.GetNamespace())); err != nil {
if err := client.List(ctx, pods, kube_client.InNamespace(obj.GetNamespace()), kube_client.MatchingLabelsSelector{Selector: labels.SelectorFromSet(cause.Spec.Selector)}); err != nil {
l.WithValues("service", svcName).Error(err, "failed to fetch Dataplanes in namespace")
lahabana marked this conversation as resolved.
Show resolved Hide resolved
return nil
}
ns := kube_core.Namespace{}
if err := client.Get(ctx, kube_types.NamespacedName{Name: cause.Namespace}, &ns); err != nil {
l.WithValues("service", svcName).Error(err, "failed to fetch Namespace")
return nil
}

meshSet := map[string]bool{}
meshSet := map[string]struct{}{}
for _, pod := range pods.Items {
if mesh, exist := metadata.Annotations(pod.Annotations).GetString(metadata.KumaMeshAnnotation); exist {
meshSet[mesh] = true
}
meshSet[k8s_util.MeshOf(&pod, &ns)] = struct{}{}
}
var req []kube_reconile.Request
for mesh := range meshSet {
req = append(req, kube_reconile.Request{
NamespacedName: kube_types.NamespacedName{Namespace: ns, Name: vips.ConfigKey(mesh)},
NamespacedName: kube_types.NamespacedName{Namespace: systemNamespace, Name: vips.ConfigKey(mesh)},
})
}

Expand Down
145 changes: 143 additions & 2 deletions pkg/plugins/runtime/k8s/controllers/configmap_controller_test.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,29 @@
package controllers
package controllers_test

import (
"context"
"fmt"
"sort"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
kube_core "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kube_client "sigs.k8s.io/controller-runtime/pkg/client"

mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
"github.com/kumahq/kuma/pkg/log"
"github.com/kumahq/kuma/pkg/plugins/resources/k8s"
mesh_k8s "github.com/kumahq/kuma/pkg/plugins/resources/k8s/native/api/v1alpha1"
"github.com/kumahq/kuma/pkg/plugins/runtime/k8s/controllers"
"github.com/kumahq/kuma/pkg/plugins/runtime/k8s/metadata"
)

var _ = Describe("DataplaneToMeshMapper", func() {
It("should map ingress to list of meshes", func() {
l := log.NewLogger(log.InfoLevel)
mapper := DataplaneToMeshMapper(l, "ns", k8s.NewSimpleConverter())
mapper := controllers.DataplaneToMeshMapper(l, "ns", k8s.NewSimpleConverter())
requests := mapper(&mesh_k8s.Dataplane{
Mesh: "mesh-1",
Spec: mesh_k8s.ToSpec(&mesh_proto.Dataplane{
Expand All @@ -36,3 +46,134 @@ var _ = Describe("DataplaneToMeshMapper", func() {
Expect(requestsStr).To(ConsistOf("kuma-mesh-1-dns-vips"))
})
})

var _ = Describe("ServiceToConfigMapMapper", func() {
ctx := context.Background()
var nsName string
defaultNs := kube_core.Namespace{
ObjectMeta: metav1.ObjectMeta{},
}
AfterEach(func() {
ns := kube_core.Namespace{}
key := kube_client.ObjectKey{Name: nsName}
Expect(k8sClient.Get(ctx, key, &ns)).To(Succeed())
Expect(k8sClient.Delete(ctx, &ns)).To(Succeed())
})
BeforeEach(func() {
nsName = fmt.Sprintf("srv-to-configmap-mapper-%d", time.Now().UnixMilli())
})
serviceFn := func(selector map[string]string) kube_core.Service {
return kube_core.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "svc",
},
Spec: kube_core.ServiceSpec{
Selector: selector,
Ports: []kube_core.ServicePort{
{Port: 80},
},
},
}
}
podFn := func(name string, labels map[string]string, annotations map[string]string) kube_core.Pod {
return kube_core.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: labels,
Annotations: annotations,
},
Spec: kube_core.PodSpec{
Containers: []kube_core.Container{
{Name: "foo", Image: "busybox"},
},
},
}
}
DescribeTable("should map services to list of config maps",
func(givenService kube_core.Service, givenNamespace kube_core.Namespace, givenPods []kube_core.Pod, expectedMeshes []string) {
l := log.NewLogger(log.InfoLevel)
givenNamespace.Name = nsName
Expect(k8sClient.Create(ctx, &givenNamespace)).To(Succeed())
givenService.Namespace = givenNamespace.Name
Expect(k8sClient.Create(ctx, &givenService)).To(Succeed())
for _, pod := range givenPods {
pod.Namespace = givenNamespace.Name
Expect(k8sClient.Create(ctx, &pod)).To(Succeed())
}
mapper := controllers.ServiceToConfigMapsMapper(k8sClient, l, "ns")
requests := mapper(&givenService)
requestsStr := []string{}
for _, r := range requests {
requestsStr = append(requestsStr, r.Name)
}
sort.Strings(requestsStr)
Expect(requestsStr).To(Equal(expectedMeshes))
},
Entry("no pod match",
serviceFn(map[string]string{"app": "app1"}),
defaultNs,
[]kube_core.Pod{
podFn("pod1", map[string]string{"app": "app2"}, nil),
podFn("pod2", map[string]string{"app": "app2"}, map[string]string{metadata.KumaMeshAnnotation: "mesh2"}),
},
[]string{},
),
Entry("namespace not annotated selects only matching pods",
serviceFn(map[string]string{"app": "app1"}),
defaultNs,
[]kube_core.Pod{
podFn("pod1", map[string]string{"app": "app1"}, nil),
podFn("pod2", map[string]string{"app": "app2"}, map[string]string{metadata.KumaMeshAnnotation: "mesh2"}),
},
[]string{"kuma-default-dns-vips"},
),
Entry("namespace not annotated pod annotated selects only matching pods",
serviceFn(map[string]string{"app": "app1"}),
defaultNs,
[]kube_core.Pod{
podFn("pod1", map[string]string{"app": "app1"}, map[string]string{metadata.KumaMeshAnnotation: "mesh1"}),
podFn("pod2", map[string]string{"app": "app2"}, map[string]string{metadata.KumaMeshAnnotation: "mesh2"}),
},
[]string{"kuma-mesh1-dns-vips"},
),
Entry("namespace not annotated pod annotated matches on multiple meshes",
serviceFn(map[string]string{"app": "app1"}),
defaultNs,
[]kube_core.Pod{
podFn("pod1", map[string]string{"app": "app1"}, map[string]string{metadata.KumaMeshAnnotation: "mesh1"}),
podFn("pod2", map[string]string{"app": "app1"}, map[string]string{metadata.KumaMeshAnnotation: "mesh2"}),
},
[]string{"kuma-mesh1-dns-vips", "kuma-mesh2-dns-vips"},
),
Entry("namespace annotated pod not annotated",
serviceFn(map[string]string{"app": "app1"}),
kube_core.Namespace{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
metadata.KumaMeshAnnotation: "mesh1",
},
},
},
[]kube_core.Pod{
podFn("pod1", map[string]string{"app": "app1"}, nil),
podFn("pod2", map[string]string{"app": "app1"}, nil),
},
[]string{"kuma-mesh1-dns-vips"},
),
Entry("namespace annotated pod annotated",
serviceFn(map[string]string{"app": "app1"}),
kube_core.Namespace{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
metadata.KumaMeshAnnotation: "mesh1",
},
},
},
[]kube_core.Pod{
podFn("pod1", map[string]string{"app": "app1"}, map[string]string{metadata.KumaMeshAnnotation: "mesh2"}),
podFn("pod2", map[string]string{"app": "app1"}, nil),
},
[]string{"kuma-mesh1-dns-vips", "kuma-mesh2-dns-vips"},
),
)
})
2 changes: 1 addition & 1 deletion pkg/plugins/runtime/k8s/controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var testEnv *envtest.Environment
var k8sClientScheme *runtime.Scheme

func TestAPIs(t *testing.T) {
test.RunSpecs(t, "Namespace Controller Suite")
test.RunSpecs(t, "K8s Controller Suite")
}

var _ = BeforeSuite(test.Within(time.Minute, func() {
Expand Down