Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize kube-ovn-controller logic #2771

Merged
merged 1 commit into from
May 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ require (
github.com/kubeovn/gonetworkmanager/v2 v2.0.0-20230327064018-0b27f88874f7
github.com/mdlayher/arp v0.0.0-20220512170110-6706a2966875
github.com/moby/sys/mountinfo v0.6.2
github.com/neverlee/keymutex v0.0.0-20171121013845-f593aa834bf9
github.com/oilbeater/go-ping v0.0.0-20200413021620-332b7197c5b5
github.com/onsi/ginkgo/v2 v2.9.2
github.com/onsi/gomega v1.27.6
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1052,8 +1052,6 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J1GEMiLbxo1LJaP8RfCpH6pymGZus=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
github.com/ncw/swift v1.0.47/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM=
github.com/neverlee/keymutex v0.0.0-20171121013845-f593aa834bf9 h1:UfW5pM66x0MWE72ySrpd2Ymrn+b62kNHirozKkY3ojE=
github.com/neverlee/keymutex v0.0.0-20171121013845-f593aa834bf9/go.mod h1:3hf2IoUXDKjCg/EuqSLUB5TY8StGS3haWYJiqzP907c=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
Expand Down
27 changes: 16 additions & 11 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package controller
import (
"context"
"fmt"
"runtime"
"sync"
"time"

"github.com/neverlee/keymutex"
"golang.org/x/time/rate"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -22,6 +22,7 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/utils/keymutex"

kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
Expand Down Expand Up @@ -63,7 +64,7 @@ type Controller struct {
addOrUpdatePodQueue workqueue.RateLimitingInterface
deletePodQueue workqueue.RateLimitingInterface
updatePodSecurityQueue workqueue.RateLimitingInterface
podKeyMutex *keymutex.KeyMutex
podKeyMutex keymutex.KeyMutex

vpcsLister kubeovnlister.VpcLister
vpcSynced cache.InformerSynced
Expand All @@ -81,7 +82,7 @@ type Controller struct {
updateVpcDnatQueue workqueue.RateLimitingInterface
updateVpcSnatQueue workqueue.RateLimitingInterface
updateVpcSubnetQueue workqueue.RateLimitingInterface
vpcNatGwKeyMutex *keymutex.KeyMutex
vpcNatGwKeyMutex keymutex.KeyMutex

switchLBRuleLister kubeovnlister.SwitchLBRuleLister
switchLBRuleSynced cache.InformerSynced
Expand All @@ -101,7 +102,7 @@ type Controller struct {
deleteRouteQueue workqueue.RateLimitingInterface
updateSubnetStatusQueue workqueue.RateLimitingInterface
syncVirtualPortsQueue workqueue.RateLimitingInterface
subnetStatusKeyMutex *keymutex.KeyMutex
subnetStatusKeyMutex keymutex.KeyMutex

ipsLister kubeovnlister.IPLister
ipSynced cache.InformerSynced
Expand Down Expand Up @@ -208,14 +209,14 @@ type Controller struct {
npsSynced cache.InformerSynced
updateNpQueue workqueue.RateLimitingInterface
deleteNpQueue workqueue.RateLimitingInterface
npKeyMutex *keymutex.KeyMutex
npKeyMutex keymutex.KeyMutex

sgsLister kubeovnlister.SecurityGroupLister
sgSynced cache.InformerSynced
addOrUpdateSgQueue workqueue.RateLimitingInterface
delSgQueue workqueue.RateLimitingInterface
syncSgPortsQueue workqueue.RateLimitingInterface
sgKeyMutex *keymutex.KeyMutex
sgKeyMutex keymutex.KeyMutex

qosPoliciesLister kubeovnlister.QoSPolicyLister
qosPolicySynced cache.InformerSynced
Expand Down Expand Up @@ -287,6 +288,10 @@ func Run(ctx context.Context, config *Configuration) {
ovnSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnSnatRules()
ovnDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnDnatRules()

numKeyLocks := runtime.NumCPU() * 2
if numKeyLocks < config.WorkerNum*2 {
numKeyLocks = config.WorkerNum * 2
}
controller := &Controller{
config: config,
vpcs: &sync.Map{},
Expand All @@ -311,7 +316,7 @@ func Run(ctx context.Context, config *Configuration) {
updateVpcDnatQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "UpdateVpcDnat"),
updateVpcSnatQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "UpdateVpcSnat"),
updateVpcSubnetQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "UpdateVpcSubnet"),
vpcNatGwKeyMutex: keymutex.New(97),
vpcNatGwKeyMutex: keymutex.NewHashed(numKeyLocks),

subnetsLister: subnetInformer.Lister(),
subnetSynced: subnetInformer.Informer().HasSynced,
Expand All @@ -320,7 +325,7 @@ func Run(ctx context.Context, config *Configuration) {
deleteRouteQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteRoute"),
updateSubnetStatusQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateSubnetStatus"),
syncVirtualPortsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "SyncVirtualPort"),
subnetStatusKeyMutex: keymutex.New(97),
subnetStatusKeyMutex: keymutex.NewHashed(numKeyLocks),

ipsLister: ipInformer.Lister(),
ipSynced: ipInformer.Informer().HasSynced,
Expand Down Expand Up @@ -382,7 +387,7 @@ func Run(ctx context.Context, config *Configuration) {
addOrUpdatePodQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddOrUpdatePod"),
deletePodQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeletePod"),
updatePodSecurityQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdatePodSecurity"),
podKeyMutex: keymutex.New(97),
podKeyMutex: keymutex.NewHashed(numKeyLocks),

namespacesLister: namespaceInformer.Lister(),
namespacesSynced: namespaceInformer.Informer().HasSynced,
Expand Down Expand Up @@ -413,7 +418,7 @@ func Run(ctx context.Context, config *Configuration) {
configMapsLister: configMapInformer.Lister(),
configMapsSynced: configMapInformer.Informer().HasSynced,

sgKeyMutex: keymutex.New(97),
sgKeyMutex: keymutex.NewHashed(numKeyLocks),
sgsLister: sgInformer.Lister(),
sgSynced: sgInformer.Informer().HasSynced,
addOrUpdateSgQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateSg"),
Expand Down Expand Up @@ -474,7 +479,7 @@ func Run(ctx context.Context, config *Configuration) {
controller.npsSynced = npInformer.Informer().HasSynced
controller.updateNpQueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateNp")
controller.deleteNpQueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteNp")
controller.npKeyMutex = keymutex.New(97)
controller.npKeyMutex = keymutex.NewHashed(128)
}

defer controller.shutdown()
Expand Down
20 changes: 10 additions & 10 deletions pkg/controller/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -247,14 +246,17 @@ func (c *Controller) gcNode() error {

func (c *Controller) gcVip() error {
klog.Infof("start to gc vips")
vips, err := c.config.KubeOvnClient.KubeovnV1().Vips().List(context.Background(), metav1.ListOptions{
LabelSelector: fields.OneTermNotEqualSelector(util.IpReservedLabel, "").String()},
)
selector, err := util.LabelSelectorNotEmpty(util.IpReservedLabel)
if err != nil {
klog.Errorf("failed to generate selector for label %s: %v", util.IpReservedLabel, err)
return err
}
vips, err := c.virtualIpsLister.List(selector)
if err != nil {
klog.Errorf("failed to list VIPs: %v", err)
return err
}
for _, vip := range vips.Items {
for _, vip := range vips {
portName := vip.Labels[util.IpReservedLabel]
portNameSplits := strings.Split(portName, ".")
if len(portNameSplits) >= 2 {
Expand Down Expand Up @@ -681,7 +683,7 @@ func (c *Controller) gcChassis() error {

func (c *Controller) isOVNProvided(providerName string, pod *corev1.Pod) (bool, error) {
ls := pod.Annotations[fmt.Sprintf(util.LogicalSwitchAnnotationTemplate, providerName)]
subnet, err := c.config.KubeOvnClient.KubeovnV1().Subnets().Get(context.Background(), ls, metav1.GetOptions{})
subnet, err := c.subnetsLister.Get(ls)
if err != nil {
klog.Errorf("parse annotation logical switch %s error %v", ls, err)
return false, err
Expand Down Expand Up @@ -815,15 +817,13 @@ func (c *Controller) gcVpcDns() error {
}
}

slrs, err := c.config.KubeOvnClient.KubeovnV1().SwitchLBRules().List(context.Background(), metav1.ListOptions{
LabelSelector: sel.String(),
})
slrs, err := c.switchLBRuleLister.List(sel)
if err != nil {
klog.Errorf("failed to list vpc-dns SwitchLBRules, %s", err)
return err
}

for _, slr := range slrs.Items {
for _, slr := range slrs {
canFind := false
for _, vd := range vds {
name := genVpcDnsDpName(vd.Name)
Expand Down
14 changes: 7 additions & 7 deletions pkg/controller/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (c *Controller) InitDefaultVpc() error {

// InitDefaultLogicalSwitch init the default logical switch for ovn network
func (c *Controller) initDefaultLogicalSwitch() error {
subnet, err := c.config.KubeOvnClient.KubeovnV1().Subnets().Get(context.Background(), c.config.DefaultLogicalSwitch, metav1.GetOptions{})
subnet, err := c.subnetsLister.Get(c.config.DefaultLogicalSwitch)
if err == nil {
if subnet != nil && util.CheckProtocol(c.config.DefaultCIDR) != util.CheckProtocol(subnet.Spec.CIDRBlock) {
// single-stack upgrade to dual-stack
Expand Down Expand Up @@ -152,7 +152,7 @@ func (c *Controller) initDefaultLogicalSwitch() error {

// InitNodeSwitch init node switch to connect host and pod
func (c *Controller) initNodeSwitch() error {
subnet, err := c.config.KubeOvnClient.KubeovnV1().Subnets().Get(context.Background(), c.config.NodeSwitch, metav1.GetOptions{})
subnet, err := c.subnetsLister.Get(c.config.NodeSwitch)
if err == nil {
if util.CheckProtocol(c.config.NodeSwitchCIDR) == kubeovnv1.ProtocolDual && util.CheckProtocol(subnet.Spec.CIDRBlock) != kubeovnv1.ProtocolDual {
// single-stack upgrade to dual-stack
Expand Down Expand Up @@ -226,13 +226,13 @@ func (c *Controller) initLB(name, protocol string, sessionAffinity bool) error {

// InitLoadBalancer init the default tcp and udp cluster loadbalancer
func (c *Controller) initLoadBalancer() error {
vpcs, err := c.config.KubeOvnClient.KubeovnV1().Vpcs().List(context.Background(), metav1.ListOptions{})
vpcs, err := c.vpcsLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to list vpc: %v", err)
return err
}

for _, cachedVpc := range vpcs.Items {
for _, cachedVpc := range vpcs {
vpc := cachedVpc.DeepCopy()
vpcLb := c.GenVpcLoadBalancer(vpc.Name)
if err = c.initLB(vpcLb.TcpLoadBalancer, string(v1.ProtocolTCP), false); err != nil {
Expand Down Expand Up @@ -612,7 +612,7 @@ func (c *Controller) initDefaultVlan() error {

func (c *Controller) initSyncCrdIPs() error {
klog.Info("start to sync ips")
ips, err := c.config.KubeOvnClient.KubeovnV1().IPs().List(context.Background(), metav1.ListOptions{})
ips, err := c.ipsLister.List(labels.Everything())
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
Expand All @@ -622,7 +622,7 @@ func (c *Controller) initSyncCrdIPs() error {

ipMap := strset.New(c.getVmLsps()...)

for _, ipCr := range ips.Items {
for _, ipCr := range ips {
ip := ipCr.DeepCopy()
changed := false
if ipMap.Has(ip.Name) && ip.Spec.PodType == "" {
Expand Down Expand Up @@ -669,7 +669,7 @@ func (c *Controller) initSyncCrdSubnets() error {

// only sync subnet spec enableEcmp when subnet.Spec.EnableEcmp is false and c.config.EnableEcmp is true
if subnet.Spec.GatewayType == kubeovnv1.GWCentralizedType && !subnet.Spec.EnableEcmp && subnet.Spec.EnableEcmp != c.config.EnableEcmp {
subnet, err = c.config.KubeOvnClient.KubeovnV1().Subnets().Get(context.Background(), subnet.Name, metav1.GetOptions{})
subnet, err = c.subnetsLister.Get(subnet.Name)
if err != nil {
klog.Errorf("failed to get subnet %s: %v", subnet.Name, err)
return err
Expand Down
11 changes: 5 additions & 6 deletions pkg/controller/network_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strings"
"unicode"

"github.com/ovn-org/libovsdb/ovsdb"
corev1 "k8s.io/api/core/v1"
netv1 "k8s.io/api/networking/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -16,8 +17,6 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

"github.com/ovn-org/libovsdb/ovsdb"

kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
"github.com/kubeovn/kube-ovn/pkg/ovs"
"github.com/kubeovn/kube-ovn/pkg/util"
Expand Down Expand Up @@ -141,8 +140,8 @@ func (c *Controller) handleUpdateNp(key string) error {
return nil
}

c.npKeyMutex.Lock(key)
defer c.npKeyMutex.Unlock(key)
c.npKeyMutex.LockKey(key)
defer func() { _ = c.npKeyMutex.UnlockKey(key) }()
klog.Infof("handle add/update network policy %s", key)

np, err := c.npsLister.NetworkPolicies(namespace).Get(name)
Expand Down Expand Up @@ -564,8 +563,8 @@ func (c *Controller) handleDeleteNp(key string) error {
return nil
}

c.npKeyMutex.Lock(key)
defer c.npKeyMutex.Unlock(key)
c.npKeyMutex.LockKey(key)
defer func() { _ = c.npKeyMutex.UnlockKey(key) }()
klog.Infof("handle delete network policy %s", key)

npName := name
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ func (c *Controller) createOrUpdateCrdIPs(podName, ip, mac, subnetName, ns, node
if existingCR != nil {
ipCr = *existingCR
} else {
ipCr, err = c.config.KubeOvnClient.KubeovnV1().IPs().Get(context.Background(), ipName, metav1.GetOptions{})
ipCr, err = c.ipsLister.Get(ipName)
if err != nil {
if !k8serrors.IsNotFound(err) {
errMsg := fmt.Errorf("failed to get ip CR %s: %v", ipName, err)
Expand Down
14 changes: 8 additions & 6 deletions pkg/controller/ovn_eip.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@ import (
"strconv"
"time"

kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
"github.com/kubeovn/kube-ovn/pkg/util"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
"github.com/kubeovn/kube-ovn/pkg/util"
)

func (c *Controller) enqueueAddOvnEip(obj interface{}) {
Expand Down Expand Up @@ -711,24 +713,24 @@ func (c *Controller) isOvnEipNotUse(cachedEip *kubeovnv1.OvnEip) (bool, error) {
switch cachedEip.Status.Type {
case util.DnatUsingEip:
// nat change eip not that fast
dnats, err := c.config.KubeOvnClient.KubeovnV1().OvnDnatRules().List(context.Background(), metav1.ListOptions{})
dnats, err := c.ovnDnatRulesLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to get ovn dnat list, %v", err)
return false, err
}
for _, item := range dnats.Items {
for _, item := range dnats {
if item.Annotations[util.VpcEipAnnotation] == cachedEip.Name {
return false, nil
}
}
case util.SnatUsingEip:
// nat change eip not that fast
snats, err := c.config.KubeOvnClient.KubeovnV1().OvnSnatRules().List(context.Background(), metav1.ListOptions{})
snats, err := c.ovnSnatRulesLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to get ovn snat, %v", err)
return false, err
}
for _, item := range snats.Items {
for _, item := range snats {
if item.Annotations[util.VpcEipAnnotation] == cachedEip.Name {
return false, nil
}
Expand Down
Loading