Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add key lock for more resources #2781

Merged
merged 1 commit into from
May 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type Controller struct {
addOrUpdateVpcQueue workqueue.RateLimitingInterface
delVpcQueue workqueue.RateLimitingInterface
updateVpcStatusQueue workqueue.RateLimitingInterface
vpcKeyMutex keymutex.KeyMutex

vpcNatGatewayLister kubeovnlister.VpcNatGatewayLister
vpcNatGatewaySynced cache.InformerSynced
Expand Down Expand Up @@ -101,7 +102,7 @@ type Controller struct {
deleteSubnetQueue workqueue.RateLimitingInterface
updateSubnetStatusQueue workqueue.RateLimitingInterface
syncVirtualPortsQueue workqueue.RateLimitingInterface
subnetStatusKeyMutex keymutex.KeyMutex
subnetKeyMutex keymutex.KeyMutex

ipsLister kubeovnlister.IPLister
ipSynced cache.InformerSynced
Expand Down Expand Up @@ -174,35 +175,39 @@ type Controller struct {
updateOvnDnatRuleQueue workqueue.RateLimitingInterface
delOvnDnatRuleQueue workqueue.RateLimitingInterface

vlansLister kubeovnlister.VlanLister
vlanSynced cache.InformerSynced

providerNetworksLister kubeovnlister.ProviderNetworkLister
providerNetworkSynced cache.InformerSynced

vlansLister kubeovnlister.VlanLister
vlanSynced cache.InformerSynced
addVlanQueue workqueue.RateLimitingInterface
delVlanQueue workqueue.RateLimitingInterface
updateVlanQueue workqueue.RateLimitingInterface
vlanKeyMutex keymutex.KeyMutex

namespacesLister v1.NamespaceLister
namespacesSynced cache.InformerSynced
addNamespaceQueue workqueue.RateLimitingInterface
nsKeyMutex keymutex.KeyMutex

nodesLister v1.NodeLister
nodesSynced cache.InformerSynced
addNodeQueue workqueue.RateLimitingInterface
updateNodeQueue workqueue.RateLimitingInterface
deleteNodeQueue workqueue.RateLimitingInterface
nodeKeyMutex keymutex.KeyMutex

servicesLister v1.ServiceLister
serviceSynced cache.InformerSynced
addServiceQueue workqueue.RateLimitingInterface
deleteServiceQueue workqueue.RateLimitingInterface
updateServiceQueue workqueue.RateLimitingInterface
svcKeyMutex keymutex.KeyMutex

endpointsLister v1.EndpointsLister
endpointsSynced cache.InformerSynced
updateEndpointQueue workqueue.RateLimitingInterface
epKeyMutex keymutex.KeyMutex

npsLister netv1.NetworkPolicyLister
npsSynced cache.InformerSynced
Expand Down Expand Up @@ -304,6 +309,7 @@ func Run(ctx context.Context, config *Configuration) {
addOrUpdateVpcQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddOrUpdateVpc"),
delVpcQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteVpc"),
updateVpcStatusQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateVpcStatus"),
vpcKeyMutex: keymutex.NewHashed(numKeyLocks),

vpcNatGatewayLister: vpcNatGatewayInformer.Lister(),
vpcNatGatewaySynced: vpcNatGatewayInformer.Informer().HasSynced,
Expand All @@ -323,7 +329,7 @@ func Run(ctx context.Context, config *Configuration) {
deleteSubnetQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteSubnet"),
updateSubnetStatusQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateSubnetStatus"),
syncVirtualPortsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "SyncVirtualPort"),
subnetStatusKeyMutex: keymutex.NewHashed(numKeyLocks),
subnetKeyMutex: keymutex.NewHashed(numKeyLocks),

ipsLister: ipInformer.Lister(),
ipSynced: ipInformer.Informer().HasSynced,
Expand Down Expand Up @@ -376,6 +382,7 @@ func Run(ctx context.Context, config *Configuration) {
addVlanQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddVlan"),
delVlanQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DelVlan"),
updateVlanQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateVlan"),
vlanKeyMutex: keymutex.NewHashed(numKeyLocks),

providerNetworksLister: providerNetworkInformer.Lister(),
providerNetworkSynced: providerNetworkInformer.Informer().HasSynced,
Expand All @@ -393,22 +400,26 @@ func Run(ctx context.Context, config *Configuration) {
namespacesLister: namespaceInformer.Lister(),
namespacesSynced: namespaceInformer.Informer().HasSynced,
addNamespaceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddNamespace"),
nsKeyMutex: keymutex.NewHashed(numKeyLocks),

nodesLister: nodeInformer.Lister(),
nodesSynced: nodeInformer.Informer().HasSynced,
addNodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddNode"),
updateNodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateNode"),
deleteNodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteNode"),
nodeKeyMutex: keymutex.NewHashed(numKeyLocks),

servicesLister: serviceInformer.Lister(),
serviceSynced: serviceInformer.Informer().HasSynced,
addServiceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddService"),
deleteServiceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteService"),
updateServiceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateService"),
svcKeyMutex: keymutex.NewHashed(numKeyLocks),

endpointsLister: endpointInformer.Lister(),
endpointsSynced: endpointInformer.Informer().HasSynced,
updateEndpointQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateEndpoint"),
epKeyMutex: keymutex.NewHashed(numKeyLocks),

qosPoliciesLister: qosPolicyInformer.Lister(),
qosPolicySynced: qosPolicyInformer.Informer().HasSynced,
Expand Down
5 changes: 4 additions & 1 deletion pkg/controller/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ func (c *Controller) handleUpdateEndpoint(key string) error {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
}
klog.Infof("update endpoint %s/%s", namespace, name)

c.epKeyMutex.LockKey(key)
defer func() { _ = c.epKeyMutex.UnlockKey(key) }()
klog.Infof("update add/update endpoint %s/%s", namespace, name)

ep, err := c.endpointsLister.Endpoints(namespace).Get(name)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ func (c *Controller) processNextAddNamespaceWorkItem() bool {
}

func (c *Controller) handleAddNamespace(key string) error {
c.nsKeyMutex.LockKey(key)
defer func() { _ = c.nsKeyMutex.UnlockKey(key) }()
klog.Infof("handle add/update namespace %s", key)

cachedNs, err := c.namespacesLister.Get(key)
if err != nil {
if errors.IsNotFound(err) {
Expand Down
11 changes: 11 additions & 0 deletions pkg/controller/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ func nodeUnderlayAddressSetName(node string, af int) string {
}

func (c *Controller) handleAddNode(key string) error {
c.nodeKeyMutex.LockKey(key)
defer func() { _ = c.nodeKeyMutex.UnlockKey(key) }()

cachedNode, err := c.nodesLister.Get(key)
if err != nil {
if k8serrors.IsNotFound(err) {
Expand Down Expand Up @@ -454,6 +457,10 @@ func (c *Controller) handleNodeAnnotationsForProviderNetworks(node *v1.Node) err
}

func (c *Controller) handleDeleteNode(key string) error {
c.nodeKeyMutex.LockKey(key)
defer func() { _ = c.nodeKeyMutex.UnlockKey(key) }()
klog.Infof("handle delete node %s", key)

portName := fmt.Sprintf("node-%s", key)
klog.Infof("delete logical switch port %s", portName)
if err := c.ovnClient.DeleteLogicalSwitchPort(portName); err != nil {
Expand Down Expand Up @@ -579,6 +586,10 @@ func (c *Controller) updateProviderNetworkForNodeDeletion(pn *kubeovnv1.Provider
}

func (c *Controller) handleUpdateNode(key string) error {
c.nodeKeyMutex.LockKey(key)
defer func() { _ = c.nodeKeyMutex.UnlockKey(key) }()
klog.Infof("handle update node %s", key)

node, err := c.nodesLister.Get(key)
if err != nil {
if k8serrors.IsNotFound(err) {
Expand Down
20 changes: 19 additions & 1 deletion pkg/controller/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,16 @@ func (c *Controller) processNextUpdateServiceWorkItem() bool {
}

func (c *Controller) handleDeleteService(service *vpcService) error {
key, err := cache.MetaNamespaceKeyFunc(service.Svc)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to get meta namespace key of %#v: %v", service.Svc, err))
return nil
}

c.svcKeyMutex.LockKey(key)
defer func() { _ = c.svcKeyMutex.UnlockKey(key) }()
klog.Infof("handle delete service %s", key)

svcs, err := c.servicesLister.Services(v1.NamespaceAll).List(labels.Everything())
if err != nil {
klog.Errorf("failed to list svc, %v", err)
Expand Down Expand Up @@ -297,7 +307,11 @@ func (c *Controller) handleUpdateService(key string) error {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
}
klog.Infof("update svc %s/%s", namespace, name)

c.svcKeyMutex.LockKey(key)
defer func() { _ = c.svcKeyMutex.UnlockKey(key) }()
klog.Infof("handle update service %s", key)

svc, err := c.servicesLister.Services(namespace).Get(name)
if err != nil {
if k8serrors.IsNotFound(err) {
Expand Down Expand Up @@ -428,6 +442,10 @@ func (c *Controller) handleAddService(key string) error {
return nil
}

c.svcKeyMutex.LockKey(key)
defer func() { _ = c.svcKeyMutex.UnlockKey(key) }()
klog.Infof("handle add service %s", key)

svc, err := c.servicesLister.Services(namespace).Get(name)
if err != nil {
if k8serrors.IsNotFound(err) {
Expand Down
11 changes: 7 additions & 4 deletions pkg/controller/subnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,8 +473,8 @@ func (c Controller) patchSubnetStatus(subnet *kubeovnv1.Subnet, reason string, e
}

func (c *Controller) handleAddOrUpdateSubnet(key string) error {
c.subnetStatusKeyMutex.LockKey(key)
defer func() { _ = c.subnetStatusKeyMutex.UnlockKey(key) }()
c.subnetKeyMutex.LockKey(key)
defer func() { _ = c.subnetKeyMutex.UnlockKey(key) }()

cachedSubnet, err := c.subnetsLister.Get(key)
if err != nil {
Expand Down Expand Up @@ -746,8 +746,8 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error {
}

func (c *Controller) handleUpdateSubnetStatus(key string) error {
c.subnetStatusKeyMutex.LockKey(key)
defer func() { _ = c.subnetStatusKeyMutex.UnlockKey(key) }()
c.subnetKeyMutex.LockKey(key)
defer func() { _ = c.subnetKeyMutex.UnlockKey(key) }()

cachedSubnet, err := c.subnetsLister.Get(key)
subnet := cachedSubnet.DeepCopy()
Expand Down Expand Up @@ -816,6 +816,9 @@ func (c *Controller) handleDeleteLogicalSwitch(key string) (err error) {
}

func (c *Controller) handleDeleteSubnet(subnet *kubeovnv1.Subnet) error {
c.subnetKeyMutex.LockKey(subnet.Name)
defer func() { _ = c.subnetKeyMutex.UnlockKey(subnet.Name) }()

c.updateVpcStatusQueue.Add(subnet.Spec.Vpc)
klog.Infof("delete u2o interconnection policy route for subnet %s", subnet.Name)
if err := c.deletePolicyRouteForU2OInterconn(subnet); err != nil {
Expand Down
13 changes: 12 additions & 1 deletion pkg/controller/vlan.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,15 @@ func (c *Controller) processNextDelVlanWorkItem() bool {
}

func (c *Controller) handleAddVlan(key string) error {
c.vlanKeyMutex.LockKey(key)
defer func() { _ = c.vlanKeyMutex.UnlockKey(key) }()
klog.Infof("handle add vlan %s", key)

cachedVlan, err := c.vlansLister.Get(key)
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}

return err
}

Expand Down Expand Up @@ -229,6 +232,10 @@ func (c *Controller) handleAddVlan(key string) error {
}

func (c *Controller) handleUpdateVlan(key string) error {
c.vlanKeyMutex.LockKey(key)
defer func() { _ = c.vlanKeyMutex.UnlockKey(key) }()
klog.Infof("handle update vlan %s", key)

vlan, err := c.vlansLister.Get(key)
if err != nil {
if k8serrors.IsNotFound(err) {
Expand Down Expand Up @@ -263,6 +270,10 @@ func (c *Controller) handleUpdateVlan(key string) error {
}

func (c *Controller) handleDelVlan(key string) error {
c.vlanKeyMutex.LockKey(key)
defer func() { _ = c.vlanKeyMutex.UnlockKey(key) }()
klog.Infof("handle delete vlan %s", key)

subnet, err := c.subnetsLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to list subnets: %v", err)
Expand Down
12 changes: 12 additions & 0 deletions pkg/controller/vpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ func (c *Controller) runDelVpcWorker() {
}

func (c *Controller) handleDelVpc(vpc *kubeovnv1.Vpc) error {
c.vpcKeyMutex.LockKey(vpc.Name)
defer func() { _ = c.vpcKeyMutex.UnlockKey(vpc.Name) }()
klog.Infof("handle delete vpc %s", vpc.Name)

if err := c.deleteVpcLb(vpc); err != nil {
return err
}
Expand All @@ -119,6 +123,10 @@ func (c *Controller) handleDelVpc(vpc *kubeovnv1.Vpc) error {
}

func (c *Controller) handleUpdateVpcStatus(key string) error {
c.vpcKeyMutex.LockKey(key)
defer func() { _ = c.vpcKeyMutex.UnlockKey(key) }()
klog.Infof("handle status update for vpc %s", key)

cachedVpc, err := c.vpcsLister.Get(key)
if err != nil {
if k8serrors.IsNotFound(err) {
Expand Down Expand Up @@ -223,6 +231,10 @@ func (c *Controller) addLoadBalancer(vpc string) (*VpcLoadBalancer, error) {
}

func (c *Controller) handleAddOrUpdateVpc(key string) error {
c.vpcKeyMutex.LockKey(key)
defer func() { _ = c.vpcKeyMutex.UnlockKey(key) }()
klog.Infof("handle add/update vpc %s", key)

// get latest vpc info
cachedVpc, err := c.vpcsLister.Get(key)
if err != nil {
Expand Down