Skip to content

Commit

Permalink
Add IPv4/v6 dual stack support in Flow aggregator
Browse files Browse the repository at this point in the history
Fixed a bug in GetServiceByIP() function of dual-stack proxy and moved
it under pkg/agent/proxy.
Improved the e2e test of flow aggregator: for dual stack clusters, we
will test services for both IPv4 and IPv6 clusterIP.
  • Loading branch information
Yongming Ding committed Mar 24, 2021
1 parent 2c1666d commit e2e02c3
Show file tree
Hide file tree
Showing 10 changed files with 111 additions and 81 deletions.
7 changes: 1 addition & 6 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ import (
"github.com/vmware-tanzu/antrea/pkg/signals"
"github.com/vmware-tanzu/antrea/pkg/util/cipher"
"github.com/vmware-tanzu/antrea/pkg/version"
k8sproxy "github.com/vmware-tanzu/antrea/third_party/proxy"
)

// informerDefaultResync is the default resync period if a handler doesn't specify one.
Expand Down Expand Up @@ -339,18 +338,14 @@ func run(o *Options) error {
v4Enabled := config.IsIPv4Enabled(nodeConfig, networkConfig.TrafficEncapMode)
v6Enabled := config.IsIPv6Enabled(nodeConfig, networkConfig.TrafficEncapMode)

var proxyProvider k8sproxy.Provider
if proxier != nil {
proxyProvider = proxier.GetProxyProvider()
}
flowRecords := flowrecords.NewFlowRecords()
connStore := connections.NewConnectionStore(
connections.InitializeConnTrackDumper(nodeConfig, serviceCIDRNet, serviceCIDRNetv6, ovsDatapathType, features.DefaultFeatureGate.Enabled(features.AntreaProxy)),
flowRecords,
ifaceStore,
v4Enabled,
v6Enabled,
proxyProvider,
proxier,
networkPolicyController,
o.pollInterval)
go connStore.Run(stopCh)
Expand Down
6 changes: 3 additions & 3 deletions pkg/agent/flowexporter/connections/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
"github.com/vmware-tanzu/antrea/pkg/agent/interfacestore"
"github.com/vmware-tanzu/antrea/pkg/agent/metrics"
"github.com/vmware-tanzu/antrea/pkg/agent/openflow"
"github.com/vmware-tanzu/antrea/pkg/agent/proxy"
"github.com/vmware-tanzu/antrea/pkg/querier"
"github.com/vmware-tanzu/antrea/third_party/proxy"
)

