Skip to content

Commit

Permalink
feat(MeshGatewayInstance): respect kuma.io/mesh label
Browse files Browse the repository at this point in the history
Signed-off-by: Mike Beaumont <mjboamail@gmail.com>
  • Loading branch information
michaelbeaumont committed Nov 2, 2022
1 parent b9c2064 commit 1eb72fe
Show file tree
Hide file tree
Showing 10 changed files with 36 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func ServiceToConfigMapsMapper(client kube_client.Reader, l logr.Logger, systemN

meshSet := map[string]struct{}{}
for _, pod := range pods.Items {
meshSet[k8s_util.MeshOf(&pod, &ns)] = struct{}{}
meshSet[k8s_util.MeshOfByAnnotation(&pod, &ns)] = struct{}{}
}
var req []kube_reconile.Request
for mesh := range meshSet {
Expand Down
2 changes: 1 addition & 1 deletion pkg/plugins/runtime/k8s/controllers/gateway_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (r *PodReconciler) createorUpdateBuiltinGatewayDataplane(ctx context.Contex
Namespace: pod.Namespace,
Name: pod.Name,
},
Mesh: k8s_util.MeshOf(pod, ns),
Mesh: k8s_util.MeshOfByAnnotation(pod, ns),
}

tagsAnnotation, ok := pod.Annotations[metadata.KumaTagsAnnotation]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ func (r *GatewayInstanceReconciler) Reconcile(ctx context.Context, req kube_ctrl
if err := r.Client.Get(ctx, kube_types.NamespacedName{Name: gatewayInstance.Namespace}, &ns); err != nil {
return kube_ctrl.Result{}, errors.Wrap(err, "unable to get Namespace of MeshGatewayInstance")
}
mesh := k8s_util.MeshOf(gatewayInstance, &ns)

mesh := k8s_util.MeshOfByLabelOrAnnotation(r.Log, gatewayInstance, &ns)

orig := gatewayInstance.DeepCopyObject().(kube_client.Object)
svc, err := r.createOrUpdateService(ctx, mesh, gatewayInstance)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req kube_ctrl.Request
return kube_ctrl.Result{}, errors.Wrap(err, "unable to get Namespace of MeshGateway")
}

mesh := k8s_util.MeshOf(gateway, &ns)
mesh := k8s_util.MeshOfByAnnotation(gateway, &ns)
gatewaySpec, listenerConditions, err := r.gapiToKumaGateway(ctx, mesh, gateway, config)
if err != nil {
return kube_ctrl.Result{}, errors.Wrap(err, "error generating MeshGateway.kuma.io")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (r *HTTPRouteReconciler) Reconcile(ctx context.Context, req kube_ctrl.Reque
return kube_ctrl.Result{}, errors.Wrap(err, "unable to get Namespace of HTTPRoute")
}

mesh := k8s_util.MeshOf(httpRoute, &ns)
mesh := k8s_util.MeshOfByAnnotation(httpRoute, &ns)

spec, conditions, err := r.gapiToKumaRoutes(ctx, mesh, httpRoute)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/plugins/runtime/k8s/controllers/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (r *PodReconciler) findOtherDataplanes(ctx context.Context, pod *kube_core.
}

// only consider Dataplanes in the same Mesh as Pod
mesh := util_k8s.MeshOf(pod, ns)
mesh := util_k8s.MeshOfByAnnotation(pod, ns)
otherDataplanes := make([]*mesh_k8s.Dataplane, 0)
for i := range allDataplanes.Items {
dataplane := allDataplanes.Items[i]
Expand Down
2 changes: 1 addition & 1 deletion pkg/plugins/runtime/k8s/controllers/pod_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (p *PodConverter) PodToDataplane(
services []*kube_core.Service,
others []*mesh_k8s.Dataplane,
) error {
dataplane.Mesh = util_k8s.MeshOf(pod, ns)
dataplane.Mesh = util_k8s.MeshOfByAnnotation(pod, ns)
dataplaneProto, err := p.dataplaneFor(ctx, pod, services, others)
if err != nil {
return err
Expand Down
28 changes: 26 additions & 2 deletions pkg/plugins/runtime/k8s/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
kube_meta "k8s.io/apimachinery/pkg/apis/meta/v1"
kube_labels "k8s.io/apimachinery/pkg/labels"
kube_intstr "k8s.io/apimachinery/pkg/util/intstr"
kube_client "sigs.k8s.io/controller-runtime/pkg/client"

"github.com/go-logr/logr"
"github.com/kumahq/kuma/pkg/core/resources/model"
"github.com/kumahq/kuma/pkg/plugins/runtime/k8s/metadata"
)
Expand Down Expand Up @@ -135,9 +137,9 @@ func CopyStringMap(in map[string]string) map[string]string {
return out
}

// MeshOf returns the mesh of the given object according to its own annotations
// MeshOfByAnnotation returns the mesh of the given object according to its own annotations
// or those of its namespace.
func MeshOf(obj kube_meta.Object, namespace *kube_core.Namespace) string {
func MeshOfByAnnotation(obj kube_meta.Object, namespace *kube_core.Namespace) string {
if mesh, exists := metadata.Annotations(obj.GetAnnotations()).GetString(metadata.KumaMeshAnnotation); exists && mesh != "" {
return mesh
}
Expand All @@ -148,6 +150,28 @@ func MeshOf(obj kube_meta.Object, namespace *kube_core.Namespace) string {
return model.DefaultMesh
}

// MeshOfByLabelOrAnnotation returns the mesh of the given object according to its own
// annotations or labels or those of its namespace. It treats the annotation
// directly on the object as deprecated.
func MeshOfByLabelOrAnnotation(log logr.Logger, obj kube_client.Object, namespace *kube_core.Namespace) string {
if mesh, exists := metadata.Annotations(obj.GetLabels()).GetString(metadata.KumaMeshLabel); exists && mesh != "" {
return mesh
}
if mesh, exists := metadata.Annotations(namespace.GetLabels()).GetString(metadata.KumaMeshLabel); exists && mesh != "" {
return mesh
}

if mesh, exists := metadata.Annotations(obj.GetAnnotations()).GetString(metadata.KumaMeshAnnotation); exists && mesh != "" {
log.Info("WARNING: The kuma.io/mesh annotation is deprecated for this object kind", "name", obj.GetName(), "namespace", obj.GetNamespace(), "kind", obj.GetObjectKind().GroupVersionKind().Kind)
return mesh
}
if mesh, exists := metadata.Annotations(namespace.GetAnnotations()).GetString(metadata.KumaMeshAnnotation); exists && mesh != "" {
return mesh
}

return model.DefaultMesh
}

// ServiceTagFor returns the canonical service name for a Kubernetes service,
// optionally with a specific port.
func ServiceTagFor(svc *kube_core.Service, svcPort *int32) string {
Expand Down
2 changes: 1 addition & 1 deletion pkg/plugins/runtime/k8s/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ var _ = Describe("Util", func() {
}

// then
Expect(util.MeshOf(pod, ns)).To(Equal(given.expected))
Expect(util.MeshOfByAnnotation(pod, ns)).To(Equal(given.expected))
},
Entry("Pod without annotations", testCase{
podAnnotations: nil,
Expand Down
2 changes: 1 addition & 1 deletion pkg/plugins/runtime/k8s/webhooks/injector/injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (i *KumaInjector) InjectKuma(ctx context.Context, pod *kube_core.Pod) error
logger.V(1).Info("skip injecting Kuma")
return nil
}
meshName := k8s_util.MeshOf(pod, ns)
meshName := k8s_util.MeshOfByAnnotation(pod, ns)
logger = logger.WithValues("mesh", meshName)
// Check mesh exists
if err := i.client.Get(ctx, kube_types.NamespacedName{Name: meshName}, &mesh_k8s.Mesh{}); err != nil {
Expand Down

0 comments on commit 1eb72fe

Please sign in to comment.