From cdca43766023f9028c22ed4ac9ef94659038b68c Mon Sep 17 00:00:00 2001 From: zhangzujian Date: Wed, 10 May 2023 11:39:59 +0800 Subject: [PATCH] add key lock for more resources --- pkg/controller/controller.go | 21 ++++++++++++++++----- pkg/controller/endpoint.go | 5 ++++- pkg/controller/namespace.go | 4 ++++ pkg/controller/node.go | 11 +++++++++++ pkg/controller/service.go | 20 +++++++++++++++++++- pkg/controller/subnet.go | 11 +++++++---- pkg/controller/vlan.go | 13 ++++++++++++- pkg/controller/vpc.go | 12 ++++++++++++ 8 files changed, 85 insertions(+), 12 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index e2fd4f7b8b1..43173fac696 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -71,6 +71,7 @@ type Controller struct { addOrUpdateVpcQueue workqueue.RateLimitingInterface delVpcQueue workqueue.RateLimitingInterface updateVpcStatusQueue workqueue.RateLimitingInterface + vpcKeyMutex keymutex.KeyMutex vpcNatGatewayLister kubeovnlister.VpcNatGatewayLister vpcNatGatewaySynced cache.InformerSynced @@ -101,7 +102,7 @@ type Controller struct { deleteSubnetQueue workqueue.RateLimitingInterface updateSubnetStatusQueue workqueue.RateLimitingInterface syncVirtualPortsQueue workqueue.RateLimitingInterface - subnetStatusKeyMutex keymutex.KeyMutex + subnetKeyMutex keymutex.KeyMutex ipsLister kubeovnlister.IPLister ipSynced cache.InformerSynced @@ -174,35 +175,39 @@ type Controller struct { updateOvnDnatRuleQueue workqueue.RateLimitingInterface delOvnDnatRuleQueue workqueue.RateLimitingInterface - vlansLister kubeovnlister.VlanLister - vlanSynced cache.InformerSynced - providerNetworksLister kubeovnlister.ProviderNetworkLister providerNetworkSynced cache.InformerSynced + vlansLister kubeovnlister.VlanLister + vlanSynced cache.InformerSynced addVlanQueue workqueue.RateLimitingInterface delVlanQueue workqueue.RateLimitingInterface updateVlanQueue workqueue.RateLimitingInterface + vlanKeyMutex keymutex.KeyMutex namespacesLister v1.NamespaceLister namespacesSynced cache.InformerSynced addNamespaceQueue workqueue.RateLimitingInterface + nsKeyMutex keymutex.KeyMutex nodesLister v1.NodeLister nodesSynced cache.InformerSynced addNodeQueue workqueue.RateLimitingInterface updateNodeQueue workqueue.RateLimitingInterface deleteNodeQueue workqueue.RateLimitingInterface + nodeKeyMutex keymutex.KeyMutex servicesLister v1.ServiceLister serviceSynced cache.InformerSynced addServiceQueue workqueue.RateLimitingInterface deleteServiceQueue workqueue.RateLimitingInterface updateServiceQueue workqueue.RateLimitingInterface + svcKeyMutex keymutex.KeyMutex endpointsLister v1.EndpointsLister endpointsSynced cache.InformerSynced updateEndpointQueue workqueue.RateLimitingInterface + epKeyMutex keymutex.KeyMutex npsLister netv1.NetworkPolicyLister npsSynced cache.InformerSynced @@ -304,6 +309,7 @@ func Run(ctx context.Context, config *Configuration) { addOrUpdateVpcQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddOrUpdateVpc"), delVpcQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteVpc"), updateVpcStatusQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateVpcStatus"), + vpcKeyMutex: keymutex.NewHashed(numKeyLocks), vpcNatGatewayLister: vpcNatGatewayInformer.Lister(), vpcNatGatewaySynced: vpcNatGatewayInformer.Informer().HasSynced, @@ -323,7 +329,7 @@ func Run(ctx context.Context, config *Configuration) { deleteSubnetQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteSubnet"), updateSubnetStatusQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateSubnetStatus"), syncVirtualPortsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "SyncVirtualPort"), - subnetStatusKeyMutex: keymutex.NewHashed(numKeyLocks), + subnetKeyMutex: keymutex.NewHashed(numKeyLocks), ipsLister: ipInformer.Lister(), ipSynced: ipInformer.Informer().HasSynced, @@ -376,6 +382,7 @@ func Run(ctx context.Context, config *Configuration) { addVlanQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddVlan"), delVlanQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DelVlan"), updateVlanQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateVlan"), + vlanKeyMutex: keymutex.NewHashed(numKeyLocks), providerNetworksLister: providerNetworkInformer.Lister(), providerNetworkSynced: providerNetworkInformer.Informer().HasSynced, @@ -393,22 +400,26 @@ func Run(ctx context.Context, config *Configuration) { namespacesLister: namespaceInformer.Lister(), namespacesSynced: namespaceInformer.Informer().HasSynced, addNamespaceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddNamespace"), + nsKeyMutex: keymutex.NewHashed(numKeyLocks), nodesLister: nodeInformer.Lister(), nodesSynced: nodeInformer.Informer().HasSynced, addNodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddNode"), updateNodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateNode"), deleteNodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteNode"), + nodeKeyMutex: keymutex.NewHashed(numKeyLocks), servicesLister: serviceInformer.Lister(), serviceSynced: serviceInformer.Informer().HasSynced, addServiceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddService"), deleteServiceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteService"), updateServiceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateService"), + svcKeyMutex: keymutex.NewHashed(numKeyLocks), endpointsLister: endpointInformer.Lister(), endpointsSynced: endpointInformer.Informer().HasSynced, updateEndpointQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateEndpoint"), + epKeyMutex: keymutex.NewHashed(numKeyLocks), qosPoliciesLister: qosPolicyInformer.Lister(), qosPolicySynced: qosPolicyInformer.Informer().HasSynced, diff --git a/pkg/controller/endpoint.go b/pkg/controller/endpoint.go index 0c94d7a54c1..ed78d94255f 100644 --- a/pkg/controller/endpoint.go +++ b/pkg/controller/endpoint.go @@ -90,7 +90,10 @@ func (c *Controller) handleUpdateEndpoint(key string) error { utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key)) return nil } - klog.Infof("update endpoint %s/%s", namespace, name) + + c.epKeyMutex.LockKey(key) + defer func() { _ = c.epKeyMutex.UnlockKey(key) }() + klog.Infof("update add/update endpoint %s/%s", namespace, name) ep, err := c.endpointsLister.Endpoints(namespace).Get(name) if err != nil { diff --git a/pkg/controller/namespace.go b/pkg/controller/namespace.go index 9183c564101..829933fcc48 100644 --- a/pkg/controller/namespace.go +++ b/pkg/controller/namespace.go @@ -107,6 +107,10 @@ func (c *Controller) processNextAddNamespaceWorkItem() bool { } func (c *Controller) handleAddNamespace(key string) error { + c.nsKeyMutex.LockKey(key) + defer func() { _ = c.nsKeyMutex.UnlockKey(key) }() + klog.Infof("handle add/update namespace %s", key) + cachedNs, err := c.namespacesLister.Get(key) if err != nil { if errors.IsNotFound(err) { diff --git a/pkg/controller/node.go b/pkg/controller/node.go index a65068b306d..e2ac3597f65 100644 --- a/pkg/controller/node.go +++ b/pkg/controller/node.go @@ -192,6 +192,9 @@ func nodeUnderlayAddressSetName(node string, af int) string { } func (c *Controller) handleAddNode(key string) error { + c.nodeKeyMutex.LockKey(key) + defer func() { _ = c.nodeKeyMutex.UnlockKey(key) }() + cachedNode, err := c.nodesLister.Get(key) if err != nil { if k8serrors.IsNotFound(err) { @@ -454,6 +457,10 @@ func (c *Controller) handleNodeAnnotationsForProviderNetworks(node *v1.Node) err } func (c *Controller) handleDeleteNode(key string) error { + c.nodeKeyMutex.LockKey(key) + defer func() { _ = c.nodeKeyMutex.UnlockKey(key) }() + klog.Infof("handle delete node %s", key) + portName := fmt.Sprintf("node-%s", key) klog.Infof("delete logical switch port %s", portName) if err := c.ovnClient.DeleteLogicalSwitchPort(portName); err != nil { @@ -579,6 +586,10 @@ func (c *Controller) updateProviderNetworkForNodeDeletion(pn *kubeovnv1.Provider } func (c *Controller) handleUpdateNode(key string) error { + c.nodeKeyMutex.LockKey(key) + defer func() { _ = c.nodeKeyMutex.UnlockKey(key) }() + klog.Infof("handle update node %s", key) + node, err := c.nodesLister.Get(key) if err != nil { if k8serrors.IsNotFound(err) { diff --git a/pkg/controller/service.go b/pkg/controller/service.go index ac97e049fe3..81a582847c9 100644 --- a/pkg/controller/service.go +++ b/pkg/controller/service.go @@ -235,6 +235,16 @@ func (c *Controller) processNextUpdateServiceWorkItem() bool { } func (c *Controller) handleDeleteService(service *vpcService) error { + key, err := cache.MetaNamespaceKeyFunc(service.Svc) + if err != nil { + utilruntime.HandleError(fmt.Errorf("failed to get meta namespace key of %#v: %v", service.Svc, err)) + return nil + } + + c.svcKeyMutex.LockKey(key) + defer func() { _ = c.svcKeyMutex.UnlockKey(key) }() + klog.Infof("handle delete service %s", key) + svcs, err := c.servicesLister.Services(v1.NamespaceAll).List(labels.Everything()) if err != nil { klog.Errorf("failed to list svc, %v", err) @@ -297,7 +307,11 @@ func (c *Controller) handleUpdateService(key string) error { utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key)) return nil } - klog.Infof("update svc %s/%s", namespace, name) + + c.svcKeyMutex.LockKey(key) + defer func() { _ = c.svcKeyMutex.UnlockKey(key) }() + klog.Infof("handle update service %s", key) + svc, err := c.servicesLister.Services(namespace).Get(name) if err != nil { if k8serrors.IsNotFound(err) { @@ -428,6 +442,10 @@ func (c *Controller) handleAddService(key string) error { return nil } + c.svcKeyMutex.LockKey(key) + defer func() { _ = c.svcKeyMutex.UnlockKey(key) }() + klog.Infof("handle add service %s", key) + svc, err := c.servicesLister.Services(namespace).Get(name) if err != nil { if k8serrors.IsNotFound(err) { diff --git a/pkg/controller/subnet.go b/pkg/controller/subnet.go index 92005dfeda4..aece196bf99 100644 --- a/pkg/controller/subnet.go +++ b/pkg/controller/subnet.go @@ -473,8 +473,8 @@ func (c Controller) patchSubnetStatus(subnet *kubeovnv1.Subnet, reason string, e } func (c *Controller) handleAddOrUpdateSubnet(key string) error { - c.subnetStatusKeyMutex.LockKey(key) - defer func() { _ = c.subnetStatusKeyMutex.UnlockKey(key) }() + c.subnetKeyMutex.LockKey(key) + defer func() { _ = c.subnetKeyMutex.UnlockKey(key) }() cachedSubnet, err := c.subnetsLister.Get(key) if err != nil { @@ -746,8 +746,8 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error { } func (c *Controller) handleUpdateSubnetStatus(key string) error { - c.subnetStatusKeyMutex.LockKey(key) - defer func() { _ = c.subnetStatusKeyMutex.UnlockKey(key) }() + c.subnetKeyMutex.LockKey(key) + defer func() { _ = c.subnetKeyMutex.UnlockKey(key) }() cachedSubnet, err := c.subnetsLister.Get(key) subnet := cachedSubnet.DeepCopy() @@ -816,6 +816,9 @@ func (c *Controller) handleDeleteLogicalSwitch(key string) (err error) { } func (c *Controller) handleDeleteSubnet(subnet *kubeovnv1.Subnet) error { + c.subnetKeyMutex.LockKey(subnet.Name) + defer func() { _ = c.subnetKeyMutex.UnlockKey(subnet.Name) }() + c.updateVpcStatusQueue.Add(subnet.Spec.Vpc) klog.Infof("delete u2o interconnection policy route for subnet %s", subnet.Name) if err := c.deletePolicyRouteForU2OInterconn(subnet); err != nil { diff --git a/pkg/controller/vlan.go b/pkg/controller/vlan.go index 6fef240ac64..00d0e7f32c9 100644 --- a/pkg/controller/vlan.go +++ b/pkg/controller/vlan.go @@ -168,12 +168,15 @@ func (c *Controller) processNextDelVlanWorkItem() bool { } func (c *Controller) handleAddVlan(key string) error { + c.vlanKeyMutex.LockKey(key) + defer func() { _ = c.vlanKeyMutex.UnlockKey(key) }() + klog.Infof("handle add vlan %s", key) + cachedVlan, err := c.vlansLister.Get(key) if err != nil { if k8serrors.IsNotFound(err) { return nil } - return err } @@ -229,6 +232,10 @@ func (c *Controller) handleAddVlan(key string) error { } func (c *Controller) handleUpdateVlan(key string) error { + c.vlanKeyMutex.LockKey(key) + defer func() { _ = c.vlanKeyMutex.UnlockKey(key) }() + klog.Infof("handle update vlan %s", key) + vlan, err := c.vlansLister.Get(key) if err != nil { if k8serrors.IsNotFound(err) { @@ -263,6 +270,10 @@ func (c *Controller) handleUpdateVlan(key string) error { } func (c *Controller) handleDelVlan(key string) error { + c.vlanKeyMutex.LockKey(key) + defer func() { _ = c.vlanKeyMutex.UnlockKey(key) }() + klog.Infof("handle delete vlan %s", key) + subnet, err := c.subnetsLister.List(labels.Everything()) if err != nil { klog.Errorf("failed to list subnets: %v", err) diff --git a/pkg/controller/vpc.go b/pkg/controller/vpc.go index 9467aaa800a..3e98cc77452 100644 --- a/pkg/controller/vpc.go +++ b/pkg/controller/vpc.go @@ -95,6 +95,10 @@ func (c *Controller) runDelVpcWorker() { } func (c *Controller) handleDelVpc(vpc *kubeovnv1.Vpc) error { + c.vpcKeyMutex.LockKey(vpc.Name) + defer func() { _ = c.vpcKeyMutex.UnlockKey(vpc.Name) }() + klog.Infof("handle delete vpc %s", vpc.Name) + if err := c.deleteVpcLb(vpc); err != nil { return err } @@ -119,6 +123,10 @@ func (c *Controller) handleDelVpc(vpc *kubeovnv1.Vpc) error { } func (c *Controller) handleUpdateVpcStatus(key string) error { + c.vpcKeyMutex.LockKey(key) + defer func() { _ = c.vpcKeyMutex.UnlockKey(key) }() + klog.Infof("handle status update for vpc %s", key) + cachedVpc, err := c.vpcsLister.Get(key) if err != nil { if k8serrors.IsNotFound(err) { @@ -223,6 +231,10 @@ func (c *Controller) addLoadBalancer(vpc string) (*VpcLoadBalancer, error) { } func (c *Controller) handleAddOrUpdateVpc(key string) error { + c.vpcKeyMutex.LockKey(key) + defer func() { _ = c.vpcKeyMutex.UnlockKey(key) }() + klog.Infof("handle add/update vpc %s", key) + // get latest vpc info cachedVpc, err := c.vpcsLister.Get(key) if err != nil {