From d92cc4161807ff053c77ef25a8cf4295f81837a9 Mon Sep 17 00:00:00 2001 From: Yongming Ding Date: Tue, 16 Mar 2021 16:16:55 -0700 Subject: [PATCH] Add IPv4/v6 dual stack support in Flow aggregator Fix the e2e test failure in dual stack cluster of flow aggregator. --- test/e2e/fixtures.go | 28 ++++-------- test/e2e/flowaggregator_test.go | 76 +++++++++++++++++++++---------- third_party/proxy/meta_proxier.go | 5 +- 3 files changed, 66 insertions(+), 43 deletions(-) diff --git a/test/e2e/fixtures.go b/test/e2e/fixtures.go index 9f15fed5992..bfbb312d0ff 100644 --- a/test/e2e/fixtures.go +++ b/test/e2e/fixtures.go @@ -68,12 +68,6 @@ func skipIfNotIPv6Cluster(tb testing.TB) { } } -func skipIfDualStackCluster(tb testing.TB) { - if clusterInfo.podV6NetworkCIDR != "" && clusterInfo.podV4NetworkCIDR != "" { - tb.Skipf("Skipping test as it is not supported in dual stack cluster") - } -} - func skipIfMissingKernelModule(tb testing.TB, nodeName string, requiredModules []string) { for _, module := range requiredModules { // modprobe with "--dry-run" does not require root privileges @@ -147,14 +141,12 @@ func setupTest(tb testing.TB) (*TestData, error) { return testData, nil } -func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, bool, error) { - isIPv6 := false - if clusterInfo.podV6NetworkCIDR != "" { - isIPv6 = true - } +func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, bool, bool, error) { + v4Enabled := clusterInfo.podV4NetworkCIDR != "" + v6Enabled := clusterInfo.podV6NetworkCIDR != "" testData, err := setupTest(tb) if err != nil { - return testData, isIPv6, err + return testData, v4Enabled, v6Enabled, err } // Create pod using ipfix collector image if err = testData.createPodOnNode("ipfix-collector", "", ipfixCollectorImage, nil, nil, nil, nil, true, nil); err != nil { @@ -163,10 +155,10 @@ func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, bool, error) { ipfixCollectorIP, err := testData.podWaitForIPs(defaultTimeout, "ipfix-collector", testNamespace) if err != nil || len(ipfixCollectorIP.ipStrings) == 0 { tb.Errorf("Error when waiting to get ipfix collector Pod IP: %v", err) - return nil, isIPv6, err + return nil, v4Enabled, v6Enabled, err } var ipStr string - if isIPv6 && ipfixCollectorIP.ipv6 != nil { + if v6Enabled && ipfixCollectorIP.ipv6 != nil { ipStr = ipfixCollectorIP.ipv6.String() } else { ipStr = ipfixCollectorIP.ipv4.String() @@ -177,7 +169,7 @@ func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, bool, error) { tb.Logf("Applying flow aggregator YAML with ipfix collector address: %s", ipfixCollectorAddr) faClusterIP, err := testData.deployFlowAggregator(ipfixCollectorAddr) if err != nil { - return testData, isIPv6, err + return testData, v4Enabled, v6Enabled, err } if testOptions.providerName == "kind" { // In Kind cluster, there are issues with DNS name resolution on worker nodes. @@ -186,14 +178,14 @@ func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, bool, error) { } tb.Logf("Deploying flow exporter with collector address: %s", faClusterIPAddr) if err = testData.deployAntreaFlowExporter(faClusterIPAddr); err != nil { - return testData, isIPv6, err + return testData, v4Enabled, v6Enabled, err } tb.Logf("Checking CoreDNS deployment") if err = testData.checkCoreDNSPods(defaultTimeout); err != nil { - return testData, isIPv6, err + return testData, v4Enabled, v6Enabled, err } - return testData, isIPv6, nil + return testData, v4Enabled, v6Enabled, nil } func exportLogs(tb testing.TB, data *TestData, logsSubDir string, writeNodeLogs bool) { diff --git a/test/e2e/flowaggregator_test.go b/test/e2e/flowaggregator_test.go index 1ea578c1212..076ad5705d1 100644 --- a/test/e2e/flowaggregator_test.go +++ b/test/e2e/flowaggregator_test.go @@ -102,18 +102,33 @@ const ( ) func TestFlowAggregator(t *testing.T) { - skipIfDualStackCluster(t) - data, isIPv6, err := setupTestWithIPFIXCollector(t) + data, v4Enabled, v6Enabled, err := setupTestWithIPFIXCollector(t) if err != nil { t.Fatalf("Error when setting up test: %v", err) } defer teardownTest(t, data) defer teardownFlowAggregator(t, data) - podAIP, podBIP, podCIP, svcB, svcC, err := createPerftestPods(data, isIPv6) + podAIP, podBIP, podCIP, err := createPerftestPods(data) if err != nil { - t.Fatalf("Error when creating perftest pods and services: %v", err) + t.Fatalf("Error when creating perftest pods: %v", err) } + + if v4Enabled { + createSvcRunTests(t, data, podAIP, podBIP, podCIP, false) + } + + if v6Enabled { + createSvcRunTests(t, data, podAIP, podBIP, podCIP, true) + } +} + +func createSvcRunTests(t *testing.T, data *TestData, podAIP *PodIPs, podBIP *PodIPs, podCIP *PodIPs, isIPv6 bool) { + svcB, svcC, err := createPerftestServices(data, isIPv6) + if err != nil { + t.Fatalf("Error when creating perftest services: %v", err) + } + defer deletePerftestServices(data) // Wait for the Service to be realized. time.Sleep(3 * time.Second) @@ -376,45 +391,60 @@ func deployNetworkPolicies(t *testing.T, data *TestData, srcPod, dstPod string) return np1, np2 } -func createPerftestPods(data *TestData, isIPv6 bool) (podAIP *PodIPs, podBIP *PodIPs, podCIP *PodIPs, svcB *corev1.Service, svcC *corev1.Service, err error) { +func createPerftestPods(data *TestData) (podAIP *PodIPs, podBIP *PodIPs, podCIP *PodIPs, err error) { if err := data.createPodOnNode("perftest-a", controlPlaneNodeName(), perftoolImage, nil, nil, nil, nil, false, nil); err != nil { - return nil, nil, nil, nil, nil, fmt.Errorf("Error when creating the perftest client Pod: %v", err) + return nil, nil, nil, fmt.Errorf("Error when creating the perftest client Pod: %v", err) } podAIP, err = data.podWaitForIPs(defaultTimeout, "perftest-a", testNamespace) if err != nil { - return nil, nil, nil, nil, nil, fmt.Errorf("Error when waiting for the perftest client Pod: %v", err) + return nil, nil, nil, fmt.Errorf("Error when waiting for the perftest client Pod: %v", err) } - svcIPFamily := corev1.IPv4Protocol - if isIPv6 { - svcIPFamily = corev1.IPv6Protocol + if err := data.createPodOnNode("perftest-b", controlPlaneNodeName(), perftoolImage, nil, nil, nil, []v1.ContainerPort{{Protocol: v1.ProtocolTCP, ContainerPort: iperfPort}}, false, nil); err != nil { + return nil, nil, nil, fmt.Errorf("Error when creating the perftest server Pod: %v", err) + } + podBIP, err = data.podWaitForIPs(defaultTimeout, "perftest-b", testNamespace) + if err != nil { + return nil, nil, nil, fmt.Errorf("Error when getting the perftest server Pod's IP: %v", err) } - svcB, err = data.createService("perftest-b", iperfPort, iperfPort, map[string]string{"antrea-e2e": "perftest-b"}, false, v1.ServiceTypeClusterIP, &svcIPFamily) + if err := data.createPodOnNode("perftest-c", workerNodeName(1), perftoolImage, nil, nil, nil, []v1.ContainerPort{{Protocol: v1.ProtocolTCP, ContainerPort: iperfPort}}, false, nil); err != nil { + return nil, nil, nil, fmt.Errorf("Error when creating the perftest server Pod: %v", err) + } + podCIP, err = data.podWaitForIPs(defaultTimeout, "perftest-c", testNamespace) if err != nil { - return nil, nil, nil, nil, nil, fmt.Errorf("Error when creating perftest service: %v", err) + return nil, nil, nil, fmt.Errorf("Error when getting the perftest server Pod's IP: %v", err) } + return podAIP, podBIP, podCIP, nil +} - if err := data.createPodOnNode("perftest-b", controlPlaneNodeName(), perftoolImage, nil, nil, nil, []v1.ContainerPort{{Protocol: v1.ProtocolTCP, ContainerPort: iperfPort}}, false, nil); err != nil { - return nil, nil, nil, nil, nil, fmt.Errorf("Error when creating the perftest server Pod: %v", err) +func createPerftestServices(data *TestData, isIPv6 bool) (svcB *corev1.Service, svcC *corev1.Service, err error) { + svcIPFamily := corev1.IPv4Protocol + if isIPv6 { + svcIPFamily = corev1.IPv6Protocol } - podBIP, err = data.podWaitForIPs(defaultTimeout, "perftest-b", testNamespace) + + svcB, err = data.createService("perftest-b", iperfPort, iperfPort, map[string]string{"antrea-e2e": "perftest-b"}, false, v1.ServiceTypeClusterIP, &svcIPFamily) if err != nil { - return nil, nil, nil, nil, nil, fmt.Errorf("Error when getting the perftest server Pod's IP: %v", err) + return nil, nil, fmt.Errorf("Error when creating perftest service: %v", err) } - // svcC will be needed when adding RemoteServiceAccess testcase svcC, err = data.createService("perftest-c", iperfPort, iperfPort, map[string]string{"antrea-e2e": "perftest-c"}, false, v1.ServiceTypeClusterIP, &svcIPFamily) if err != nil { - return nil, nil, nil, nil, nil, fmt.Errorf("Error when creating perftest service: %v", err) + return nil, nil, fmt.Errorf("Error when creating perftest service: %v", err) } - if err := data.createPodOnNode("perftest-c", workerNodeName(1), perftoolImage, nil, nil, nil, []v1.ContainerPort{{Protocol: v1.ProtocolTCP, ContainerPort: iperfPort}}, false, nil); err != nil { - return nil, nil, nil, nil, nil, fmt.Errorf("Error when creating the perftest server Pod: %v", err) + return svcB, svcC, nil +} + +func deletePerftestServices(data *TestData) error { + err := data.deleteService("perftest-b") + if err != nil { + return fmt.Errorf("Error when deleting perftest service: %v", err) } - podCIP, err = data.podWaitForIPs(defaultTimeout, "perftest-c", testNamespace) + err = data.deleteService("perftest-c") if err != nil { - return nil, nil, nil, nil, nil, fmt.Errorf("Error when getting the perftest server Pod's IP: %v", err) + return fmt.Errorf("Error when deleting perftest service: %v", err) } - return podAIP, podBIP, podCIP, svcB, svcC, nil + return nil } diff --git a/third_party/proxy/meta_proxier.go b/third_party/proxy/meta_proxier.go index 046144eeb85..25038bc31d3 100644 --- a/third_party/proxy/meta_proxier.go +++ b/third_party/proxy/meta_proxier.go @@ -39,6 +39,7 @@ package proxy import ( "fmt" + "strings" "k8s.io/api/core/v1" "k8s.io/klog" @@ -160,12 +161,12 @@ func (proxier *metaProxier) OnEndpointsSynced() { } func (proxier *metaProxier) GetServiceByIP(serviceStr string) (ServicePortName, bool) { - if utilnet.IsIPv6String(serviceStr) { + // Format of serviceStr is fmt.Sprintf("%s:%d/%s", clusterIP, svcPort, protocol), so IPv6 serviceStr has more than 3 colons. + if strings.Count(serviceStr, ":") >= 3 { return proxier.ipv6Proxier.GetServiceByIP(serviceStr) } else { return proxier.ipv4Proxier.GetServiceByIP(serviceStr) } - } func (proxier *metaProxier) Run(stopCh <-chan struct{}) {