diff --git a/dist/images/cleanup.sh b/dist/images/cleanup.sh index 70ab1b2919e..14183c3e79d 100644 --- a/dist/images/cleanup.sh +++ b/dist/images/cleanup.sh @@ -1,5 +1,6 @@ #!/bin/bash -set -eu +set -eux +export PS4='+ $(date "+%Y-%m-%d %H:%M:%S")\011 ' kubectl delete --ignore-not-found ds kube-ovn-pinger -n kube-system # ensure kube-ovn-pinger has been deleted @@ -109,12 +110,22 @@ kubectl delete --ignore-not-found clusterrole system:vpc-dns kubectl delete --ignore-not-found clusterrolebinding vpc-dns kubectl delete --ignore-not-found sa vpc-dns -n kube-system + # delete CRD -kubectl delete --ignore-not-found crd htbqoses.kubeovn.io security-groups.kubeovn.io ips.kubeovn.io subnets.kubeovn.io \ - vpc-nat-gateways.kubeovn.io vpcs.kubeovn.io vlans.kubeovn.io provider-networks.kubeovn.io \ - iptables-dnat-rules.kubeovn.io iptables-eips.kubeovn.io iptables-fip-rules.kubeovn.io \ +kubectl delete --ignore-not-found crd security-groups.kubeovn.io subnets.kubeovn.io vpcs.kubeovn.io \ + vlans.kubeovn.io provider-networks.kubeovn.io vpc-nat-gateways.kubeovn.io \ + iptables-dnat-rules.kubeovn.io iptables-eips.kubeovn.io iptables-fip-rules.kubeovn.io \ iptables-snat-rules.kubeovn.io vips.kubeovn.io switch-lb-rules.kubeovn.io vpc-dnses.kubeovn.io \ ovn-eips.kubeovn.io ovn-fips.kubeovn.io ovn-snat-rules.kubeovn.io +# in case of ip not delete +set +e +for ip in $(kubectl get ip -o name); do + kubectl patch "$ip" --type='json' -p '[{"op": "replace", "path": "/metadata/finalizers", "value": []}]' + kubectl delete --ignore-not-found "$ip" +done +kubectl delete --ignore-not-found crd ips.kubeovn.io +set -e + # Remove annotations/labels in namespaces and nodes kubectl annotate no --all ovn.kubernetes.io/cidr- diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 6805d4978d7..8229668bccc 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -91,8 +91,11 @@ type Controller struct { syncVirtualPortsQueue workqueue.RateLimitingInterface subnetStatusKeyMutex *keymutex.KeyMutex - ipsLister kubeovnlister.IPLister - ipSynced cache.InformerSynced + ipsLister kubeovnlister.IPLister + ipSynced cache.InformerSynced + addIPQueue workqueue.RateLimitingInterface + updateIPQueue workqueue.RateLimitingInterface + delIPQueue workqueue.RateLimitingInterface virtualIpsLister kubeovnlister.VipLister virtualIpsSynced cache.InformerSynced @@ -288,8 +291,11 @@ func NewController(config *Configuration) *Controller { syncVirtualPortsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "SyncVirtualPort"), subnetStatusKeyMutex: keymutex.New(97), - ipsLister: ipInformer.Lister(), - ipSynced: ipInformer.Informer().HasSynced, + ipsLister: ipInformer.Lister(), + ipSynced: ipInformer.Informer().HasSynced, + addIPQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddIP"), + updateIPQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateIP"), + delIPQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteIP"), virtualIpsLister: virtualIpInformer.Lister(), virtualIpsSynced: virtualIpInformer.Informer().HasSynced, @@ -485,9 +491,9 @@ func NewController(config *Configuration) *Controller { } if _, err = ipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: controller.enqueueAddOrDelIP, + AddFunc: controller.enqueueAddIP, UpdateFunc: controller.enqueueUpdateIP, - DeleteFunc: controller.enqueueAddOrDelIP, + DeleteFunc: controller.enqueueDelIP, }); err != nil { util.LogFatalAndExit(err, "failed to add ips event handler") } @@ -763,6 +769,10 @@ func (c *Controller) shutdown() { c.delVpcDnsQueue.ShutDown() } + c.addIPQueue.ShutDown() + c.updateIPQueue.ShutDown() + c.delIPQueue.ShutDown() + c.addVirtualIpQueue.ShutDown() c.updateVirtualIpQueue.ShutDown() c.delVirtualIpQueue.ShutDown() @@ -977,6 +987,10 @@ func (c *Controller) startWorkers(ctx context.Context) { go wait.Until(c.syncVmLiveMigrationPort, 15*time.Second, ctx.Done()) + go wait.Until(c.runAddIPWorker, time.Second, ctx.Done()) + go wait.Until(c.runUpdateIPWorker, time.Second, ctx.Done()) + go wait.Until(c.runDelIPWorker, time.Second, ctx.Done()) + go wait.Until(c.runAddVirtualIpWorker, time.Second, ctx.Done()) go wait.Until(c.runUpdateVirtualIpWorker, time.Second, ctx.Done()) go wait.Until(c.runDelVirtualIpWorker, time.Second, ctx.Done()) diff --git a/pkg/controller/ip.go b/pkg/controller/ip.go index 1c641fbcab6..976ac074745 100644 --- a/pkg/controller/ip.go +++ b/pkg/controller/ip.go @@ -1,38 +1,318 @@ package controller import ( + "context" + "fmt" + "reflect" "strings" - "github.com/kubeovn/kube-ovn/pkg/util" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1" + "github.com/kubeovn/kube-ovn/pkg/util" ) -func (c *Controller) enqueueAddOrDelIP(obj interface{}) { - if _, ok := obj.(*kubeovnv1.IP); !ok { +func (c *Controller) enqueueAddIP(obj interface{}) { + ipObj, ok := obj.(*kubeovnv1.IP) + if !ok { klog.Errorf("object is not an IP, ignore it") return } - - ipObj := obj.(*kubeovnv1.IP) - klog.V(3).Infof("enqueue update status subnet %s", ipObj.Spec.Subnet) if strings.HasPrefix(ipObj.Name, util.U2OInterconnName[0:19]) { return } + klog.V(3).Infof("enqueue update status subnet %s", ipObj.Spec.Subnet) c.updateSubnetStatusQueue.Add(ipObj.Spec.Subnet) for _, as := range ipObj.Spec.AttachSubnets { - klog.V(3).Infof("enqueue update status subnet %s", as) + klog.V(3).Infof("enqueue update attach status for subnet %s", as) c.updateSubnetStatusQueue.Add(as) } + var key string + var err error + if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { + utilruntime.HandleError(err) + return + } + klog.V(3).Infof("enqueue add ip %s", key) + c.addIPQueue.Add(key) } -func (c *Controller) enqueueUpdateIP(old, new interface{}) { +func (c *Controller) enqueueUpdateIP(oldObj, newObj interface{}) { + var key string + var err error + if key, err = cache.MetaNamespaceKeyFunc(newObj); err != nil { + utilruntime.HandleError(err) + return + } + oldIP := oldObj.(*kubeovnv1.IP) + newIP := newObj.(*kubeovnv1.IP) + if !newIP.DeletionTimestamp.IsZero() { + klog.V(3).Infof("enqueue update ip %s", key) + c.updateIPQueue.Add(key) + return + } + if !reflect.DeepEqual(oldIP.Spec.AttachSubnets, newIP.Spec.AttachSubnets) { + klog.V(3).Infof("enqueue update status subnet %s", newIP.Spec.Subnet) + for _, as := range newIP.Spec.AttachSubnets { + klog.V(3).Infof("enqueue update status for attach subnet %s", as) + c.updateSubnetStatusQueue.Add(as) + } + } +} - ipObj := new.(*kubeovnv1.IP) - klog.V(3).Infof("enqueue update status subnet %s", ipObj.Spec.Subnet) - for _, as := range ipObj.Spec.AttachSubnets { - klog.V(3).Infof("enqueue update status subnet %s", as) +func (c *Controller) enqueueDelIP(obj interface{}) { + var key string + var err error + if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { + utilruntime.HandleError(err) + return + } + ipObj := obj.(*kubeovnv1.IP) + if strings.HasPrefix(ipObj.Name, util.U2OInterconnName[0:19]) { + return + } + klog.V(3).Infof("enqueue del ip %s", key) + c.delIPQueue.Add(ipObj) +} + +func (c *Controller) runAddIPWorker() { + for c.processNextAddIPWorkItem() { + } +} + +func (c *Controller) runUpdateIPWorker() { + for c.processNextUpdateIPWorkItem() { + } +} + +func (c *Controller) runDelIPWorker() { + for c.processNextDeleteIPWorkItem() { + } +} + +func (c *Controller) processNextAddIPWorkItem() bool { + obj, shutdown := c.addIPQueue.Get() + if shutdown { + return false + } + + err := func(obj interface{}) error { + defer c.addIPQueue.Done(obj) + var key string + var ok bool + if key, ok = obj.(string); !ok { + c.addIPQueue.Forget(obj) + utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) + return nil + } + if err := c.handleAddIP(key); err != nil { + c.addIPQueue.AddRateLimited(key) + return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) + } + c.addIPQueue.Forget(obj) + return nil + }(obj) + if err != nil { + utilruntime.HandleError(err) + return true + } + return true +} + +func (c *Controller) processNextUpdateIPWorkItem() bool { + obj, shutdown := c.updateIPQueue.Get() + if shutdown { + return false + } + + err := func(obj interface{}) error { + defer c.updateIPQueue.Done(obj) + var key string + var ok bool + if key, ok = obj.(string); !ok { + c.updateIPQueue.Forget(obj) + utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) + return nil + } + if err := c.handleUpdateIP(key); err != nil { + c.updateIPQueue.AddRateLimited(key) + return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) + } + c.updateIPQueue.Forget(obj) + return nil + }(obj) + if err != nil { + utilruntime.HandleError(err) + return true + } + return true +} + +func (c *Controller) processNextDeleteIPWorkItem() bool { + obj, shutdown := c.delIPQueue.Get() + if shutdown { + return false + } + + err := func(obj interface{}) error { + defer c.delIPQueue.Done(obj) + var ip *kubeovnv1.IP + var ok bool + if ip, ok = obj.(*kubeovnv1.IP); !ok { + c.delIPQueue.Forget(obj) + utilruntime.HandleError(fmt.Errorf("expected ip in workqueue but got %#v", obj)) + return nil + } + if err := c.handleDelIP(ip); err != nil { + c.delIPQueue.AddRateLimited(obj) + return fmt.Errorf("error syncing '%s': %s, requeuing", ip.Name, err.Error()) + } + c.delIPQueue.Forget(obj) + return nil + }(obj) + if err != nil { + utilruntime.HandleError(err) + return true + } + return true +} + +func (c *Controller) handleAddIP(key string) error { + cachedIP, err := c.ipsLister.Get(key) + if err != nil { + if k8serrors.IsNotFound(err) { + return nil + } + return err + } + klog.V(3).Infof("handle add ip %s", cachedIP.Name) + if err := c.handleAddIPFinalizer(cachedIP, util.ControllerName); err != nil { + klog.Errorf("failed to handle add ip finalizer %v", err) + return err + } + return nil +} + +func (c *Controller) handleUpdateIP(key string) error { + cachedIP, err := c.ipsLister.Get(key) + if err != nil { + if k8serrors.IsNotFound(err) { + return nil + } + klog.Error(err) + return err + } + if !cachedIP.DeletionTimestamp.IsZero() { + subnet, err := c.subnetsLister.Get(cachedIP.Spec.Subnet) + if err != nil { + klog.Errorf("failed to get subnet %s: %v", cachedIP.Spec.Subnet, err) + return err + } + cleanIPAM := true + if isOvnSubnet(subnet) { + portName := cachedIP.Name + port, err := c.ovnClient.GetLogicalSwitchPort(portName, true) + if err != nil { + klog.Errorf("failed to get logical switch port %s: %v", portName, err) + return err + } + if port != nil && len(port.Addresses) > 0 { + address := port.Addresses[0] + if strings.Contains(address, cachedIP.Spec.MacAddress) { + klog.Infof("delete ip cr lsp %s from switch %s", portName, subnet.Name) + if err := c.ovnLegacyClient.DeleteLogicalSwitchPort(portName); err != nil { + klog.Errorf("failed to delete ip cr lsp %s from switch %s: %v", portName, subnet.Name, err) + return err + } + klog.V(3).Infof("sync sg for deleted port %s", portName) + sgList, err := c.getPortSg(port) + if err != nil { + klog.Errorf("get port sg failed, %v", err) + return err + } + for _, sgName := range sgList { + if sgName != "" { + c.syncSgPortsQueue.Add(sgName) + } + } + } else { + // ip subnet changed in pod handle add or update pod process + klog.Infof("lsp %s ip changed, only delete old ip cr %s", portName, key) + cleanIPAM = false + } + } + } + if cleanIPAM { + klog.V(3).Infof("release ipam for deleted ip %s from subnet %s", cachedIP.Name, cachedIP.Spec.Subnet) + c.ipam.ReleaseAddressByPod(cachedIP.Name, cachedIP.Spec.Subnet) + } + if err = c.handleDelIPFinalizer(cachedIP, util.ControllerName); err != nil { + klog.Errorf("failed to handle del ip finalizer %v", err) + return err + } + } + return nil +} + +func (c *Controller) handleDelIP(ip *kubeovnv1.IP) error { + klog.V(3).Infof("handle delete ip %s", ip.Name) + klog.V(3).Infof("enqueue update status subnet %s", ip.Spec.Subnet) + c.updateSubnetStatusQueue.Add(ip.Spec.Subnet) + for _, as := range ip.Spec.AttachSubnets { + klog.V(3).Infof("enqueue update attach status for subnet %s", as) c.updateSubnetStatusQueue.Add(as) } + return nil +} + +func (c *Controller) handleAddIPFinalizer(cachedIP *kubeovnv1.IP, finalizer string) error { + if cachedIP.DeletionTimestamp.IsZero() { + if util.ContainsString(cachedIP.Finalizers, finalizer) { + return nil + } + } + newIP := cachedIP.DeepCopy() + controllerutil.AddFinalizer(newIP, finalizer) + patch, err := util.GenerateMergePatchPayload(cachedIP, newIP) + if err != nil { + klog.Errorf("failed to generate patch payload for ip %s, %v", cachedIP.Name, err) + return err + } + if _, err := c.config.KubeOvnClient.KubeovnV1().IPs().Patch(context.Background(), cachedIP.Name, + types.MergePatchType, patch, metav1.PatchOptions{}, ""); err != nil { + if k8serrors.IsNotFound(err) { + return nil + } + klog.Errorf("failed to add finalizer for ip %s, %v", cachedIP.Name, err) + return err + } + return nil +} + +func (c *Controller) handleDelIPFinalizer(cachedIP *kubeovnv1.IP, finalizer string) error { + if len(cachedIP.Finalizers) == 0 { + return nil + } + newIP := cachedIP.DeepCopy() + controllerutil.RemoveFinalizer(newIP, finalizer) + patch, err := util.GenerateMergePatchPayload(cachedIP, newIP) + if err != nil { + klog.Errorf("failed to generate patch payload for ip %s, %v", cachedIP.Name, err) + return err + } + if _, err := c.config.KubeOvnClient.KubeovnV1().IPs().Patch(context.Background(), cachedIP.Name, + types.MergePatchType, patch, metav1.PatchOptions{}, ""); err != nil { + if k8serrors.IsNotFound(err) { + return nil + } + klog.Errorf("failed to remove finalizer from ip %s, %v", cachedIP.Name, err) + return err + } + return nil } diff --git a/pkg/controller/node.go b/pkg/controller/node.go index 4a7055a8238..ae8676e737d 100644 --- a/pkg/controller/node.go +++ b/pkg/controller/node.go @@ -692,18 +692,6 @@ func (c *Controller) createOrUpdateCrdIPs(podName, ip, mac, subnetName, ns, node return nil } -func (c *Controller) deleteCrdIPs(podName, ns, providerName string) error { - portName := ovs.PodNameToPortName(podName, ns, providerName) - klog.Infof("delete cr ip '%s' for pod %s/%s", portName, ns, podName) - if err := c.config.KubeOvnClient.KubeovnV1().IPs().Delete(context.Background(), portName, metav1.DeleteOptions{}); err != nil { - if !k8serrors.IsNotFound(err) { - klog.Errorf("failed to delete ip %s, %v", portName, err) - return err - } - } - return nil -} - func (c *Controller) CheckGatewayReady() { if err := c.checkGatewayReady(); err != nil { klog.Errorf("failed to check gateway ready %v", err) diff --git a/pkg/controller/pod.go b/pkg/controller/pod.go index fc00a5bc7f6..c0b3d00a881 100644 --- a/pkg/controller/pod.go +++ b/pkg/controller/pod.go @@ -547,34 +547,33 @@ func (c *Controller) getPodKubeovnNets(pod *v1.Pod) ([]*kubeovnNet, error) { return podNets, nil } -func (c *Controller) changeVMSubnet(vmName, namespace, providerName, subnetName string, pod *v1.Pod) error { +func (c *Controller) changeVMSubnet(vmName, namespace, providerName, subnetName string) error { ipName := ovs.PodNameToPortName(vmName, namespace, providerName) ipCr, err := c.config.KubeOvnClient.KubeovnV1().IPs().Get(context.Background(), ipName, metav1.GetOptions{}) if err != nil { - if !k8serrors.IsNotFound(err) { - errMsg := fmt.Errorf("failed to get ip CR %s: %v", ipName, err) - klog.Error(errMsg) - return errMsg - } - // the returned pointer is not nil if the CR does not exist - ipCr = nil - } - if ipCr != nil { - if ipCr.Spec.Subnet != subnetName { - key := fmt.Sprintf("%s/%s", pod.Namespace, vmName) - if err := c.config.KubeOvnClient.KubeovnV1().IPs().Delete(context.Background(), ipName, metav1.DeleteOptions{}); err != nil { - if !k8serrors.IsNotFound(err) { - klog.Errorf("failed to delete ip %s, %v", ipName, err) - return err - } - } - klog.Infof("gc logical switch port %s", key) - if err := c.ovnLegacyClient.DeleteLogicalSwitchPort(key); err != nil { - klog.Errorf("failed to delete lsp %s, %v", key, err) + if k8serrors.IsNotFound(err) { + return nil + } + err := fmt.Errorf("failed to get ip CR %s: %v", ipName, err) + klog.Error(err) + return err + } + if ipCr.Spec.Subnet != subnetName { + key := fmt.Sprintf("%s/%s", namespace, vmName) + klog.Infof("release ipam for vm %s from old subnet %s", key, ipCr.Spec.Subnet) + c.ipam.ReleaseAddressByPod(key, ipCr.Spec.Subnet) + klog.Infof("gc logical switch port %s", key) + if err := c.ovnLegacyClient.DeleteLogicalSwitchPort(key); err != nil { + klog.Errorf("failed to delete lsp %s, %v", key, err) + return err + } + if err := c.config.KubeOvnClient.KubeovnV1().IPs().Delete(context.Background(), ipName, metav1.DeleteOptions{}); err != nil { + if !k8serrors.IsNotFound(err) { + klog.Errorf("failed to delete ip %s, %v", ipName, err) return err } - c.ipam.ReleaseAddressByPod(key, subnetName) } + // handleAddOrUpdatePod will create new lsp and new ip cr later } return nil } @@ -637,7 +636,7 @@ func (c *Controller) handleAddPod(key string) error { } if isVmPod && c.config.EnableKeepVmIP { pod.Annotations[fmt.Sprintf(util.VmTemplate, podNet.ProviderName)] = vmName - if err := c.changeVMSubnet(vmName, namespace, podNet.ProviderName, subnet.Name, pod); err != nil { + if err := c.changeVMSubnet(vmName, namespace, podNet.ProviderName, subnet.Name); err != nil { klog.Errorf("change subnet of pod %s/%s to %s failed: %v", namespace, name, subnet.Name, err) return err } @@ -857,6 +856,18 @@ func (c *Controller) handleDeletePod(pod *v1.Pod) error { return nil } +func (c *Controller) deleteCrdIPs(podName, ns, providerName string) error { + portName := ovs.PodNameToPortName(podName, ns, providerName) + klog.Infof("delete cr ip '%s' for pod %s/%s", portName, ns, podName) + if err := c.config.KubeOvnClient.KubeovnV1().IPs().Delete(context.Background(), portName, metav1.DeleteOptions{}); err != nil { + if !k8serrors.IsNotFound(err) { + klog.Errorf("failed to delete ip %s, %v", portName, err) + return err + } + } + return nil +} + func (c *Controller) handleUpdatePodSecurity(key string) error { namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil {