From 870d20b0a0c95c8386aaadac2992132ca128ceca Mon Sep 17 00:00:00 2001 From: Mengxin Liu Date: Tue, 4 Feb 2020 22:26:22 +0800 Subject: [PATCH] refactor: refactor controller.go --- pkg/controller/controller.go | 339 ++++++----------------------------- pkg/controller/election.go | 25 +++ pkg/controller/gc.go | 213 ++++++++++++++++++++++ pkg/controller/init.go | 84 +++++---- pkg/controller/service.go | 17 +- 5 files changed, 343 insertions(+), 335 deletions(-) create mode 100644 pkg/controller/gc.go diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 71dc76fd3d5..a6c6818b3e2 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -1,8 +1,6 @@ package controller import ( - "fmt" - "strings" "time" kubeovnv1 "github.com/alauda/kube-ovn/pkg/apis/kubeovn/v1" @@ -12,7 +10,6 @@ import ( "github.com/alauda/kube-ovn/pkg/util" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" @@ -38,11 +35,6 @@ type Controller struct { podsLister v1.PodLister podsSynced cache.InformerSynced - // workqueue is a rate limited work queue. This is used to queue work to be - // processed instead of performing it as soon as a change happens. This - // means we can ensure we only process a fixed amount of resources at a - // time, and makes it easy to ensure we are never processing the same item - // simultaneously in two different workers. addPodQueue workqueue.RateLimitingInterface addIpPoolPodQueue workqueue.RateLimitingInterface deletePodQueue workqueue.RateLimitingInterface @@ -83,21 +75,15 @@ type Controller struct { npsSynced cache.InformerSynced updateNpQueue workqueue.RateLimitingInterface deleteNpQueue workqueue.RateLimitingInterface - // recorder is an event recorder for recording Event resources to the - // Kubernetes API. - recorder record.EventRecorder + recorder record.EventRecorder informerFactory informers.SharedInformerFactory kubeovnInformerFactory kubeovninformer.SharedInformerFactory - - elector *leaderelection.LeaderElector + elector *leaderelection.LeaderElector } // NewController returns a new ovn controller func NewController(config *Configuration) *Controller { - // Create event broadcaster - // Add ovn-controller types to the default Kubernetes Scheme so Events can be - // logged for ovn-controller types. utilruntime.Must(kubeovnv1.AddToScheme(scheme.Scheme)) klog.V(4).Info("Creating event broadcaster") eventBroadcaster := record.NewBroadcaster() @@ -228,67 +214,15 @@ func NewController(config *Configuration) *Controller { // as syncing informer caches and starting workers. It will block until stopCh // is closed, at which point it will shutdown the workqueue and wait for // workers to finish processing their current work items. -func (c *Controller) Run(stopCh <-chan struct{}) error { - defer utilruntime.HandleCrash() - - defer c.addPodQueue.ShutDown() - defer c.addIpPoolPodQueue.ShutDown() - defer c.deletePodQueue.ShutDown() - defer c.updatePodQueue.ShutDown() - - defer c.addNamespaceQueue.ShutDown() - - defer c.addSubnetQueue.ShutDown() - defer c.updateSubnetQueue.ShutDown() - defer c.deleteSubnetQueue.ShutDown() - defer c.deleteRouteQueue.ShutDown() - defer c.updateSubnetStatusQueue.ShutDown() - - defer c.addNodeQueue.ShutDown() - defer c.updateNodeQueue.ShutDown() - defer c.deleteNodeQueue.ShutDown() - - defer c.deleteTcpServiceQueue.ShutDown() - defer c.deleteUdpServiceQueue.ShutDown() - defer c.updateServiceQueue.ShutDown() - defer c.updateEndpointQueue.ShutDown() - - defer c.updateNpQueue.ShutDown() - defer c.deleteNpQueue.ShutDown() - - // Start the informer factories to begin populating the informer caches +func (c *Controller) Run(stopCh <-chan struct{}) { + defer c.shutdown() klog.Info("Starting OVN controller") - // leader election - elector := setupLeaderElection(&leaderElectionConfig{ - Client: c.config.KubeClient, - ElectionID: "ovn-config", - PodName: c.config.PodName, - PodNamespace: c.config.PodNamespace, - }) - c.elector = elector - for { - klog.Info("waiting for becoming a leader") - if c.isLeader() { - break - } - time.Sleep(5 * time.Second) - } - - if err := InitClusterRouter(c.config); err != nil { - klog.Fatalf("init cluster router failed %v", err) - } - - if err := InitLoadBalancer(c.config); err != nil { - klog.Fatalf("init load balancer failed %v", err) - } + // wait for becoming a leader + c.leaderElection() - if err := InitNodeSwitch(c.config); err != nil { - klog.Fatalf("init node switch failed %v", err) - } - - if err := InitDefaultLogicalSwitch(c.config); err != nil { - klog.Fatalf("init default switch failed %v", err) + if err := c.InitOVN(); err != nil { + klog.Fatalf("failed to init ovn resource %v", err) } c.informerFactory.Start(stopCh) @@ -297,20 +231,54 @@ func (c *Controller) Run(stopCh <-chan struct{}) error { // Wait for the caches to be synced before starting workers klog.Info("Waiting for informer caches to sync") if ok := cache.WaitForCacheSync(stopCh, c.subnetSynced, c.ipSynced, c.podsSynced, c.namespacesSynced, c.nodesSynced, c.serviceSynced, c.endpointsSynced, c.npsSynced); !ok { - return fmt.Errorf("failed to wait for caches to sync") + klog.Fatalf("failed to wait for caches to sync") } - c.gcLogicalSwitch() - c.gcNode() - c.gcLogicalSwitchPort() - c.gcLoadBalancer() - c.gcPortGroup() + // remove resources in ovndb that not exist any more in kubernetes resources + if err := c.gc(); err != nil { + klog.Fatalf("gc failed %v", err) + } + + // start workers to do all the network operations + c.startWorkers(stopCh) + <-stopCh + klog.Info("Shutting down workers") +} + +func (c *Controller) shutdown() { + utilruntime.HandleCrash() + + c.addPodQueue.ShutDown() + c.addIpPoolPodQueue.ShutDown() + c.deletePodQueue.ShutDown() + c.updatePodQueue.ShutDown() + + c.addNamespaceQueue.ShutDown() + + c.addSubnetQueue.ShutDown() + c.updateSubnetQueue.ShutDown() + c.deleteSubnetQueue.ShutDown() + c.deleteRouteQueue.ShutDown() + c.updateSubnetStatusQueue.ShutDown() + c.addNodeQueue.ShutDown() + c.updateNodeQueue.ShutDown() + c.deleteNodeQueue.ShutDown() + + c.deleteTcpServiceQueue.ShutDown() + c.deleteUdpServiceQueue.ShutDown() + c.updateServiceQueue.ShutDown() + c.updateEndpointQueue.ShutDown() + + c.updateNpQueue.ShutDown() + c.deleteNpQueue.ShutDown() +} + +func (c *Controller) startWorkers(stopCh <-chan struct{}) { klog.Info("Starting workers") - // Launch workers to process resources + // add default/join subnet and wait them ready go wait.Until(c.runAddSubnetWorker, time.Second, stopCh) - // wait default/join subnet ready for { klog.Infof("wait for %s and %s ready", c.config.DefaultLogicalSwitch, c.config.NodeSwitch) time.Sleep(3 * time.Second) @@ -353,211 +321,4 @@ func (c *Controller) Run(stopCh <-chan struct{}) error { go wait.Until(c.runUpdateNpWorker, time.Second, stopCh) go wait.Until(c.runDeleteNpWorker, time.Second, stopCh) } - - klog.Info("Started workers") - <-stopCh - klog.Info("Shutting down workers") - - return nil -} - -func (c *Controller) isLeader() bool { - return c.elector.IsLeader() -} - -func (c *Controller) hasLeader() bool { - return c.elector.GetLeader() != "" -} - -func (c *Controller) gcLogicalSwitch() error { - klog.Infof("start to gc logical switch") - subnets, err := c.subnetsLister.List(labels.Everything()) - if err != nil { - klog.Errorf("failed to list subnet, %v", err) - return err - } - subnetNames := make([]string, 0, len(subnets)) - for _, s := range subnets { - subnetNames = append(subnetNames, s.Name) - } - lss, err := c.ovnClient.ListLogicalSwitch() - if err != nil { - klog.Errorf("failed to list logical switch, %v", err) - return err - } - klog.Infof("ls in ovn %v", lss) - klog.Infof("subnet in kubernetes %v", subnetNames) - for _, ls := range lss { - if !util.IsStringIn(ls, subnetNames) { - klog.Infof("gc subnet %s", ls) - if err := c.handleDeleteSubnet(ls); err != nil { - klog.Errorf("failed to gc subnet %s, %v", ls, err) - return err - } - } - } - return nil -} - -func (c *Controller) gcNode() error { - klog.Infof("start to gc nodes") - nodes, err := c.nodesLister.List(labels.Everything()) - if err != nil { - klog.Errorf("failed to list node, %v", err) - return err - } - nodeNames := make([]string, 0, len(nodes)) - for _, no := range nodes { - nodeNames = append(nodeNames, no.Name) - } - ips, err := c.ipsLister.List(labels.Everything()) - if err != nil { - klog.Errorf("failed to list ip, %v", err) - return err - } - ipNodeNames := make([]string, 0, len(ips)) - for _, ip := range ips { - if !strings.Contains(ip.Name, ".") { - ipNodeNames = append(ipNodeNames, strings.TrimPrefix(ip.Name, "node-")) - } - } - for _, no := range ipNodeNames { - if !util.IsStringIn(no, nodeNames) { - klog.Infof("gc node %s", no) - if err := c.handleDeleteNode(no); err != nil { - klog.Errorf("failed to gc node %s, %v", no, err) - return err - } - } - } - return nil -} - -func (c *Controller) gcLogicalSwitchPort() error { - klog.Infof("start to gc logical switch ports") - pods, err := c.podsLister.List(labels.Everything()) - if err != nil { - klog.Errorf("failed to list ip, %v", err) - return err - } - nodes, err := c.nodesLister.List(labels.Everything()) - if err != nil { - klog.Errorf("failed to list node, %v", err) - return err - } - ipNames := make([]string, 0, len(pods)+len(nodes)) - for _, pod := range pods { - ipNames = append(ipNames, fmt.Sprintf("%s.%s", pod.Name, pod.Namespace)) - } - for _, node := range nodes { - ipNames = append(ipNames, fmt.Sprintf("node-%s", node.Name)) - } - lsps, err := c.ovnClient.ListLogicalSwitchPort() - if err != nil { - klog.Errorf("failed to list logical switch port, %v", err) - return err - } - for _, lsp := range lsps { - if !util.IsStringIn(lsp, ipNames) { - if strings.Contains(lsp, ".") { - klog.Infof("gc logical switch port %s", lsp) - podName := strings.Split(lsp, ".")[0] - podNameSpace := strings.Split(lsp, ".")[1] - if err := c.handleDeletePod(fmt.Sprintf("%s/%s", podNameSpace, podName)); err != nil { - klog.Errorf("failed to gc port %s, %v", lsp, err) - return err - } - } - } - } - return nil -} - -func (c *Controller) gcLoadBalancer() error { - klog.Infof("start to gc loadbalancers") - svcs, err := c.servicesLister.List(labels.Everything()) - if err != nil { - klog.Errorf("failed to list svc, %v", err) - return err - } - tcpVips := []string{} - udpVips := []string{} - for _, svc := range svcs { - ip := svc.Spec.ClusterIP - for _, port := range svc.Spec.Ports { - if port.Protocol == corev1.ProtocolTCP { - tcpVips = append(tcpVips, fmt.Sprintf("%s:%d", ip, port.Port)) - } else { - udpVips = append(udpVips, fmt.Sprintf("%s:%d", ip, port.Port)) - } - } - } - - lbUuid, err := c.ovnClient.FindLoadbalancer(c.config.ClusterTcpLoadBalancer) - if err != nil { - klog.Errorf("failed to get lb %v", err) - } - vips, err := c.ovnClient.GetLoadBalancerVips(lbUuid) - if err != nil { - klog.Errorf("failed to get udp lb vips %v", err) - return err - } - for vip := range vips { - if !util.IsStringIn(vip, tcpVips) { - err := c.ovnClient.DeleteLoadBalancerVip(vip, c.config.ClusterTcpLoadBalancer) - if err != nil { - klog.Errorf("failed to delete vip %s from tcp lb, %v", vip, err) - return err - } - } - } - - lbUuid, err = c.ovnClient.FindLoadbalancer(c.config.ClusterUdpLoadBalancer) - if err != nil { - klog.Errorf("failed to get lb %v", err) - return err - } - vips, err = c.ovnClient.GetLoadBalancerVips(lbUuid) - if err != nil { - klog.Errorf("failed to get udp lb vips %v", err) - return err - } - for vip := range vips { - if !util.IsStringIn(vip, udpVips) { - err := c.ovnClient.DeleteLoadBalancerVip(vip, c.config.ClusterUdpLoadBalancer) - if err != nil { - klog.Errorf("failed to delete vip %s from tcp lb, %v", vip, err) - return err - } - } - } - return nil -} - -func (c *Controller) gcPortGroup() error { - klog.Infof("start to gc network policy") - nps, err := c.npsLister.List(labels.Everything()) - npNames := make([]string, 0, len(nps)) - for _, np := range nps { - npNames = append(npNames, fmt.Sprintf("%s/%s", np.Namespace, np.Name)) - } - if err != nil { - klog.Errorf("failed to list network policy, %v", err) - return err - } - pgs, err := c.ovnClient.ListPortGroup() - if err != nil { - klog.Errorf("failed to list port-group, %v", err) - return err - } - for _, pg := range pgs { - if !util.IsStringIn(fmt.Sprintf("%s/%s", pg.NpNamespace, pg.NpName), npNames) { - klog.Infof("gc port group %s", pg.Name) - if err := c.handleDeleteNp(fmt.Sprintf("%s/%s", pg.NpNamespace, pg.NpName)); err != nil { - klog.Errorf("failed to gc np %s/%s, %v", pg.NpNamespace, pg.NpName, err) - return err - } - } - } - return nil } diff --git a/pkg/controller/election.go b/pkg/controller/election.go index 605a977bb28..e8c259f3444 100644 --- a/pkg/controller/election.go +++ b/pkg/controller/election.go @@ -31,6 +31,31 @@ type leaderElectionConfig struct { OnNewLeader func(identity string) } +func (c *Controller) isLeader() bool { + return c.elector.IsLeader() +} + +func (c *Controller) hasLeader() bool { + return c.elector.GetLeader() != "" +} + +func (c *Controller) leaderElection() { + elector := setupLeaderElection(&leaderElectionConfig{ + Client: c.config.KubeClient, + ElectionID: "ovn-config", + PodName: c.config.PodName, + PodNamespace: c.config.PodNamespace, + }) + c.elector = elector + for { + if c.isLeader() { + return + } + klog.Info("waiting for becoming a leader") + time.Sleep(5 * time.Second) + } +} + func setupLeaderElection(config *leaderElectionConfig) *leaderelection.LeaderElector { var elector *leaderelection.LeaderElector diff --git a/pkg/controller/gc.go b/pkg/controller/gc.go new file mode 100644 index 00000000000..5deb91be552 --- /dev/null +++ b/pkg/controller/gc.go @@ -0,0 +1,213 @@ +package controller + +import ( + "fmt" + "github.com/alauda/kube-ovn/pkg/util" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/klog" + "strings" +) + +func (c *Controller) gc() error { + gcFunctions := []func() error{c.gcLogicalSwitch, c.gcNode, c.gcLogicalSwitch, c.gcLoadBalancer, c.gcPortGroup} + for _, gcFunc := range gcFunctions { + if err := gcFunc(); err != nil { + return err + } + } + return nil +} + +func (c *Controller) gcLogicalSwitch() error { + klog.Infof("start to gc logical switch") + subnets, err := c.subnetsLister.List(labels.Everything()) + if err != nil { + klog.Errorf("failed to list subnet, %v", err) + return err + } + subnetNames := make([]string, 0, len(subnets)) + for _, s := range subnets { + subnetNames = append(subnetNames, s.Name) + } + lss, err := c.ovnClient.ListLogicalSwitch() + if err != nil { + klog.Errorf("failed to list logical switch, %v", err) + return err + } + klog.Infof("ls in ovn %v", lss) + klog.Infof("subnet in kubernetes %v", subnetNames) + for _, ls := range lss { + if !util.IsStringIn(ls, subnetNames) { + klog.Infof("gc subnet %s", ls) + if err := c.handleDeleteSubnet(ls); err != nil { + klog.Errorf("failed to gc subnet %s, %v", ls, err) + return err + } + } + } + return nil +} + +func (c *Controller) gcNode() error { + klog.Infof("start to gc nodes") + nodes, err := c.nodesLister.List(labels.Everything()) + if err != nil { + klog.Errorf("failed to list node, %v", err) + return err + } + nodeNames := make([]string, 0, len(nodes)) + for _, no := range nodes { + nodeNames = append(nodeNames, no.Name) + } + ips, err := c.ipsLister.List(labels.Everything()) + if err != nil { + klog.Errorf("failed to list ip, %v", err) + return err + } + ipNodeNames := make([]string, 0, len(ips)) + for _, ip := range ips { + if !strings.Contains(ip.Name, ".") { + ipNodeNames = append(ipNodeNames, strings.TrimPrefix(ip.Name, "node-")) + } + } + for _, no := range ipNodeNames { + if !util.IsStringIn(no, nodeNames) { + klog.Infof("gc node %s", no) + if err := c.handleDeleteNode(no); err != nil { + klog.Errorf("failed to gc node %s, %v", no, err) + return err + } + } + } + return nil +} + +func (c *Controller) gcLogicalSwitchPort() error { + klog.Infof("start to gc logical switch ports") + pods, err := c.podsLister.List(labels.Everything()) + if err != nil { + klog.Errorf("failed to list ip, %v", err) + return err + } + nodes, err := c.nodesLister.List(labels.Everything()) + if err != nil { + klog.Errorf("failed to list node, %v", err) + return err + } + ipNames := make([]string, 0, len(pods)+len(nodes)) + for _, pod := range pods { + ipNames = append(ipNames, fmt.Sprintf("%s.%s", pod.Name, pod.Namespace)) + } + for _, node := range nodes { + ipNames = append(ipNames, fmt.Sprintf("node-%s", node.Name)) + } + lsps, err := c.ovnClient.ListLogicalSwitchPort() + if err != nil { + klog.Errorf("failed to list logical switch port, %v", err) + return err + } + for _, lsp := range lsps { + if !util.IsStringIn(lsp, ipNames) { + if strings.Contains(lsp, ".") { + klog.Infof("gc logical switch port %s", lsp) + podName := strings.Split(lsp, ".")[0] + podNameSpace := strings.Split(lsp, ".")[1] + if err := c.handleDeletePod(fmt.Sprintf("%s/%s", podNameSpace, podName)); err != nil { + klog.Errorf("failed to gc port %s, %v", lsp, err) + return err + } + } + } + } + return nil +} + +func (c *Controller) gcLoadBalancer() error { + klog.Infof("start to gc loadbalancers") + svcs, err := c.servicesLister.List(labels.Everything()) + if err != nil { + klog.Errorf("failed to list svc, %v", err) + return err + } + tcpVips := []string{} + udpVips := []string{} + for _, svc := range svcs { + ip := svc.Spec.ClusterIP + for _, port := range svc.Spec.Ports { + if port.Protocol == corev1.ProtocolTCP { + tcpVips = append(tcpVips, fmt.Sprintf("%s:%d", ip, port.Port)) + } else { + udpVips = append(udpVips, fmt.Sprintf("%s:%d", ip, port.Port)) + } + } + } + + lbUuid, err := c.ovnClient.FindLoadbalancer(c.config.ClusterTcpLoadBalancer) + if err != nil { + klog.Errorf("failed to get lb %v", err) + } + vips, err := c.ovnClient.GetLoadBalancerVips(lbUuid) + if err != nil { + klog.Errorf("failed to get udp lb vips %v", err) + return err + } + for vip := range vips { + if !util.IsStringIn(vip, tcpVips) { + err := c.ovnClient.DeleteLoadBalancerVip(vip, c.config.ClusterTcpLoadBalancer) + if err != nil { + klog.Errorf("failed to delete vip %s from tcp lb, %v", vip, err) + return err + } + } + } + + lbUuid, err = c.ovnClient.FindLoadbalancer(c.config.ClusterUdpLoadBalancer) + if err != nil { + klog.Errorf("failed to get lb %v", err) + return err + } + vips, err = c.ovnClient.GetLoadBalancerVips(lbUuid) + if err != nil { + klog.Errorf("failed to get udp lb vips %v", err) + return err + } + for vip := range vips { + if !util.IsStringIn(vip, udpVips) { + err := c.ovnClient.DeleteLoadBalancerVip(vip, c.config.ClusterUdpLoadBalancer) + if err != nil { + klog.Errorf("failed to delete vip %s from tcp lb, %v", vip, err) + return err + } + } + } + return nil +} + +func (c *Controller) gcPortGroup() error { + klog.Infof("start to gc network policy") + nps, err := c.npsLister.List(labels.Everything()) + npNames := make([]string, 0, len(nps)) + for _, np := range nps { + npNames = append(npNames, fmt.Sprintf("%s/%s", np.Namespace, np.Name)) + } + if err != nil { + klog.Errorf("failed to list network policy, %v", err) + return err + } + pgs, err := c.ovnClient.ListPortGroup() + if err != nil { + klog.Errorf("failed to list port-group, %v", err) + return err + } + for _, pg := range pgs { + if !util.IsStringIn(fmt.Sprintf("%s/%s", pg.NpNamespace, pg.NpName), npNames) { + klog.Infof("gc port group %s", pg.Name) + if err := c.handleDeleteNp(fmt.Sprintf("%s/%s", pg.NpNamespace, pg.NpName)); err != nil { + klog.Errorf("failed to gc np %s/%s, %v", pg.NpNamespace, pg.NpName, err) + return err + } + } + } + return nil +} diff --git a/pkg/controller/init.go b/pkg/controller/init.go index ec72c84ffcc..dda52133388 100644 --- a/pkg/controller/init.go +++ b/pkg/controller/init.go @@ -6,92 +6,112 @@ import ( "strings" kubeovnv1 "github.com/alauda/kube-ovn/pkg/apis/kubeovn/v1" - "github.com/alauda/kube-ovn/pkg/ovs" "github.com/alauda/kube-ovn/pkg/util" "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog" ) +func (c *Controller) InitOVN() error { + if err := c.initClusterRouter(); err != nil { + klog.Errorf("init cluster router failed %v", err) + return err + } + + if err := c.initLoadBalancer(); err != nil { + klog.Errorf("init load balancer failed %v", err) + return err + } + + if err := c.initNodeSwitch(); err != nil { + klog.Errorf("init node switch failed %v", err) + return err + } + + if err := c.initDefaultLogicalSwitch(); err != nil { + klog.Errorf("init default switch failed %v", err) + return err + } + return nil +} + // InitDefaultLogicalSwitch int the default logical switch for ovn network -func InitDefaultLogicalSwitch(config *Configuration) error { - _, err := config.KubeOvnClient.KubeovnV1().Subnets().Get(config.DefaultLogicalSwitch, v1.GetOptions{}) +func (c *Controller) initDefaultLogicalSwitch() error { + _, err := c.config.KubeOvnClient.KubeovnV1().Subnets().Get(c.config.DefaultLogicalSwitch, v1.GetOptions{}) if err == nil { return nil } if !errors.IsNotFound(err) { - klog.Errorf("get default subnet %s failed %v", config.DefaultLogicalSwitch, err) + klog.Errorf("get default subnet %s failed %v", c.config.DefaultLogicalSwitch, err) return err } defaultSubnet := kubeovnv1.Subnet{ - ObjectMeta: v1.ObjectMeta{Name: config.DefaultLogicalSwitch}, + ObjectMeta: v1.ObjectMeta{Name: c.config.DefaultLogicalSwitch}, Spec: kubeovnv1.SubnetSpec{ Default: true, - CIDRBlock: config.DefaultCIDR, - Gateway: config.DefaultGateway, - ExcludeIps: strings.Split(config.DefaultExcludeIps, ","), + CIDRBlock: c.config.DefaultCIDR, + Gateway: c.config.DefaultGateway, + ExcludeIps: strings.Split(c.config.DefaultExcludeIps, ","), NatOutgoing: true, GatewayType: kubeovnv1.GWDistributedType, - Protocol: util.CheckProtocol(config.DefaultCIDR), + Protocol: util.CheckProtocol(c.config.DefaultCIDR), }, } - _, err = config.KubeOvnClient.KubeovnV1().Subnets().Create(&defaultSubnet) + _, err = c.config.KubeOvnClient.KubeovnV1().Subnets().Create(&defaultSubnet) return err } // InitNodeSwitch init node switch to connect host and pod -func InitNodeSwitch(config *Configuration) error { - _, err := config.KubeOvnClient.KubeovnV1().Subnets().Get(config.NodeSwitch, v1.GetOptions{}) +func (c *Controller) initNodeSwitch() error { + _, err := c.config.KubeOvnClient.KubeovnV1().Subnets().Get(c.config.NodeSwitch, v1.GetOptions{}) if err == nil { return nil } if !errors.IsNotFound(err) { - klog.Errorf("get node subnet %s failed %v", config.NodeSwitch, err) + klog.Errorf("get node subnet %s failed %v", c.config.NodeSwitch, err) return err } nodeSubnet := kubeovnv1.Subnet{ - ObjectMeta: v1.ObjectMeta{Name: config.NodeSwitch}, + ObjectMeta: v1.ObjectMeta{Name: c.config.NodeSwitch}, Spec: kubeovnv1.SubnetSpec{ Default: false, - CIDRBlock: config.NodeSwitchCIDR, - Gateway: config.NodeSwitchGateway, - ExcludeIps: []string{config.NodeSwitchGateway}, - Protocol: util.CheckProtocol(config.NodeSwitchCIDR), + CIDRBlock: c.config.NodeSwitchCIDR, + Gateway: c.config.NodeSwitchGateway, + ExcludeIps: []string{c.config.NodeSwitchGateway}, + Protocol: util.CheckProtocol(c.config.NodeSwitchCIDR), }, } - _, err = config.KubeOvnClient.KubeovnV1().Subnets().Create(&nodeSubnet) + _, err = c.config.KubeOvnClient.KubeovnV1().Subnets().Create(&nodeSubnet) return err } // InitClusterRouter init cluster router to connect different logical switches -func InitClusterRouter(config *Configuration) error { - client := ovs.NewClient(config.OvnNbHost, config.OvnNbPort, config.OvnNbTimeout, "", 0, config.ClusterRouter, config.ClusterTcpLoadBalancer, config.ClusterUdpLoadBalancer, config.NodeSwitch, config.NodeSwitchCIDR) - lrs, err := client.ListLogicalRouter() +func (c *Controller) initClusterRouter() error { + lrs, err := c.ovnClient.ListLogicalRouter() if err != nil { return err } klog.Infof("exists routers %v", lrs) for _, r := range lrs { - if config.ClusterRouter == r { + if c.config.ClusterRouter == r { return nil } } - return client.CreateLogicalRouter(config.ClusterRouter) + return c.ovnClient.CreateLogicalRouter(c.config.ClusterRouter) } // InitLoadBalancer init the default tcp and udp cluster loadbalancer -func InitLoadBalancer(config *Configuration) error { - client := ovs.NewClient(config.OvnNbHost, config.OvnNbPort, config.OvnNbTimeout, "", 0, config.ClusterRouter, config.ClusterTcpLoadBalancer, config.ClusterUdpLoadBalancer, config.NodeSwitch, config.NodeSwitchCIDR) - tcpLb, err := client.FindLoadbalancer(config.ClusterTcpLoadBalancer) +func (c *Controller) initLoadBalancer() error { + tcpLb, err := c.ovnClient.FindLoadbalancer(c.config.ClusterTcpLoadBalancer) if err != nil { return fmt.Errorf("failed to find tcp lb %v", err) } if tcpLb == "" { - klog.Infof("init cluster tcp load balancer %s", config.ClusterTcpLoadBalancer) - err := client.CreateLoadBalancer(config.ClusterTcpLoadBalancer, util.ProtocolTCP) + klog.Infof("init cluster tcp load balancer %s", c.config.ClusterTcpLoadBalancer) + err := c.ovnClient.CreateLoadBalancer(c.config.ClusterTcpLoadBalancer, util.ProtocolTCP) if err != nil { klog.Errorf("failed to crate cluster tcp load balancer %v", err) return err @@ -100,13 +120,13 @@ func InitLoadBalancer(config *Configuration) error { klog.Infof("tcp load balancer %s exists", tcpLb) } - udpLb, err := client.FindLoadbalancer(config.ClusterUdpLoadBalancer) + udpLb, err := c.ovnClient.FindLoadbalancer(c.config.ClusterUdpLoadBalancer) if err != nil { return fmt.Errorf("failed to find udp lb %v", err) } if udpLb == "" { - klog.Infof("init cluster udp load balancer %s", config.ClusterUdpLoadBalancer) - err := client.CreateLoadBalancer(config.ClusterUdpLoadBalancer, util.ProtocolUDP) + klog.Infof("init cluster udp load balancer %s", c.config.ClusterUdpLoadBalancer) + err := c.ovnClient.CreateLoadBalancer(c.config.ClusterUdpLoadBalancer, util.ProtocolUDP) if err != nil { klog.Errorf("failed to crate cluster udp load balancer %v", err) return err diff --git a/pkg/controller/service.go b/pkg/controller/service.go index 5dcf297bcc7..c170b0ce470 100644 --- a/pkg/controller/service.go +++ b/pkg/controller/service.go @@ -2,6 +2,7 @@ package controller import ( "fmt" + "github.com/alauda/kube-ovn/pkg/util" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" @@ -237,7 +238,7 @@ func (c *Controller) handleUpdateService(key string) error { } for vip := range vips { - if strings.HasPrefix(vip, ip) && !containsString(tcpVips, vip) { + if strings.HasPrefix(vip, ip) && !util.IsStringIn(vip, tcpVips) { klog.Infof("remove stall vip %s", vip) err := c.ovnClient.DeleteLoadBalancerVip(vip, c.config.ClusterTcpLoadBalancer) if err != nil { @@ -267,7 +268,7 @@ func (c *Controller) handleUpdateService(key string) error { } for vip := range vips { - if strings.HasPrefix(vip, ip) && !containsString(udpVips, vip) { + if strings.HasPrefix(vip, ip) && !util.IsStringIn(vip, udpVips) { klog.Infof("remove stall vip %s", vip) err := c.ovnClient.DeleteLoadBalancerVip(vip, c.config.ClusterUdpLoadBalancer) if err != nil { @@ -279,15 +280,3 @@ func (c *Controller) handleUpdateService(key string) error { return nil } - -// -// Helper functions to check string from a slice of strings. -// -func containsString(slice []string, s string) bool { - for _, item := range slice { - if item == s { - return true - } - } - return false -}