diff --git a/ci/kind/test-e2e-kind.sh b/ci/kind/test-e2e-kind.sh index fa216f7b3cc..5019ddcdde8 100755 --- a/ci/kind/test-e2e-kind.sh +++ b/ci/kind/test-e2e-kind.sh @@ -213,7 +213,7 @@ COMMON_IMAGES_LIST=("registry.k8s.io/e2e-test-images/agnhost:2.29" \ "projects.registry.vmware.com/antrea/nginx:1.21.6-alpine" \ "projects.registry.vmware.com/antrea/toolbox:1.3-0") -FLOW_VISIBILITY_IMAGE_LIST=("projects.registry.vmware.com/antrea/ipfix-collector:v0.8.2" \ +FLOW_VISIBILITY_IMAGE_LIST=("projects.registry.vmware.com/antrea/ipfix-collector:v0.9.0" \ "projects.registry.vmware.com/antrea/clickhouse-operator:0.21.0" \ "projects.registry.vmware.com/antrea/metrics-exporter:0.21.0" \ "projects.registry.vmware.com/antrea/clickhouse-server:23.4") diff --git a/go.mod b/go.mod index 191411422af..7bbfcf18707 100644 --- a/go.mod +++ b/go.mod @@ -52,7 +52,7 @@ require ( github.com/stretchr/testify v1.8.4 github.com/ti-mo/conntrack v0.5.0 github.com/vishvananda/netlink v1.2.1-beta.2 - github.com/vmware/go-ipfix v0.8.2 + github.com/vmware/go-ipfix v0.9.0 go.uber.org/mock v0.4.0 golang.org/x/crypto v0.19.0 golang.org/x/mod v0.15.0 diff --git a/go.sum b/go.sum index 31fe7d55dc7..32ea833a78d 100644 --- a/go.sum +++ b/go.sum @@ -759,8 +759,8 @@ github.com/vishvananda/netlink v1.2.1-beta.2/go.mod h1:twkDnbuQxJYemMlGd4JFIcuhg github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8= github.com/vishvananda/netns v0.0.4/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM= -github.com/vmware/go-ipfix v0.8.2 h1:7pnmXZpI0995psJgno4Bur5fr9PCxGQuKjCI/RYurzA= -github.com/vmware/go-ipfix v0.8.2/go.mod h1:NvEehcpptPOTBaLSkMA+88l2Oe8YNelVBdvj8PA/1d0= +github.com/vmware/go-ipfix v0.9.0 h1:4/N5eFliqULEaCUQV0lafOpN/1bItPE9OTAPGhrIXus= +github.com/vmware/go-ipfix v0.9.0/go.mod h1:MYEdL6Uel2ufOZyVCKvIAaw9hwnewK8aPr7rnwRbxMY= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xlab/treeprint v1.1.0 h1:G/1DjNkPpfZCFt9CSh6b5/nY4VimlbHF3Rh4obvtzDk= diff --git a/pkg/agent/flowexporter/exporter/exporter.go b/pkg/agent/flowexporter/exporter/exporter.go index 80e4643039a..34084f9ba4c 100644 --- a/pkg/agent/flowexporter/exporter/exporter.go +++ b/pkg/agent/flowexporter/exporter/exporter.go @@ -104,6 +104,7 @@ var ( "egressIP", "appProtocolName", "httpVals", + "egressNodeName", } AntreaInfoElementsIPv4 = append(antreaInfoElementsCommon, []string{"destinationClusterIPv4"}...) AntreaInfoElementsIPv6 = append(antreaInfoElementsCommon, []string{"destinationClusterIPv6"}...) @@ -595,6 +596,8 @@ func (exp *FlowExporter) addConnToSet(conn *flowexporter.Connection) error { ie.SetStringValue(conn.AppProtocolName) case "httpVals": ie.SetStringValue(conn.HttpVals) + case "egressNodeName": + ie.SetStringValue(conn.EgressNodeName) } } err := exp.ipfixSet.AddRecord(eL, templateID) @@ -641,14 +644,15 @@ func (exp *FlowExporter) findFlowType(conn flowexporter.Connection) uint8 { } func (exp *FlowExporter) fillEgressInfo(conn *flowexporter.Connection) { - egressName, egressIP, _, err := exp.egressQuerier.GetEgress(conn.SourcePodNamespace, conn.SourcePodName) + egressName, egressIP, egressNodeName, err := exp.egressQuerier.GetEgress(conn.SourcePodNamespace, conn.SourcePodName) if err != nil { // Egress is not enabled or no Egress is applied to this Pod return } conn.EgressName = egressName conn.EgressIP = egressIP - klog.V(4).InfoS("Filling Egress Info for flow", "Egress", conn.EgressName, "EgressIP", conn.EgressIP, "SourcePodNamespace", conn.SourcePodNamespace, "SourcePodName", conn.SourcePodName) + conn.EgressNodeName = egressNodeName + klog.V(4).InfoS("Filling Egress Info for flow", "Egress", conn.EgressName, "EgressIP", conn.EgressIP, "EgressNode", conn.EgressNodeName, "SourcePod", klog.KRef(conn.SourcePodNamespace, conn.SourcePodName)) } func (exp *FlowExporter) exportConn(conn *flowexporter.Connection) error { diff --git a/pkg/agent/flowexporter/exporter/exporter_test.go b/pkg/agent/flowexporter/exporter/exporter_test.go index 3d0dc3d96cd..5df827176e8 100644 --- a/pkg/agent/flowexporter/exporter/exporter_test.go +++ b/pkg/agent/flowexporter/exporter/exporter_test.go @@ -806,26 +806,29 @@ func TestFlowExporter_findFlowType(t *testing.T) { func TestFlowExporter_fillEgressInfo(t *testing.T) { ctrl := gomock.NewController(t) testCases := []struct { - name string - sourcePodNamespace string - sourcePodName string - expectedEgressName string - expectedEgressIP string - expectedErr string + name string + sourcePodNamespace string + sourcePodName string + expectedEgressName string + expectedEgressIP string + expectedEgressNodeName string + expectedErr string }{ { - name: "Both EgressName and EgressIP filled", - sourcePodNamespace: "namespaceA", - sourcePodName: "podA", - expectedEgressName: "test-egress", - expectedEgressIP: "172.18.0.1", + name: "EgressName, EgressIP and EgressNodeName filled", + sourcePodNamespace: "namespaceA", + sourcePodName: "podA", + expectedEgressName: "test-egress", + expectedEgressIP: "172.18.0.1", + expectedEgressNodeName: "test-egress-node", }, { - name: "No Egress Information filled", - sourcePodNamespace: "namespaceA", - sourcePodName: "podC", - expectedEgressName: "", - expectedEgressIP: "", + name: "No Egress Information filled", + sourcePodNamespace: "namespaceA", + sourcePodName: "podC", + expectedEgressName: "", + expectedEgressIP: "", + expectedEgressNodeName: "", }, } @@ -841,13 +844,14 @@ func TestFlowExporter_fillEgressInfo(t *testing.T) { SourcePodName: tc.sourcePodName, } if tc.expectedEgressName != "" { - egressQuerier.EXPECT().GetEgress(conn.SourcePodNamespace, conn.SourcePodName).Return(tc.expectedEgressName, tc.expectedEgressIP, "", nil) + egressQuerier.EXPECT().GetEgress(conn.SourcePodNamespace, conn.SourcePodName).Return(tc.expectedEgressName, tc.expectedEgressIP, tc.expectedEgressNodeName, nil) } else { egressQuerier.EXPECT().GetEgress(conn.SourcePodNamespace, conn.SourcePodName).Return("", "", "", fmt.Errorf("no Egress applied to Pod %s", conn.SourcePodName)) } exp.fillEgressInfo(&conn) assert.Equal(t, tc.expectedEgressName, conn.EgressName) assert.Equal(t, tc.expectedEgressIP, conn.EgressIP) + assert.Equal(t, tc.expectedEgressNodeName, conn.EgressNodeName) }) } } diff --git a/pkg/agent/flowexporter/types.go b/pkg/agent/flowexporter/types.go index f091fe7f3f2..4b8407f0708 100644 --- a/pkg/agent/flowexporter/types.go +++ b/pkg/agent/flowexporter/types.go @@ -86,6 +86,7 @@ type Connection struct { EgressIP string AppProtocolName string HttpVals string + EgressNodeName string } type ItemToExpire struct { diff --git a/pkg/flowaggregator/clickhouseclient/clickhouseclient.go b/pkg/flowaggregator/clickhouseclient/clickhouseclient.go index 989061bafba..be21f777aaa 100644 --- a/pkg/flowaggregator/clickhouseclient/clickhouseclient.go +++ b/pkg/flowaggregator/clickhouseclient/clickhouseclient.go @@ -89,10 +89,11 @@ const ( egressName, egressIP, appProtocolName, - httpVals) + httpVals, + egressNodeName) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, - ?, ?, ?, ?)` + ?, ?, ?, ?, ?)` ) // PrepareClickHouseConnection is used for unit testing @@ -334,6 +335,7 @@ func (ch *ClickHouseExportProcess) batchCommitAll(ctx context.Context) (int, err record.EgressIP, record.AppProtocolName, record.HttpVals, + record.EgressNodeName, ) if err != nil { diff --git a/pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go b/pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go index 2f601454f86..ce17f73d656 100644 --- a/pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go +++ b/pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go @@ -137,7 +137,8 @@ func TestBatchCommitAll(t *testing.T) { "test-egress", "172.18.0.1", "http", - "mockHttpString"). + "mockHttpString", + "test-egress-node"). WillReturnResult(sqlmock.NewResult(0, 1)) mock.ExpectCommit() diff --git a/pkg/flowaggregator/flowlogger/logger.go b/pkg/flowaggregator/flowlogger/logger.go index fca7495ee75..70e4ebf4033 100644 --- a/pkg/flowaggregator/flowlogger/logger.go +++ b/pkg/flowaggregator/flowlogger/logger.go @@ -118,6 +118,7 @@ func (fl *FlowLogger) WriteRecord(r *flowrecord.FlowRecord, prettyPrint bool) er r.EgressIP, r.AppProtocolName, r.HttpVals, + r.EgressNodeName, } str := strings.Join(fields, ",") diff --git a/pkg/flowaggregator/flowlogger/logger_test.go b/pkg/flowaggregator/flowlogger/logger_test.go index d0c7ef40c29..0b811c29101 100644 --- a/pkg/flowaggregator/flowlogger/logger_test.go +++ b/pkg/flowaggregator/flowlogger/logger_test.go @@ -70,11 +70,11 @@ func TestWriteRecord(t *testing.T) { }{ { prettyPrint: true, - expected: "1637706961,1637706973,10.10.0.79,10.10.0.80,44752,5201,TCP,perftest-a,antrea-test,k8s-node-control-plane,perftest-b,antrea-test-b,k8s-node-control-plane-b,10.10.1.10,5202,perftest,test-flow-aggregator-networkpolicy-ingress-allow,antrea-test-ns,test-flow-aggregator-networkpolicy-rule,Drop,K8sNetworkPolicy,test-flow-aggregator-networkpolicy-egress-allow,antrea-test-ns-e,test-flow-aggregator-networkpolicy-rule-e,Invalid,Invalid,test-egress,172.18.0.1,http,mockHttpString", + expected: "1637706961,1637706973,10.10.0.79,10.10.0.80,44752,5201,TCP,perftest-a,antrea-test,k8s-node-control-plane,perftest-b,antrea-test-b,k8s-node-control-plane-b,10.10.1.10,5202,perftest,test-flow-aggregator-networkpolicy-ingress-allow,antrea-test-ns,test-flow-aggregator-networkpolicy-rule,Drop,K8sNetworkPolicy,test-flow-aggregator-networkpolicy-egress-allow,antrea-test-ns-e,test-flow-aggregator-networkpolicy-rule-e,Invalid,Invalid,test-egress,172.18.0.1,http,mockHttpString,test-egress-node", }, { prettyPrint: false, - expected: "1637706961,1637706973,10.10.0.79,10.10.0.80,44752,5201,6,perftest-a,antrea-test,k8s-node-control-plane,perftest-b,antrea-test-b,k8s-node-control-plane-b,10.10.1.10,5202,perftest,test-flow-aggregator-networkpolicy-ingress-allow,antrea-test-ns,test-flow-aggregator-networkpolicy-rule,2,1,test-flow-aggregator-networkpolicy-egress-allow,antrea-test-ns-e,test-flow-aggregator-networkpolicy-rule-e,5,4,test-egress,172.18.0.1,http,mockHttpString", + expected: "1637706961,1637706973,10.10.0.79,10.10.0.80,44752,5201,6,perftest-a,antrea-test,k8s-node-control-plane,perftest-b,antrea-test-b,k8s-node-control-plane-b,10.10.1.10,5202,perftest,test-flow-aggregator-networkpolicy-ingress-allow,antrea-test-ns,test-flow-aggregator-networkpolicy-rule,2,1,test-flow-aggregator-networkpolicy-egress-allow,antrea-test-ns-e,test-flow-aggregator-networkpolicy-rule-e,5,4,test-egress,172.18.0.1,http,mockHttpString,test-egress-node", }, } diff --git a/pkg/flowaggregator/flowrecord/record.go b/pkg/flowaggregator/flowrecord/record.go index 2213d682258..ab8684c877a 100644 --- a/pkg/flowaggregator/flowrecord/record.go +++ b/pkg/flowaggregator/flowrecord/record.go @@ -72,6 +72,7 @@ type FlowRecord struct { EgressIP string AppProtocolName string HttpVals string + EgressNodeName string } // GetFlowRecord converts ipfixentities.Record to FlowRecord @@ -236,6 +237,9 @@ func GetFlowRecord(record ipfixentities.Record) *FlowRecord { if httpVals, _, ok := record.GetInfoElementWithValue("httpVals"); ok { r.HttpVals = httpVals.GetStringValue() } + if egressNodeName, _, ok := record.GetInfoElementWithValue("egressNodeName"); ok { + r.EgressNodeName = egressNodeName.GetStringValue() + } return r } diff --git a/pkg/flowaggregator/flowrecord/testing/util.go b/pkg/flowaggregator/flowrecord/testing/util.go index e06ae31e164..18b8343103d 100644 --- a/pkg/flowaggregator/flowrecord/testing/util.go +++ b/pkg/flowaggregator/flowrecord/testing/util.go @@ -74,5 +74,6 @@ func PrepareTestFlowRecord() *flowrecord.FlowRecord { EgressIP: "172.18.0.1", AppProtocolName: "http", HttpVals: "mockHttpString", + EgressNodeName: "test-egress-node", } } diff --git a/pkg/flowaggregator/infoelements/elements.go b/pkg/flowaggregator/infoelements/elements.go index 7cd851fe6f1..37c2e14cf15 100644 --- a/pkg/flowaggregator/infoelements/elements.go +++ b/pkg/flowaggregator/infoelements/elements.go @@ -61,6 +61,7 @@ var ( "egressIP", "appProtocolName", "httpVals", + "egressNodeName", } AntreaInfoElementsIPv4 = append(AntreaInfoElementsCommon, []string{"destinationClusterIPv4"}...) AntreaInfoElementsIPv6 = append(AntreaInfoElementsCommon, []string{"destinationClusterIPv6"}...) diff --git a/pkg/flowaggregator/s3uploader/s3uploader.go b/pkg/flowaggregator/s3uploader/s3uploader.go index 4239dcc6a6b..e2dfd8611e0 100644 --- a/pkg/flowaggregator/s3uploader/s3uploader.go +++ b/pkg/flowaggregator/s3uploader/s3uploader.go @@ -488,4 +488,6 @@ func writeRecord(w io.Writer, r *flowrecord.FlowRecord, clusterUUID string) { io.WriteString(w, r.AppProtocolName) io.WriteString(w, ",") io.WriteString(w, r.HttpVals) + io.WriteString(w, ",") + io.WriteString(w, r.EgressNodeName) } diff --git a/pkg/flowaggregator/s3uploader/s3uploader_test.go b/pkg/flowaggregator/s3uploader/s3uploader_test.go index b452ca09d38..82d5a81000e 100644 --- a/pkg/flowaggregator/s3uploader/s3uploader_test.go +++ b/pkg/flowaggregator/s3uploader/s3uploader_test.go @@ -38,8 +38,8 @@ import ( var ( fakeClusterUUID = uuid.New().String() - recordStrIPv4 = "1637706961,1637706973,1637706974,1637706975,3,10.10.0.79,10.10.0.80,44752,5201,6,823188,30472817041,241333,8982624938,471111,24500996,136211,7083284,perftest-a,antrea-test,k8s-node-control-plane,perftest-b,antrea-test-b,k8s-node-control-plane-b,10.10.1.10,5202,perftest,test-flow-aggregator-networkpolicy-ingress-allow,antrea-test-ns,test-flow-aggregator-networkpolicy-rule,2,1,test-flow-aggregator-networkpolicy-egress-allow,antrea-test-ns-e,test-flow-aggregator-networkpolicy-rule-e,5,4,TIME_WAIT,11,'{\"antrea-e2e\":\"perftest-a\",\"app\":\"iperf\"}','{\"antrea-e2e\":\"perftest-b\",\"app\":\"iperf\"}',15902813472,12381344,15902813473,15902813474,12381345,12381346," + fakeClusterUUID + "," + fmt.Sprintf("%d", time.Now().Unix()) + ",test-egress,172.18.0.1,http,mockHttpString" - recordStrIPv6 = "1637706961,1637706973,1637706974,1637706975,3,2001:0:3238:dfe1:63::fefb,2001:0:3238:dfe1:63::fefc,44752,5201,6,823188,30472817041,241333,8982624938,471111,24500996,136211,7083284,perftest-a,antrea-test,k8s-node-control-plane,perftest-b,antrea-test-b,k8s-node-control-plane-b,2001:0:3238:dfe1:64::a,5202,perftest,test-flow-aggregator-networkpolicy-ingress-allow,antrea-test-ns,test-flow-aggregator-networkpolicy-rule,2,1,test-flow-aggregator-networkpolicy-egress-allow,antrea-test-ns-e,test-flow-aggregator-networkpolicy-rule-e,5,4,TIME_WAIT,11,'{\"antrea-e2e\":\"perftest-a\",\"app\":\"iperf\"}','{\"antrea-e2e\":\"perftest-b\",\"app\":\"iperf\"}',15902813472,12381344,15902813473,15902813474,12381345,12381346," + fakeClusterUUID + "," + fmt.Sprintf("%d", time.Now().Unix()) + ",test-egress,172.18.0.1,http,mockHttpString" + recordStrIPv4 = "1637706961,1637706973,1637706974,1637706975,3,10.10.0.79,10.10.0.80,44752,5201,6,823188,30472817041,241333,8982624938,471111,24500996,136211,7083284,perftest-a,antrea-test,k8s-node-control-plane,perftest-b,antrea-test-b,k8s-node-control-plane-b,10.10.1.10,5202,perftest,test-flow-aggregator-networkpolicy-ingress-allow,antrea-test-ns,test-flow-aggregator-networkpolicy-rule,2,1,test-flow-aggregator-networkpolicy-egress-allow,antrea-test-ns-e,test-flow-aggregator-networkpolicy-rule-e,5,4,TIME_WAIT,11,'{\"antrea-e2e\":\"perftest-a\",\"app\":\"iperf\"}','{\"antrea-e2e\":\"perftest-b\",\"app\":\"iperf\"}',15902813472,12381344,15902813473,15902813474,12381345,12381346," + fakeClusterUUID + "," + fmt.Sprintf("%d", time.Now().Unix()) + ",test-egress,172.18.0.1,http,mockHttpString,test-egress-node" + recordStrIPv6 = "1637706961,1637706973,1637706974,1637706975,3,2001:0:3238:dfe1:63::fefb,2001:0:3238:dfe1:63::fefc,44752,5201,6,823188,30472817041,241333,8982624938,471111,24500996,136211,7083284,perftest-a,antrea-test,k8s-node-control-plane,perftest-b,antrea-test-b,k8s-node-control-plane-b,2001:0:3238:dfe1:64::a,5202,perftest,test-flow-aggregator-networkpolicy-ingress-allow,antrea-test-ns,test-flow-aggregator-networkpolicy-rule,2,1,test-flow-aggregator-networkpolicy-egress-allow,antrea-test-ns-e,test-flow-aggregator-networkpolicy-rule-e,5,4,TIME_WAIT,11,'{\"antrea-e2e\":\"perftest-a\",\"app\":\"iperf\"}','{\"antrea-e2e\":\"perftest-b\",\"app\":\"iperf\"}',15902813472,12381344,15902813473,15902813474,12381345,12381346," + fakeClusterUUID + "," + fmt.Sprintf("%d", time.Now().Unix()) + ",test-egress,172.18.0.1,http,mockHttpString,test-egress-node" ) const seed = 1 diff --git a/pkg/flowaggregator/testing/util.go b/pkg/flowaggregator/testing/util.go index af73695c87d..b24be55139c 100644 --- a/pkg/flowaggregator/testing/util.go +++ b/pkg/flowaggregator/testing/util.go @@ -223,6 +223,10 @@ func PrepareMockIpfixRecord(mockRecord *ipfixentitiestesting.MockRecord, isIPv4 httpValsElem.SetStringValue("mockHttpString") mockRecord.EXPECT().GetInfoElementWithValue("httpVals").Return(httpValsElem, 0, true) + egressNodeNameElem := createElement("egressNodeName", ipfixregistry.AntreaEnterpriseID) + egressNodeNameElem.SetStringValue("test-egress-node") + mockRecord.EXPECT().GetInfoElementWithValue("egressNodeName").Return(egressNodeNameElem, 0, true) + if isIPv4 { sourceIPv4Elem := createElement("sourceIPv4Address", ipfixregistry.IANAEnterpriseID) sourceIPv4Elem.SetIPAddressValue(net.ParseIP("10.10.0.79")) diff --git a/test/e2e/charts/flow-visibility/templates/configmap.yaml b/test/e2e/charts/flow-visibility/templates/configmap.yaml index 5dc812926b6..60e928cbf99 100644 --- a/test/e2e/charts/flow-visibility/templates/configmap.yaml +++ b/test/e2e/charts/flow-visibility/templates/configmap.yaml @@ -78,7 +78,8 @@ data: egressName String, egressIP String, appProtocolName String, - httpVals String + httpVals String, + egressNodeName String ) engine=MergeTree ORDER BY (timeInserted, flowEndSeconds) TTL timeInserted + INTERVAL 1 HOUR diff --git a/test/e2e/flowaggregator_test.go b/test/e2e/flowaggregator_test.go index d4bfff2d328..dc9ebe048e5 100644 --- a/test/e2e/flowaggregator_test.go +++ b/test/e2e/flowaggregator_test.go @@ -92,6 +92,7 @@ DATA SET: egressIP: 172.18.0.2 appProtocolName: http httpVals: mockHttpString + egressNodeName: k8s-node-worker destinationClusterIPv4: 0.0.0.0 octetDeltaCountFromSourceNode: 8982624938 octetDeltaCountFromDestinationNode: 8982624938 @@ -766,7 +767,8 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { addLabelToTestPods(t, data, label, []string{clientName}) // Create an Egress and the Egress IP is assigned to the Node running the client Pods - var egressNodeIP string + var egressNodeIP, egressNodeName string + egressNodeName = nodeName(0) if !isIPv6 { egressNodeIP = nodeIPv4(0) } else { @@ -781,11 +783,11 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { defer data.crdClient.CrdV1beta1().Egresses().Delete(context.TODO(), egress.Name, metav1.DeleteOptions{}) if !isIPv6 { if clientIPs.IPv4 != nil && serverIPs.IPv4 != nil { - checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv4.String(), serverIPs.IPv4.String(), serverPodPort, isIPv6, egress.Name, egressNodeIP, label) + checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv4.String(), serverIPs.IPv4.String(), serverPodPort, isIPv6, egress.Name, egressNodeIP, egressNodeName, label) } } else { if clientIPs.IPv6 != nil && serverIPs.IPv6 != nil { - checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv6.String(), serverIPs.IPv6.String(), serverPodPort, isIPv6, egress.Name, egressNodeIP, label) + checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv6.String(), serverIPs.IPv6.String(), serverPodPort, isIPv6, egress.Name, egressNodeIP, egressNodeName, label) } } }) @@ -807,7 +809,8 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { addLabelToTestPods(t, data, label, []string{clientName}) // Create an Egress and the Egress IP is assigned to the Node not running the client Pods - var egressNodeIP string + var egressNodeIP, egressNodeName string + egressNodeName = nodeName(1) if !isIPv6 { egressNodeIP = nodeIPv4(1) } else { @@ -822,11 +825,11 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { defer data.crdClient.CrdV1beta1().Egresses().Delete(context.TODO(), egress.Name, metav1.DeleteOptions{}) if !isIPv6 { if clientIPs.IPv4 != nil && serverIPs.IPv4 != nil { - checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv4.String(), serverIPs.IPv4.String(), serverPodPort, isIPv6, egress.Name, egressNodeIP, label) + checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv4.String(), serverIPs.IPv4.String(), serverPodPort, isIPv6, egress.Name, egressNodeIP, egressNodeName, label) } } else { if clientIPs.IPv6 != nil && serverIPs.IPv6 != nil { - checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv6.String(), serverIPs.IPv6.String(), serverPodPort, isIPv6, egress.Name, egressNodeIP, label) + checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv6.String(), serverIPs.IPv6.String(), serverPodPort, isIPv6, egress.Name, egressNodeIP, egressNodeName, label) } } }) @@ -841,11 +844,11 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { addLabelToTestPods(t, data, label, []string{clientName}) if !isIPv6 { if clientIPs.IPv4 != nil && serverIPs.IPv4 != nil { - checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv4.String(), serverIPs.IPv4.String(), serverPodPort, isIPv6, "", "", label) + checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv4.String(), serverIPs.IPv4.String(), serverPodPort, isIPv6, "", "", "", label) } } else { if clientIPs.IPv6 != nil && serverIPs.IPv6 != nil { - checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv6.String(), serverIPs.IPv6.String(), serverPodPort, isIPv6, "", "", label) + checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv6.String(), serverIPs.IPv6.String(), serverPodPort, isIPv6, "", "", "", label) } } }) @@ -1145,7 +1148,7 @@ func checkRecordsForFlowsClickHouse(t *testing.T, data *TestData, srcIP, dstIP, assert.GreaterOrEqualf(t, len(clickHouseRecords), expectedNumDataRecords, "ClickHouse should receive expected number of flow records. Considered records: %s", clickHouseRecords) } -func checkRecordsForToExternalFlows(t *testing.T, data *TestData, srcNodeName string, srcPodName string, srcIP string, dstIP string, dstPort int32, isIPv6 bool, egressName, egressIP, labelFilter string) { +func checkRecordsForToExternalFlows(t *testing.T, data *TestData, srcNodeName string, srcPodName string, srcIP string, dstIP string, dstPort int32, isIPv6 bool, egressName, egressIP, egressNodeName, labelFilter string) { var cmd string if !isIPv6 { cmd = fmt.Sprintf("wget -O- %s:%d", dstIP, dstPort) @@ -1159,7 +1162,7 @@ func checkRecordsForToExternalFlows(t *testing.T, data *TestData, srcNodeName st checkPodAndNodeData(t, record, srcPodName, srcNodeName, "", "", data.testNamespace) checkFlowType(t, record, ipfixregistry.FlowTypeToExternal) if egressName != "" { - checkEgressInfo(t, record, egressName, egressIP) + checkEgressInfo(t, record, egressName, egressIP, egressNodeName) } } @@ -1168,7 +1171,7 @@ func checkRecordsForToExternalFlows(t *testing.T, data *TestData, srcNodeName st checkPodAndNodeDataClickHouse(data, t, record, srcPodName, srcNodeName, "", "") checkFlowTypeClickHouse(t, record, ipfixregistry.FlowTypeToExternal) if egressName != "" { - checkEgressInfoClickHouse(t, record, egressName, egressIP) + checkEgressInfoClickHouse(t, record, egressName, egressIP, egressNodeName) } } } @@ -1388,14 +1391,16 @@ func checkFlowTypeClickHouse(t *testing.T, record *ClickHouseFullRow, flowType u assert.Equal(t, record.FlowType, flowType, "Record does not have correct flowType") } -func checkEgressInfo(t *testing.T, record, egressName, egressIP string) { +func checkEgressInfo(t *testing.T, record, egressName, egressIP, egressNodeName string) { assert.Containsf(t, record, fmt.Sprintf("egressName: %s", egressName), "Record does not have correct egressName") assert.Containsf(t, record, fmt.Sprintf("egressIP: %s", egressIP), "Record does not have correct egressIP") + assert.Containsf(t, record, fmt.Sprintf("egressNodeName: %s", egressNodeName), "Record does not have correct egressNodeName") } -func checkEgressInfoClickHouse(t *testing.T, record *ClickHouseFullRow, egressName, egressIP string) { +func checkEgressInfoClickHouse(t *testing.T, record *ClickHouseFullRow, egressName, egressIP, egressNodeName string) { assert.Equal(t, egressName, record.EgressName, "Record does not have correct egressName") assert.Equal(t, egressIP, record.EgressIP, "Record does not have correct egressIP") + assert.Equal(t, egressNodeName, record.EgressNodeName, "Record does not have correct egressNodeName") } func checkL7FlowExporterData(t *testing.T, record, appProtocolName string) { @@ -1997,4 +2002,5 @@ type ClickHouseFullRow struct { EgressIP string `json:"egressIP"` AppProtocolName string `json:"appProtocolName"` HttpVals string `json:"httpVals"` + EgressNodeName string `json:"egressNodeName"` } diff --git a/test/e2e/framework.go b/test/e2e/framework.go index 48ff6ef4442..ef998a20617 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -130,7 +130,7 @@ const ( mcjoinImage = "projects.registry.vmware.com/antrea/mcjoin:v2.9" nginxImage = "projects.registry.vmware.com/antrea/nginx:1.21.6-alpine" iisImage = "mcr.microsoft.com/windows/servercore/iis" - ipfixCollectorImage = "projects.registry.vmware.com/antrea/ipfix-collector:v0.8.2" + ipfixCollectorImage = "projects.registry.vmware.com/antrea/ipfix-collector:v0.9.0" nginxLBService = "nginx-loadbalancer"