Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add IPv4/v6 dual stack support in Flow aggregator #1962

Merged
merged 1 commit into from
Mar 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ import (
"github.com/vmware-tanzu/antrea/pkg/signals"
"github.com/vmware-tanzu/antrea/pkg/util/cipher"
"github.com/vmware-tanzu/antrea/pkg/version"
k8sproxy "github.com/vmware-tanzu/antrea/third_party/proxy"
)

// informerDefaultResync is the default resync period if a handler doesn't specify one.
Expand Down Expand Up @@ -339,18 +338,14 @@ func run(o *Options) error {
v4Enabled := config.IsIPv4Enabled(nodeConfig, networkConfig.TrafficEncapMode)
v6Enabled := config.IsIPv6Enabled(nodeConfig, networkConfig.TrafficEncapMode)

var proxyProvider k8sproxy.Provider
if proxier != nil {
proxyProvider = proxier.GetProxyProvider()
}
flowRecords := flowrecords.NewFlowRecords()
connStore := connections.NewConnectionStore(
connections.InitializeConnTrackDumper(nodeConfig, serviceCIDRNet, serviceCIDRNetv6, ovsDatapathType, features.DefaultFeatureGate.Enabled(features.AntreaProxy)),
flowRecords,
ifaceStore,
v4Enabled,
v6Enabled,
proxyProvider,
proxier,
networkPolicyController,
o.pollInterval)
go connStore.Run(stopCh)
Expand Down
6 changes: 3 additions & 3 deletions pkg/agent/flowexporter/connections/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
"github.com/vmware-tanzu/antrea/pkg/agent/interfacestore"
"github.com/vmware-tanzu/antrea/pkg/agent/metrics"
"github.com/vmware-tanzu/antrea/pkg/agent/openflow"
"github.com/vmware-tanzu/antrea/pkg/agent/proxy"
"github.com/vmware-tanzu/antrea/pkg/querier"
"github.com/vmware-tanzu/antrea/third_party/proxy"
)

