From fde683eaf8e8574f6d53867478b955310a248594 Mon Sep 17 00:00:00 2001 From: MengxinLiu Date: Tue, 13 Aug 2019 16:33:40 +0800 Subject: [PATCH] feat: add subnet annotation to ns and automatically unbind ns from subnet. --- pkg/controller/controller.go | 13 ++-- pkg/controller/namespace.go | 111 ++++++++++++++++++++++++++++++++++- pkg/controller/subnet.go | 69 +++++++++++++++++++++- 3 files changed, 187 insertions(+), 6 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 8517f13eb8c..46c6dc6fd63 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -49,8 +49,9 @@ type Controller struct { deleteSubnetQueue workqueue.RateLimitingInterface updateSubnetQueue workqueue.RateLimitingInterface - namespacesLister v1.NamespaceLister - namespacesSynced cache.InformerSynced + namespacesLister v1.NamespaceLister + namespacesSynced cache.InformerSynced + addNamespaceQueue workqueue.RateLimitingInterface nodesLister v1.NodeLister nodesSynced cache.InformerSynced @@ -121,8 +122,9 @@ func NewController(config *Configuration) *Controller { deletePodQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeletePod"), updatePodQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdatePod"), - namespacesLister: namespaceInformer.Lister(), - namespacesSynced: namespaceInformer.Informer().HasSynced, + namespacesLister: namespaceInformer.Lister(), + namespacesSynced: namespaceInformer.Informer().HasSynced, + addNamespaceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddNamespace"), nodesLister: nodeInformer.Lister(), nodesSynced: nodeInformer.Informer().HasSynced, @@ -204,6 +206,8 @@ func (c *Controller) Run(stopCh <-chan struct{}) error { defer c.deletePodQueue.ShutDown() defer c.updatePodQueue.ShutDown() + defer c.addNamespaceQueue.ShutDown() + defer c.addSubnetQueue.ShutDown() defer c.updateSubnetQueue.ShutDown() defer c.deleteSubnetQueue.ShutDown() @@ -254,6 +258,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) error { time.Sleep(3 * time.Second) go wait.Until(c.runAddIpPoolPodWorker, time.Second, stopCh) + go wait.Until(c.runAddNamespaceWorker, time.Second, stopCh) for i := 0; i < c.config.WorkerNum; i++ { go wait.Until(c.runAddPodWorker, time.Second, stopCh) go wait.Until(c.runDeletePodWorker, time.Second, stopCh) diff --git a/pkg/controller/namespace.go b/pkg/controller/namespace.go index 1d3eb5b1ff6..9c0c1ea9c18 100644 --- a/pkg/controller/namespace.go +++ b/pkg/controller/namespace.go @@ -1,9 +1,18 @@ package controller import ( + "encoding/json" + "fmt" + "reflect" + "github.com/alauda/kube-ovn/pkg/util" v1 "k8s.io/api/core/v1" - "reflect" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/tools/cache" + "k8s.io/klog" ) func (c *Controller) enqueueAddNamespace(obj interface{}) { @@ -14,6 +23,13 @@ func (c *Controller) enqueueAddNamespace(obj interface{}) { for _, np := range c.namespaceMatchNetworkPolicies(ns) { c.updateNpQueue.AddRateLimited(np) } + var key string + var err error + if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { + utilruntime.HandleError(err) + return + } + c.addNamespaceQueue.AddRateLimited(key) } func (c *Controller) enqueueDeleteNamespace(obj interface{}) { @@ -45,3 +61,96 @@ func (c *Controller) enqueueUpdateNamespace(old, new interface{}) { } } } + +func (c *Controller) runAddNamespaceWorker() { + for c.processNextAddNamespaceWorkItem() { + } +} + +func (c *Controller) processNextAddNamespaceWorkItem() bool { + obj, shutdown := c.addNamespaceQueue.Get() + + if shutdown { + return false + } + + err := func(obj interface{}) error { + defer c.addNamespaceQueue.Done(obj) + var key string + var ok bool + if key, ok = obj.(string); !ok { + c.addNamespaceQueue.Forget(obj) + utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) + return nil + } + if err := c.handleAddNamespace(key); err != nil { + c.addNamespaceQueue.AddRateLimited(key) + return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) + } + c.addNamespaceQueue.Forget(obj) + return nil + }(obj) + + if err != nil { + utilruntime.HandleError(err) + return true + } + + return true +} + +func (c *Controller) handleAddNamespace(key string) error { + namespace, err := c.namespacesLister.Get(key) + if err != nil { + if errors.IsNotFound(err) { + return nil + } + return err + } + subnet, err := c.subnetsLister.Get(c.config.DefaultLogicalSwitch) + if err != nil { + klog.Errorf("failed to get default subnet %v", err) + return err + } + subnets, err := c.subnetsLister.List(labels.Everything()) + if err != nil { + klog.Errorf("failed to list subnets %v", err) + return err + } + for _, s := range subnets { + for _, ns := range s.Spec.Namespaces { + if ns == key { + subnet = s + break + } + } + } + + op := "replace" + if namespace.Annotations == nil { + op = "add" + namespace.Annotations = map[string]string{} + } else { + if namespace.Annotations[util.LogicalSwitchAnnotation] == subnet.Name && + namespace.Annotations[util.CidrAnnotation] == subnet.Spec.CIDRBlock { + return nil + } + } + + namespace.Annotations[util.LogicalSwitchAnnotation] = subnet.Name + namespace.Annotations[util.CidrAnnotation] = subnet.Spec.CIDRBlock + patchPayloadTemplate := + `[{ + "op": "%s", + "path": "/metadata/annotations", + "value": %s + }]` + + raw, _ := json.Marshal(namespace.Annotations) + patchPayload := fmt.Sprintf(patchPayloadTemplate, op, raw) + _, err = c.config.KubeClient.CoreV1().Namespaces().Patch(key, types.JSONPatchType, []byte(patchPayload)) + if err != nil { + klog.Errorf("patch namespace %s failed %v", key, err) + } + return err +} diff --git a/pkg/controller/subnet.go b/pkg/controller/subnet.go index 5e1de8052c9..b8083e39e97 100644 --- a/pkg/controller/subnet.go +++ b/pkg/controller/subnet.go @@ -250,6 +250,11 @@ func (c *Controller) handleAddSubnet(key string) error { } } + if err := c.reconcileSubnet(subnet); err != nil { + klog.Errorf("failed to reconcile subnet %s, %v", subnet.Name, err) + return err + } + if subnet.Spec.Private { return c.ovnClient.SetPrivateLogicalSwitch(subnet.Name, subnet.Spec.Protocol, subnet.Spec.AllowSubnets) } @@ -275,9 +280,9 @@ func (c *Controller) handleUpdateSubnet(key string) error { return err } if !exist { - c.addSubnetQueue.AddRateLimited(key) return nil } + if err = util.ValidateSubnet(*subnet); err != nil { klog.Error(err) subnet.TypeMeta.Kind = "Subnet" @@ -286,6 +291,11 @@ func (c *Controller) handleUpdateSubnet(key string) error { return err } + if err := c.reconcileSubnet(subnet); err != nil { + klog.Errorf("failed to reconcile subnet %s, %v", subnet.Name, err) + return err + } + if subnet.Spec.Private { return c.ovnClient.SetPrivateLogicalSwitch(subnet.Name, subnet.Spec.Protocol, subnet.Spec.AllowSubnets) } @@ -315,3 +325,60 @@ func (c *Controller) handleDeleteSubnet(key string) error { } return nil } + +func (c *Controller) reconcileSubnet(subnet *kubeovnv1.Subnet) error { + // 1. unbind from previous subnet + subnets, err := c.subnetsLister.List(labels.Everything()) + if err != nil { + return err + } + + namespaceMap := map[string]bool{} + for _, ns := range subnet.Spec.Namespaces { + namespaceMap[ns] = true + } + + for _, sub := range subnets { + if sub.Name == subnet.Name || len(sub.Spec.Namespaces) == 0 { + continue + } + + changed := false + reservedNamespaces := []string{} + for _, ns := range sub.Spec.Namespaces { + if namespaceMap[ns] { + changed = true + } else { + reservedNamespaces = append(reservedNamespaces, ns) + } + } + if changed { + sub.Spec.Namespaces = reservedNamespaces + _, err := c.config.KubeOvnClient.KubeovnV1().Subnets().Update(sub) + if err != nil { + klog.Errorf("failed to unbind namespace from subnet %s, %v", sub.Name, err) + return err + } + } + } + + // 2. add annotations to bind namespace + for _, ns := range subnet.Spec.Namespaces { + c.addNamespaceQueue.AddRateLimited(ns) + } + + // 3. update unbind namespace annotation + namespaces, err := c.namespacesLister.List(labels.Everything()) + if err != nil { + klog.Errorf("failed to list namespaces, %v", err) + return err + } + + for _, ns := range namespaces { + if ns.Annotations != nil && ns.Annotations[util.LogicalSwitchAnnotation] == subnet.Name && !namespaceMap[ns.Name] { + c.addNamespaceQueue.AddRateLimited(ns.Name) + } + } + + return nil +}