Skip to content

Commit

Permalink
Add IPv4/v6 dual stack support in Flow aggregator
Browse files Browse the repository at this point in the history
Fix the e2e test failure in dual stack cluster of flow aggregator.
  • Loading branch information
Yongming Ding committed Mar 23, 2021
1 parent 2c1666d commit d92cc41
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 43 deletions.
28 changes: 10 additions & 18 deletions test/e2e/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,6 @@ func skipIfNotIPv6Cluster(tb testing.TB) {
}
}

func skipIfDualStackCluster(tb testing.TB) {
if clusterInfo.podV6NetworkCIDR != "" && clusterInfo.podV4NetworkCIDR != "" {
tb.Skipf("Skipping test as it is not supported in dual stack cluster")
}
}

func skipIfMissingKernelModule(tb testing.TB, nodeName string, requiredModules []string) {
for _, module := range requiredModules {
// modprobe with "--dry-run" does not require root privileges
Expand Down Expand Up @@ -147,14 +141,12 @@ func setupTest(tb testing.TB) (*TestData, error) {
return testData, nil
}

func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, bool, error) {
isIPv6 := false
if clusterInfo.podV6NetworkCIDR != "" {
isIPv6 = true
}
func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, bool, bool, error) {
v4Enabled := clusterInfo.podV4NetworkCIDR != ""
v6Enabled := clusterInfo.podV6NetworkCIDR != ""
testData, err := setupTest(tb)
if err != nil {
return testData, isIPv6, err
return testData, v4Enabled, v6Enabled, err
}
// Create pod using ipfix collector image
if err = testData.createPodOnNode("ipfix-collector", "", ipfixCollectorImage, nil, nil, nil, nil, true, nil); err != nil {
Expand All @@ -163,10 +155,10 @@ func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, bool, error) {
ipfixCollectorIP, err := testData.podWaitForIPs(defaultTimeout, "ipfix-collector", testNamespace)
if err != nil || len(ipfixCollectorIP.ipStrings) == 0 {
tb.Errorf("Error when waiting to get ipfix collector Pod IP: %v", err)
return nil, isIPv6, err
return nil, v4Enabled, v6Enabled, err
}
var ipStr string
if isIPv6 && ipfixCollectorIP.ipv6 != nil {
if v6Enabled && ipfixCollectorIP.ipv6 != nil {
ipStr = ipfixCollectorIP.ipv6.String()
} else {
ipStr = ipfixCollectorIP.ipv4.String()
Expand All @@ -177,7 +169,7 @@ func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, bool, error) {
tb.Logf("Applying flow aggregator YAML with ipfix collector address: %s", ipfixCollectorAddr)
faClusterIP, err := testData.deployFlowAggregator(ipfixCollectorAddr)
if err != nil {
return testData, isIPv6, err
return testData, v4Enabled, v6Enabled, err
}
if testOptions.providerName == "kind" {
// In Kind cluster, there are issues with DNS name resolution on worker nodes.
Expand All @@ -186,14 +178,14 @@ func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, bool, error) {
}
tb.Logf("Deploying flow exporter with collector address: %s", faClusterIPAddr)
if err = testData.deployAntreaFlowExporter(faClusterIPAddr); err != nil {
return testData, isIPv6, err
return testData, v4Enabled, v6Enabled, err
}

tb.Logf("Checking CoreDNS deployment")
if err = testData.checkCoreDNSPods(defaultTimeout); err != nil {
return testData, isIPv6, err
return testData, v4Enabled, v6Enabled, err
}
return testData, isIPv6, nil
return testData, v4Enabled, v6Enabled, nil
}

func exportLogs(tb testing.TB, data *TestData, logsSubDir string, writeNodeLogs bool) {
Expand Down
76 changes: 53 additions & 23 deletions test/e2e/flowaggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,18 +102,33 @@ const (
)

func TestFlowAggregator(t *testing.T) {
skipIfDualStackCluster(t)
data, isIPv6, err := setupTestWithIPFIXCollector(t)
data, v4Enabled, v6Enabled, err := setupTestWithIPFIXCollector(t)
if err != nil {
t.Fatalf("Error when setting up test: %v", err)
}
defer teardownTest(t, data)
defer teardownFlowAggregator(t, data)

podAIP, podBIP, podCIP, svcB, svcC, err := createPerftestPods(data, isIPv6)
podAIP, podBIP, podCIP, err := createPerftestPods(data)
if err != nil {
t.Fatalf("Error when creating perftest pods and services: %v", err)
t.Fatalf("Error when creating perftest pods: %v", err)
}

if v4Enabled {
createSvcRunTests(t, data, podAIP, podBIP, podCIP, false)
}

if v6Enabled {
createSvcRunTests(t, data, podAIP, podBIP, podCIP, true)
}
}

func createSvcRunTests(t *testing.T, data *TestData, podAIP *PodIPs, podBIP *PodIPs, podCIP *PodIPs, isIPv6 bool) {
svcB, svcC, err := createPerftestServices(data, isIPv6)
if err != nil {
t.Fatalf("Error when creating perftest services: %v", err)
}
defer deletePerftestServices(data)
// Wait for the Service to be realized.
time.Sleep(3 * time.Second)

Expand Down Expand Up @@ -376,45 +391,60 @@ func deployNetworkPolicies(t *testing.T, data *TestData, srcPod, dstPod string)
return np1, np2
}

func createPerftestPods(data *TestData, isIPv6 bool) (podAIP *PodIPs, podBIP *PodIPs, podCIP *PodIPs, svcB *corev1.Service, svcC *corev1.Service, err error) {
func createPerftestPods(data *TestData) (podAIP *PodIPs, podBIP *PodIPs, podCIP *PodIPs, err error) {
if err := data.createPodOnNode("perftest-a", controlPlaneNodeName(), perftoolImage, nil, nil, nil, nil, false, nil); err != nil {
return nil, nil, nil, nil, nil, fmt.Errorf("Error when creating the perftest client Pod: %v", err)
return nil, nil, nil, fmt.Errorf("Error when creating the perftest client Pod: %v", err)
}
podAIP, err = data.podWaitForIPs(defaultTimeout, "perftest-a", testNamespace)
if err != nil {
return nil, nil, nil, nil, nil, fmt.Errorf("Error when waiting for the perftest client Pod: %v", err)
return nil, nil, nil, fmt.Errorf("Error when waiting for the perftest client Pod: %v", err)
}

svcIPFamily := corev1.IPv4Protocol
if isIPv6 {
svcIPFamily = corev1.IPv6Protocol
if err := data.createPodOnNode("perftest-b", controlPlaneNodeName(), perftoolImage, nil, nil, nil, []v1.ContainerPort{{Protocol: v1.ProtocolTCP, ContainerPort: iperfPort}}, false, nil); err != nil {
return nil, nil, nil, fmt.Errorf("Error when creating the perftest server Pod: %v", err)
}
podBIP, err = data.podWaitForIPs(defaultTimeout, "perftest-b", testNamespace)
if err != nil {
return nil, nil, nil, fmt.Errorf("Error when getting the perftest server Pod's IP: %v", err)
}

svcB, err = data.createService("perftest-b", iperfPort, iperfPort, map[string]string{"antrea-e2e": "perftest-b"}, false, v1.ServiceTypeClusterIP, &svcIPFamily)
if err := data.createPodOnNode("perftest-c", workerNodeName(1), perftoolImage, nil, nil, nil, []v1.ContainerPort{{Protocol: v1.ProtocolTCP, ContainerPort: iperfPort}}, false, nil); err != nil {
return nil, nil, nil, fmt.Errorf("Error when creating the perftest server Pod: %v", err)
}
podCIP, err = data.podWaitForIPs(defaultTimeout, "perftest-c", testNamespace)
if err != nil {
return nil, nil, nil, nil, nil, fmt.Errorf("Error when creating perftest service: %v", err)
return nil, nil, nil, fmt.Errorf("Error when getting the perftest server Pod's IP: %v", err)
}
return podAIP, podBIP, podCIP, nil
}

if err := data.createPodOnNode("perftest-b", controlPlaneNodeName(), perftoolImage, nil, nil, nil, []v1.ContainerPort{{Protocol: v1.ProtocolTCP, ContainerPort: iperfPort}}, false, nil); err != nil {
return nil, nil, nil, nil, nil, fmt.Errorf("Error when creating the perftest server Pod: %v", err)
func createPerftestServices(data *TestData, isIPv6 bool) (svcB *corev1.Service, svcC *corev1.Service, err error) {
svcIPFamily := corev1.IPv4Protocol
if isIPv6 {
svcIPFamily = corev1.IPv6Protocol
}
podBIP, err = data.podWaitForIPs(defaultTimeout, "perftest-b", testNamespace)

svcB, err = data.createService("perftest-b", iperfPort, iperfPort, map[string]string{"antrea-e2e": "perftest-b"}, false, v1.ServiceTypeClusterIP, &svcIPFamily)
if err != nil {
return nil, nil, nil, nil, nil, fmt.Errorf("Error when getting the perftest server Pod's IP: %v", err)
return nil, nil, fmt.Errorf("Error when creating perftest service: %v", err)
}

// svcC will be needed when adding RemoteServiceAccess testcase
svcC, err = data.createService("perftest-c", iperfPort, iperfPort, map[string]string{"antrea-e2e": "perftest-c"}, false, v1.ServiceTypeClusterIP, &svcIPFamily)
if err != nil {
return nil, nil, nil, nil, nil, fmt.Errorf("Error when creating perftest service: %v", err)
return nil, nil, fmt.Errorf("Error when creating perftest service: %v", err)
}

if err := data.createPodOnNode("perftest-c", workerNodeName(1), perftoolImage, nil, nil, nil, []v1.ContainerPort{{Protocol: v1.ProtocolTCP, ContainerPort: iperfPort}}, false, nil); err != nil {
return nil, nil, nil, nil, nil, fmt.Errorf("Error when creating the perftest server Pod: %v", err)
return svcB, svcC, nil
}

func deletePerftestServices(data *TestData) error {
err := data.deleteService("perftest-b")
if err != nil {
return fmt.Errorf("Error when deleting perftest service: %v", err)
}
podCIP, err = data.podWaitForIPs(defaultTimeout, "perftest-c", testNamespace)
err = data.deleteService("perftest-c")
if err != nil {
return nil, nil, nil, nil, nil, fmt.Errorf("Error when getting the perftest server Pod's IP: %v", err)
return fmt.Errorf("Error when deleting perftest service: %v", err)
}
return podAIP, podBIP, podCIP, svcB, svcC, nil
return nil
}
5 changes: 3 additions & 2 deletions third_party/proxy/meta_proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ package proxy

import (
"fmt"
"strings"

"k8s.io/api/core/v1"
"k8s.io/klog"
Expand Down Expand Up @@ -160,12 +161,12 @@ func (proxier *metaProxier) OnEndpointsSynced() {
}

func (proxier *metaProxier) GetServiceByIP(serviceStr string) (ServicePortName, bool) {
if utilnet.IsIPv6String(serviceStr) {
// Format of serviceStr is fmt.Sprintf("%s:%d/%s", clusterIP, svcPort, protocol), so IPv6 serviceStr has more than 3 colons.
if strings.Count(serviceStr, ":") >= 3 {
return proxier.ipv6Proxier.GetServiceByIP(serviceStr)
} else {
return proxier.ipv4Proxier.GetServiceByIP(serviceStr)
}

}

func (proxier *metaProxier) Run(stopCh <-chan struct{}) {
Expand Down

0 comments on commit d92cc41

Please sign in to comment.