var serviceProtocolMap = map[uint8]corev1.Protocol{
Expand Down Expand Up @@ -58,7 +58,7 @@ type connectionStore struct {
ifaceStore interfacestore.InterfaceStore
v4Enabled bool
v6Enabled bool
antreaProxier proxy.Provider
antreaProxier proxy.Proxier
networkPolicyQuerier querier.AgentNetworkPolicyInfoQuerier
pollInterval time.Duration
mutex sync.Mutex
Expand All @@ -70,7 +70,7 @@ func NewConnectionStore(
ifaceStore interfacestore.InterfaceStore,
v4Enabled bool,
v6Enabled bool,
proxier proxy.Provider,
proxier proxy.Proxier,
npQuerier querier.AgentNetworkPolicyInfoQuerier,
pollInterval time.Duration,
) *connectionStore {
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/flowexporter/connections/connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ import (
interfacestoretest "github.com/vmware-tanzu/antrea/pkg/agent/interfacestore/testing"
"github.com/vmware-tanzu/antrea/pkg/agent/metrics"
"github.com/vmware-tanzu/antrea/pkg/agent/openflow"
proxytest "github.com/vmware-tanzu/antrea/pkg/agent/proxy/testing"
cpv1beta "github.com/vmware-tanzu/antrea/pkg/apis/controlplane/v1beta2"
queriertest "github.com/vmware-tanzu/antrea/pkg/querier/testing"
k8sproxy "github.com/vmware-tanzu/antrea/third_party/proxy"
k8proxytest "github.com/vmware-tanzu/antrea/third_party/proxy/testing"
)

var (
Expand Down Expand Up @@ -164,7 +164,7 @@ func TestConnectionStore_addAndUpdateConn(t *testing.T) {
// Mock interface store with one of the couple of IPs correspond to Pods
mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl)
mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl)
mockProxier := k8proxytest.NewMockProvider(ctrl)
mockProxier := proxytest.NewMockProxier(ctrl)
npQuerier := queriertest.NewMockAgentNetworkPolicyInfoQuerier(ctrl)
connStore := NewConnectionStore(mockConnDumper, flowrecords.NewFlowRecords(), mockIfaceStore, true, false, mockProxier, npQuerier, testPollInterval)

Expand Down
14 changes: 14 additions & 0 deletions pkg/agent/proxy/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package proxy
import (
"fmt"
"net"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -53,6 +54,9 @@ type Proxier interface {
// flows and the OVS group IDs for a Service. False is returned if the
// Service is not found.
GetServiceFlowKeys(serviceName, namespace string) ([]string, []binding.GroupIDType, bool)
// GetServiceByIP returns the ServicePortName struct for the given serviceString(ClusterIP:Port/Proto).
// False is returned if the serviceString is not found in serviceStringMap.
GetServiceByIP(serviceStr string) (k8sproxy.ServicePortName, bool)
antoninbas marked this conversation as resolved.
Show resolved Hide resolved
}

type proxier struct {
Expand Down Expand Up @@ -639,6 +643,16 @@ func (p *metaProxierWrapper) GetServiceFlowKeys(serviceName, namespace string) (
return append(v4Flows, v6Flows...), append(v4Groups, v6Groups...), v4Found || v6Found
}

func (p *metaProxierWrapper) GetServiceByIP(serviceStr string) (k8sproxy.ServicePortName, bool) {
// Format of serviceStr is <clusterIP>:<svcPort>/<protocol>.
lastColonIndex := strings.LastIndex(serviceStr, ":")
if utilnet.IsIPv6String(serviceStr[:lastColonIndex]) {
return p.ipv6Proxier.GetServiceByIP(serviceStr)
} else {
return p.ipv4Proxier.GetServiceByIP(serviceStr)
}
}

func NewDualStackProxier(
hostname string, informerFactory informers.SharedInformerFactory, ofClient openflow.Client) *metaProxierWrapper {

Expand Down
15 changes: 15 additions & 0 deletions pkg/agent/proxy/testing/mock_proxy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 10 additions & 18 deletions test/e2e/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,6 @@ func skipIfNotIPv6Cluster(tb testing.TB) {
}
}

func skipIfDualStackCluster(tb testing.TB) {
if clusterInfo.podV6NetworkCIDR != "" && clusterInfo.podV4NetworkCIDR != "" {
tb.Skipf("Skipping test as it is not supported in dual stack cluster")
}
}
Comment on lines -71 to -75
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep it for future use?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, addressed.


func skipIfMissingKernelModule(tb testing.TB, nodeName string, requiredModules []string) {
for _, module := range requiredModules {
// modprobe with "--dry-run" does not require root privileges
Expand Down Expand Up @@ -147,14 +141,12 @@ func setupTest(tb testing.TB) (*TestData, error) {
return testData, nil
}

func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, bool, error) {
isIPv6 := false
if clusterInfo.podV6NetworkCIDR != "" {
isIPv6 = true
}
func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, bool, bool, error) {
v4Enabled := clusterInfo.podV4NetworkCIDR != ""
v6Enabled := clusterInfo.podV6NetworkCIDR != ""
testData, err := setupTest(tb)
if err != nil {
return testData, isIPv6, err
return testData, v4Enabled, v6Enabled, err
}
// Create pod using ipfix collector image
if err = testData.createPodOnNode("ipfix-collector", "", ipfixCollectorImage, nil, nil, nil, nil, true, nil); err != nil {
Expand All @@ -163,10 +155,10 @@ func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, bool, error) {
ipfixCollectorIP, err := testData.podWaitForIPs(defaultTimeout, "ipfix-collector", testNamespace)
if err != nil || len(ipfixCollectorIP.ipStrings) == 0 {
tb.Errorf("Error when waiting to get ipfix collector Pod IP: %v", err)
return nil, isIPv6, err
return nil, v4Enabled, v6Enabled, err
}
var ipStr string
if isIPv6 && ipfixCollectorIP.ipv6 != nil {
if v6Enabled && ipfixCollectorIP.ipv6 != nil {
ipStr = ipfixCollectorIP.ipv6.String()
} else {
ipStr = ipfixCollectorIP.ipv4.String()
Expand All @@ -177,7 +169,7 @@ func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, bool, error) {
tb.Logf("Applying flow aggregator YAML with ipfix collector address: %s", ipfixCollectorAddr)
faClusterIP, err := testData.deployFlowAggregator(ipfixCollectorAddr)
if err != nil {
return testData, isIPv6, err
return testData, v4Enabled, v6Enabled, err
}
if testOptions.providerName == "kind" {
// In Kind cluster, there are issues with DNS name resolution on worker nodes.
Expand All @@ -186,14 +178,14 @@ func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, bool, error) {
}
tb.Logf("Deploying flow exporter with collector address: %s", faClusterIPAddr)
if err = testData.deployAntreaFlowExporter(faClusterIPAddr); err != nil {
return testData, isIPv6, err
return testData, v4Enabled, v6Enabled, err
}

tb.Logf("Checking CoreDNS deployment")
if err = testData.checkCoreDNSPods(defaultTimeout); err != nil {
return testData, isIPv6, err
return testData, v4Enabled, v6Enabled, err
}
return testData, isIPv6, nil
return testData, v4Enabled, v6Enabled, nil
}

func exportLogs(tb testing.TB, data *TestData, logsSubDir string, writeNodeLogs bool) {
Expand Down
95 changes: 61 additions & 34 deletions test/e2e/flowaggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,18 +102,33 @@ const (
)

func TestFlowAggregator(t *testing.T) {
skipIfDualStackCluster(t)
data, isIPv6, err := setupTestWithIPFIXCollector(t)
data, v4Enabled, v6Enabled, err := setupTestWithIPFIXCollector(t)
if err != nil {
t.Fatalf("Error when setting up test: %v", err)
}
defer teardownTest(t, data)
defer teardownFlowAggregator(t, data)

podAIP, podBIP, podCIP, svcB, svcC, err := createPerftestPods(data, isIPv6)
podAIPs, podBIPs, podCIPs, err := createPerftestPods(data)
if err != nil {
t.Fatalf("Error when creating perftest pods and services: %v", err)
t.Fatalf("Error when creating perftest Pods: %v", err)
}

if v4Enabled {
t.Run("IPv4", func(t *testing.T) { testHelper(t, data, podAIPs, podBIPs, podCIPs, false) })
}

if v6Enabled {
t.Run("IPv6", func(t *testing.T) { testHelper(t, data, podAIPs, podBIPs, podCIPs, true) })
}
}

func testHelper(t *testing.T, data *TestData, podAIPs *PodIPs, podBIPs *PodIPs, podCIPs *PodIPs, isIPv6 bool) {
svcB, svcC, err := createPerftestServices(data, isIPv6)
if err != nil {
t.Fatalf("Error when creating perftest Services: %v", err)
}
defer deletePerftestServices(t, data)
// Wait for the Service to be realized.
time.Sleep(3 * time.Second)

Expand All @@ -134,9 +149,9 @@ func TestFlowAggregator(t *testing.T) {
}()
// TODO: Skipping bandwidth check for Intra-Node flows as it is flaky.
if !isIPv6 {
checkRecordsForFlows(t, data, podAIP.ipv4.String(), podBIP.ipv4.String(), isIPv6, true, false, true, false)
checkRecordsForFlows(t, data, podAIPs.ipv4.String(), podBIPs.ipv4.String(), isIPv6, true, false, true, false)
} else {
checkRecordsForFlows(t, data, podAIP.ipv6.String(), podBIP.ipv6.String(), isIPv6, true, false, true, false)
checkRecordsForFlows(t, data, podAIPs.ipv6.String(), podBIPs.ipv6.String(), isIPv6, true, false, true, false)
}
})

Expand All @@ -157,9 +172,9 @@ func TestFlowAggregator(t *testing.T) {
}
}()
if !isIPv6 {
checkRecordsForFlows(t, data, podAIP.ipv4.String(), podCIP.ipv4.String(), isIPv6, false, false, true, true)
checkRecordsForFlows(t, data, podAIPs.ipv4.String(), podCIPs.ipv4.String(), isIPv6, false, false, true, true)
} else {
checkRecordsForFlows(t, data, podAIP.ipv6.String(), podCIP.ipv6.String(), isIPv6, false, false, true, true)
checkRecordsForFlows(t, data, podAIPs.ipv6.String(), podCIPs.ipv6.String(), isIPv6, false, false, true, true)
}
})

Expand All @@ -168,19 +183,19 @@ func TestFlowAggregator(t *testing.T) {
skipIfProxyDisabled(t, data)
// TODO: Skipping bandwidth check for LocalServiceAccess flows as it is flaky.
if !isIPv6 {
checkRecordsForFlows(t, data, podAIP.ipv4.String(), svcB.Spec.ClusterIP, isIPv6, true, true, false, false)
checkRecordsForFlows(t, data, podAIPs.ipv4.String(), svcB.Spec.ClusterIP, isIPv6, true, true, false, false)
} else {
checkRecordsForFlows(t, data, podAIP.ipv6.String(), svcB.Spec.ClusterIP, isIPv6, true, true, false, false)
checkRecordsForFlows(t, data, podAIPs.ipv6.String(), svcB.Spec.ClusterIP, isIPv6, true, true, false, false)
}
})

// RemoteServiceAccess tests the case, where Pod and Service are deployed on different Nodes and their flow information is exported as IPFIX flow records.
t.Run("RemoteServiceAccess", func(t *testing.T) {
skipIfProxyDisabled(t, data)
if !isIPv6 {
checkRecordsForFlows(t, data, podAIP.ipv4.String(), svcC.Spec.ClusterIP, isIPv6, false, true, false, true)
checkRecordsForFlows(t, data, podAIPs.ipv4.String(), svcC.Spec.ClusterIP, isIPv6, false, true, false, true)
} else {
checkRecordsForFlows(t, data, podAIP.ipv6.String(), svcC.Spec.ClusterIP, isIPv6, false, true, false, true)
checkRecordsForFlows(t, data, podAIPs.ipv6.String(), svcC.Spec.ClusterIP, isIPv6, false, true, false, true)
}
})
}
Expand Down Expand Up @@ -376,45 +391,57 @@ func deployNetworkPolicies(t *testing.T, data *TestData, srcPod, dstPod string)
return np1, np2
}

func createPerftestPods(data *TestData, isIPv6 bool) (podAIP *PodIPs, podBIP *PodIPs, podCIP *PodIPs, svcB *corev1.Service, svcC *corev1.Service, err error) {
func createPerftestPods(data *TestData) (podAIPs *PodIPs, podBIPs *PodIPs, podCIPs *PodIPs, err error) {
if err := data.createPodOnNode("perftest-a", controlPlaneNodeName(), perftoolImage, nil, nil, nil, nil, false, nil); err != nil {
return nil, nil, nil, nil, nil, fmt.Errorf("Error when creating the perftest client Pod: %v", err)
return nil, nil, nil, fmt.Errorf("Error when creating the perftest client Pod: %v", err)
}
podAIP, err = data.podWaitForIPs(defaultTimeout, "perftest-a", testNamespace)
podAIPs, err = data.podWaitForIPs(defaultTimeout, "perftest-a", testNamespace)
if err != nil {
return nil, nil, nil, nil, nil, fmt.Errorf("Error when waiting for the perftest client Pod: %v", err)
return nil, nil, nil, fmt.Errorf("Error when waiting for the perftest client Pod: %v", err)
}

svcIPFamily := corev1.IPv4Protocol
if isIPv6 {
svcIPFamily = corev1.IPv6Protocol
if err := data.createPodOnNode("perftest-b", controlPlaneNodeName(), perftoolImage, nil, nil, nil, []v1.ContainerPort{{Protocol: v1.ProtocolTCP, ContainerPort: iperfPort}}, false, nil); err != nil {
return nil, nil, nil, fmt.Errorf("Error when creating the perftest server Pod: %v", err)
}
podBIPs, err = data.podWaitForIPs(defaultTimeout, "perftest-b", testNamespace)
if err != nil {
return nil, nil, nil, fmt.Errorf("Error when getting the perftest server Pod's IPs: %v", err)
}

svcB, err = data.createService("perftest-b", iperfPort, iperfPort, map[string]string{"antrea-e2e": "perftest-b"}, false, v1.ServiceTypeClusterIP, &svcIPFamily)
if err := data.createPodOnNode("perftest-c", workerNodeName(1), perftoolImage, nil, nil, nil, []v1.ContainerPort{{Protocol: v1.ProtocolTCP, ContainerPort: iperfPort}}, false, nil); err != nil {
return nil, nil, nil, fmt.Errorf("Error when creating the perftest server Pod: %v", err)
}
podCIPs, err = data.podWaitForIPs(defaultTimeout, "perftest-c", testNamespace)
if err != nil {
return nil, nil, nil, nil, nil, fmt.Errorf("Error when creating perftest service: %v", err)
return nil, nil, nil, fmt.Errorf("Error when getting the perftest server Pod's IPs: %v", err)
}
return podAIPs, podBIPs, podCIPs, nil
}

if err := data.createPodOnNode("perftest-b", controlPlaneNodeName(), perftoolImage, nil, nil, nil, []v1.ContainerPort{{Protocol: v1.ProtocolTCP, ContainerPort: iperfPort}}, false, nil); err != nil {
return nil, nil, nil, nil, nil, fmt.Errorf("Error when creating the perftest server Pod: %v", err)
func createPerftestServices(data *TestData, isIPv6 bool) (svcB *corev1.Service, svcC *corev1.Service, err error) {
svcIPFamily := corev1.IPv4Protocol
if isIPv6 {
svcIPFamily = corev1.IPv6Protocol
}
podBIP, err = data.podWaitForIPs(defaultTimeout, "perftest-b", testNamespace)

svcB, err = data.createService("perftest-b", iperfPort, iperfPort, map[string]string{"antrea-e2e": "perftest-b"}, false, v1.ServiceTypeClusterIP, &svcIPFamily)
if err != nil {
return nil, nil, nil, nil, nil, fmt.Errorf("Error when getting the perftest server Pod's IP: %v", err)
return nil, nil, fmt.Errorf("Error when creating perftest-b Service: %v", err)
}

// svcC will be needed when adding RemoteServiceAccess testcase
svcC, err = data.createService("perftest-c", iperfPort, iperfPort, map[string]string{"antrea-e2e": "perftest-c"}, false, v1.ServiceTypeClusterIP, &svcIPFamily)
if err != nil {
return nil, nil, nil, nil, nil, fmt.Errorf("Error when creating perftest service: %v", err)
return nil, nil, fmt.Errorf("Error when creating perftest-c Service: %v", err)
}

if err := data.createPodOnNode("perftest-c", workerNodeName(1), perftoolImage, nil, nil, nil, []v1.ContainerPort{{Protocol: v1.ProtocolTCP, ContainerPort: iperfPort}}, false, nil); err != nil {
return nil, nil, nil, nil, nil, fmt.Errorf("Error when creating the perftest server Pod: %v", err)
}
podCIP, err = data.podWaitForIPs(defaultTimeout, "perftest-c", testNamespace)
if err != nil {
return nil, nil, nil, nil, nil, fmt.Errorf("Error when getting the perftest server Pod's IP: %v", err)
return svcB, svcC, nil
}

func deletePerftestServices(t *testing.T, data *TestData) {
for _, serviceName := range []string{"perftest-b", "perftest-c"} {
err := data.deleteService(serviceName)
if err != nil {
t.Logf("Error when deleting %s Service: %v", serviceName, err)
}
}
return podAIP, podBIP, podCIP, svcB, svcC, nil
}
9 changes: 0 additions & 9 deletions third_party/proxy/meta_proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,15 +159,6 @@ func (proxier *metaProxier) OnEndpointsSynced() {
proxier.ipv6Proxier.OnEndpointsSynced()
}

func (proxier *metaProxier) GetServiceByIP(serviceStr string) (ServicePortName, bool) {
if utilnet.IsIPv6String(serviceStr) {
return proxier.ipv6Proxier.GetServiceByIP(serviceStr)
} else {
return proxier.ipv4Proxier.GetServiceByIP(serviceStr)
}

}

func (proxier *metaProxier) Run(stopCh <-chan struct{}) {
go proxier.ipv4Proxier.Run(stopCh)
proxier.ipv6Proxier.Run(stopCh)
Expand Down
Loading