Skip to content

Commit

Permalink
fix access from node to overlay pods when network policy ingress exis…
Browse files Browse the repository at this point in the history
…ts (#2279)
  • Loading branch information
zhangzujian authored Feb 2, 2023
1 parent 2b38340 commit 9985ee5
Show file tree
Hide file tree
Showing 5 changed files with 276 additions and 4 deletions.
15 changes: 13 additions & 2 deletions pkg/controller/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -968,7 +968,18 @@ func (c *Controller) checkAndUpdateNodePortGroup() error {
for _, node := range nodes {
// The port-group should already created when add node
pgName := strings.Replace(node.Annotations[util.PortNameAnnotation], "-", ".", -1)
nodeIP := node.Annotations[util.IpAddressAnnotation]

// use join IP only when no internal IP exists
nodeIPv4, nodeIPv6 := util.GetNodeInternalIP(*node)
joinIP := node.Annotations[util.IpAddressAnnotation]
joinIPv4, joinIPv6 := util.SplitStringIP(joinIP)
if nodeIPv4 == "" {
nodeIPv4 = joinIPv4
}
if nodeIPv6 == "" {
nodeIPv6 = joinIPv6
}
nodeIP := strings.Trim(fmt.Sprintf("%s,%s", nodeIPv4, nodeIPv6), ",")

ports, err := c.fetchPodsOnNode(node.Name, pods)
if err != nil {
Expand Down Expand Up @@ -1000,7 +1011,7 @@ func (c *Controller) checkAndUpdateNodePortGroup() error {
}

if networkPolicyExists {
if err := c.ovnLegacyClient.CreateACLForNodePg(pgName, nodeIP); err != nil {
if err := c.ovnLegacyClient.CreateACLForNodePg(pgName, nodeIP, joinIP); err != nil {
klog.Errorf("failed to create node acl for node pg %v, %v", pgName, err)
}
} else {
Expand Down
25 changes: 23 additions & 2 deletions pkg/ovs/ovn-nbctl-legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -1947,8 +1947,9 @@ func (c LegacyClient) CreateGatewayACL(pgName, gateway, cidr string) error {
return nil
}

func (c LegacyClient) CreateACLForNodePg(pgName, nodeIpStr string) error {
for _, nodeIp := range strings.Split(nodeIpStr, ",") {
func (c LegacyClient) CreateACLForNodePg(pgName, nodeIpStr, joinIpStr string) error {
nodeIPs := strings.Split(nodeIpStr, ",")
for _, nodeIp := range nodeIPs {
protocol := util.CheckProtocol(nodeIp)
ipSuffix := "ip4"
if protocol == kubeovnv1.ProtocolIPv6 {
Expand All @@ -1964,6 +1965,26 @@ func (c LegacyClient) CreateACLForNodePg(pgName, nodeIpStr string) error {
return err
}
}
for _, joinIp := range strings.Split(joinIpStr, ",") {
if util.ContainsString(nodeIPs, joinIp) {
continue
}

protocol := util.CheckProtocol(joinIp)
ipSuffix := "ip4"
if protocol == kubeovnv1.ProtocolIPv6 {
ipSuffix = "ip6"
}
pgAs := fmt.Sprintf("%s_%s", pgName, ipSuffix)

ingressArgs := []string{"acl-del", pgName, "to-lport", util.NodeAllowPriority, fmt.Sprintf("%s.src == %s && %s.dst == $%s", ipSuffix, joinIp, ipSuffix, pgAs)}
egressArgs := []string{"--", "acl-del", pgName, "from-lport", util.NodeAllowPriority, fmt.Sprintf("%s.dst == %s && %s.src == $%s", ipSuffix, joinIp, ipSuffix, pgAs)}
ovnArgs := append(ingressArgs, egressArgs...)
if _, err := c.ovnNbCommand(ovnArgs...); err != nil {
klog.Errorf("failed to delete node port-group acl: %v", err)
return err
}
}

return nil
}
Expand Down
91 changes: 91 additions & 0 deletions test/e2e/framework/network-policy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package framework

import (
"context"
"fmt"
"time"

netv1 "k8s.io/api/networking/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
v1net "k8s.io/client-go/kubernetes/typed/networking/v1"

"github.com/onsi/gomega"
)

// NetworkPolicyClient is a struct for network policy client.
type NetworkPolicyClient struct {
f *Framework
v1net.NetworkPolicyInterface
}

func (f *Framework) NetworkPolicyClient() *NetworkPolicyClient {
return &NetworkPolicyClient{
f: f,
NetworkPolicyInterface: f.ClientSet.NetworkingV1().NetworkPolicies(f.Namespace.Name),
}
}

func (s *NetworkPolicyClient) Get(name string) *netv1.NetworkPolicy {
np, err := s.NetworkPolicyInterface.Get(context.TODO(), name, metav1.GetOptions{})
ExpectNoError(err)
return np
}

// Create creates a new network policy according to the framework specifications
func (c *NetworkPolicyClient) Create(netpol *netv1.NetworkPolicy) *netv1.NetworkPolicy {
np, err := c.NetworkPolicyInterface.Create(context.TODO(), netpol, metav1.CreateOptions{})
ExpectNoError(err, "Error creating network policy")
return np.DeepCopy()
}

// Delete deletes a network policy if the network policy exists
func (c *NetworkPolicyClient) Delete(name string) {
err := c.NetworkPolicyInterface.Delete(context.TODO(), name, metav1.DeleteOptions{})
if err != nil && !apierrors.IsNotFound(err) {
Failf("Failed to delete network policy %q: %v", name, err)
}
}

// DeleteSync deletes the network policy and waits for the network policy to disappear for `timeout`.
// If the network policy doesn't disappear before the timeout, it will fail the test.
func (c *NetworkPolicyClient) DeleteSync(name string) {
c.Delete(name)
gomega.Expect(c.WaitToDisappear(name, 2*time.Second, timeout)).To(gomega.Succeed(), "wait for network policy %q to disappear", name)
}

// WaitToDisappear waits the given timeout duration for the specified network policy to disappear.
func (c *NetworkPolicyClient) WaitToDisappear(name string, interval, timeout time.Duration) error {
var lastNetpol *netv1.NetworkPolicy
err := wait.PollImmediate(interval, timeout, func() (bool, error) {
Logf("Waiting for network policy %s to disappear", name)
policies, err := c.List(context.TODO(), metav1.ListOptions{})
if err != nil {
return handleWaitingAPIError(err, true, "listing network policies")
}
found := false
for i, netpol := range policies.Items {
if netpol.Name == name {
Logf("Network policy %s still exists", name)
found = true
lastNetpol = &(policies.Items[i])
break
}
}
if !found {
Logf("Network policy %s no longer exists", name)
return true, nil
}
return false, nil
})
if err == nil {
return nil
}
if IsTimeout(err) {
return TimeoutError(fmt.Sprintf("timed out while waiting for network policy %s to disappear", name),
lastNetpol,
)
}
return maybeTimeoutError(err, "waiting for network policy %s to disappear", name)
}
1 change: 1 addition & 0 deletions test/e2e/kube-ovn/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
// Import tests.
_ "github.com/kubeovn/kube-ovn/test/e2e/kube-ovn/ipam"
_ "github.com/kubeovn/kube-ovn/test/e2e/kube-ovn/kubectl-ko"
_ "github.com/kubeovn/kube-ovn/test/e2e/kube-ovn/network-policy"
_ "github.com/kubeovn/kube-ovn/test/e2e/kube-ovn/node"
_ "github.com/kubeovn/kube-ovn/test/e2e/kube-ovn/qos"
_ "github.com/kubeovn/kube-ovn/test/e2e/kube-ovn/subnet"
Expand Down
148 changes: 148 additions & 0 deletions test/e2e/kube-ovn/network-policy/network-policy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package network_policy

import (
"context"
"fmt"
"math/rand"
"net"
"strconv"
"strings"
"time"

corev1 "k8s.io/api/core/v1"
netv1 "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2epodoutput "k8s.io/kubernetes/test/e2e/framework/pod/output"

"github.com/onsi/ginkgo/v2"

apiv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
"github.com/kubeovn/kube-ovn/pkg/util"
"github.com/kubeovn/kube-ovn/test/e2e/framework"
)

var _ = framework.Describe("[group:network-policy]", func() {
f := framework.NewDefaultFramework("network-policy")

var subnet *apiv1.Subnet
var cs clientset.Interface
var podClient *framework.PodClient
var subnetClient *framework.SubnetClient
var netpolClient *framework.NetworkPolicyClient
var namespaceName, netpolName, subnetName, podName string
var cidr, image string

ginkgo.BeforeEach(func() {
cs = f.ClientSet
podClient = f.PodClient()
subnetClient = f.SubnetClient()
netpolClient = f.NetworkPolicyClient()
namespaceName = f.Namespace.Name
netpolName = "netpol-" + framework.RandomSuffix()
podName = "pod-" + framework.RandomSuffix()
subnetName = "subnet-" + framework.RandomSuffix()
cidr = framework.RandomCIDR(f.ClusterIpFamily)

if image == "" {
image = framework.GetKubeOvnImage(cs)
}
})
ginkgo.AfterEach(func() {
ginkgo.By("Deleting pod " + podName)
podClient.DeleteSync(podName)

ginkgo.By("Deleting subnet " + subnetName)
subnetClient.DeleteSync(subnetName)

ginkgo.By("Deleting network policy " + netpolName)
netpolClient.DeleteSync(netpolName)
})

framework.ConformanceIt("should be able to access pods from node after creating a network policy with empty ingress rules", func() {
ginkgo.By("Creating network policy " + netpolName)
netpol := &netv1.NetworkPolicy{
ObjectMeta: metav1.ObjectMeta{
Name: netpolName,
},
Spec: netv1.NetworkPolicySpec{
Ingress: []netv1.NetworkPolicyIngressRule{},
},
}
_ = netpolClient.Create(netpol)

ginkgo.By("Creating subnet " + subnetName)
subnet = framework.MakeSubnet(subnetName, "", cidr, "", nil, nil, nil)
subnet = subnetClient.CreateSync(subnet)

ginkgo.By("Creating pod " + podName)
port := strconv.Itoa(8000 + rand.Intn(1000))
args := []string{"netexec", "--http-port", port}
annotations := map[string]string{util.LogicalSwitchAnnotation: subnetName}
pod := framework.MakePod(namespaceName, podName, nil, annotations, framework.AgnhostImage, nil, args)
pod = podClient.CreateSync(pod)

ginkgo.By("Getting nodes")
nodeList, err := e2enode.GetReadySchedulableNodes(cs)
framework.ExpectNoError(err)
framework.ExpectNotEmpty(nodeList.Items)

ginkgo.By("Getting daemonset kube-ovn-cni")
ds, err := cs.AppsV1().DaemonSets(framework.KubeOvnNamespace).Get(context.TODO(), "kube-ovn-cni", metav1.GetOptions{})
framework.ExpectNoError(err, "failed to to get daemonset")

ginkgo.By("Getting kube-ovn-cni pods")
pods := make([]corev1.Pod, 0, len(nodeList.Items))
for _, node := range nodeList.Items {
pod, err := framework.GetPodOnNodeForDaemonSet(cs, ds, node.Name)
framework.ExpectNoError(err, "failed to get kube-ovn-cni pod running on node %s", node.Name)
pods = append(pods, *pod)
}

for _, podIP := range pod.Status.PodIPs {
ip := podIP.IP
protocol := strings.ToLower(util.CheckProtocol(ip))
cmd := fmt.Sprintf("curl -q -s --connect-timeout 2 %s", net.JoinHostPort(ip, port))

var podSameNode *corev1.Pod
for _, hostPod := range pods {
nodeName := hostPod.Spec.NodeName
if nodeName == pod.Spec.NodeName {
podSameNode = hostPod.DeepCopy()
continue
}

ginkgo.By("Checking connection from node " + nodeName + " to " + podName + " via " + protocol)
ginkgo.By(fmt.Sprintf(`Executing %q in pod %s/%s`, cmd, hostPod.Namespace, hostPod.Name))
err := wait.PollImmediate(2*time.Second, time.Minute, func() (bool, error) {
_, err := e2epodoutput.RunHostCmd(hostPod.Namespace, hostPod.Name, cmd)
return err != nil, nil
})
framework.ExpectNoError(err)
}

ginkgo.By("Checking connection from node " + podSameNode.Spec.NodeName + " to " + podName + " via " + protocol)
ginkgo.By(fmt.Sprintf(`Executing %q in pod %s/%s`, cmd, podSameNode.Namespace, podSameNode.Name))
err := wait.PollImmediate(2*time.Second, time.Minute, func() (bool, error) {
_, err := e2epodoutput.RunHostCmd(podSameNode.Namespace, podSameNode.Name, cmd)
return err == nil, nil
})
framework.ExpectNoError(err)

// check one more time
for _, hostPod := range pods {
nodeName := hostPod.Spec.NodeName
if nodeName == pod.Spec.NodeName {
continue
}

ginkgo.By("Checking connection from node " + nodeName + " to " + podName + " via " + protocol)
ginkgo.By(fmt.Sprintf(`Executing %q in pod %s/%s`, cmd, hostPod.Namespace, hostPod.Name))
_, err := e2epodoutput.RunHostCmd(hostPod.Namespace, hostPod.Name, cmd)
framework.ExpectError(err)
}
}
})
})

0 comments on commit 9985ee5

Please sign in to comment.