From d3d01762c1640d4e6469e88ee5d9c6a16b07fb45 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=A5=96=E5=BB=BA?= Date: Mon, 8 May 2023 13:29:14 +0800 Subject: [PATCH] informer: wait for cache sync before adding event handlers (#2768) --- cmd/controller/controller.go | 3 +- cmd/daemon/cniserver.go | 5 +- pkg/controller/controller.go | 273 ++++++++++++++++++----------------- pkg/daemon/controller.go | 16 +- 4 files changed, 153 insertions(+), 144 deletions(-) diff --git a/cmd/controller/controller.go b/cmd/controller/controller.go index 15e7fa3bfec..4e5be8c3b5b 100644 --- a/cmd/controller/controller.go +++ b/cmd/controller/controller.go @@ -116,8 +116,7 @@ func CmdMain() { RetryPeriod: 6 * time.Second, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func(ctx context.Context) { - ctl := controller.NewController(config) - ctl.Run(ctx) + controller.Run(ctx, config) }, OnStoppedLeading: func() { select { diff --git a/cmd/daemon/cniserver.go b/cmd/daemon/cniserver.go index 0716f44b278..fa59d9ea5cb 100644 --- a/cmd/daemon/cniserver.go +++ b/cmd/daemon/cniserver.go @@ -73,13 +73,10 @@ func CmdMain() { kubeovninformer.WithTweakListOptions(func(listOption *v1.ListOptions) { listOption.AllowWatchBookmarks = true })) - ctl, err := daemon.NewController(config, podInformerFactory, nodeInformerFactory, kubeovnInformerFactory) + ctl, err := daemon.NewController(config, stopCh, podInformerFactory, nodeInformerFactory, kubeovnInformerFactory) if err != nil { util.LogFatalAndExit(err, "failed to create controller") } - podInformerFactory.Start(stopCh) - nodeInformerFactory.Start(stopCh) - kubeovnInformerFactory.Start(stopCh) klog.Info("start daemon controller") go ctl.Run(stopCh) go daemon.RunServer(config, ctl) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index e08fad473e2..7684ca5b67f 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -232,8 +232,8 @@ type Controller struct { kubeovnInformerFactory kubeovninformer.SharedInformerFactory } -// NewController returns a new ovn controller -func NewController(config *Configuration) *Controller { +// Run creates and runs a new ovn controller +func Run(ctx context.Context, config *Configuration) { utilruntime.Must(kubeovnv1.AddToScheme(scheme.Scheme)) klog.V(4).Info("Creating event broadcaster") eventBroadcaster := record.NewBroadcaster() @@ -279,6 +279,13 @@ func NewController(config *Configuration) *Controller { endpointInformer := informerFactory.Core().V1().Endpoints() qosPolicyInformer := kubeovnInformerFactory.Kubeovn().V1().QoSPolicies() configMapInformer := cmInformerFactory.Core().V1().ConfigMaps() + npInformer := informerFactory.Networking().V1().NetworkPolicies() + switchLBRuleInformer := kubeovnInformerFactory.Kubeovn().V1().SwitchLBRules() + vpcDnsInformer := kubeovnInformerFactory.Kubeovn().V1().VpcDnses() + ovnEipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnEips() + ovnFipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnFips() + ovnSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnSnatRules() + ovnDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnDnatRules() controller := &Controller{ config: config, @@ -320,46 +327,46 @@ func NewController(config *Configuration) *Controller { virtualIpsLister: virtualIpInformer.Lister(), virtualIpsSynced: virtualIpInformer.Informer().HasSynced, - addVirtualIpQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "addVirtualIp"), - updateVirtualIpQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "updateVirtualIp"), - delVirtualIpQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "delVirtualIp"), + addVirtualIpQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddVirtualIp"), + updateVirtualIpQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateVirtualIp"), + delVirtualIpQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteVirtualIp"), iptablesEipsLister: iptablesEipInformer.Lister(), iptablesEipSynced: iptablesEipInformer.Informer().HasSynced, - addIptablesEipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "addIptablesEip"), - updateIptablesEipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "updateIptablesEip"), - resetIptablesEipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "resetIptablesEip"), - delIptablesEipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "delIptablesEip"), + addIptablesEipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "AddIptablesEip"), + updateIptablesEipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "UpdateIptablesEip"), + resetIptablesEipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "ResetIptablesEip"), + delIptablesEipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "DeleteIptablesEip"), podAnnotatedIptablesEipLister: podAnnotatedIptablesEipInformer.Lister(), podAnnotatedIptablesEipSynced: podAnnotatedIptablesEipInformer.Informer().HasSynced, - addPodAnnotatedIptablesEipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "addPodAnnotatedIptablesEip"), - updatePodAnnotatedIptablesEipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "updatePodAnnotatedIptablesEip"), - delPodAnnotatedIptablesEipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "delPodAnnotatedIptablesEip"), + addPodAnnotatedIptablesEipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "AddPodAnnotatedIptablesEip"), + updatePodAnnotatedIptablesEipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "UpdatePodAnnotatedIptablesEip"), + delPodAnnotatedIptablesEipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "DeletePodAnnotatedIptablesEip"), iptablesFipsLister: iptablesFipInformer.Lister(), iptablesFipSynced: iptablesFipInformer.Informer().HasSynced, - addIptablesFipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "addIptablesFip"), - updateIptablesFipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "updateIptablesFip"), - delIptablesFipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "delIptablesFip"), + addIptablesFipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "AddIptablesFip"), + updateIptablesFipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "UpdateIptablesFip"), + delIptablesFipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "DeleteIptablesFip"), podAnnotatedIptablesFipLister: podAnnotatedIptablesFipInformer.Lister(), podAnnotatedIptablesFipSynced: podAnnotatedIptablesFipInformer.Informer().HasSynced, - addPodAnnotatedIptablesFipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "addPodAnnotatedIptablesFip"), - updatePodAnnotatedIptablesFipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "updatePodAnnotatedIptablesFip"), - delPodAnnotatedIptablesFipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "delPodAnnotatedIptablesFip"), + addPodAnnotatedIptablesFipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "AddPodAnnotatedIptablesFip"), + updatePodAnnotatedIptablesFipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "UpdatePodAnnotatedIptablesFip"), + delPodAnnotatedIptablesFipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "DeletePodAnnotatedIptablesFip"), iptablesDnatRulesLister: iptablesDnatRuleInformer.Lister(), iptablesDnatRuleSynced: iptablesDnatRuleInformer.Informer().HasSynced, - addIptablesDnatRuleQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "addIptablesDnatRule"), - updateIptablesDnatRuleQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "updateIptablesDnatRule"), - delIptablesDnatRuleQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "delIptablesDnatRule"), + addIptablesDnatRuleQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "AddIptablesDnatRule"), + updateIptablesDnatRuleQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "UpdateIptablesDnatRule"), + delIptablesDnatRuleQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "DeleteIptablesDnatRule"), iptablesSnatRulesLister: iptablesSnatRuleInformer.Lister(), iptablesSnatRuleSynced: iptablesSnatRuleInformer.Informer().HasSynced, - addIptablesSnatRuleQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "addIptablesSnatRule"), - updateIptablesSnatRuleQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "updateIptablesSnatRule"), - delIptablesSnatRuleQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "delIptablesSnatRule"), + addIptablesSnatRuleQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "AddIptablesSnatRule"), + updateIptablesSnatRuleQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "UpdateIptablesSnatRule"), + delIptablesSnatRuleQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "DeleteIptablesSnatRule"), vlansLister: vlanInformer.Lister(), vlanSynced: vlanInformer.Informer().HasSynced, @@ -399,22 +406,46 @@ func NewController(config *Configuration) *Controller { qosPoliciesLister: qosPolicyInformer.Lister(), qosPolicySynced: qosPolicyInformer.Informer().HasSynced, - addQoSPolicyQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "addQoSPolicy"), - updateQoSPolicyQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "updateQoSPolicy"), - delQoSPolicyQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "delQoSPolicy"), + addQoSPolicyQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "AddQoSPolicy"), + updateQoSPolicyQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "UpdateQoSPolicy"), + delQoSPolicyQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "DeleteQoSPolicy"), configMapsLister: configMapInformer.Lister(), configMapsSynced: configMapInformer.Informer().HasSynced, - recorder: recorder, - + sgKeyMutex: keymutex.New(97), sgsLister: sgInformer.Lister(), sgSynced: sgInformer.Informer().HasSynced, addOrUpdateSgQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateSg"), delSgQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteSg"), syncSgPortsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "SyncSgPorts"), - sgKeyMutex: keymutex.New(97), + ovnEipsLister: ovnEipInformer.Lister(), + ovnEipSynced: ovnEipInformer.Informer().HasSynced, + addOvnEipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "AddOvnEip"), + updateOvnEipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "UpdateOvnEip"), + resetOvnEipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "ResetOvnEip"), + delOvnEipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "DelOvnEip"), + + ovnFipsLister: ovnFipInformer.Lister(), + ovnFipSynced: ovnFipInformer.Informer().HasSynced, + addOvnFipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "AddOvnFip"), + updateOvnFipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "UpdateOvnFip"), + delOvnFipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "DeleteOvnFip"), + + ovnSnatRulesLister: ovnSnatRuleInformer.Lister(), + ovnSnatRuleSynced: ovnSnatRuleInformer.Informer().HasSynced, + addOvnSnatRuleQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "AddOvnSnatRule"), + updateOvnSnatRuleQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "UpdateOvnSnatRule"), + delOvnSnatRuleQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "DelOvnSnatRule"), + + ovnDnatRulesLister: ovnDnatRuleInformer.Lister(), + ovnDnatRuleSynced: ovnDnatRuleInformer.Informer().HasSynced, + addOvnDnatRuleQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "AddOvnDnatRule"), + updateOvnDnatRuleQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "UpdateOvnDnatRule"), + delOvnDnatRuleQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "DeleteOvnDnatRule"), + + recorder: recorder, informerFactory: informerFactory, cmInformerFactory: cmInformerFactory, kubeovnInformerFactory: kubeovnInformerFactory, @@ -425,6 +456,56 @@ func NewController(config *Configuration) *Controller { util.LogFatalAndExit(err, "failed to create ovn client") } + if config.EnableLb { + controller.switchLBRuleLister = switchLBRuleInformer.Lister() + controller.switchLBRuleSynced = switchLBRuleInformer.Informer().HasSynced + controller.addSwitchLBRuleQueue = workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "addSwitchLBRule") + controller.delSwitchLBRuleQueue = workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "delSwitchLBRule") + controller.UpdateSwitchLBRuleQueue = workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "updateSwitchLBRule") + + controller.vpcDnsLister = vpcDnsInformer.Lister() + controller.vpcDnsSynced = vpcDnsInformer.Informer().HasSynced + controller.addOrUpdateVpcDnsQueue = workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "AddOrUpdateVpcDns") + controller.delVpcDnsQueue = workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "DeleteVpcDns") + } + + if config.EnableNP { + controller.npsLister = npInformer.Lister() + controller.npsSynced = npInformer.Informer().HasSynced + controller.updateNpQueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateNp") + controller.deleteNpQueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteNp") + controller.npKeyMutex = keymutex.New(97) + } + + defer controller.shutdown() + klog.Info("Starting OVN controller") + + // Wait for the caches to be synced before starting workers + controller.informerFactory.Start(ctx.Done()) + controller.cmInformerFactory.Start(ctx.Done()) + controller.kubeovnInformerFactory.Start(ctx.Done()) + + klog.Info("Waiting for informer caches to sync") + cacheSyncs := []cache.InformerSynced{ + controller.vpcNatGatewaySynced, controller.vpcSynced, controller.subnetSynced, + controller.ipSynced, controller.virtualIpsSynced, controller.iptablesEipSynced, + controller.iptablesFipSynced, controller.iptablesDnatRuleSynced, controller.iptablesSnatRuleSynced, + controller.podAnnotatedIptablesEipSynced, controller.podAnnotatedIptablesFipSynced, + controller.vlanSynced, controller.podsSynced, controller.namespacesSynced, controller.nodesSynced, + controller.serviceSynced, controller.endpointsSynced, controller.configMapsSynced, + controller.ovnEipSynced, controller.ovnFipSynced, controller.ovnSnatRuleSynced, + controller.ovnDnatRuleSynced, + } + if controller.config.EnableLb { + cacheSyncs = append(cacheSyncs, controller.switchLBRuleSynced, controller.vpcDnsSynced) + } + if controller.config.EnableNP { + cacheSyncs = append(cacheSyncs, controller.npsSynced) + } + if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) { + util.LogFatalAndExit(nil, "failed to wait for caches to sync") + } + if _, err = podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.enqueueAddPod, DeleteFunc: controller.enqueueDeletePod, @@ -480,36 +561,6 @@ func NewController(config *Configuration) *Controller { util.LogFatalAndExit(err, "failed to add vpc nat gateway event handler") } - if config.EnableLb { - switchLBRuleInformer := kubeovnInformerFactory.Kubeovn().V1().SwitchLBRules() - controller.switchLBRuleLister = switchLBRuleInformer.Lister() - controller.switchLBRuleSynced = switchLBRuleInformer.Informer().HasSynced - controller.addSwitchLBRuleQueue = workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "addSwitchLBRule") - controller.delSwitchLBRuleQueue = workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "delSwitchLBRule") - controller.UpdateSwitchLBRuleQueue = workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "updateSwitchLBRule") - - if _, err = switchLBRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: controller.enqueueAddSwitchLBRule, - UpdateFunc: controller.enqueueUpdateSwitchLBRule, - DeleteFunc: controller.enqueueDeleteSwitchLBRule, - }); err != nil { - util.LogFatalAndExit(err, "failed to add switch lb rule event handler") - } - - vpcDnsInformer := kubeovnInformerFactory.Kubeovn().V1().VpcDnses() - controller.vpcDnsLister = vpcDnsInformer.Lister() - controller.vpcDnsSynced = vpcDnsInformer.Informer().HasSynced - controller.addOrUpdateVpcDnsQueue = workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "AddOrUpdateVpcDns") - controller.delVpcDnsQueue = workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "DeleteVpcDns") - if _, err = vpcDnsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: controller.enqueueAddVpcDns, - UpdateFunc: controller.enqueueUpdateVpcDns, - DeleteFunc: controller.enqueueDeleteVpcDns, - }); err != nil { - util.LogFatalAndExit(err, "failed to add vpc dns event handler") - } - } - if _, err = subnetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.enqueueAddSubnet, UpdateFunc: controller.enqueueUpdateSubnet, @@ -534,21 +585,6 @@ func NewController(config *Configuration) *Controller { util.LogFatalAndExit(err, "failed to add vlan event handler") } - if config.EnableNP { - npInformer := informerFactory.Networking().V1().NetworkPolicies() - controller.npsLister = npInformer.Lister() - controller.npsSynced = npInformer.Informer().HasSynced - controller.updateNpQueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateNp") - controller.deleteNpQueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteNp") - controller.npKeyMutex = keymutex.New(97) - if _, err = npInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: controller.enqueueAddNp, - UpdateFunc: controller.enqueueUpdateNp, - DeleteFunc: controller.enqueueDeleteNp, - }); err != nil { - util.LogFatalAndExit(err, "failed to add network policy event handler") - } - } if _, err = sgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.enqueueAddSg, DeleteFunc: controller.enqueueDeleteSg, @@ -596,13 +632,6 @@ func NewController(config *Configuration) *Controller { }); err != nil { util.LogFatalAndExit(err, "failed to add iptables snat rule event handler") } - ovnEipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnEips() - controller.ovnEipsLister = ovnEipInformer.Lister() - controller.ovnEipSynced = ovnEipInformer.Informer().HasSynced - controller.addOvnEipQueue = workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "addOvnEip") - controller.updateOvnEipQueue = workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "updateOvnEip") - controller.resetOvnEipQueue = workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "resetOvnEip") - controller.delOvnEipQueue = workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "delOvnEip") if _, err = ovnEipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.enqueueAddOvnEip, @@ -612,12 +641,6 @@ func NewController(config *Configuration) *Controller { util.LogFatalAndExit(err, "failed to add eip event handler") } - ovnFipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnFips() - controller.ovnFipsLister = ovnFipInformer.Lister() - controller.ovnFipSynced = ovnFipInformer.Informer().HasSynced - controller.addOvnFipQueue = workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "addOvnFip") - controller.updateOvnFipQueue = workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "updateOvnFip") - controller.delOvnFipQueue = workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "delOvnFip") if _, err = ovnFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.enqueueAddOvnFip, UpdateFunc: controller.enqueueUpdateOvnFip, @@ -626,12 +649,6 @@ func NewController(config *Configuration) *Controller { util.LogFatalAndExit(err, "failed to add ovn fip event handler") } - ovnSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnSnatRules() - controller.ovnSnatRulesLister = ovnSnatRuleInformer.Lister() - controller.ovnSnatRuleSynced = ovnSnatRuleInformer.Informer().HasSynced - controller.addOvnSnatRuleQueue = workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "addOvnSnatRule") - controller.updateOvnSnatRuleQueue = workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "updateOvnSnatRule") - controller.delOvnSnatRuleQueue = workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "delOvnSnatRule") if _, err = ovnSnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.enqueueAddOvnSnatRule, UpdateFunc: controller.enqueueUpdateOvnSnatRule, @@ -640,12 +657,6 @@ func NewController(config *Configuration) *Controller { util.LogFatalAndExit(err, "failed to add ovn snat rule event handler") } - ovnDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnDnatRules() - controller.ovnDnatRulesLister = ovnDnatRuleInformer.Lister() - controller.ovnDnatRuleSynced = ovnDnatRuleInformer.Informer().HasSynced - controller.addOvnDnatRuleQueue = workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "addOvnDnatRule") - controller.updateOvnDnatRuleQueue = workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "updateOvnDnatRule") - controller.delOvnDnatRuleQueue = workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "delOvnDnatRule") if _, err = ovnDnatRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.enqueueAddOvnDnatRule, UpdateFunc: controller.enqueueUpdateOvnDnatRule, @@ -677,7 +688,35 @@ func NewController(config *Configuration) *Controller { util.LogFatalAndExit(err, "failed to add qos policy event handler") } - return controller + if config.EnableLb { + if _, err = switchLBRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: controller.enqueueAddSwitchLBRule, + UpdateFunc: controller.enqueueUpdateSwitchLBRule, + DeleteFunc: controller.enqueueDeleteSwitchLBRule, + }); err != nil { + util.LogFatalAndExit(err, "failed to add switch lb rule event handler") + } + + if _, err = vpcDnsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: controller.enqueueAddVpcDns, + UpdateFunc: controller.enqueueUpdateVpcDns, + DeleteFunc: controller.enqueueDeleteVpcDns, + }); err != nil { + util.LogFatalAndExit(err, "failed to add vpc dns event handler") + } + } + + if config.EnableNP { + if _, err = npInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: controller.enqueueAddNp, + UpdateFunc: controller.enqueueUpdateNp, + DeleteFunc: controller.enqueueDeleteNp, + }); err != nil { + util.LogFatalAndExit(err, "failed to add network policy event handler") + } + } + + controller.Run(ctx) } // Run will set up the event handlers for types we are interested in, as well @@ -685,38 +724,6 @@ func NewController(config *Configuration) *Controller { // is closed, at which point it will shutdown the workqueue and wait for // workers to finish processing their current work items. func (c *Controller) Run(ctx context.Context) { - defer c.shutdown() - klog.Info("Starting OVN controller") - - // Wait for the caches to be synced before starting workers - c.informerFactory.Start(ctx.Done()) - c.cmInformerFactory.Start(ctx.Done()) - c.kubeovnInformerFactory.Start(ctx.Done()) - - klog.Info("Waiting for informer caches to sync") - cacheSyncs := []cache.InformerSynced{ - c.vpcNatGatewaySynced, c.vpcSynced, c.subnetSynced, - c.ipSynced, c.virtualIpsSynced, c.iptablesEipSynced, - c.iptablesFipSynced, c.iptablesDnatRuleSynced, c.iptablesSnatRuleSynced, - c.podAnnotatedIptablesEipSynced, c.podAnnotatedIptablesFipSynced, - c.vlanSynced, c.podsSynced, c.namespacesSynced, c.nodesSynced, - c.serviceSynced, c.endpointsSynced, c.configMapsSynced, - c.ovnEipSynced, c.ovnFipSynced, c.ovnSnatRuleSynced, - c.ovnDnatRuleSynced, - } - - if c.config.EnableNP { - cacheSyncs = append(cacheSyncs, c.npsSynced) - } - - if c.config.EnableLb { - cacheSyncs = append(cacheSyncs, c.switchLBRuleSynced, c.vpcDnsSynced) - } - - if ok := cache.WaitForCacheSync(ctx.Done(), cacheSyncs...); !ok { - util.LogFatalAndExit(nil, "failed to wait for caches to sync") - } - if err := c.ovnClient.SetLsDnatModDlDst(c.config.LsDnatModDlDst); err != nil { util.LogFatalAndExit(err, "failed to set NB_Global option ls_dnat_mod_dl_dst") } diff --git a/pkg/daemon/controller.go b/pkg/daemon/controller.go index 459a804fdf4..d9314f89d4a 100644 --- a/pkg/daemon/controller.go +++ b/pkg/daemon/controller.go @@ -66,7 +66,7 @@ type Controller struct { } // NewController init a daemon controller -func NewController(config *Configuration, podInformerFactory informers.SharedInformerFactory, nodeInformerFactory informers.SharedInformerFactory, kubeovnInformerFactory kubeovninformer.SharedInformerFactory) (*Controller, error) { +func NewController(config *Configuration, stopCh <-chan struct{}, podInformerFactory informers.SharedInformerFactory, nodeInformerFactory informers.SharedInformerFactory, kubeovnInformerFactory kubeovninformer.SharedInformerFactory) (*Controller, error) { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(klog.Infof) eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeClient.CoreV1().Events("")}) @@ -114,6 +114,16 @@ func NewController(config *Configuration, podInformerFactory informers.SharedInf return nil, err } + podInformerFactory.Start(stopCh) + nodeInformerFactory.Start(stopCh) + kubeovnInformerFactory.Start(stopCh) + + if !cache.WaitForCacheSync(stopCh, + controller.providerNetworksSynced, controller.subnetsSynced, + controller.podsSynced, controller.nodesSynced) { + util.LogFatalAndExit(nil, "failed to wait for caches to sync") + } + if _, err = providerNetworkInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.enqueueAddProviderNetwork, UpdateFunc: controller.enqueueUpdateProviderNetwork, @@ -590,10 +600,6 @@ func (c *Controller) Run(stopCh <-chan struct{}) { go wait.Until(rotateLog, 1*time.Hour, stopCh) go wait.Until(c.operateMod, 10*time.Second, stopCh) - if ok := cache.WaitForCacheSync(stopCh, c.providerNetworksSynced, c.subnetsSynced, c.podsSynced, c.nodesSynced); !ok { - util.LogFatalAndExit(nil, "failed to wait for caches to sync") - } - if err := c.setIPSet(); err != nil { util.LogFatalAndExit(err, "failed to set ipsets") }