From 4a54c5229af63265de747e88cb69a5b09f47e8dd Mon Sep 17 00:00:00 2001 From: zhangzujian Date: Tue, 9 May 2023 14:50:34 +0800 Subject: [PATCH] optimize kube-ovn-controller logic 1. get resources using lister interfaces; 2. replace keymutex with the k8s implementation --- go.mod | 1 - go.sum | 2 - pkg/controller/controller.go | 27 ++++++----- pkg/controller/gc.go | 20 ++++---- pkg/controller/init.go | 14 +++--- pkg/controller/network_policy.go | 11 ++--- pkg/controller/node.go | 2 +- pkg/controller/ovn_eip.go | 14 +++--- pkg/controller/pod.go | 23 ++++----- pkg/controller/qos_policy.go | 16 ++++--- pkg/controller/security_group.go | 15 +++--- pkg/controller/subnet.go | 65 ++++++++++++-------------- pkg/controller/switch_lb_rule.go | 2 +- pkg/controller/vip.go | 9 ++-- pkg/controller/vpc.go | 4 +- pkg/controller/vpc_dns.go | 3 +- pkg/controller/vpc_nat_gateway.go | 77 ++++++++++++++++++------------- pkg/controller/vpc_nat_gw_eip.go | 28 ++++++----- pkg/controller/vpc_nat_gw_nat.go | 46 ++++++++++-------- pkg/daemon/controller.go | 2 +- pkg/util/k8s.go | 14 ++++++ 21 files changed, 213 insertions(+), 182 deletions(-) diff --git a/go.mod b/go.mod index 8130e72105a..609628203b7 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,6 @@ require ( github.com/kubeovn/gonetworkmanager/v2 v2.0.0-20230327064018-0b27f88874f7 github.com/mdlayher/arp v0.0.0-20220512170110-6706a2966875 github.com/moby/sys/mountinfo v0.6.2 - github.com/neverlee/keymutex v0.0.0-20171121013845-f593aa834bf9 github.com/oilbeater/go-ping v0.0.0-20200413021620-332b7197c5b5 github.com/onsi/ginkgo/v2 v2.9.2 github.com/onsi/gomega v1.27.6 diff --git a/go.sum b/go.sum index 2ae5347c04a..9c59bb9e261 100644 --- a/go.sum +++ b/go.sum @@ -1052,8 +1052,6 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J1GEMiLbxo1LJaP8RfCpH6pymGZus= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= github.com/ncw/swift v1.0.47/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM= -github.com/neverlee/keymutex v0.0.0-20171121013845-f593aa834bf9 h1:UfW5pM66x0MWE72ySrpd2Ymrn+b62kNHirozKkY3ojE= -github.com/neverlee/keymutex v0.0.0-20171121013845-f593aa834bf9/go.mod h1:3hf2IoUXDKjCg/EuqSLUB5TY8StGS3haWYJiqzP907c= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 7684ca5b67f..e82bc017679 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -3,10 +3,10 @@ package controller import ( "context" "fmt" + "runtime" "sync" "time" - "github.com/neverlee/keymutex" "golang.org/x/time/rate" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -22,6 +22,7 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "k8s.io/utils/keymutex" kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1" kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions" @@ -63,7 +64,7 @@ type Controller struct { addOrUpdatePodQueue workqueue.RateLimitingInterface deletePodQueue workqueue.RateLimitingInterface updatePodSecurityQueue workqueue.RateLimitingInterface - podKeyMutex *keymutex.KeyMutex + podKeyMutex keymutex.KeyMutex vpcsLister kubeovnlister.VpcLister vpcSynced cache.InformerSynced @@ -81,7 +82,7 @@ type Controller struct { updateVpcDnatQueue workqueue.RateLimitingInterface updateVpcSnatQueue workqueue.RateLimitingInterface updateVpcSubnetQueue workqueue.RateLimitingInterface - vpcNatGwKeyMutex *keymutex.KeyMutex + vpcNatGwKeyMutex keymutex.KeyMutex switchLBRuleLister kubeovnlister.SwitchLBRuleLister switchLBRuleSynced cache.InformerSynced @@ -101,7 +102,7 @@ type Controller struct { deleteRouteQueue workqueue.RateLimitingInterface updateSubnetStatusQueue workqueue.RateLimitingInterface syncVirtualPortsQueue workqueue.RateLimitingInterface - subnetStatusKeyMutex *keymutex.KeyMutex + subnetStatusKeyMutex keymutex.KeyMutex ipsLister kubeovnlister.IPLister ipSynced cache.InformerSynced @@ -208,14 +209,14 @@ type Controller struct { npsSynced cache.InformerSynced updateNpQueue workqueue.RateLimitingInterface deleteNpQueue workqueue.RateLimitingInterface - npKeyMutex *keymutex.KeyMutex + npKeyMutex keymutex.KeyMutex sgsLister kubeovnlister.SecurityGroupLister sgSynced cache.InformerSynced addOrUpdateSgQueue workqueue.RateLimitingInterface delSgQueue workqueue.RateLimitingInterface syncSgPortsQueue workqueue.RateLimitingInterface - sgKeyMutex *keymutex.KeyMutex + sgKeyMutex keymutex.KeyMutex qosPoliciesLister kubeovnlister.QoSPolicyLister qosPolicySynced cache.InformerSynced @@ -287,6 +288,10 @@ func Run(ctx context.Context, config *Configuration) { ovnSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnSnatRules() ovnDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnDnatRules() + numKeyLocks := runtime.NumCPU() * 2 + if numKeyLocks < config.WorkerNum*2 { + numKeyLocks = config.WorkerNum * 2 + } controller := &Controller{ config: config, vpcs: &sync.Map{}, @@ -311,7 +316,7 @@ func Run(ctx context.Context, config *Configuration) { updateVpcDnatQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "UpdateVpcDnat"), updateVpcSnatQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "UpdateVpcSnat"), updateVpcSubnetQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "UpdateVpcSubnet"), - vpcNatGwKeyMutex: keymutex.New(97), + vpcNatGwKeyMutex: keymutex.NewHashed(numKeyLocks), subnetsLister: subnetInformer.Lister(), subnetSynced: subnetInformer.Informer().HasSynced, @@ -320,7 +325,7 @@ func Run(ctx context.Context, config *Configuration) { deleteRouteQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteRoute"), updateSubnetStatusQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateSubnetStatus"), syncVirtualPortsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "SyncVirtualPort"), - subnetStatusKeyMutex: keymutex.New(97), + subnetStatusKeyMutex: keymutex.NewHashed(numKeyLocks), ipsLister: ipInformer.Lister(), ipSynced: ipInformer.Informer().HasSynced, @@ -382,7 +387,7 @@ func Run(ctx context.Context, config *Configuration) { addOrUpdatePodQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddOrUpdatePod"), deletePodQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeletePod"), updatePodSecurityQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdatePodSecurity"), - podKeyMutex: keymutex.New(97), + podKeyMutex: keymutex.NewHashed(numKeyLocks), namespacesLister: namespaceInformer.Lister(), namespacesSynced: namespaceInformer.Informer().HasSynced, @@ -413,7 +418,7 @@ func Run(ctx context.Context, config *Configuration) { configMapsLister: configMapInformer.Lister(), configMapsSynced: configMapInformer.Informer().HasSynced, - sgKeyMutex: keymutex.New(97), + sgKeyMutex: keymutex.NewHashed(numKeyLocks), sgsLister: sgInformer.Lister(), sgSynced: sgInformer.Informer().HasSynced, addOrUpdateSgQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateSg"), @@ -474,7 +479,7 @@ func Run(ctx context.Context, config *Configuration) { controller.npsSynced = npInformer.Informer().HasSynced controller.updateNpQueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateNp") controller.deleteNpQueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteNp") - controller.npKeyMutex = keymutex.New(97) + controller.npKeyMutex = keymutex.NewHashed(128) } defer controller.shutdown() diff --git a/pkg/controller/gc.go b/pkg/controller/gc.go index 7928b8a82f9..43b2f122de6 100644 --- a/pkg/controller/gc.go +++ b/pkg/controller/gc.go @@ -10,7 +10,6 @@ import ( corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" @@ -247,14 +246,17 @@ func (c *Controller) gcNode() error { func (c *Controller) gcVip() error { klog.Infof("start to gc vips") - vips, err := c.config.KubeOvnClient.KubeovnV1().Vips().List(context.Background(), metav1.ListOptions{ - LabelSelector: fields.OneTermNotEqualSelector(util.IpReservedLabel, "").String()}, - ) + selector, err := util.LabelSelectorNotEmpty(util.IpReservedLabel) + if err != nil { + klog.Errorf("failed to generate selector for label %s: %v", util.IpReservedLabel, err) + return err + } + vips, err := c.virtualIpsLister.List(selector) if err != nil { klog.Errorf("failed to list VIPs: %v", err) return err } - for _, vip := range vips.Items { + for _, vip := range vips { portName := vip.Labels[util.IpReservedLabel] portNameSplits := strings.Split(portName, ".") if len(portNameSplits) >= 2 { @@ -681,7 +683,7 @@ func (c *Controller) gcChassis() error { func (c *Controller) isOVNProvided(providerName string, pod *corev1.Pod) (bool, error) { ls := pod.Annotations[fmt.Sprintf(util.LogicalSwitchAnnotationTemplate, providerName)] - subnet, err := c.config.KubeOvnClient.KubeovnV1().Subnets().Get(context.Background(), ls, metav1.GetOptions{}) + subnet, err := c.subnetsLister.Get(ls) if err != nil { klog.Errorf("parse annotation logical switch %s error %v", ls, err) return false, err @@ -815,15 +817,13 @@ func (c *Controller) gcVpcDns() error { } } - slrs, err := c.config.KubeOvnClient.KubeovnV1().SwitchLBRules().List(context.Background(), metav1.ListOptions{ - LabelSelector: sel.String(), - }) + slrs, err := c.switchLBRuleLister.List(sel) if err != nil { klog.Errorf("failed to list vpc-dns SwitchLBRules, %s", err) return err } - for _, slr := range slrs.Items { + for _, slr := range slrs { canFind := false for _, vd := range vds { name := genVpcDnsDpName(vd.Name) diff --git a/pkg/controller/init.go b/pkg/controller/init.go index 726d9b4f6e1..4ac6568fdd2 100644 --- a/pkg/controller/init.go +++ b/pkg/controller/init.go @@ -97,7 +97,7 @@ func (c *Controller) InitDefaultVpc() error { // InitDefaultLogicalSwitch init the default logical switch for ovn network func (c *Controller) initDefaultLogicalSwitch() error { - subnet, err := c.config.KubeOvnClient.KubeovnV1().Subnets().Get(context.Background(), c.config.DefaultLogicalSwitch, metav1.GetOptions{}) + subnet, err := c.subnetsLister.Get(c.config.DefaultLogicalSwitch) if err == nil { if subnet != nil && util.CheckProtocol(c.config.DefaultCIDR) != util.CheckProtocol(subnet.Spec.CIDRBlock) { // single-stack upgrade to dual-stack @@ -152,7 +152,7 @@ func (c *Controller) initDefaultLogicalSwitch() error { // InitNodeSwitch init node switch to connect host and pod func (c *Controller) initNodeSwitch() error { - subnet, err := c.config.KubeOvnClient.KubeovnV1().Subnets().Get(context.Background(), c.config.NodeSwitch, metav1.GetOptions{}) + subnet, err := c.subnetsLister.Get(c.config.NodeSwitch) if err == nil { if util.CheckProtocol(c.config.NodeSwitchCIDR) == kubeovnv1.ProtocolDual && util.CheckProtocol(subnet.Spec.CIDRBlock) != kubeovnv1.ProtocolDual { // single-stack upgrade to dual-stack @@ -226,13 +226,13 @@ func (c *Controller) initLB(name, protocol string, sessionAffinity bool) error { // InitLoadBalancer init the default tcp and udp cluster loadbalancer func (c *Controller) initLoadBalancer() error { - vpcs, err := c.config.KubeOvnClient.KubeovnV1().Vpcs().List(context.Background(), metav1.ListOptions{}) + vpcs, err := c.vpcsLister.List(labels.Everything()) if err != nil { klog.Errorf("failed to list vpc: %v", err) return err } - for _, cachedVpc := range vpcs.Items { + for _, cachedVpc := range vpcs { vpc := cachedVpc.DeepCopy() vpcLb := c.GenVpcLoadBalancer(vpc.Name) if err = c.initLB(vpcLb.TcpLoadBalancer, string(v1.ProtocolTCP), false); err != nil { @@ -612,7 +612,7 @@ func (c *Controller) initDefaultVlan() error { func (c *Controller) initSyncCrdIPs() error { klog.Info("start to sync ips") - ips, err := c.config.KubeOvnClient.KubeovnV1().IPs().List(context.Background(), metav1.ListOptions{}) + ips, err := c.ipsLister.List(labels.Everything()) if err != nil { if k8serrors.IsNotFound(err) { return nil @@ -622,7 +622,7 @@ func (c *Controller) initSyncCrdIPs() error { ipMap := strset.New(c.getVmLsps()...) - for _, ipCr := range ips.Items { + for _, ipCr := range ips { ip := ipCr.DeepCopy() changed := false if ipMap.Has(ip.Name) && ip.Spec.PodType == "" { @@ -669,7 +669,7 @@ func (c *Controller) initSyncCrdSubnets() error { // only sync subnet spec enableEcmp when subnet.Spec.EnableEcmp is false and c.config.EnableEcmp is true if subnet.Spec.GatewayType == kubeovnv1.GWCentralizedType && !subnet.Spec.EnableEcmp && subnet.Spec.EnableEcmp != c.config.EnableEcmp { - subnet, err = c.config.KubeOvnClient.KubeovnV1().Subnets().Get(context.Background(), subnet.Name, metav1.GetOptions{}) + subnet, err = c.subnetsLister.Get(subnet.Name) if err != nil { klog.Errorf("failed to get subnet %s: %v", subnet.Name, err) return err diff --git a/pkg/controller/network_policy.go b/pkg/controller/network_policy.go index 5e506d5ef17..496fc03c525 100644 --- a/pkg/controller/network_policy.go +++ b/pkg/controller/network_policy.go @@ -7,6 +7,7 @@ import ( "strings" "unicode" + "github.com/ovn-org/libovsdb/ovsdb" corev1 "k8s.io/api/core/v1" netv1 "k8s.io/api/networking/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -16,8 +17,6 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" - "github.com/ovn-org/libovsdb/ovsdb" - kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1" "github.com/kubeovn/kube-ovn/pkg/ovs" "github.com/kubeovn/kube-ovn/pkg/util" @@ -141,8 +140,8 @@ func (c *Controller) handleUpdateNp(key string) error { return nil } - c.npKeyMutex.Lock(key) - defer c.npKeyMutex.Unlock(key) + c.npKeyMutex.LockKey(key) + defer func() { _ = c.npKeyMutex.UnlockKey(key) }() klog.Infof("handle add/update network policy %s", key) np, err := c.npsLister.NetworkPolicies(namespace).Get(name) @@ -564,8 +563,8 @@ func (c *Controller) handleDeleteNp(key string) error { return nil } - c.npKeyMutex.Lock(key) - defer c.npKeyMutex.Unlock(key) + c.npKeyMutex.LockKey(key) + defer func() { _ = c.npKeyMutex.UnlockKey(key) }() klog.Infof("handle delete network policy %s", key) npName := name diff --git a/pkg/controller/node.go b/pkg/controller/node.go index 178a83e0a09..a65068b306d 100644 --- a/pkg/controller/node.go +++ b/pkg/controller/node.go @@ -638,7 +638,7 @@ func (c *Controller) createOrUpdateCrdIPs(podName, ip, mac, subnetName, ns, node if existingCR != nil { ipCr = *existingCR } else { - ipCr, err = c.config.KubeOvnClient.KubeovnV1().IPs().Get(context.Background(), ipName, metav1.GetOptions{}) + ipCr, err = c.ipsLister.Get(ipName) if err != nil { if !k8serrors.IsNotFound(err) { errMsg := fmt.Errorf("failed to get ip CR %s: %v", ipName, err) diff --git a/pkg/controller/ovn_eip.go b/pkg/controller/ovn_eip.go index 73c4fcf5902..e3c664e9c49 100644 --- a/pkg/controller/ovn_eip.go +++ b/pkg/controller/ovn_eip.go @@ -8,15 +8,17 @@ import ( "strconv" "time" - kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1" - "github.com/kubeovn/kube-ovn/pkg/util" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1" + "github.com/kubeovn/kube-ovn/pkg/util" ) func (c *Controller) enqueueAddOvnEip(obj interface{}) { @@ -711,24 +713,24 @@ func (c *Controller) isOvnEipNotUse(cachedEip *kubeovnv1.OvnEip) (bool, error) { switch cachedEip.Status.Type { case util.DnatUsingEip: // nat change eip not that fast - dnats, err := c.config.KubeOvnClient.KubeovnV1().OvnDnatRules().List(context.Background(), metav1.ListOptions{}) + dnats, err := c.ovnDnatRulesLister.List(labels.Everything()) if err != nil { klog.Errorf("failed to get ovn dnat list, %v", err) return false, err } - for _, item := range dnats.Items { + for _, item := range dnats { if item.Annotations[util.VpcEipAnnotation] == cachedEip.Name { return false, nil } } case util.SnatUsingEip: // nat change eip not that fast - snats, err := c.config.KubeOvnClient.KubeovnV1().OvnSnatRules().List(context.Background(), metav1.ListOptions{}) + snats, err := c.ovnSnatRulesLister.List(labels.Everything()) if err != nil { klog.Errorf("failed to get ovn snat, %v", err) return false, err } - for _, item := range snats.Items { + for _, item := range snats { if item.Annotations[util.VpcEipAnnotation] == cachedEip.Name { return false, nil } diff --git a/pkg/controller/pod.go b/pkg/controller/pod.go index 4fac75a8495..b0172953d4c 100644 --- a/pkg/controller/pod.go +++ b/pkg/controller/pod.go @@ -365,7 +365,6 @@ func (c *Controller) processNextAddOrUpdatePodWorkItem() bool { utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) return nil } - klog.Infof("handle sync pod %s", key) if err := c.handleAddOrUpdatePod(key); err != nil { c.addOrUpdatePodQueue.AddRateLimited(key) return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) @@ -400,7 +399,6 @@ func (c *Controller) processNextDeletePodWorkItem() bool { utilruntime.HandleError(fmt.Errorf("expected pod in workqueue but got %#v", obj)) return nil } - klog.Infof("handle delete pod %s/%s", pod.Namespace, pod.Name) if err := c.handleDeletePod(pod); err != nil { c.deletePodQueue.AddRateLimited(obj) return fmt.Errorf("error syncing '%s': %s, requeuing", pod.Name, err.Error()) @@ -475,7 +473,7 @@ func (c *Controller) getPodKubeovnNets(pod *v1.Pod) ([]*kubeovnNet, error) { func (c *Controller) changeVMSubnet(vmName, namespace, providerName, subnetName string, pod *v1.Pod) error { ipName := ovs.PodNameToPortName(vmName, namespace, providerName) - ipCr, err := c.config.KubeOvnClient.KubeovnV1().IPs().Get(context.Background(), ipName, metav1.GetOptions{}) + ipCr, err := c.ipsLister.Get(ipName) if err != nil { if !k8serrors.IsNotFound(err) { errMsg := fmt.Errorf("failed to get ip CR %s: %v", ipName, err) @@ -513,8 +511,9 @@ func (c *Controller) handleAddOrUpdatePod(key string) (err error) { return nil } - c.podKeyMutex.Lock(key) - defer c.podKeyMutex.Unlock(key) + c.podKeyMutex.LockKey(key) + defer func() { _ = c.podKeyMutex.UnlockKey(key) }() + klog.Infof("handle add/update pod %s", key) cachedPod, err := c.podsLister.Pods(namespace).Get(name) if err != nil { @@ -832,13 +831,11 @@ func (c *Controller) reconcileRouteSubnets(cachedPod, pod *v1.Pod, needRoutePodN } func (c *Controller) handleDeletePod(pod *v1.Pod) error { - var key string - var err error - podName := c.getNameByPod(pod) - key = fmt.Sprintf("%s/%s", pod.Namespace, podName) - c.podKeyMutex.Lock(key) - defer c.podKeyMutex.Unlock(key) + key := fmt.Sprintf("%s/%s", pod.Namespace, podName) + c.podKeyMutex.LockKey(key) + defer func() { _ = c.podKeyMutex.UnlockKey(key) }() + klog.Infof("handle delete pod %s", key) p, _ := c.podsLister.Pods(pod.Namespace).Get(pod.Name) if p != nil && p.UID != pod.UID { @@ -951,8 +948,8 @@ func (c *Controller) handleUpdatePodSecurity(key string) error { return nil } - c.podKeyMutex.Lock(key) - defer c.podKeyMutex.Unlock(key) + c.podKeyMutex.LockKey(key) + defer func() { _ = c.podKeyMutex.UnlockKey(key) }() pod, err := c.podsLister.Pods(namespace).Get(name) if err != nil { diff --git a/pkg/controller/qos_policy.go b/pkg/controller/qos_policy.go index 3e7fde2d99e..0963a219ad3 100644 --- a/pkg/controller/qos_policy.go +++ b/pkg/controller/qos_policy.go @@ -8,8 +8,6 @@ import ( "sort" "strings" - kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1" - "github.com/kubeovn/kube-ovn/pkg/util" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -18,6 +16,9 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1" + "github.com/kubeovn/kube-ovn/pkg/util" ) func (c *Controller) enqueueAddQoSPolicy(obj interface{}) { @@ -182,9 +183,9 @@ func (c *Controller) processNextDeleteQoSPolicyWorkItem() bool { } func (c *Controller) handleAddQoSPolicy(key string) error { - - c.vpcNatGwKeyMutex.Lock(key) - defer c.vpcNatGwKeyMutex.Unlock(key) + c.vpcNatGwKeyMutex.LockKey(key) + defer func() { _ = c.vpcNatGwKeyMutex.UnlockKey(key) }() + klog.Infof("handle add QoS policy %s", key) cachedQoS, err := c.qosPoliciesLister.Get(key) if err != nil { @@ -383,8 +384,9 @@ func (c *Controller) validateQosPolicy(qosPolicy *kubeovnv1.QoSPolicy) error { } func (c *Controller) handleUpdateQoSPolicy(key string) error { - c.vpcNatGwKeyMutex.Lock(key) - defer c.vpcNatGwKeyMutex.Unlock(key) + c.vpcNatGwKeyMutex.LockKey(key) + defer func() { _ = c.vpcNatGwKeyMutex.UnlockKey(key) }() + klog.Infof("handle update QoS policy %s", key) cachedQos, err := c.qosPoliciesLister.Get(key) if err != nil { diff --git a/pkg/controller/security_group.go b/pkg/controller/security_group.go index f6be78b9b5e..f2e75b5dbb5 100644 --- a/pkg/controller/security_group.go +++ b/pkg/controller/security_group.go @@ -227,8 +227,9 @@ func (c *Controller) updateDenyAllSgPorts() error { } func (c *Controller) handleAddOrUpdateSg(key string) error { - c.sgKeyMutex.Lock(key) - defer c.sgKeyMutex.Unlock(key) + c.sgKeyMutex.LockKey(key) + defer func() { _ = c.sgKeyMutex.UnlockKey(key) }() + klog.Infof("handle add/update security group %s", key) // set 'deny all' for port associated with security group if key == util.DenyAllSecurityGroup { @@ -394,8 +395,9 @@ func (c *Controller) patchSgStatus(sg *kubeovnv1.SecurityGroup) { } func (c *Controller) handleDeleteSg(key string) error { - c.sgKeyMutex.Lock(key) - defer c.sgKeyMutex.Unlock(key) + c.sgKeyMutex.LockKey(key) + defer func() { _ = c.sgKeyMutex.UnlockKey(key) }() + klog.Infof("handle delete security group %s", key) if err := c.ovnClient.DeleteSecurityGroup(key); err != nil { klog.Errorf("delete sg %s: %v", key, err) @@ -406,8 +408,9 @@ func (c *Controller) handleDeleteSg(key string) error { } func (c *Controller) syncSgLogicalPort(key string) error { - c.sgKeyMutex.Lock(key) - defer c.sgKeyMutex.Unlock(key) + c.sgKeyMutex.LockKey(key) + defer func() { _ = c.sgKeyMutex.UnlockKey(key) }() + klog.Infof("sync lsp for security group %s", key) sg, err := c.sgsLister.Get(key) if err != nil { diff --git a/pkg/controller/subnet.go b/pkg/controller/subnet.go index 097069fac35..6071c2dc930 100644 --- a/pkg/controller/subnet.go +++ b/pkg/controller/subnet.go @@ -9,21 +9,21 @@ import ( "strings" "time" - kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1" - "github.com/kubeovn/kube-ovn/pkg/ipam" - "github.com/kubeovn/kube-ovn/pkg/ovs" - "github.com/kubeovn/kube-ovn/pkg/ovsdb/ovnnb" - "github.com/kubeovn/kube-ovn/pkg/util" "github.com/ovn-org/libovsdb/ovsdb" v1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" + + kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1" + "github.com/kubeovn/kube-ovn/pkg/ipam" + "github.com/kubeovn/kube-ovn/pkg/ovs" + "github.com/kubeovn/kube-ovn/pkg/ovsdb/ovnnb" + "github.com/kubeovn/kube-ovn/pkg/util" ) func (c *Controller) enqueueAddSubnet(obj interface{}) { @@ -512,9 +512,8 @@ func (c Controller) patchSubnetStatus(subnet *kubeovnv1.Subnet, reason string, e } func (c *Controller) handleAddOrUpdateSubnet(key string) error { - var err error - c.subnetStatusKeyMutex.Lock(key) - defer c.subnetStatusKeyMutex.Unlock(key) + c.subnetStatusKeyMutex.LockKey(key) + defer func() { _ = c.subnetStatusKeyMutex.UnlockKey(key) }() cachedSubnet, err := c.subnetsLister.Get(key) if err != nil { @@ -530,7 +529,7 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error { return err } - subnet, err = c.config.KubeOvnClient.KubeovnV1().Subnets().Get(context.Background(), key, metav1.GetOptions{}) + subnet, err = c.subnetsLister.Get(key) if err != nil { if k8serrors.IsNotFound(err) { return nil @@ -786,8 +785,8 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error { } func (c *Controller) handleUpdateSubnetStatus(key string) error { - c.subnetStatusKeyMutex.Lock(key) - defer c.subnetStatusKeyMutex.Unlock(key) + c.subnetStatusKeyMutex.LockKey(key) + defer func() { _ = c.subnetStatusKeyMutex.UnlockKey(key) }() cachedSubnet, err := c.subnetsLister.Get(key) subnet := cachedSubnet.DeepCopy() @@ -1139,7 +1138,7 @@ func (c *Controller) reconcileVpcUseBfdStaticRoute(vpcName, subnetName string) e klog.Errorf("failed to get subnet %s, %v", subnetName, err) return err } - vpc, err := c.config.KubeOvnClient.KubeovnV1().Vpcs().Get(context.Background(), vpcName, metav1.GetOptions{}) + vpc, err := c.vpcsLister.Get(vpcName) if err != nil { if k8serrors.IsNotFound(err) { return nil @@ -1148,7 +1147,7 @@ func (c *Controller) reconcileVpcUseBfdStaticRoute(vpcName, subnetName string) e return err } lrpEipName := fmt.Sprintf("%s-%s", vpcName, c.config.ExternalGatewaySwitch) - lrpEip, err := c.config.KubeOvnClient.KubeovnV1().OvnEips().Get(context.Background(), lrpEipName, metav1.GetOptions{}) + lrpEip, err := c.ovnEipsLister.Get(lrpEipName) if err != nil { if !k8serrors.IsNotFound(err) { klog.Error(err) @@ -1227,7 +1226,7 @@ func (c *Controller) reconcileVpcAddNormalStaticRoute(vpcName string) error { } gatewayV4, gatewayV6 := util.SplitStringIP(defualtExternalSubnet.Spec.Gateway) needUpdate := false - vpc, err := c.config.KubeOvnClient.KubeovnV1().Vpcs().Get(context.Background(), vpcName, metav1.GetOptions{}) + vpc, err := c.vpcsLister.Get(vpcName) if err != nil { if k8serrors.IsNotFound(err) { return nil @@ -1309,7 +1308,7 @@ func (c *Controller) reconcileVpcDelNormalStaticRoute(vpcName string) error { } gatewayV4, gatewayV6 := util.SplitStringIP(defualtExternalSubnet.Spec.Gateway) needUpdate := false - vpc, err := c.config.KubeOvnClient.KubeovnV1().Vpcs().Get(context.Background(), vpcName, metav1.GetOptions{}) + vpc, err := c.vpcsLister.Get(vpcName) if err != nil { if k8serrors.IsNotFound(err) { return nil @@ -1867,9 +1866,7 @@ func calcDualSubnetStatusIP(subnet *kubeovnv1.Subnet, c *Controller) error { return err } // Get the number of pods, not ips. For one pod with two ip(v4 & v6) in dual-stack, num of Items is 1 - podUsedIPs, err := c.config.KubeOvnClient.KubeovnV1().IPs().List(context.Background(), metav1.ListOptions{ - LabelSelector: fields.OneTermEqualSelector(subnet.Name, "").String(), - }) + podUsedIPs, err := c.ipsLister.List(labels.SelectorFromSet(labels.Set{subnet.Name: ""})) if err != nil { return err } @@ -1885,17 +1882,16 @@ func calcDualSubnetStatusIP(subnet *kubeovnv1.Subnet, c *Controller) error { v4availableIPs := util.AddressCount(v4CIDR) - util.CountIpNums(v4toSubIPs) v6availableIPs := util.AddressCount(v6CIDR) - util.CountIpNums(v6toSubIPs) - usingIPs := float64(len(podUsedIPs.Items)) + usingIPs := float64(len(podUsedIPs)) - vipSelectors := fields.AndSelectors(fields.OneTermEqualSelector(util.SubnetNameLabel, subnet.Name), - fields.OneTermEqualSelector(util.IpReservedLabel, "")).String() - vips, err := c.config.KubeOvnClient.KubeovnV1().Vips().List(context.Background(), metav1.ListOptions{ - LabelSelector: vipSelectors, - }) + vips, err := c.virtualIpsLister.List(labels.SelectorFromSet(labels.Set{ + util.SubnetNameLabel: subnet.Name, + util.IpReservedLabel: "", + })) if err != nil { return err } - usingIPs += float64(len(vips.Items)) + usingIPs += float64(len(vips)) if !isOvnSubnet(subnet) { eips, err := c.iptablesEipsLister.List( @@ -1949,25 +1945,22 @@ func calcSubnetStatusIP(subnet *kubeovnv1.Subnet, c *Controller) error { if err != nil { return err } - podUsedIPs, err := c.config.KubeOvnClient.KubeovnV1().IPs().List(context.Background(), metav1.ListOptions{ - LabelSelector: fields.OneTermEqualSelector(subnet.Name, "").String(), - }) + podUsedIPs, err := c.ipsLister.List(labels.SelectorFromSet(labels.Set{subnet.Name: ""})) if err != nil { return err } // gateway always in excludeIPs toSubIPs := util.ExpandExcludeIPs(subnet.Spec.ExcludeIps, subnet.Spec.CIDRBlock) availableIPs := util.AddressCount(cidr) - util.CountIpNums(toSubIPs) - usingIPs := float64(len(podUsedIPs.Items)) - vipSelectors := fields.AndSelectors(fields.OneTermEqualSelector(util.SubnetNameLabel, subnet.Name), - fields.OneTermEqualSelector(util.IpReservedLabel, "")).String() - vips, err := c.config.KubeOvnClient.KubeovnV1().Vips().List(context.Background(), metav1.ListOptions{ - LabelSelector: vipSelectors, - }) + usingIPs := float64(len(podUsedIPs)) + vips, err := c.virtualIpsLister.List(labels.SelectorFromSet(labels.Set{ + util.SubnetNameLabel: subnet.Name, + util.IpReservedLabel: "", + })) if err != nil { return err } - usingIPs += float64(len(vips.Items)) + usingIPs += float64(len(vips)) if !isOvnSubnet(subnet) { eips, err := c.iptablesEipsLister.List( labels.SelectorFromSet(labels.Set{util.SubnetNameLabel: subnet.Name})) diff --git a/pkg/controller/switch_lb_rule.go b/pkg/controller/switch_lb_rule.go index 81652f0c872..7fe5a7892a2 100644 --- a/pkg/controller/switch_lb_rule.go +++ b/pkg/controller/switch_lb_rule.go @@ -137,7 +137,7 @@ func (c *Controller) handleAddOrUpdateSwitchLBRule(key string) error { needToCreate := false name := genSvcName(slr.Name) - oldSvc, err := c.config.KubeClient.CoreV1().Services(slr.Spec.Namespace).Get(context.Background(), name, metav1.GetOptions{}) + oldSvc, err := c.servicesLister.Services(slr.Spec.Namespace).Get(name) if err != nil { if k8serrors.IsNotFound(err) { needToCreate = true diff --git a/pkg/controller/vip.go b/pkg/controller/vip.go index ed9a958920c..eb9b87a7a1f 100644 --- a/pkg/controller/vip.go +++ b/pkg/controller/vip.go @@ -7,9 +7,6 @@ import ( "net" "strings" - kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1" - "github.com/kubeovn/kube-ovn/pkg/ovs" - "github.com/kubeovn/kube-ovn/pkg/util" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -17,6 +14,10 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1" + "github.com/kubeovn/kube-ovn/pkg/ovs" + "github.com/kubeovn/kube-ovn/pkg/util" ) func (c *Controller) enqueueAddVirtualIp(obj interface{}) { @@ -323,7 +324,7 @@ func (c *Controller) subnetCountIp(subnet *kubeovnv1.Subnet) error { } func (c *Controller) createOrUpdateCrdVip(key, ns, subnet, v4ip, v6ip, mac, pV4ip, pV6ip, pmac string) error { - vipCr, err := c.config.KubeOvnClient.KubeovnV1().Vips().Get(context.Background(), key, metav1.GetOptions{}) + vipCr, err := c.virtualIpsLister.Get(key) if err != nil { if k8serrors.IsNotFound(err) { if _, err := c.config.KubeOvnClient.KubeovnV1().Vips().Create(context.Background(), &kubeovnv1.Vip{ diff --git a/pkg/controller/vpc.go b/pkg/controller/vpc.go index 06c603d8c9d..ef1e212dd26 100644 --- a/pkg/controller/vpc.go +++ b/pkg/controller/vpc.go @@ -224,7 +224,7 @@ func (c *Controller) addLoadBalancer(vpc string) (*VpcLoadBalancer, error) { func (c *Controller) handleAddOrUpdateVpc(key string) error { // get latest vpc info - cachedVpc, err := c.config.KubeOvnClient.KubeovnV1().Vpcs().Get(context.Background(), key, metav1.GetOptions{}) + cachedVpc, err := c.vpcsLister.Get(key) if err != nil { if k8serrors.IsNotFound(err) { return nil @@ -856,7 +856,7 @@ func (c *Controller) handleAddVpcExternal(key string) error { } func (c *Controller) handleDeleteVpcStaticRoute(key string) error { - vpc, err := c.config.KubeOvnClient.KubeovnV1().Vpcs().Get(context.Background(), key, metav1.GetOptions{}) + vpc, err := c.vpcsLister.Get(key) if err != nil { if k8serrors.IsNotFound(err) { return nil diff --git a/pkg/controller/vpc_dns.go b/pkg/controller/vpc_dns.go index 88f4c9eec63..b556f7f8d14 100644 --- a/pkg/controller/vpc_dns.go +++ b/pkg/controller/vpc_dns.go @@ -302,8 +302,7 @@ func (c *Controller) createOrUpdateVpcDnsDep(vpcDns *kubeovnv1.VpcDns) error { func (c *Controller) createOrUpdateVpcDnsSlr(vpcDns *kubeovnv1.VpcDns) error { needToCreateSlr := false - oldSlr, err := c.config.KubeOvnClient.KubeovnV1().SwitchLBRules().Get(context.Background(), - genVpcDnsDpName(vpcDns.Name), metav1.GetOptions{}) + oldSlr, err := c.switchLBRuleLister.Get(genVpcDnsDpName(vpcDns.Name)) if err != nil { if k8serrors.IsNotFound(err) { needToCreateSlr = true diff --git a/pkg/controller/vpc_nat_gateway.go b/pkg/controller/vpc_nat_gateway.go index c3a29733b6b..450c24217aa 100644 --- a/pkg/controller/vpc_nat_gateway.go +++ b/pkg/controller/vpc_nat_gateway.go @@ -14,7 +14,6 @@ import ( corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -207,8 +206,8 @@ func (c *Controller) processNextWorkItem(processName string, queue workqueue.Rat } func (c *Controller) handleDelVpcNatGw(key string) error { - c.vpcNatGwKeyMutex.Lock(key) - defer c.vpcNatGwKeyMutex.Unlock(key) + c.vpcNatGwKeyMutex.LockKey(key) + defer func() { _ = c.vpcNatGwKeyMutex.UnlockKey(key) }() name := genNatGwStsName(key) klog.Infof("delete vpc nat gw %s", name) if err := c.config.KubeClient.AppsV1().StatefulSets(c.config.PodNamespace).Delete(context.Background(), @@ -244,8 +243,10 @@ func isVpcNatGwChanged(gw *kubeovnv1.VpcNatGateway) bool { func (c *Controller) handleAddOrUpdateVpcNatGw(key string) error { // create nat gw statefulset - c.vpcNatGwKeyMutex.Lock(key) - defer c.vpcNatGwKeyMutex.Unlock(key) + c.vpcNatGwKeyMutex.LockKey(key) + defer func() { _ = c.vpcNatGwKeyMutex.UnlockKey(key) }() + klog.Infof("handle add/update vpc nat gateway %s", key) + if vpcNatEnabled != "true" { return fmt.Errorf("iptables nat gw not enable") } @@ -344,8 +345,11 @@ func (c *Controller) handleInitVpcNatGw(key string) error { if vpcNatEnabled != "true" { return fmt.Errorf("iptables nat gw not enable") } - c.vpcNatGwKeyMutex.Lock(key) - defer c.vpcNatGwKeyMutex.Unlock(key) + + c.vpcNatGwKeyMutex.LockKey(key) + defer func() { _ = c.vpcNatGwKeyMutex.UnlockKey(key) }() + klog.Infof("handle init vpc nat gateway %s", key) + gw, err := c.vpcNatGatewayLister.Get(key) if err != nil { if k8serrors.IsNotFound(err) { @@ -427,8 +431,11 @@ func (c *Controller) handleUpdateVpcFloatingIp(natGwKey string) error { if vpcNatEnabled != "true" { return fmt.Errorf("iptables nat gw not enable") } - c.vpcNatGwKeyMutex.Lock(natGwKey) - defer c.vpcNatGwKeyMutex.Unlock(natGwKey) + + c.vpcNatGwKeyMutex.LockKey(natGwKey) + defer func() { _ = c.vpcNatGwKeyMutex.UnlockKey(natGwKey) }() + klog.Infof("handle update vpc fip %s", natGwKey) + // refresh exist fips if err := c.initCreateAt(natGwKey); err != nil { err = fmt.Errorf("failed to init nat gw pod '%s' create at, %v", natGwKey, err) @@ -436,17 +443,14 @@ func (c *Controller) handleUpdateVpcFloatingIp(natGwKey string) error { return err } - fips, err := c.config.KubeOvnClient.KubeovnV1().IptablesFIPRules().List(context.Background(), metav1.ListOptions{ - LabelSelector: fields.OneTermEqualSelector(util.VpcNatGatewayNameLabel, natGwKey).String(), - }) - + fips, err := c.iptablesFipsLister.List(labels.SelectorFromSet(labels.Set{util.VpcNatGatewayNameLabel: natGwKey})) if err != nil { err := fmt.Errorf("failed to get all fips, %v", err) klog.Error(err) return err } - for _, fip := range fips.Items { + for _, fip := range fips { if fip.Status.Redo != NAT_GW_CREATED_AT { klog.V(3).Infof("redo fip %s", fip.Name) if err = c.redoFip(fip.Name, NAT_GW_CREATED_AT, false); err != nil { @@ -462,8 +466,11 @@ func (c *Controller) handleUpdateVpcEip(natGwKey string) error { if vpcNatEnabled != "true" { return fmt.Errorf("iptables nat gw not enable") } - c.vpcNatGwKeyMutex.Lock(natGwKey) - defer c.vpcNatGwKeyMutex.Unlock(natGwKey) + + c.vpcNatGwKeyMutex.LockKey(natGwKey) + defer func() { _ = c.vpcNatGwKeyMutex.UnlockKey(natGwKey) }() + klog.Infof("handle update vpc eip %s", natGwKey) + // refresh exist fips if err := c.initCreateAt(natGwKey); err != nil { err = fmt.Errorf("failed to init nat gw pod '%s' create at, %v", natGwKey, err) @@ -492,23 +499,24 @@ func (c *Controller) handleUpdateVpcSnat(natGwKey string) error { if vpcNatEnabled != "true" { return fmt.Errorf("iptables nat gw not enable") } - c.vpcNatGwKeyMutex.Lock(natGwKey) - defer c.vpcNatGwKeyMutex.Unlock(natGwKey) + + c.vpcNatGwKeyMutex.LockKey(natGwKey) + defer func() { _ = c.vpcNatGwKeyMutex.UnlockKey(natGwKey) }() + klog.Infof("handle update vpc snat %s", natGwKey) + // refresh exist snats if err := c.initCreateAt(natGwKey); err != nil { err = fmt.Errorf("failed to init nat gw pod '%s' create at, %v", natGwKey, err) klog.Error(err) return err } - snats, err := c.config.KubeOvnClient.KubeovnV1().IptablesSnatRules().List(context.Background(), metav1.ListOptions{ - LabelSelector: fields.OneTermEqualSelector(util.VpcNatGatewayNameLabel, natGwKey).String(), - }) + snats, err := c.iptablesSnatRulesLister.List(labels.SelectorFromSet(labels.Set{util.VpcNatGatewayNameLabel: natGwKey})) if err != nil { err = fmt.Errorf("failed to get all snats, %v", err) klog.Error(err) return err } - for _, snat := range snats.Items { + for _, snat := range snats { if snat.Status.Redo != NAT_GW_CREATED_AT { klog.V(3).Infof("redo snat %s", snat.Name) if err = c.redoSnat(snat.Name, NAT_GW_CREATED_AT, false); err != nil { @@ -525,8 +533,11 @@ func (c *Controller) handleUpdateVpcDnat(natGwKey string) error { if vpcNatEnabled != "true" { return fmt.Errorf("iptables nat gw not enable") } - c.vpcNatGwKeyMutex.Lock(natGwKey) - defer c.vpcNatGwKeyMutex.Unlock(natGwKey) + + c.vpcNatGwKeyMutex.LockKey(natGwKey) + defer func() { _ = c.vpcNatGwKeyMutex.UnlockKey(natGwKey) }() + klog.Infof("handle update vpc dnat %s", natGwKey) + // refresh exist dnats if err := c.initCreateAt(natGwKey); err != nil { err = fmt.Errorf("failed to init nat gw pod '%s' create at, %v", natGwKey, err) @@ -534,15 +545,13 @@ func (c *Controller) handleUpdateVpcDnat(natGwKey string) error { return err } - dnats, err := c.config.KubeOvnClient.KubeovnV1().IptablesDnatRules().List(context.Background(), metav1.ListOptions{ - LabelSelector: fields.OneTermEqualSelector(util.VpcNatGatewayNameLabel, natGwKey).String(), - }) + dnats, err := c.iptablesDnatRulesLister.List(labels.SelectorFromSet(labels.Set{util.VpcNatGatewayNameLabel: natGwKey})) if err != nil { err = fmt.Errorf("failed to get all dnats, %v", err) klog.Error(err) return err } - for _, dnat := range dnats.Items { + for _, dnat := range dnats { if dnat.Status.Redo != NAT_GW_CREATED_AT { klog.V(3).Infof("redo dnat %s", dnat.Name) if err = c.redoDnat(dnat.Name, NAT_GW_CREATED_AT, false); err != nil { @@ -592,8 +601,11 @@ func (c *Controller) handleUpdateNatGwSubnetRoute(natGwKey string) error { if vpcNatEnabled != "true" { return fmt.Errorf("iptables nat gw not enable") } - c.vpcNatGwKeyMutex.Lock(natGwKey) - defer c.vpcNatGwKeyMutex.Unlock(natGwKey) + + c.vpcNatGwKeyMutex.LockKey(natGwKey) + defer func() { _ = c.vpcNatGwKeyMutex.UnlockKey(natGwKey) }() + klog.Infof("handle update subnet route for nat gateway %s", natGwKey) + gw, err := c.vpcNatGatewayLister.Get(natGwKey) if err != nil { return err @@ -852,7 +864,7 @@ func (c *Controller) initCreateAt(key string) (err error) { } func (c *Controller) updateCrdNatGwLabels(key string, qos string) error { - gw, err := c.config.KubeOvnClient.KubeovnV1().VpcNatGateways().Get(context.Background(), key, metav1.GetOptions{}) + gw, err := c.vpcNatGatewayLister.Get(key) if err != nil { errMsg := fmt.Errorf("failed to get vpc nat gw '%s', %v", key, err) klog.Error(errMsg) @@ -998,8 +1010,7 @@ func (c *Controller) patchNatGwStatus(key string) error { } func (c *Controller) execNatGwQoS(gw *kubeovnv1.VpcNatGateway, qos string, operation string) error { - var err error - qosPolicy, err := c.config.KubeOvnClient.KubeovnV1().QoSPolicies().Get(context.Background(), qos, metav1.GetOptions{}) + qosPolicy, err := c.qosPoliciesLister.Get(qos) if err != nil { klog.Errorf("get qos policy %s failed: %v", qos, err) return err diff --git a/pkg/controller/vpc_nat_gw_eip.go b/pkg/controller/vpc_nat_gw_eip.go index 873f8580272..755d154314c 100644 --- a/pkg/controller/vpc_nat_gw_eip.go +++ b/pkg/controller/vpc_nat_gw_eip.go @@ -8,7 +8,6 @@ import ( k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -205,8 +204,9 @@ func (c *Controller) handleAddIptablesEip(key string) error { return fmt.Errorf("iptables nat gw not enable") } - c.vpcNatGwKeyMutex.Lock(key) - defer c.vpcNatGwKeyMutex.Unlock(key) + c.vpcNatGwKeyMutex.LockKey(key) + defer func() { _ = c.vpcNatGwKeyMutex.UnlockKey(key) }() + klog.Infof("handle add iptables eip %s", key) cachedEip, err := c.iptablesEipsLister.Get(key) if err != nil { @@ -272,15 +272,13 @@ func (c *Controller) checkEipBindNat(key string, eip *kubeovnv1.IptablesEIP) (bo notUse = true case util.DnatUsingEip: // nat change eip not that fast - dnats, err := c.config.KubeOvnClient.KubeovnV1().IptablesDnatRules().List(context.Background(), metav1.ListOptions{ - LabelSelector: fields.OneTermEqualSelector(util.VpcNatGatewayNameLabel, key).String(), - }) + dnats, err := c.iptablesDnatRulesLister.List(labels.SelectorFromSet(labels.Set{util.VpcNatGatewayNameLabel: key})) if err != nil { klog.Errorf("failed to get dnats, %v", err) return notUse, natName, err } notUse = true - for _, item := range dnats.Items { + for _, item := range dnats { if item.Annotations[util.VpcEipAnnotation] == key { notUse = false natName = item.Name @@ -289,15 +287,13 @@ func (c *Controller) checkEipBindNat(key string, eip *kubeovnv1.IptablesEIP) (bo } case util.SnatUsingEip: // nat change eip not that fast - snats, err := c.config.KubeOvnClient.KubeovnV1().IptablesSnatRules().List(context.Background(), metav1.ListOptions{ - LabelSelector: fields.OneTermEqualSelector(util.VpcNatGatewayNameLabel, key).String(), - }) + snats, err := c.iptablesSnatRulesLister.List(labels.SelectorFromSet(labels.Set{util.VpcNatGatewayNameLabel: key})) if err != nil { klog.Errorf("failed to get snats, %v", err) return notUse, natName, err } notUse = true - for _, item := range snats.Items { + for _, item := range snats { if item.Annotations[util.VpcEipAnnotation] == key { notUse = false natName = item.Name @@ -340,8 +336,10 @@ func (c *Controller) handleResetIptablesEip(key string) error { } func (c *Controller) handleUpdateIptablesEip(key string) error { - c.vpcNatGwKeyMutex.Lock(key) - defer c.vpcNatGwKeyMutex.Unlock(key) + c.vpcNatGwKeyMutex.LockKey(key) + defer func() { _ = c.vpcNatGwKeyMutex.UnlockKey(key) }() + klog.Infof("handle update iptables eip %s", key) + cachedEip, err := c.iptablesEipsLister.Get(key) if err != nil { if k8serrors.IsNotFound(err) { @@ -525,7 +523,7 @@ func (c *Controller) addOrUpdateEIPBandtithLimitRules(eip *kubeovnv1.IptablesEIP // add tc rule for eip in nat gw pod func (c *Controller) addEipQoS(eip *kubeovnv1.IptablesEIP, v4ip string) error { var err error - qosPolicy, err := c.config.KubeOvnClient.KubeovnV1().QoSPolicies().Get(context.Background(), eip.Spec.QoSPolicy, metav1.GetOptions{}) + qosPolicy, err := c.qosPoliciesLister.Get(eip.Spec.QoSPolicy) if !qosPolicy.Status.Shared { eips, err := c.iptablesEipsLister.List( labels.SelectorFromSet(labels.Set{util.QoSLabel: qosPolicy.Name})) @@ -563,7 +561,7 @@ func (c *Controller) delEIPBandtithLimitRules(eip *kubeovnv1.IptablesEIP, v4ip s // del tc rule for eip in nat gw pod func (c *Controller) delEipQoS(eip *kubeovnv1.IptablesEIP, v4ip string) error { var err error - qosPolicy, err := c.config.KubeOvnClient.KubeovnV1().QoSPolicies().Get(context.Background(), eip.Status.QoSPolicy, metav1.GetOptions{}) + qosPolicy, err := c.qosPoliciesLister.Get(eip.Status.QoSPolicy) if err != nil { klog.Errorf("get qos policy %s failed: %v", eip.Status.QoSPolicy, err) return err diff --git a/pkg/controller/vpc_nat_gw_nat.go b/pkg/controller/vpc_nat_gw_nat.go index 47b5a751a68..9c39b54649f 100644 --- a/pkg/controller/vpc_nat_gw_nat.go +++ b/pkg/controller/vpc_nat_gw_nat.go @@ -7,6 +7,7 @@ import ( k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/cache" @@ -488,8 +489,10 @@ func (c *Controller) handleAddIptablesFip(key string) error { if vpcNatEnabled != "true" { return fmt.Errorf("iptables nat gw not enable") } - c.vpcNatGwKeyMutex.Lock(key) - defer c.vpcNatGwKeyMutex.Unlock(key) + + c.vpcNatGwKeyMutex.LockKey(key) + defer func() { _ = c.vpcNatGwKeyMutex.UnlockKey(key) }() + klog.Infof("handle add iptables fip %s", key) fip, err := c.iptablesFipsLister.Get(key) if err != nil { @@ -556,8 +559,9 @@ func (c *Controller) handleAddIptablesFip(key string) error { } func (c *Controller) handleUpdateIptablesFip(key string) error { - c.vpcNatGwKeyMutex.Lock(key) - defer c.vpcNatGwKeyMutex.Unlock(key) + c.vpcNatGwKeyMutex.LockKey(key) + defer func() { _ = c.vpcNatGwKeyMutex.UnlockKey(key) }() + klog.Infof("handle update iptables fip %s", key) cachedFip, err := c.iptablesFipsLister.Get(key) if err != nil { @@ -671,8 +675,10 @@ func (c *Controller) handleAddIptablesDnatRule(key string) error { if vpcNatEnabled != "true" { return fmt.Errorf("iptables nat gw not enable") } - c.vpcNatGwKeyMutex.Lock(key) - defer c.vpcNatGwKeyMutex.Unlock(key) + + c.vpcNatGwKeyMutex.LockKey(key) + defer func() { _ = c.vpcNatGwKeyMutex.UnlockKey(key) }() + klog.Infof("handle add iptables dnat rule %s", key) dnat, err := c.iptablesDnatRulesLister.Get(key) if err != nil { @@ -736,8 +742,9 @@ func (c *Controller) handleAddIptablesDnatRule(key string) error { } func (c *Controller) handleUpdateIptablesDnatRule(key string) error { - c.vpcNatGwKeyMutex.Lock(key) - defer c.vpcNatGwKeyMutex.Unlock(key) + c.vpcNatGwKeyMutex.LockKey(key) + defer func() { _ = c.vpcNatGwKeyMutex.UnlockKey(key) }() + klog.Infof("handle update iptables fip %s", key) cachedDnat, err := c.iptablesDnatRulesLister.Get(key) if err != nil { @@ -855,8 +862,10 @@ func (c *Controller) handleAddIptablesSnatRule(key string) error { if vpcNatEnabled != "true" { return fmt.Errorf("iptables nat gw not enable") } - c.vpcNatGwKeyMutex.Lock(key) - defer c.vpcNatGwKeyMutex.Unlock(key) + + c.vpcNatGwKeyMutex.LockKey(key) + defer func() { _ = c.vpcNatGwKeyMutex.UnlockKey(key) }() + klog.Infof("handle add iptables snat rule %s", key) snat, err := c.iptablesSnatRulesLister.Get(key) if err != nil { @@ -921,8 +930,9 @@ func (c *Controller) handleAddIptablesSnatRule(key string) error { } func (c *Controller) handleUpdateIptablesSnatRule(key string) error { - c.vpcNatGwKeyMutex.Lock(key) - defer c.vpcNatGwKeyMutex.Unlock(key) + c.vpcNatGwKeyMutex.LockKey(key) + defer func() { _ = c.vpcNatGwKeyMutex.UnlockKey(key) }() + klog.Infof("handle update iptables snat rule %s", key) cachedSnat, err := c.iptablesSnatRulesLister.Get(key) if err != nil { @@ -1727,17 +1737,17 @@ func (c *Controller) snatChangeEip(snat *kubeovnv1.IptablesSnatRule, eip *kubeov func (c *Controller) isDnatDuplicated(gwName, eipName, dnatName, externalPort string) (bool, error) { // check if eip:external port already used - dnatLabel := fmt.Sprintf("%s=%s,%s=%s", util.VpcNatGatewayNameLabel, gwName, util.VpcDnatEPortLabel, externalPort) - dnats, err := c.config.KubeOvnClient.KubeovnV1().IptablesDnatRules().List(context.Background(), metav1.ListOptions{ - LabelSelector: dnatLabel, - }) + dnats, err := c.iptablesDnatRulesLister.List(labels.SelectorFromSet(labels.Set{ + util.VpcNatGatewayNameLabel: gwName, + util.VpcDnatEPortLabel: externalPort, + })) if err != nil { if !k8serrors.IsNotFound(err) { return false, err } } - if len(dnats.Items) > 0 { - for _, d := range dnats.Items { + if len(dnats) != 0 { + for _, d := range dnats { if d.Name != dnatName && d.Annotations[util.VpcEipAnnotation] == eipName { err = fmt.Errorf("failed to create dnat %s, duplicate, same eip %s, same external port '%s' is using by dnat %s", dnatName, eipName, externalPort, d.Name) return true, err diff --git a/pkg/daemon/controller.go b/pkg/daemon/controller.go index d9314f89d4a..16876f09548 100644 --- a/pkg/daemon/controller.go +++ b/pkg/daemon/controller.go @@ -342,7 +342,7 @@ func (c *Controller) recordProviderNetworkErr(providerNetwork string, errMsg str return } } else { - if currentPod, err = c.config.KubeClient.CoreV1().Pods(c.localNamespace).Get(context.Background(), c.localPodName, metav1.GetOptions{}); err != nil { + if currentPod, err = c.podsLister.Pods(c.localNamespace).Get(c.localPodName); err != nil { klog.Errorf("failed to get pod %s, %v", c.localPodName, err) return } diff --git a/pkg/util/k8s.go b/pkg/util/k8s.go index 176980216ea..714adc3b01b 100644 --- a/pkg/util/k8s.go +++ b/pkg/util/k8s.go @@ -8,6 +8,8 @@ import ( "time" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/selection" "k8s.io/klog/v2" ) @@ -52,3 +54,15 @@ func ServiceClusterIPs(svc v1.Service) []string { } return ips } + +func LabelSelectorNotEquals(key, value string) (labels.Selector, error) { + requirement, err := labels.NewRequirement(key, selection.NotEquals, []string{value}) + if err != nil { + return nil, err + } + return labels.Everything().Add(*requirement), nil +} + +func LabelSelectorNotEmpty(key string) (labels.Selector, error) { + return LabelSelectorNotEquals(key, "") +}