diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 44b3c9d3875..209de4b0d4e 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -52,7 +52,6 @@ import ( "github.com/vmware-tanzu/antrea/pkg/signals" "github.com/vmware-tanzu/antrea/pkg/util/cipher" "github.com/vmware-tanzu/antrea/pkg/version" - k8sproxy "github.com/vmware-tanzu/antrea/third_party/proxy" ) // informerDefaultResync is the default resync period if a handler doesn't specify one. @@ -339,10 +338,6 @@ func run(o *Options) error { v4Enabled := config.IsIPv4Enabled(nodeConfig, networkConfig.TrafficEncapMode) v6Enabled := config.IsIPv6Enabled(nodeConfig, networkConfig.TrafficEncapMode) - var proxyProvider k8sproxy.Provider - if proxier != nil { - proxyProvider = proxier.GetProxyProvider() - } flowRecords := flowrecords.NewFlowRecords() connStore := connections.NewConnectionStore( connections.InitializeConnTrackDumper(nodeConfig, serviceCIDRNet, serviceCIDRNetv6, ovsDatapathType, features.DefaultFeatureGate.Enabled(features.AntreaProxy)), @@ -350,7 +345,7 @@ func run(o *Options) error { ifaceStore, v4Enabled, v6Enabled, - proxyProvider, + proxier, networkPolicyController, o.pollInterval) go connStore.Run(stopCh) diff --git a/pkg/agent/flowexporter/connections/connections.go b/pkg/agent/flowexporter/connections/connections.go index bf7d6c58fb4..a07fe252a14 100644 --- a/pkg/agent/flowexporter/connections/connections.go +++ b/pkg/agent/flowexporter/connections/connections.go @@ -28,8 +28,8 @@ import ( "github.com/vmware-tanzu/antrea/pkg/agent/interfacestore" "github.com/vmware-tanzu/antrea/pkg/agent/metrics" "github.com/vmware-tanzu/antrea/pkg/agent/openflow" + "github.com/vmware-tanzu/antrea/pkg/agent/proxy" "github.com/vmware-tanzu/antrea/pkg/querier" - "github.com/vmware-tanzu/antrea/third_party/proxy" ) var serviceProtocolMap = map[uint8]corev1.Protocol{ @@ -58,7 +58,7 @@ type connectionStore struct { ifaceStore interfacestore.InterfaceStore v4Enabled bool v6Enabled bool - antreaProxier proxy.Provider + antreaProxier proxy.Proxier networkPolicyQuerier querier.AgentNetworkPolicyInfoQuerier pollInterval time.Duration mutex sync.Mutex @@ -70,7 +70,7 @@ func NewConnectionStore( ifaceStore interfacestore.InterfaceStore, v4Enabled bool, v6Enabled bool, - proxier proxy.Provider, + proxier proxy.Proxier, npQuerier querier.AgentNetworkPolicyInfoQuerier, pollInterval time.Duration, ) *connectionStore { diff --git a/pkg/agent/flowexporter/connections/connections_test.go b/pkg/agent/flowexporter/connections/connections_test.go index a2884ecb80e..6b5e5dc7669 100644 --- a/pkg/agent/flowexporter/connections/connections_test.go +++ b/pkg/agent/flowexporter/connections/connections_test.go @@ -37,10 +37,10 @@ import ( interfacestoretest "github.com/vmware-tanzu/antrea/pkg/agent/interfacestore/testing" "github.com/vmware-tanzu/antrea/pkg/agent/metrics" "github.com/vmware-tanzu/antrea/pkg/agent/openflow" + proxytest "github.com/vmware-tanzu/antrea/pkg/agent/proxy/testing" cpv1beta "github.com/vmware-tanzu/antrea/pkg/apis/controlplane/v1beta2" queriertest "github.com/vmware-tanzu/antrea/pkg/querier/testing" k8sproxy "github.com/vmware-tanzu/antrea/third_party/proxy" - k8proxytest "github.com/vmware-tanzu/antrea/third_party/proxy/testing" ) var ( @@ -164,7 +164,7 @@ func TestConnectionStore_addAndUpdateConn(t *testing.T) { // Mock interface store with one of the couple of IPs correspond to Pods mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl) mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl) - mockProxier := k8proxytest.NewMockProvider(ctrl) + mockProxier := proxytest.NewMockProxier(ctrl) npQuerier := queriertest.NewMockAgentNetworkPolicyInfoQuerier(ctrl) connStore := NewConnectionStore(mockConnDumper, flowrecords.NewFlowRecords(), mockIfaceStore, true, false, mockProxier, npQuerier, testPollInterval) diff --git a/pkg/agent/proxy/proxier.go b/pkg/agent/proxy/proxier.go index 7d4a3b8ce8c..8a99c770a15 100644 --- a/pkg/agent/proxy/proxier.go +++ b/pkg/agent/proxy/proxier.go @@ -17,6 +17,7 @@ package proxy import ( "fmt" "net" + "strings" "sync" "time" @@ -53,6 +54,9 @@ type Proxier interface { // flows and the OVS group IDs for a Service. False is returned if the // Service is not found. GetServiceFlowKeys(serviceName, namespace string) ([]string, []binding.GroupIDType, bool) + // GetServiceByIP returns the ServicePortName struct of giving serviceString(ClusterIP:Port/Proto). + // False is returned if the serviceString is not existed in serviceStringMap. + GetServiceByIP(serviceStr string) (k8sproxy.ServicePortName, bool) } type proxier struct { @@ -639,6 +643,16 @@ func (p *metaProxierWrapper) GetServiceFlowKeys(serviceName, namespace string) ( return append(v4Flows, v6Flows...), append(v4Groups, v6Groups...), v4Found || v6Found } +func (p *metaProxierWrapper) GetServiceByIP(serviceStr string) (k8sproxy.ServicePortName, bool) { + // Format of serviceStr is fmt.Sprintf("%s:%d/%s", clusterIP, svcPort, protocol). + lastColonIndex := strings.LastIndex(serviceStr, ":") + if utilnet.IsIPv6String(serviceStr[:lastColonIndex]) { + return p.ipv6Proxier.GetServiceByIP(serviceStr) + } else { + return p.ipv4Proxier.GetServiceByIP(serviceStr) + } +} + func NewDualStackProxier( hostname string, informerFactory informers.SharedInformerFactory, ofClient openflow.Client) *metaProxierWrapper { diff --git a/pkg/agent/proxy/testing/mock_proxy.go b/pkg/agent/proxy/testing/mock_proxy.go index 4c3d11f1b64..b758910ab7b 100644 --- a/pkg/agent/proxy/testing/mock_proxy.go +++ b/pkg/agent/proxy/testing/mock_proxy.go @@ -63,6 +63,21 @@ func (mr *MockProxierMockRecorder) GetProxyProvider() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetProxyProvider", reflect.TypeOf((*MockProxier)(nil).GetProxyProvider)) } +// GetServiceByIP mocks base method +func (m *MockProxier) GetServiceByIP(arg0 string) (proxy.ServicePortName, bool) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetServiceByIP", arg0) + ret0, _ := ret[0].(proxy.ServicePortName) + ret1, _ := ret[1].(bool) + return ret0, ret1 +} + +// GetServiceByIP indicates an expected call of GetServiceByIP +func (mr *MockProxierMockRecorder) GetServiceByIP(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetServiceByIP", reflect.TypeOf((*MockProxier)(nil).GetServiceByIP), arg0) +} + // GetServiceFlowKeys mocks base method func (m *MockProxier) GetServiceFlowKeys(arg0, arg1 string) ([]string, []openflow.GroupIDType, bool) { m.ctrl.T.Helper() diff --git a/test/e2e/fixtures.go b/test/e2e/fixtures.go index 9f15fed5992..bfbb312d0ff 100644 --- a/test/e2e/fixtures.go +++ b/test/e2e/fixtures.go @@ -68,12 +68,6 @@ func skipIfNotIPv6Cluster(tb testing.TB) { } } -func skipIfDualStackCluster(tb testing.TB) { - if clusterInfo.podV6NetworkCIDR != "" && clusterInfo.podV4NetworkCIDR != "" { - tb.Skipf("Skipping test as it is not supported in dual stack cluster") - } -} - func skipIfMissingKernelModule(tb testing.TB, nodeName string, requiredModules []string) { for _, module := range requiredModules { // modprobe with "--dry-run" does not require root privileges @@ -147,14 +141,12 @@ func setupTest(tb testing.TB) (*TestData, error) { return testData, nil } -func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, bool, error) { - isIPv6 := false - if clusterInfo.podV6NetworkCIDR != "" { - isIPv6 = true - } +func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, bool, bool, error) { + v4Enabled := clusterInfo.podV4NetworkCIDR != "" + v6Enabled := clusterInfo.podV6NetworkCIDR != "" testData, err := setupTest(tb) if err != nil { - return testData, isIPv6, err + return testData, v4Enabled, v6Enabled, err } // Create pod using ipfix collector image if err = testData.createPodOnNode("ipfix-collector", "", ipfixCollectorImage, nil, nil, nil, nil, true, nil); err != nil { @@ -163,10 +155,10 @@ func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, bool, error) { ipfixCollectorIP, err := testData.podWaitForIPs(defaultTimeout, "ipfix-collector", testNamespace) if err != nil || len(ipfixCollectorIP.ipStrings) == 0 { tb.Errorf("Error when waiting to get ipfix collector Pod IP: %v", err) - return nil, isIPv6, err + return nil, v4Enabled, v6Enabled, err } var ipStr string - if isIPv6 && ipfixCollectorIP.ipv6 != nil { + if v6Enabled && ipfixCollectorIP.ipv6 != nil { ipStr = ipfixCollectorIP.ipv6.String() } else { ipStr = ipfixCollectorIP.ipv4.String() @@ -177,7 +169,7 @@ func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, bool, error) { tb.Logf("Applying flow aggregator YAML with ipfix collector address: %s", ipfixCollectorAddr) faClusterIP, err := testData.deployFlowAggregator(ipfixCollectorAddr) if err != nil { - return testData, isIPv6, err + return testData, v4Enabled, v6Enabled, err } if testOptions.providerName == "kind" { // In Kind cluster, there are issues with DNS name resolution on worker nodes. @@ -186,14 +178,14 @@ func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, bool, error) { } tb.Logf("Deploying flow exporter with collector address: %s", faClusterIPAddr) if err = testData.deployAntreaFlowExporter(faClusterIPAddr); err != nil { - return testData, isIPv6, err + return testData, v4Enabled, v6Enabled, err } tb.Logf("Checking CoreDNS deployment") if err = testData.checkCoreDNSPods(defaultTimeout); err != nil { - return testData, isIPv6, err + return testData, v4Enabled, v6Enabled, err } - return testData, isIPv6, nil + return testData, v4Enabled, v6Enabled, nil } func exportLogs(tb testing.TB, data *TestData, logsSubDir string, writeNodeLogs bool) { diff --git a/test/e2e/flowaggregator_test.go b/test/e2e/flowaggregator_test.go index 1ea578c1212..8c3aac3de5f 100644 --- a/test/e2e/flowaggregator_test.go +++ b/test/e2e/flowaggregator_test.go @@ -102,21 +102,45 @@ const ( ) func TestFlowAggregator(t *testing.T) { - skipIfDualStackCluster(t) - data, isIPv6, err := setupTestWithIPFIXCollector(t) + data, v4Enabled, v6Enabled, err := setupTestWithIPFIXCollector(t) if err != nil { t.Fatalf("Error when setting up test: %v", err) } defer teardownTest(t, data) defer teardownFlowAggregator(t, data) - podAIP, podBIP, podCIP, svcB, svcC, err := createPerftestPods(data, isIPv6) + podAIP, podBIP, podCIP, err := createPerftestPods(data) if err != nil { - t.Fatalf("Error when creating perftest pods and services: %v", err) + t.Fatalf("Error when creating perftest pods: %v", err) } - // Wait for the Service to be realized. - time.Sleep(3 * time.Second) + if v4Enabled { + svcB, svcC, err := createPerftestServices(data, false) + if err != nil { + t.Fatalf("Error when creating IPv4 perftest services: %v", err) + } + // Wait for the Service to be realized. + time.Sleep(3 * time.Second) + + runTests(t, data, podAIP, podBIP, podCIP, svcB, svcC, false) + deletePerftestServices(data) + } + + if v6Enabled { + svcB, svcC, err := createPerftestServices(data, true) + if err != nil { + t.Fatalf("Error when creating IPv6 perftest services: %v", err) + } + // Wait for the Service to be realized. + time.Sleep(3 * time.Second) + + runTests(t, data, podAIP, podBIP, podCIP, svcB, svcC, true) + deletePerftestServices(data) + } +} + +func runTests(t *testing.T, data *TestData, podAIP *PodIPs, podBIP *PodIPs, podCIP *PodIPs, svcB *corev1.Service, svcC *corev1.Service, isIPv6 bool) { + var err error // IntraNodeFlows tests the case, where Pods are deployed on same Node and their flow information is exported as IPFIX flow records. t.Run("IntraNodeFlows", func(t *testing.T) { np1, np2 := deployNetworkPolicies(t, data, "perftest-a", "perftest-b") @@ -376,45 +400,60 @@ func deployNetworkPolicies(t *testing.T, data *TestData, srcPod, dstPod string) return np1, np2 } -func createPerftestPods(data *TestData, isIPv6 bool) (podAIP *PodIPs, podBIP *PodIPs, podCIP *PodIPs, svcB *corev1.Service, svcC *corev1.Service, err error) { +func createPerftestPods(data *TestData) (podAIP *PodIPs, podBIP *PodIPs, podCIP *PodIPs, err error) { if err := data.createPodOnNode("perftest-a", controlPlaneNodeName(), perftoolImage, nil, nil, nil, nil, false, nil); err != nil { - return nil, nil, nil, nil, nil, fmt.Errorf("Error when creating the perftest client Pod: %v", err) + return nil, nil, nil, fmt.Errorf("Error when creating the perftest client Pod: %v", err) } podAIP, err = data.podWaitForIPs(defaultTimeout, "perftest-a", testNamespace) if err != nil { - return nil, nil, nil, nil, nil, fmt.Errorf("Error when waiting for the perftest client Pod: %v", err) + return nil, nil, nil, fmt.Errorf("Error when waiting for the perftest client Pod: %v", err) } - svcIPFamily := corev1.IPv4Protocol - if isIPv6 { - svcIPFamily = corev1.IPv6Protocol + if err := data.createPodOnNode("perftest-b", controlPlaneNodeName(), perftoolImage, nil, nil, nil, []v1.ContainerPort{{Protocol: v1.ProtocolTCP, ContainerPort: iperfPort}}, false, nil); err != nil { + return nil, nil, nil, fmt.Errorf("Error when creating the perftest server Pod: %v", err) + } + podBIP, err = data.podWaitForIPs(defaultTimeout, "perftest-b", testNamespace) + if err != nil { + return nil, nil, nil, fmt.Errorf("Error when getting the perftest server Pod's IP: %v", err) } - svcB, err = data.createService("perftest-b", iperfPort, iperfPort, map[string]string{"antrea-e2e": "perftest-b"}, false, v1.ServiceTypeClusterIP, &svcIPFamily) + if err := data.createPodOnNode("perftest-c", workerNodeName(1), perftoolImage, nil, nil, nil, []v1.ContainerPort{{Protocol: v1.ProtocolTCP, ContainerPort: iperfPort}}, false, nil); err != nil { + return nil, nil, nil, fmt.Errorf("Error when creating the perftest server Pod: %v", err) + } + podCIP, err = data.podWaitForIPs(defaultTimeout, "perftest-c", testNamespace) if err != nil { - return nil, nil, nil, nil, nil, fmt.Errorf("Error when creating perftest service: %v", err) + return nil, nil, nil, fmt.Errorf("Error when getting the perftest server Pod's IP: %v", err) } + return podAIP, podBIP, podCIP, nil +} - if err := data.createPodOnNode("perftest-b", controlPlaneNodeName(), perftoolImage, nil, nil, nil, []v1.ContainerPort{{Protocol: v1.ProtocolTCP, ContainerPort: iperfPort}}, false, nil); err != nil { - return nil, nil, nil, nil, nil, fmt.Errorf("Error when creating the perftest server Pod: %v", err) +func createPerftestServices(data *TestData, isIPv6 bool) (svcB *corev1.Service, svcC *corev1.Service, err error) { + svcIPFamily := corev1.IPv4Protocol + if isIPv6 { + svcIPFamily = corev1.IPv6Protocol } - podBIP, err = data.podWaitForIPs(defaultTimeout, "perftest-b", testNamespace) + + svcB, err = data.createService("perftest-b", iperfPort, iperfPort, map[string]string{"antrea-e2e": "perftest-b"}, false, v1.ServiceTypeClusterIP, &svcIPFamily) if err != nil { - return nil, nil, nil, nil, nil, fmt.Errorf("Error when getting the perftest server Pod's IP: %v", err) + return nil, nil, fmt.Errorf("Error when creating perftest service: %v", err) } - // svcC will be needed when adding RemoteServiceAccess testcase svcC, err = data.createService("perftest-c", iperfPort, iperfPort, map[string]string{"antrea-e2e": "perftest-c"}, false, v1.ServiceTypeClusterIP, &svcIPFamily) if err != nil { - return nil, nil, nil, nil, nil, fmt.Errorf("Error when creating perftest service: %v", err) + return nil, nil, fmt.Errorf("Error when creating perftest service: %v", err) } - if err := data.createPodOnNode("perftest-c", workerNodeName(1), perftoolImage, nil, nil, nil, []v1.ContainerPort{{Protocol: v1.ProtocolTCP, ContainerPort: iperfPort}}, false, nil); err != nil { - return nil, nil, nil, nil, nil, fmt.Errorf("Error when creating the perftest server Pod: %v", err) + return svcB, svcC, nil +} + +func deletePerftestServices(data *TestData) error { + err := data.deleteService("perftest-b") + if err != nil { + return fmt.Errorf("Error when deleting perftest service: %v", err) } - podCIP, err = data.podWaitForIPs(defaultTimeout, "perftest-c", testNamespace) + err = data.deleteService("perftest-c") if err != nil { - return nil, nil, nil, nil, nil, fmt.Errorf("Error when getting the perftest server Pod's IP: %v", err) + return fmt.Errorf("Error when deleting perftest service: %v", err) } - return podAIP, podBIP, podCIP, svcB, svcC, nil + return nil } diff --git a/third_party/proxy/meta_proxier.go b/third_party/proxy/meta_proxier.go index 046144eeb85..afe8485aac2 100644 --- a/third_party/proxy/meta_proxier.go +++ b/third_party/proxy/meta_proxier.go @@ -159,15 +159,6 @@ func (proxier *metaProxier) OnEndpointsSynced() { proxier.ipv6Proxier.OnEndpointsSynced() } -func (proxier *metaProxier) GetServiceByIP(serviceStr string) (ServicePortName, bool) { - if utilnet.IsIPv6String(serviceStr) { - return proxier.ipv6Proxier.GetServiceByIP(serviceStr) - } else { - return proxier.ipv4Proxier.GetServiceByIP(serviceStr) - } - -} - func (proxier *metaProxier) Run(stopCh <-chan struct{}) { go proxier.ipv4Proxier.Run(stopCh) proxier.ipv6Proxier.Run(stopCh) diff --git a/third_party/proxy/testing/mock_proxy.go b/third_party/proxy/testing/mock_proxy.go index 591ce6f5b91..4d8a5557a2f 100644 --- a/third_party/proxy/testing/mock_proxy.go +++ b/third_party/proxy/testing/mock_proxy.go @@ -1,4 +1,4 @@ -// Copyright 2020 Antrea Authors +// Copyright 2021 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -21,7 +21,6 @@ package testing import ( gomock "github.com/golang/mock/gomock" - proxy "github.com/vmware-tanzu/antrea/third_party/proxy" v1 "k8s.io/api/core/v1" reflect "reflect" ) @@ -49,21 +48,6 @@ func (m *MockProvider) EXPECT() *MockProviderMockRecorder { return m.recorder } -// GetServiceByIP mocks base method -func (m *MockProvider) GetServiceByIP(arg0 string) (proxy.ServicePortName, bool) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetServiceByIP", arg0) - ret0, _ := ret[0].(proxy.ServicePortName) - ret1, _ := ret[1].(bool) - return ret0, ret1 -} - -// GetServiceByIP indicates an expected call of GetServiceByIP -func (mr *MockProviderMockRecorder) GetServiceByIP(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetServiceByIP", reflect.TypeOf((*MockProvider)(nil).GetServiceByIP), arg0) -} - // OnEndpointsAdd mocks base method func (m *MockProvider) OnEndpointsAdd(arg0 *v1.Endpoints) { m.ctrl.T.Helper() diff --git a/third_party/proxy/types.go b/third_party/proxy/types.go index d30ef4dddcf..5c73f1aa7fa 100644 --- a/third_party/proxy/types.go +++ b/third_party/proxy/types.go @@ -33,6 +33,7 @@ Modifies: - Remove config.EndpointSliceHandler, config.NodeHandler from Provider interface type - Remove NodeHandler, EndpointSliceHandler, Sync() from Provider interface - Add Run(), GetServiceByIP() to Provider interface +- Remove GetServiceByIP() from Provider interface */ package proxy @@ -57,7 +58,6 @@ type Provider interface { // It does not return. SyncLoop() Run(stopCh <-chan struct{}) - GetServiceByIP(serviceStr string) (ServicePortName, bool) } // ServicePortName carries a namespace + name + portname. This is the unique