From 8d853656496dd94d54f5fc681e2c8b18c5c468ef Mon Sep 17 00:00:00 2001 From: MengxinLiu Date: Thu, 16 Apr 2020 13:37:34 +0800 Subject: [PATCH] refactor: refactor cni-server --- cmd/daemon/cniserver.go | 7 ++ dist/images/cleanup.sh | 6 ++ dist/images/install.sh | 2 + pkg/daemon/config.go | 10 ++ pkg/daemon/handler.go | 208 +++++++--------------------------------- pkg/daemon/init.go | 66 ++++++++++++- pkg/daemon/ovs.go | 12 ++- 7 files changed, 135 insertions(+), 176 deletions(-) diff --git a/cmd/daemon/cniserver.go b/cmd/daemon/cniserver.go index 3be643fa5d1..cd184f3e8e3 100644 --- a/cmd/daemon/cniserver.go +++ b/cmd/daemon/cniserver.go @@ -2,6 +2,7 @@ package main import ( "fmt" + "github.com/alauda/kube-ovn/pkg/util" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "net/http" _ "net/http/pprof" @@ -32,6 +33,12 @@ func main() { klog.Fatalf("init node gateway failed %v", err) } + if config.NetworkType == util.NetworkTypeVlan { + if err = daemon.InitVlan(config); err != nil { + klog.Fatalf("init vlan config failed %v", err) + } + } + stopCh := signals.SetupSignalHandler() kubeInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeClient, 0, kubeinformers.WithTweakListOptions(func(listOption *v1.ListOptions) { diff --git a/dist/images/cleanup.sh b/dist/images/cleanup.sh index 51edad8855b..e36891dd034 100644 --- a/dist/images/cleanup.sh +++ b/dist/images/cleanup.sh @@ -51,4 +51,10 @@ for ns in $(kubectl get ns -o name |cut -c 11-); do kubectl annotate pod --all ovn.kubernetes.io/mac_address- -n "$ns" kubectl annotate pod --all ovn.kubernetes.io/port_name- -n "$ns" kubectl annotate pod --all ovn.kubernetes.io/allocated- -n "$ns" + kubectl annotate pod --all ovn.kubernetes.io/routed- -n "$ns" + kubectl annotate pod --all ovn.kubernetes.io/vlan_id- -n "$ns" + kubectl annotate pod --all ovn.kubernetes.io/vlan_range- -n "$ns" + kubectl annotate pod --all ovn.kubernetes.io/network_types- -n "$ns" + kubectl annotate pod --all ovn.kubernetes.io/provider_interface_name- -n "$ns" + kubectl annotate pod --all ovn.kubernetes.io/host_interface_name- -n "$ns" done diff --git a/dist/images/install.sh b/dist/images/install.sh index 43412baff8d..ed99975bc54 100644 --- a/dist/images/install.sh +++ b/dist/images/install.sh @@ -675,6 +675,8 @@ spec: - --encap-checksum=true - --service-cluster-ip-range=$SVC_CIDR - --iface=${IFACE} + - --network-type=$NETWORK_TYPE + - --default-interface-name=$VLAN_INTERFACE_NAME securityContext: capabilities: add: ["NET_ADMIN", "SYS_ADMIN", "SYS_PTRACE"] diff --git a/pkg/daemon/config.go b/pkg/daemon/config.go index b454306063c..1c87277d324 100644 --- a/pkg/daemon/config.go +++ b/pkg/daemon/config.go @@ -38,6 +38,9 @@ type Configuration struct { NodeLocalDNSIP string EncapChecksum bool PprofPort int + NetworkType string + DefaultProviderName string + DefaultInterfaceName string } // ParseFlags will parse cmd args then init kubeClient and configuration @@ -55,6 +58,10 @@ func ParseFlags() (*Configuration, error) { argNodeLocalDnsIP = pflag.String("node-local-dns-ip", "", "If use nodelocaldns the local dns server ip should be set here, default empty.") argEncapChecksum = pflag.Bool("encap-checksum", true, "Enable checksum, default: true") argPprofPort = pflag.Int("pprof-port", 10665, "The port to get profiling data, default: 10665") + + argsNetworkType = pflag.String("network-type", "geneve", "The ovn network type, default: geneve") + argsDefaultProviderName = pflag.String("default-provider-name", "provider", "The vlan or xvlan type default provider interface name, default: provider") + argsDefaultInterfaceName = pflag.String("default-interface-name", "", "The default host interface name in the vlan/xvlan type") ) // mute info log for ipset lib @@ -95,6 +102,9 @@ func ParseFlags() (*Configuration, error) { ServiceClusterIPRange: *argServiceClusterIPRange, NodeLocalDNSIP: *argNodeLocalDnsIP, EncapChecksum: *argEncapChecksum, + NetworkType: *argsNetworkType, + DefaultProviderName: *argsDefaultProviderName, + DefaultInterfaceName: *argsDefaultInterfaceName, } if err := config.initNicConfig(); err != nil { diff --git a/pkg/daemon/handler.go b/pkg/daemon/handler.go index b129788d6ec..12e63ac72ca 100644 --- a/pkg/daemon/handler.go +++ b/pkg/daemon/handler.go @@ -2,24 +2,21 @@ package daemon import ( "fmt" - "github.com/alauda/kube-ovn/pkg/ovs" - "github.com/vishvananda/netlink" - "net" "net/http" "strings" "time" - kubeovnv1 "github.com/alauda/kube-ovn/pkg/apis/kubeovn/v1" - clientset "github.com/alauda/kube-ovn/pkg/client/clientset/versioned" - "github.com/alauda/kube-ovn/pkg/request" - "github.com/alauda/kube-ovn/pkg/util" - "github.com/emicklei/go-restful" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/klog" + + kubeovnv1 "github.com/alauda/kube-ovn/pkg/apis/kubeovn/v1" + clientset "github.com/alauda/kube-ovn/pkg/client/clientset/versioned" + "github.com/alauda/kube-ovn/pkg/request" + "github.com/alauda/kube-ovn/pkg/util" ) type cniServerHandler struct { @@ -35,8 +32,7 @@ func createCniServerHandler(config *Configuration) *cniServerHandler { func (csh cniServerHandler) handleAdd(req *restful.Request, resp *restful.Response) { podRequest := request.CniRequest{} - err := req.ReadEntity(&podRequest) - if err != nil { + if err := req.ReadEntity(&podRequest); err != nil { errMsg := fmt.Errorf("parse add request failed %v", err) klog.Error(errMsg) resp.WriteHeaderAndEntity(http.StatusBadRequest, request.CniResponse{Err: errMsg.Error()}) @@ -44,17 +40,15 @@ func (csh cniServerHandler) handleAdd(req *restful.Request, resp *restful.Respon } klog.Infof("add port request %v", podRequest) - - var macAddr, ip, ipAddr, cidr, gw, subnet, ingress, egress, networkType, vlanID, vlanRange, providerInterfaceName, hostInterfaceName string - pod, err := csh.KubeClient.CoreV1().Pods(podRequest.PodNamespace).Get(podRequest.PodName, v1.GetOptions{}) - if err != nil { - errMsg := fmt.Errorf("get pod %s/%s failed %v", podRequest.PodNamespace, podRequest.PodName, err) - klog.Error(errMsg) - resp.WriteHeaderAndEntity(http.StatusInternalServerError, request.CniResponse{Err: errMsg.Error()}) - return - } - + var macAddr, ip, ipAddr, cidr, gw, subnet, ingress, egress, vlanID string for i := 0; i < 10; i++ { + pod, err := csh.KubeClient.CoreV1().Pods(podRequest.PodNamespace).Get(podRequest.PodName, v1.GetOptions{}) + if err != nil { + errMsg := fmt.Errorf("get pod %s/%s failed %v", podRequest.PodNamespace, podRequest.PodName, err) + klog.Error(errMsg) + resp.WriteHeaderAndEntity(http.StatusInternalServerError, request.CniResponse{Err: errMsg.Error()}) + return + } if pod.Annotations[fmt.Sprintf(util.AllocatedAnnotationTemplate, podRequest.Provider)] != "true" { klog.Infof("wait address for pod %s/%s ", podRequest.PodNamespace, podRequest.PodName) // wait controller assign an address @@ -76,21 +70,30 @@ func (csh cniServerHandler) handleAdd(req *restful.Request, resp *restful.Respon ingress = pod.Annotations[util.IngressRateAnnotation] egress = pod.Annotations[util.EgressRateAnnotation] vlanID = pod.Annotations[util.VlanIdAnnotation] - networkType = pod.Annotations[util.NetworkType] - providerInterfaceName = pod.Annotations[util.ProviderInterfaceName] - hostInterfaceName = pod.Annotations[util.HostInterfaceName] - vlanRange = pod.Annotations[util.VlanRangeAnnotation] - + ipAddr = fmt.Sprintf("%s/%s", ip, strings.Split(cidr, "/")[1]) break } - if macAddr == "" || ip == "" || cidr == "" || gw == "" { - errMsg := fmt.Errorf("no available ip for pod %s/%s", podRequest.PodNamespace, podRequest.PodName) - klog.Error(errMsg) - resp.WriteHeaderAndEntity(http.StatusInternalServerError, request.CniResponse{Err: errMsg.Error()}) + if err := csh.createOrUpdateIPCr(podRequest, subnet, ip, macAddr); err != nil { + resp.WriteHeaderAndEntity(http.StatusInternalServerError, request.CniResponse{Err: err.Error()}) return } + if podRequest.Provider == util.OvnProvider { + klog.Infof("create container mac %s, ip %s, cidr %s, gw %s", macAddr, ipAddr, cidr, gw) + err := csh.configureNic(podRequest.PodName, podRequest.PodNamespace, podRequest.NetNs, podRequest.ContainerID, macAddr, ipAddr, gw, ingress, egress, vlanID) + if err != nil { + errMsg := fmt.Errorf("configure nic failed %v", err) + klog.Error(errMsg) + resp.WriteHeaderAndEntity(http.StatusInternalServerError, request.CniResponse{Err: errMsg.Error()}) + return + } + } + + resp.WriteHeaderAndEntity(http.StatusOK, request.CniResponse{Protocol: util.CheckProtocol(ipAddr), IpAddress: strings.Split(ipAddr, "/")[0], MacAddress: macAddr, CIDR: cidr, Gateway: gw}) +} + +func (csh cniServerHandler) createOrUpdateIPCr(podRequest request.CniRequest, subnet, ip, macAddr string) error { ipCr, err := csh.KubeOvnClient.KubeovnV1().IPs().Get(fmt.Sprintf("%s.%s", podRequest.PodName, podRequest.PodNamespace), metav1.GetOptions{}) if err != nil { if k8serrors.IsNotFound(err) { @@ -115,97 +118,25 @@ func (csh cniServerHandler) handleAdd(req *restful.Request, resp *restful.Respon if err != nil { errMsg := fmt.Errorf("failed to create ip crd for %s, %v", ip, err) klog.Error(errMsg) - resp.WriteHeaderAndEntity(http.StatusInternalServerError, request.CniResponse{Err: errMsg.Error()}) - return + return errMsg } } else { errMsg := fmt.Errorf("failed to get ip crd for %s, %v", ip, err) klog.Error(errMsg) - resp.WriteHeaderAndEntity(http.StatusInternalServerError, request.CniResponse{Err: errMsg.Error()}) - return + return errMsg } } else { ipCr.Labels[subnet] = "" ipCr.Spec.AttachSubnets = append(ipCr.Spec.AttachSubnets, subnet) ipCr.Spec.AttachIPs = append(ipCr.Spec.AttachIPs, ip) ipCr.Spec.AttachMacs = append(ipCr.Spec.AttachMacs, macAddr) - _, err := csh.KubeOvnClient.KubeovnV1().IPs().Update(ipCr) - if err != nil { + if _, err := csh.KubeOvnClient.KubeovnV1().IPs().Update(ipCr); err != nil { errMsg := fmt.Errorf("failed to update ip crd for %s, %v", ip, err) klog.Error(errMsg) - resp.WriteHeaderAndEntity(http.StatusInternalServerError, request.CniResponse{Err: errMsg.Error()}) - return - } - } - - ipAddr = fmt.Sprintf("%s/%s", ip, strings.Split(cidr, "/")[1]) - if podRequest.Provider == util.OvnProvider { - klog.Infof("create container mac %s, ip %s, cidr %s, gw %s", macAddr, ipAddr, cidr, gw) - err = csh.configureNic(podRequest.PodName, podRequest.PodNamespace, podRequest.NetNs, podRequest.ContainerID, macAddr, ipAddr, gw, ingress, egress) - if err != nil { - errMsg := fmt.Errorf("configure nic failed %v", err) - klog.Error(errMsg) - resp.WriteHeaderAndEntity(http.StatusInternalServerError, request.CniResponse{Err: errMsg.Error()}) - return - } - } - - if util.IsProviderVlan(networkType, providerInterfaceName) { - //create patch port - exists, err := providerBridgeExists() - if err != nil { - errMsg := fmt.Errorf("check provider bridge exists failed, %v", err) - klog.Error(errMsg) - resp.WriteHeaderAndEntity(http.StatusInternalServerError, request.CniResponse{Err: errMsg.Error()}) - return - } - - if !exists { - //create br-provider - if err = configProviderPort(providerInterfaceName); err != nil { - errMsg := fmt.Errorf("configure patch port br-provider failed %v", err) - klog.Error(errMsg) - resp.WriteHeaderAndEntity(http.StatusInternalServerError, request.CniResponse{Err: errMsg.Error()}) - return - } - - //add a host nic to br-provider - ifName := csh.getInterfaceName(hostInterfaceName) - if ifName == "" { - errMsg := fmt.Errorf("failed get host nic to add ovs br-provider") - klog.Error(errMsg) - resp.WriteHeaderAndEntity(http.StatusInternalServerError, request.CniResponse{Err: errMsg.Error()}) - return - } - - if err = configProviderNic(ifName); err != nil { - errMsg := fmt.Errorf("add nic %s to port br-provider failed %v", ifName, err) - klog.Error(errMsg) - resp.WriteHeaderAndEntity(http.StatusInternalServerError, request.CniResponse{Err: errMsg.Error()}) - return - } - } - - if err = csh.addRoute(ipAddr); err != nil { - errMsg := fmt.Errorf("add pod route failed, %v", err) - klog.Error(errMsg) - resp.WriteHeaderAndEntity(http.StatusInternalServerError, request.CniResponse{Err: errMsg.Error()}) - return - } - } - - //set ovs port tag - if util.IsNetworkVlan(networkType, vlanID, vlanRange) { - hostNicName, _ := generateNicName(podRequest.ContainerID) - if err := ovs.SetPortTag(hostNicName, vlanID); err != nil { - errMsg := fmt.Errorf("configure port tag failed %v", err) - klog.Error(errMsg) - resp.WriteHeaderAndEntity(http.StatusInternalServerError, request.CniResponse{Err: errMsg.Error()}) - return + return errMsg } } - - resp.WriteHeaderAndEntity(http.StatusOK, request.CniResponse{Protocol: util.CheckProtocol(ipAddr), IpAddress: strings.Split(ipAddr, "/")[0], MacAddress: macAddr, CIDR: cidr, Gateway: gw}) + return nil } func (csh cniServerHandler) handleDel(req *restful.Request, resp *restful.Response) { @@ -240,68 +171,3 @@ func (csh cniServerHandler) handleDel(req *restful.Request, resp *restful.Respon resp.WriteHeader(http.StatusNoContent) } - -//get host nic name -func (csh cniServerHandler) getInterfaceName(hostInterfaceName string) string { - var interfaceName string - - node, err := csh.Config.KubeClient.CoreV1().Nodes().Get(csh.Config.NodeName, metav1.GetOptions{}) - if err == nil { - labels := node.GetLabels() - interfaceName = labels[util.HostInterfaceName] - } - - if interfaceName != "" { - return interfaceName - } - - if hostInterfaceName != "" { - return hostInterfaceName - } - - if csh.Config.Iface != "" { - return csh.Config.Iface - } - - return "" -} - -//add a static route. If it is not added, the pod will not receive packets from the host nic -func (csh cniServerHandler) addRoute(ipAddr string) error { - nic, err := netlink.LinkByName(util.NodeNic) - if err != nil { - klog.Errorf("failed to get nic %s", util.NodeNic) - return fmt.Errorf("failed to get nic %s", util.NodeNic) - } - - existRoutes, err := netlink.RouteList(nic, netlink.FAMILY_V4) - if err != nil { - return err - } - - _, cidr, _ := net.ParseCIDR(ipAddr) - for _, route := range existRoutes { - if route.Dst == cidr { - return nil - } - } - - node, err := csh.Config.KubeClient.CoreV1().Nodes().Get(csh.Config.NodeName, metav1.GetOptions{}) - if err != nil { - klog.Errorf("failed to get node %s %v", csh.Config.NodeName, err) - return err - } - - gateway, ok := node.Annotations[util.GatewayAnnotation] - if !ok { - klog.Errorf("annotation for node %s ovn.kubernetes.io/gateway not exists", node.Name) - return fmt.Errorf("annotation for node ovn.kubernetes.io/gateway not exists") - } - - gw := net.ParseIP(gateway) - if err = netlink.RouteReplace(&netlink.Route{Dst: cidr, LinkIndex: nic.Attrs().Index, Scope: netlink.SCOPE_UNIVERSE, Gw: gw}); err != nil { - klog.Errorf("failed to add route %v", err) - } - - return err -} diff --git a/pkg/daemon/init.go b/pkg/daemon/init.go index 2fb55ae2eb6..0fe04aef76a 100644 --- a/pkg/daemon/init.go +++ b/pkg/daemon/init.go @@ -3,7 +3,7 @@ package daemon import ( "fmt" "github.com/alauda/kube-ovn/pkg/util" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog" "net" "strings" @@ -15,7 +15,7 @@ func InitNodeGateway(config *Configuration) error { var portName, ip, cidr, macAddr, gw, ipAddr string for { nodeName := config.NodeName - node, err := config.KubeClient.CoreV1().Nodes().Get(nodeName, v1.GetOptions{}) + node, err := config.KubeClient.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{}) if err != nil { klog.Errorf("failed to get node %s info %v", nodeName, err) return err @@ -49,3 +49,65 @@ func InitNodeGateway(config *Configuration) error { func InitMirror(config *Configuration) error { return configureMirror(config.MirrorNic, config.MTU) } + +func InitVlan(config *Configuration) error { + + if util.IsProviderVlan(config.NetworkType, config.DefaultProviderName) { + //create patch port + exists, err := providerBridgeExists() + if err != nil { + errMsg := fmt.Errorf("check provider bridge exists failed, %v", err) + klog.Error(errMsg) + return err + } + + if !exists { + //create br-provider + if err = configProviderPort(config.DefaultProviderName); err != nil { + errMsg := fmt.Errorf("configure patch port br-provider failed %v", err) + klog.Error(errMsg) + return errMsg + } + + //add a host nic to br-provider + ifName := config.getInterfaceName() + if ifName == "" { + errMsg := fmt.Errorf("failed get host nic to add ovs br-provider") + klog.Error(errMsg) + return errMsg + } + + if err = configProviderNic(ifName); err != nil { + errMsg := fmt.Errorf("add nic %s to port br-provider failed %v", ifName, err) + klog.Error(errMsg) + return errMsg + } + } + } + return nil +} + +//get host nic name +func (config *Configuration) getInterfaceName() string { + var interfaceName string + + node, err := config.KubeClient.CoreV1().Nodes().Get(config.NodeName, metav1.GetOptions{}) + if err == nil { + labels := node.GetLabels() + interfaceName = labels[util.HostInterfaceName] + } + + if interfaceName != "" { + return interfaceName + } + + if config.DefaultInterfaceName != "" { + return config.DefaultInterfaceName + } + + if config.Iface != "" { + return config.Iface + } + + return "" +} diff --git a/pkg/daemon/ovs.go b/pkg/daemon/ovs.go index 11e4a39c2f3..143dc6bb87a 100644 --- a/pkg/daemon/ovs.go +++ b/pkg/daemon/ovs.go @@ -16,7 +16,7 @@ import ( "time" ) -func (csh cniServerHandler) configureNic(podName, podNamespace, netns, containerID, mac, ip, gateway, ingress, egress string) error { +func (csh cniServerHandler) configureNic(podName, podNamespace, netns, containerID, mac, ip, gateway, ingress, egress, vlanID string) error { var err error hostNicName, containerNicName := generateNicName(containerID) // Create a veth pair, put one end to container ,the other to ovs port @@ -49,7 +49,7 @@ func (csh cniServerHandler) configureNic(podName, podNamespace, netns, container if err != nil { return fmt.Errorf("failed to parse mac %s %v", macAddr, err) } - if err = configureHostNic(hostNicName, macAddr); err != nil { + if err = configureHostNic(hostNicName, vlanID, macAddr); err != nil { return err } if err = ovs.SetPodBandwidth(podName, podNamespace, ingress, egress); err != nil { @@ -97,7 +97,7 @@ func generateNicName(containerID string) (string, string) { return fmt.Sprintf("%s_h", containerID[0:12]), fmt.Sprintf("%s_c", containerID[0:12]) } -func configureHostNic(nicName string, macAddr net.HardwareAddr) error { +func configureHostNic(nicName, vlanID string, macAddr net.HardwareAddr) error { hostLink, err := netlink.LinkByName(nicName) if err != nil { return fmt.Errorf("can not find host nic %s %v", nicName, err) @@ -115,6 +115,12 @@ func configureHostNic(nicName string, macAddr net.HardwareAddr) error { return fmt.Errorf("can not set host nic %s qlen %v", nicName, err) } + if vlanID != "" { + if err := ovs.SetPortTag(nicName, vlanID); err != nil { + return fmt.Errorf("failed to add vlan tag, %v", err) + } + } + return nil }