var serviceProtocolMap = map[uint8]corev1.Protocol{
Expand Down Expand Up @@ -58,7 +58,7 @@ type connectionStore struct {
ifaceStore interfacestore.InterfaceStore
v4Enabled bool
v6Enabled bool
antreaProxier proxy.Provider
antreaProxier proxy.Proxier
networkPolicyQuerier querier.AgentNetworkPolicyInfoQuerier
pollInterval time.Duration
mutex sync.Mutex
Expand All @@ -70,7 +70,7 @@ func NewConnectionStore(
ifaceStore interfacestore.InterfaceStore,
v4Enabled bool,
v6Enabled bool,
proxier proxy.Provider,
proxier proxy.Proxier,
npQuerier querier.AgentNetworkPolicyInfoQuerier,
pollInterval time.Duration,
) *connectionStore {
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/flowexporter/connections/connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ import (
interfacestoretest "github.com/vmware-tanzu/antrea/pkg/agent/interfacestore/testing"
"github.com/vmware-tanzu/antrea/pkg/agent/metrics"
"github.com/vmware-tanzu/antrea/pkg/agent/openflow"
proxytest "github.com/vmware-tanzu/antrea/pkg/agent/proxy/testing"
cpv1beta "github.com/vmware-tanzu/antrea/pkg/apis/controlplane/v1beta2"
queriertest "github.com/vmware-tanzu/antrea/pkg/querier/testing"
k8sproxy "github.com/vmware-tanzu/antrea/third_party/proxy"
k8proxytest "github.com/vmware-tanzu/antrea/third_party/proxy/testing"
)

var (
Expand Down Expand Up @@ -164,7 +164,7 @@ func TestConnectionStore_addAndUpdateConn(t *testing.T) {
// Mock interface store with one of the couple of IPs correspond to Pods
mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl)
mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl)
mockProxier := k8proxytest.NewMockProvider(ctrl)
mockProxier := proxytest.NewMockProxier(ctrl)
npQuerier := queriertest.NewMockAgentNetworkPolicyInfoQuerier(ctrl)
connStore := NewConnectionStore(mockConnDumper, flowrecords.NewFlowRecords(), mockIfaceStore, true, false, mockProxier, npQuerier, testPollInterval)

Expand Down
14 changes: 14 additions & 0 deletions pkg/agent/proxy/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package proxy
import (
"fmt"
"net"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -53,6 +54,9 @@ type Proxier interface {
// flows and the OVS group IDs for a Service. False is returned if the
// Service is not found.
GetServiceFlowKeys(serviceName, namespace string) ([]string, []binding.GroupIDType, bool)
// GetServiceByIP returns the ServicePortName struct of giving serviceString(ClusterIP:Port/Proto).
// False is returned if the serviceString is not existed in serviceStringMap.
GetServiceByIP(serviceStr string) (k8sproxy.ServicePortName, bool)
}

type proxier struct {
Expand Down Expand Up @@ -639,6 +643,16 @@ func (p *metaProxierWrapper) GetServiceFlowKeys(serviceName, namespace string) (
return append(v4Flows, v6Flows...), append(v4Groups, v6Groups...), v4Found || v6Found
}

func (p *metaProxierWrapper) GetServiceByIP(serviceStr string) (k8sproxy.ServicePortName, bool) {
// Format of serviceStr is fmt.Sprintf("%s:%d/%s", clusterIP, svcPort, protocol).
lastColonIndex := strings.LastIndex(serviceStr, ":")
if utilnet.IsIPv6String(serviceStr[:lastColonIndex]) {
return p.ipv6Proxier.GetServiceByIP(serviceStr)
} else {
return p.ipv4Proxier.GetServiceByIP(serviceStr)
}
}

func NewDualStackProxier(
hostname string, informerFactory informers.SharedInformerFactory, ofClient openflow.Client) *metaProxierWrapper {

Expand Down
15 changes: 15 additions & 0 deletions pkg/agent/proxy/testing/mock_proxy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 10 additions & 18 deletions test/e2e/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,6 @@ func skipIfNotIPv6Cluster(tb testing.TB) {
}
}

func skipIfDualStackCluster(tb testing.TB) {
if clusterInfo.podV6NetworkCIDR != "" && clusterInfo.podV4NetworkCIDR != "" {
tb.Skipf("Skipping test as it is not supported in dual stack cluster")
}
}

func skipIfMissingKernelModule(tb testing.TB, nodeName string, requiredModules []string) {
for _, module := range requiredModules {
// modprobe with "--dry-run" does not require root privileges
Expand Down Expand Up @@ -147,14 +141,12 @@ func setupTest(tb testing.TB) (*TestData, error) {
return testData, nil
}

func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, bool, error) {
isIPv6 := false
if clusterInfo.podV6NetworkCIDR != "" {
isIPv6 = true
}
func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, bool, bool, error) {
v4Enabled := clusterInfo.podV4NetworkCIDR != ""
v6Enabled := clusterInfo.podV6NetworkCIDR != ""
testData, err := setupTest(tb)
if err != nil {
return testData, isIPv6, err
return testData, v4Enabled, v6Enabled, err
}
// Create pod using ipfix collector image
if err = testData.createPodOnNode("ipfix-collector", "", ipfixCollectorImage, nil, nil, nil, nil, true, nil); err != nil {
Expand All @@ -163,10 +155,10 @@ func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, bool, error) {
ipfixCollectorIP, err := testData.podWaitForIPs(defaultTimeout, "ipfix-collector", testNamespace)
if err != nil || len(ipfixCollectorIP.ipStrings) == 0 {
tb.Errorf("Error when waiting to get ipfix collector Pod IP: %v", err)
return nil, isIPv6, err
return nil, v4Enabled, v6Enabled, err
}
var ipStr string
if isIPv6 && ipfixCollectorIP.ipv6 != nil {
if v6Enabled && ipfixCollectorIP.ipv6 != nil {
ipStr = ipfixCollectorIP.ipv6.String()
} else {
ipStr = ipfixCollectorIP.ipv4.String()
Expand All @@ -177,7 +169,7 @@ func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, bool, error) {
tb.Logf("Applying flow aggregator YAML with ipfix collector address: %s", ipfixCollectorAddr)
faClusterIP, err := testData.deployFlowAggregator(ipfixCollectorAddr)
if err != nil {
return testData, isIPv6, err
return testData, v4Enabled, v6Enabled, err
}
if testOptions.providerName == "kind" {
// In Kind cluster, there are issues with DNS name resolution on worker nodes.
Expand All @@ -186,14 +178,14 @@ func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, bool, error) {
}
tb.Logf("Deploying flow exporter with collector address: %s", faClusterIPAddr)
if err = testData.deployAntreaFlowExporter(faClusterIPAddr); err != nil {
return testData, isIPv6, err
return testData, v4Enabled, v6Enabled, err
}

tb.Logf("Checking CoreDNS deployment")
if err = testData.checkCoreDNSPods(defaultTimeout); err != nil {
return testData, isIPv6, err
return testData, v4Enabled, v6Enabled, err
}
return testData, isIPv6, nil
return testData, v4Enabled, v6Enabled, nil
}

func exportLogs(tb testing.TB, data *TestData, logsSubDir string, writeNodeLogs bool) {
Expand Down
89 changes: 64 additions & 25 deletions test/e2e/flowaggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,21 +102,45 @@ const (
)

func TestFlowAggregator(t *testing.T) {
skipIfDualStackCluster(t)
data, isIPv6, err := setupTestWithIPFIXCollector(t)
data, v4Enabled, v6Enabled, err := setupTestWithIPFIXCollector(t)
if err != nil {
t.Fatalf("Error when setting up test: %v", err)
}
defer teardownTest(t, data)
defer teardownFlowAggregator(t, data)

podAIP, podBIP, podCIP, svcB, svcC, err := createPerftestPods(data, isIPv6)
podAIP, podBIP, podCIP, err := createPerftestPods(data)
if err != nil {
t.Fatalf("Error when creating perftest pods and services: %v", err)
t.Fatalf("Error when creating perftest pods: %v", err)
}
// Wait for the Service to be realized.
time.Sleep(3 * time.Second)

if v4Enabled {
svcB, svcC, err := createPerftestServices(data, false)
if err != nil {
t.Fatalf("Error when creating IPv4 perftest services: %v", err)
}
// Wait for the Service to be realized.
time.Sleep(3 * time.Second)

runTests(t, data, podAIP, podBIP, podCIP, svcB, svcC, false)
deletePerftestServices(data)
}

if v6Enabled {
svcB, svcC, err := createPerftestServices(data, true)
if err != nil {
t.Fatalf("Error when creating IPv6 perftest services: %v", err)
}
// Wait for the Service to be realized.
time.Sleep(3 * time.Second)

runTests(t, data, podAIP, podBIP, podCIP, svcB, svcC, true)
deletePerftestServices(data)
}
}

func runTests(t *testing.T, data *TestData, podAIP *PodIPs, podBIP *PodIPs, podCIP *PodIPs, svcB *corev1.Service, svcC *corev1.Service, isIPv6 bool) {
var err error
// IntraNodeFlows tests the case, where Pods are deployed on same Node and their flow information is exported as IPFIX flow records.
t.Run("IntraNodeFlows", func(t *testing.T) {
np1, np2 := deployNetworkPolicies(t, data, "perftest-a", "perftest-b")
Expand Down Expand Up @@ -376,45 +400,60 @@ func deployNetworkPolicies(t *testing.T, data *TestData, srcPod, dstPod string)
return np1, np2
}

func createPerftestPods(data *TestData, isIPv6 bool) (podAIP *PodIPs, podBIP *PodIPs, podCIP *PodIPs, svcB *corev1.Service, svcC *corev1.Service, err error) {
func createPerftestPods(data *TestData) (podAIP *PodIPs, podBIP *PodIPs, podCIP *PodIPs, err error) {
if err := data.createPodOnNode("perftest-a", controlPlaneNodeName(), perftoolImage, nil, nil, nil, nil, false, nil); err != nil {
return nil, nil, nil, nil, nil, fmt.Errorf("Error when creating the perftest client Pod: %v", err)
return nil, nil, nil, fmt.Errorf("Error when creating the perftest client Pod: %v", err)
}
podAIP, err = data.podWaitForIPs(defaultTimeout, "perftest-a", testNamespace)
if err != nil {
return nil, nil, nil, nil, nil, fmt.Errorf("Error when waiting for the perftest client Pod: %v", err)
return nil, nil, nil, fmt.Errorf("Error when waiting for the perftest client Pod: %v", err)
}

svcIPFamily := corev1.IPv4Protocol
if isIPv6 {
svcIPFamily = corev1.IPv6Protocol
if err := data.createPodOnNode("perftest-b", controlPlaneNodeName(), perftoolImage, nil, nil, nil, []v1.ContainerPort{{Protocol: v1.ProtocolTCP, ContainerPort: iperfPort}}, false, nil); err != nil {
return nil, nil, nil, fmt.Errorf("Error when creating the perftest server Pod: %v", err)
}
podBIP, err = data.podWaitForIPs(defaultTimeout, "perftest-b", testNamespace)
if err != nil {
return nil, nil, nil, fmt.Errorf("Error when getting the perftest server Pod's IP: %v", err)
}

svcB, err = data.createService("perftest-b", iperfPort, iperfPort, map[string]string{"antrea-e2e": "perftest-b"}, false, v1.ServiceTypeClusterIP, &svcIPFamily)
if err := data.createPodOnNode("perftest-c", workerNodeName(1), perftoolImage, nil, nil, nil, []v1.ContainerPort{{Protocol: v1.ProtocolTCP, ContainerPort: iperfPort}}, false, nil); err != nil {
return nil, nil, nil, fmt.Errorf("Error when creating the perftest server Pod: %v", err)
}
podCIP, err = data.podWaitForIPs(defaultTimeout, "perftest-c", testNamespace)
if err != nil {
return nil, nil, nil, nil, nil, fmt.Errorf("Error when creating perftest service: %v", err)
return nil, nil, nil, fmt.Errorf("Error when getting the perftest server Pod's IP: %v", err)
}
return podAIP, podBIP, podCIP, nil
}

if err := data.createPodOnNode("perftest-b", controlPlaneNodeName(), perftoolImage, nil, nil, nil, []v1.ContainerPort{{Protocol: v1.ProtocolTCP, ContainerPort: iperfPort}}, false, nil); err != nil {
return nil, nil, nil, nil, nil, fmt.Errorf("Error when creating the perftest server Pod: %v", err)
func createPerftestServices(data *TestData, isIPv6 bool) (svcB *corev1.Service, svcC *corev1.Service, err error) {
svcIPFamily := corev1.IPv4Protocol
if isIPv6 {
svcIPFamily = corev1.IPv6Protocol
}
podBIP, err = data.podWaitForIPs(defaultTimeout, "perftest-b", testNamespace)

svcB, err = data.createService("perftest-b", iperfPort, iperfPort, map[string]string{"antrea-e2e": "perftest-b"}, false, v1.ServiceTypeClusterIP, &svcIPFamily)
if err != nil {
return nil, nil, nil, nil, nil, fmt.Errorf("Error when getting the perftest server Pod's IP: %v", err)
return nil, nil, fmt.Errorf("Error when creating perftest service: %v", err)
}

// svcC will be needed when adding RemoteServiceAccess testcase
svcC, err = data.createService("perftest-c", iperfPort, iperfPort, map[string]string{"antrea-e2e": "perftest-c"}, false, v1.ServiceTypeClusterIP, &svcIPFamily)
if err != nil {
return nil, nil, nil, nil, nil, fmt.Errorf("Error when creating perftest service: %v", err)
return nil, nil, fmt.Errorf("Error when creating perftest service: %v", err)
}

if err := data.createPodOnNode("perftest-c", workerNodeName(1), perftoolImage, nil, nil, nil, []v1.ContainerPort{{Protocol: v1.ProtocolTCP, ContainerPort: iperfPort}}, false, nil); err != nil {
return nil, nil, nil, nil, nil, fmt.Errorf("Error when creating the perftest server Pod: %v", err)
return svcB, svcC, nil
}

func deletePerftestServices(data *TestData) error {
err := data.deleteService("perftest-b")
if err != nil {
return fmt.Errorf("Error when deleting perftest service: %v", err)
}
podCIP, err = data.podWaitForIPs(defaultTimeout, "perftest-c", testNamespace)
err = data.deleteService("perftest-c")
if err != nil {
return nil, nil, nil, nil, nil, fmt.Errorf("Error when getting the perftest server Pod's IP: %v", err)
return fmt.Errorf("Error when deleting perftest service: %v", err)
}
return podAIP, podBIP, podCIP, svcB, svcC, nil
return nil
}
9 changes: 0 additions & 9 deletions third_party/proxy/meta_proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,15 +159,6 @@ func (proxier *metaProxier) OnEndpointsSynced() {
proxier.ipv6Proxier.OnEndpointsSynced()
}

func (proxier *metaProxier) GetServiceByIP(serviceStr string) (ServicePortName, bool) {
if utilnet.IsIPv6String(serviceStr) {
return proxier.ipv6Proxier.GetServiceByIP(serviceStr)
} else {
return proxier.ipv4Proxier.GetServiceByIP(serviceStr)
}

}

func (proxier *metaProxier) Run(stopCh <-chan struct{}) {
go proxier.ipv4Proxier.Run(stopCh)
proxier.ipv6Proxier.Run(stopCh)
Expand Down
18 changes: 1 addition & 17 deletions third_party/proxy/testing/mock_proxy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit e2e02c3

Please sign in to comment.