From 4187a329bde0884ef6586006fe5919c20a6288c2 Mon Sep 17 00:00:00 2001 From: zhangzujian Date: Thu, 16 Dec 2021 17:48:52 +0800 Subject: [PATCH] When netpol is added to a workload, the workload's POD can be accessed using service Co-authored-by: wang_yudong --- pkg/controller/controller.go | 1 + pkg/controller/network_policy.go | 234 ++++++++++++++++++++++++++++--- pkg/controller/service.go | 36 +++++ pkg/ovs/ovn-nbctl.go | 4 +- 4 files changed, 252 insertions(+), 23 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 1631020ccfe..74438516387 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -272,6 +272,7 @@ func NewController(config *Configuration) *Controller { }) serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: controller.enqueueAddService, DeleteFunc: controller.enqueueDeleteService, UpdateFunc: controller.enqueueUpdateService, }) diff --git a/pkg/controller/network_policy.go b/pkg/controller/network_policy.go index 8cb882064ce..b7efab0f83c 100644 --- a/pkg/controller/network_policy.go +++ b/pkg/controller/network_policy.go @@ -207,9 +207,58 @@ func (c *Controller) handleUpdateNp(key string) error { return err } + // set svc address_set + svcAsNameIPv4 := strings.Replace(fmt.Sprintf("%s.%s.service.%s", np.Name, np.Namespace, kubeovnv1.ProtocolIPv4), "-", ".", -1) + svcAsNameIPv6 := strings.Replace(fmt.Sprintf("%s.%s.service.%s", np.Name, np.Namespace, kubeovnv1.ProtocolIPv6), "-", ".", -1) + svcIpv4s, svcIpv6s, err := c.fetchSelectedSvc(np.Namespace, &np.Spec.PodSelector) + if err != nil { + klog.Errorf("failed to fetchSelectedSvc svcIPs result %v", err) + return err + } + for _, cidrBlock := range strings.Split(subnet.Spec.CIDRBlock, ",") { + protocol := util.CheckProtocol(cidrBlock) + svcAsName := svcAsNameIPv4 + svcIPs := svcIpv4s + if protocol == kubeovnv1.ProtocolIPv6 { + svcAsName = svcAsNameIPv6 + svcIPs = svcIpv6s + } + if err := c.ovnClient.CreateAddressSet(svcAsName, np.Namespace, np.Name, "service"); err != nil { + klog.Errorf("failed to create address_set %s, %v", svcAsNameIPv4, err) + return err + } + if err := c.ovnClient.SetAddressesToAddressSet(svcIPs, svcAsName); err != nil { + klog.Errorf("failed to set netpol svc, %v", err) + return err + } + } + + // before update or add ingress info,we should first delete acl and address_set + if err := c.ovnClient.DeleteACL(pgName, "to-lport"); err != nil { + klog.Errorf("failed to delete np %s ingress acls, %v", key, err) + return err + } + + ingressAsNames, err := c.ovnClient.ListAddressSet(np.Namespace, np.Name, "ingress") + if err != nil { + klog.Errorf("failed to list ingress address_set, %v", err) + return err + } + for _, ingressAsName := range ingressAsNames { + if err := c.ovnClient.DeleteAddressSet(ingressAsName); err != nil { + klog.Errorf("failed to delete np %s address set, %v", key, err) + return err + } + } + if hasIngressRule(np) { for _, cidrBlock := range strings.Split(subnet.Spec.CIDRBlock, ",") { protocol := util.CheckProtocol(cidrBlock) + svcAsName := svcAsNameIPv4 + if protocol == kubeovnv1.ProtocolIPv6 { + svcAsName = svcAsNameIPv6 + } + for idx, npr := range np.Spec.Ingress { // A single address set must contain addresses of the same type and the name must be unique within table, so IPv4 and IPv6 address set should be different ingressAllowAsName := fmt.Sprintf("%s.%s.%d", ingressAllowAsNamePrefix, protocol, idx) @@ -255,7 +304,7 @@ func (c *Controller) handleUpdateNp(key string) error { } if len(allows) != 0 || len(excepts) != 0 { - if err := c.ovnClient.CreateIngressACL(fmt.Sprintf("%s/%s", np.Namespace, np.Name), pgName, ingressAllowAsName, ingressExceptAsName, protocol, npr.Ports); err != nil { + if err := c.ovnClient.CreateIngressACL(fmt.Sprintf("%s/%s", np.Namespace, np.Name), pgName, ingressAllowAsName, ingressExceptAsName, svcAsName, protocol, npr.Ports); err != nil { klog.Errorf("failed to create ingress acls for np %s, %v", key, err) return err } @@ -274,7 +323,7 @@ func (c *Controller) handleUpdateNp(key string) error { return err } ingressPorts := []netv1.NetworkPolicyPort{} - if err := c.ovnClient.CreateIngressACL(fmt.Sprintf("%s/%s", np.Namespace, np.Name), pgName, ingressAllowAsName, ingressExceptAsName, protocol, ingressPorts); err != nil { + if err := c.ovnClient.CreateIngressACL(fmt.Sprintf("%s/%s", np.Namespace, np.Name), pgName, ingressAllowAsName, ingressExceptAsName, svcAsName, protocol, ingressPorts); err != nil { klog.Errorf("failed to create ingress acls for np %s, %v", key, err) return err } @@ -323,9 +372,31 @@ func (c *Controller) handleUpdateNp(key string) error { } } + // before update or add egress info, we should first delete acl and address_set + if err := c.ovnClient.DeleteACL(pgName, "from-lport"); err != nil { + klog.Errorf("failed to delete np %s egress acls, %v", key, err) + return err + } + + egressAsNames, err := c.ovnClient.ListAddressSet(np.Namespace, np.Name, "egress") + if err != nil { + klog.Errorf("failed to list egress address_set, %v", err) + return err + } + for _, egressAsName := range egressAsNames { + if err := c.ovnClient.DeleteAddressSet(egressAsName); err != nil { + klog.Errorf("failed to delete np %s address set, %v", key, err) + return err + } + } if hasEgressRule(np) { for _, cidrBlock := range strings.Split(subnet.Spec.CIDRBlock, ",") { protocol := util.CheckProtocol(cidrBlock) + svcAsName := svcAsNameIPv4 + if protocol == kubeovnv1.ProtocolIPv6 { + svcAsName = svcAsNameIPv6 + } + for idx, npr := range np.Spec.Egress { // A single address set must contain addresses of the same type and the name must be unique within table, so IPv4 and IPv6 address set should be different egressAllowAsName := fmt.Sprintf("%s.%s.%d", egressAllowAsNamePrefix, protocol, idx) @@ -371,7 +442,7 @@ func (c *Controller) handleUpdateNp(key string) error { } if len(allows) != 0 || len(excepts) != 0 { - if err := c.ovnClient.CreateEgressACL(fmt.Sprintf("%s/%s", np.Namespace, np.Name), pgName, egressAllowAsName, egressExceptAsName, protocol, npr.Ports); err != nil { + if err := c.ovnClient.CreateEgressACL(fmt.Sprintf("%s/%s", np.Namespace, np.Name), pgName, egressAllowAsName, egressExceptAsName, protocol, npr.Ports, svcAsName); err != nil { klog.Errorf("failed to create egress acls for np %s, %v", key, err) return err } @@ -390,7 +461,7 @@ func (c *Controller) handleUpdateNp(key string) error { return err } egressPorts := []netv1.NetworkPolicyPort{} - if err := c.ovnClient.CreateEgressACL(fmt.Sprintf("%s/%s", np.Namespace, np.Name), pgName, egressAllowAsName, egressExceptAsName, protocol, egressPorts); err != nil { + if err := c.ovnClient.CreateEgressACL(fmt.Sprintf("%s/%s", np.Namespace, np.Name), pgName, egressAllowAsName, egressExceptAsName, protocol, egressPorts, svcAsName); err != nil { klog.Errorf("failed to create egress acls for np %s, %v", key, err) return err } @@ -421,24 +492,8 @@ func (c *Controller) handleUpdateNp(key string) error { } } } - } else { - if err := c.ovnClient.DeleteACL(pgName, "from-lport"); err != nil { - klog.Errorf("failed to delete np %s egress acls, %v", key, err) - return err - } - - asNames, err := c.ovnClient.ListAddressSet(np.Namespace, np.Name, "egress") - if err != nil { - klog.Errorf("failed to list address_set, %v", err) - return err - } - for _, asName := range asNames { - if err := c.ovnClient.DeleteAddressSet(asName); err != nil { - klog.Errorf("failed to delete np %s address set, %v", key, err) - return err - } - } } + if err := c.ovnClient.CreateGatewayACL(pgName, subnet.Spec.Gateway, subnet.Spec.CIDRBlock); err != nil { klog.Errorf("failed to create gateway acl, %v", err) return err @@ -458,6 +513,18 @@ func (c *Controller) handleDeleteNp(key string) error { klog.Errorf("failed to delete np %s port group, %v", key, err) } + svcAsNames, err := c.ovnClient.ListAddressSet(namespace, name, "service") + if err != nil { + klog.Errorf("failed to list svc address_set, %v", err) + return err + } + for _, asName := range svcAsNames { + if err := c.ovnClient.DeleteAddressSet(asName); err != nil { + klog.Errorf("failed to delete np %s address set, %v", key, err) + return err + } + } + ingressAsNames, err := c.ovnClient.ListAddressSet(namespace, name, "ingress") if err != nil { klog.Errorf("failed to list address_set, %v", err) @@ -506,6 +573,45 @@ func (c *Controller) fetchSelectedPorts(namespace string, selector *metav1.Label return ports, nil } +func (c *Controller) fetchSelectedSvc(namespace string, selector *metav1.LabelSelector) ([]string, []string, error) { + sel, err := metav1.LabelSelectorAsSelector(selector) + if err != nil { + return nil, nil, fmt.Errorf("error creating label selector, %v", err) + } + pods, err := c.podsLister.Pods(namespace).List(sel) + if err != nil { + return nil, nil, fmt.Errorf("failed to list pods, %v", err) + } + + svcIpv4s := make([]string, 0) + svcIpv6s := make([]string, 0) + svcs, err := c.servicesLister.Services(namespace).List(labels.Everything()) + if err != nil { + klog.Errorf("failed to list svc, %v", err) + return nil, nil, err + } + + for _, pod := range pods { + if !isPodAlive(pod) { + continue + } + if !pod.Spec.HostNetwork && pod.Annotations[util.AllocatedAnnotation] == "true" { + svcIpv4, err := svcMatchPods(svcs, pod, kubeovnv1.ProtocolIPv4) + if err != nil { + return nil, nil, err + } + svcIpv4s = append(svcIpv4s, svcIpv4...) + + svcIpv6, err := svcMatchPods(svcs, pod, kubeovnv1.ProtocolIPv6) + if err != nil { + return nil, nil, err + } + svcIpv6s = append(svcIpv6s, svcIpv6...) + } + } + return svcIpv4s, svcIpv6s, nil +} + func hasIngressRule(np *netv1.NetworkPolicy) bool { for _, pt := range np.Spec.PolicyTypes { if strings.Contains(string(pt), string(netv1.PolicyTypeIngress)) { @@ -568,10 +674,26 @@ func (c *Controller) fetchPolicySelectedAddresses(namespace, protocol string, np if err != nil { return nil, nil, fmt.Errorf("failed to list pod, %v", err) } + svcs, err := c.servicesLister.Services(ns).List(labels.Everything()) + if err != nil { + klog.Errorf("failed to list svc, %v", err) + return nil, nil, fmt.Errorf("failed to list svc, %v", err) + } + for _, pod := range pods { for _, podIP := range pod.Status.PodIPs { if podIP.IP != "" && util.CheckProtocol(podIP.IP) == protocol { selectedAddresses = append(selectedAddresses, podIP.IP) + if len(svcs) == 0 { + continue + } + klog.Infof("svc is %v", svcs) + svcIPs, err := svcMatchPods(svcs, pod, protocol) + if err != nil { + return nil, nil, err + } + klog.Infof("svcIPs is %v", svcIPs) + selectedAddresses = append(selectedAddresses, svcIPs...) } } } @@ -579,6 +701,51 @@ func (c *Controller) fetchPolicySelectedAddresses(namespace, protocol string, np return selectedAddresses, exceptAddresses, nil } +func svcMatchPods(svcs []*corev1.Service, pod *corev1.Pod, protocol string) ([]string, error) { + matchSvcs := []string{} + // find svc ip by pod's info + for _, svc := range svcs { + isMatch, err := isSvcMatchPod(svc, pod) + if err != nil { + return nil, err + } + if isMatch { + clusterIPs := svc.Spec.ClusterIPs + if len(clusterIPs) == 0 && svc.Spec.ClusterIP != "" && svc.Spec.ClusterIP != corev1.ClusterIPNone { + clusterIPs = []string{svc.Spec.ClusterIP} + } + protocolClusterIPs := getProtocolSvcIp(clusterIPs, protocol) + if len(protocolClusterIPs) != 0 { + matchSvcs = append(matchSvcs, protocolClusterIPs...) + } + } + } + return matchSvcs, nil +} +func getProtocolSvcIp(clusterIPs []string, protocol string) []string { + protocolClusterIPs := []string{} + for _, clusterIP := range clusterIPs { + if clusterIP != "" && clusterIP != corev1.ClusterIPNone && util.CheckProtocol(clusterIP) == protocol { + protocolClusterIPs = append(protocolClusterIPs, clusterIP) + } + } + return protocolClusterIPs +} +func isSvcMatchPod(svc *corev1.Service, pod *corev1.Pod) (bool, error) { + ss := metav1.SetAsLabelSelector(svc.Spec.Selector) + sel, err := metav1.LabelSelectorAsSelector(ss) + if err != nil { + return false, fmt.Errorf("error fetch label selector, %v", err) + } + if pod.Labels == nil { + return false, nil + } + if sel.Matches(labels.Set(pod.Labels)) { + return true, nil + } + return false, nil +} + func (c *Controller) podMatchNetworkPolicies(pod *corev1.Pod) []string { podNs, _ := c.namespacesLister.Get(pod.Namespace) nps, _ := c.npsLister.NetworkPolicies(corev1.NamespaceAll).List(labels.Everything()) @@ -591,6 +758,31 @@ func (c *Controller) podMatchNetworkPolicies(pod *corev1.Pod) []string { return match } +func (c *Controller) svcMatchNetworkPolicies(svc *corev1.Service) ([]string, error) { + // find all match pod + pods, err := c.podsLister.Pods(svc.Namespace).List(labels.Everything()) + if err != nil { + return nil, fmt.Errorf("failed to list pods, %v", err) + } + + // find all match netpol + nps, err := c.npsLister.NetworkPolicies(corev1.NamespaceAll).List(labels.Everything()) + if err != nil { + return nil, fmt.Errorf("failed to list netpols, %v", err) + } + match := []string{} + for _, pod := range pods { + podNs, _ := c.namespacesLister.Get(pod.Namespace) + for _, np := range nps { + if isPodMatchNetworkPolicy(pod, *podNs, np, np.Namespace) { + match = append(match, fmt.Sprintf("%s/%s", np.Namespace, np.Name)) + } + } + } + klog.Infof("match svc is %v", match) + return match, nil +} + func isPodMatchNetworkPolicy(pod *corev1.Pod, podNs corev1.Namespace, policy *netv1.NetworkPolicy, policyNs string) bool { sel, _ := metav1.LabelSelectorAsSelector(&policy.Spec.PodSelector) if pod.Labels == nil { diff --git a/pkg/controller/service.go b/pkg/controller/service.go index 12d3ea05f0c..a129ac50462 100644 --- a/pkg/controller/service.go +++ b/pkg/controller/service.go @@ -21,6 +21,30 @@ type vpcService struct { Protocol v1.Protocol } +func (c *Controller) enqueueAddService(obj interface{}) { + if !c.isLeader() { + return + } + var key string + var err error + if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { + utilruntime.HandleError(err) + return + } + svc := obj.(*v1.Service) + klog.V(3).Infof("enqueue update service %s", key) + + var netpols []string + if netpols, err = c.svcMatchNetworkPolicies(svc); err != nil { + utilruntime.HandleError(err) + return + } + + for _, np := range netpols { + c.updateNpQueue.Add(np) + } +} + func (c *Controller) enqueueDeleteService(obj interface{}) { if !c.isLeader() { return @@ -29,6 +53,18 @@ func (c *Controller) enqueueDeleteService(obj interface{}) { //klog.V(3).Infof("enqueue delete service %s/%s", svc.Namespace, svc.Name) klog.Infof("enqueue delete service %s/%s", svc.Namespace, svc.Name) if svc.Spec.ClusterIP != v1.ClusterIPNone && svc.Spec.ClusterIP != "" { + + var netpols []string + var err error + if netpols, err = c.svcMatchNetworkPolicies(svc); err != nil { + utilruntime.HandleError(err) + return + } + + for _, np := range netpols { + c.updateNpQueue.Add(np) + } + for _, port := range svc.Spec.Ports { vpcSvc := &vpcService{ Vip: fmt.Sprintf("%s:%d", svc.Spec.ClusterIP, port.Port), diff --git a/pkg/ovs/ovn-nbctl.go b/pkg/ovs/ovn-nbctl.go index f1e7bbce2fb..1fe6e9a5ac0 100644 --- a/pkg/ovs/ovn-nbctl.go +++ b/pkg/ovs/ovn-nbctl.go @@ -1119,7 +1119,7 @@ func (c Client) DeleteAddressSet(asName string) error { return err } -func (c Client) CreateIngressACL(npName, pgName, asIngressName, asExceptName, protocol string, npp []netv1.NetworkPolicyPort) error { +func (c Client) CreateIngressACL(npName, pgName, asIngressName, asExceptName, svcAsName, protocol string, npp []netv1.NetworkPolicyPort) error { ipSuffix := "ip4" if protocol == kubeovnv1.ProtocolIPv6 { ipSuffix = "ip6" @@ -1140,7 +1140,7 @@ func (c Client) CreateIngressACL(npName, pgName, asIngressName, asExceptName, pr return err } -func (c Client) CreateEgressACL(npName, pgName, asEgressName, asExceptName, protocol string, npp []netv1.NetworkPolicyPort) error { +func (c Client) CreateEgressACL(npName, pgName, asEgressName, asExceptName, protocol string, npp []netv1.NetworkPolicyPort, portSvcName string) error { ipSuffix := "ip4" if protocol == kubeovnv1.ProtocolIPv6 { ipSuffix = "ip6"