From 7b3ef5c68c11b61236ad627ab4e7d7a4018f8b2a Mon Sep 17 00:00:00 2001 From: Yun-Tang Hsu <59460118+yuntanghsu@users.noreply.github.com> Date: Tue, 14 Nov 2023 10:23:46 -0800 Subject: [PATCH] Remove redundant log in fillPodInfo/fillServiceInfo and update deny connection store. (#5592) 1. Remove redundant logs in fillPodInfo/fillServiceInfo to avoid Flow Exporter from flooding logs. 2. Add Mark field for deny connections. We were missing this field previously, resulting in trying to fill service information for non-service IPs. 3. Update the DestinationServiceAddress for deny connections when we can find this information in PacketIn. 4. Update e2e/unit tests to verify that the Service-relateds field are filled correctly. Fixes #5573 Signed-off-by: Yun-Tang Hsu --- .../controller/networkpolicy/packetin.go | 53 ++- .../flowexporter/connections/connections.go | 9 +- .../connections/conntrack_connections.go | 6 + .../connections/deny_connections.go | 11 +- .../connections/deny_connections_test.go | 112 +++-- .../flowexporter/exporter/exporter_test.go | 3 +- test/e2e/bandwidth_test.go | 2 - test/e2e/flowaggregator_test.go | 410 +++++++++++++----- test/e2e/framework.go | 2 + 9 files changed, 448 insertions(+), 160 deletions(-) diff --git a/pkg/agent/controller/networkpolicy/packetin.go b/pkg/agent/controller/networkpolicy/packetin.go index d0d3aa3b2c7..7327751b03e 100644 --- a/pkg/agent/controller/networkpolicy/packetin.go +++ b/pkg/agent/controller/networkpolicy/packetin.go @@ -107,6 +107,7 @@ func (c *Controller) storeDenyConnection(pktIn *ofctrl.PacketIn) error { if err != nil { return fmt.Errorf("error in parsing packetIn: %v", err) } + matchers := pktIn.GetMatches() // Get 5-tuple information tuple := flowexporter.Tuple{ @@ -125,6 +126,15 @@ func (c *Controller) storeDenyConnection(pktIn *ofctrl.PacketIn) error { denyConn.FlowKey = tuple denyConn.DestinationServiceAddress = tuple.DestinationAddress denyConn.DestinationServicePort = tuple.DestinationPort + denyConn.Mark = getCTMarkValue(matchers) + dstSvcAddress := getCTNwDstValue(matchers) + dstSvcPort := getCTTpDstValue(matchers) + if dstSvcAddress != nil { + denyConn.DestinationServiceAddress = dstSvcAddress + } + if dstSvcPort != 0 { + denyConn.DestinationServicePort = dstSvcPort + } // No need to obtain connection info again if it already exists in denyConnectionStore. if conn, exist := c.denyConnStore.GetConnByKey(flowexporter.NewConnectionKey(&denyConn)); exist { @@ -132,7 +142,6 @@ func (c *Controller) storeDenyConnection(pktIn *ofctrl.PacketIn) error { return nil } - matchers := pktIn.GetMatches() var match *ofctrl.MatchField // Get table ID tableID := getPacketInTableID(pktIn) @@ -224,3 +233,45 @@ func getPacketInTableID(pktIn *ofctrl.PacketIn) uint8 { } return tableID } + +func getCTMarkValue(matchers *ofctrl.Matchers) uint32 { + ctMark := matchers.GetMatchByName("NXM_NX_CT_MARK") + if ctMark == nil { + return 0 + } + ctMarkValue, ok := ctMark.GetValue().(uint32) + if !ok { + return 0 + } + return ctMarkValue +} + +func getCTNwDstValue(matchers *ofctrl.Matchers) net.IP { + nwDst := matchers.GetMatchByName("NXM_NX_CT_NW_DST") + if nwDst != nil { + nwDstValue, ok := nwDst.GetValue().(net.IP) + if ok { + return nwDstValue + } + } + nwDst = matchers.GetMatchByName("NXM_NX_CT_IPV6_DST") + if nwDst != nil { + nwDstValue, ok := nwDst.GetValue().(net.IP) + if ok { + return nwDstValue + } + } + return nil +} + +func getCTTpDstValue(matchers *ofctrl.Matchers) uint16 { + port := matchers.GetMatchByName("NXM_NX_CT_TP_DST") + if port == nil { + return 0 + } + portValue, ok := port.GetValue().(uint16) + if !ok { + return 0 + } + return portValue +} diff --git a/pkg/agent/flowexporter/connections/connections.go b/pkg/agent/flowexporter/connections/connections.go index 926d804cd90..1f61abc5eb9 100644 --- a/pkg/agent/flowexporter/connections/connections.go +++ b/pkg/agent/flowexporter/connections/connections.go @@ -108,9 +108,6 @@ func (cs *connectionStore) fillPodInfo(conn *flowexporter.Connection) { srcPod, srcFound := cs.podStore.GetPodByIPAndTime(srcIP, conn.StartTime) dstPod, dstFound := cs.podStore.GetPodByIPAndTime(dstIP, conn.StartTime) - if !srcFound && !dstFound { - klog.Warningf("Cannot map any of the IP %s or %s to a local Pod", srcIP, dstIP) - } if srcFound { conn.SourcePodName = srcPod.Name conn.SourcePodNamespace = srcPod.Namespace @@ -125,10 +122,10 @@ func (cs *connectionStore) fillServiceInfo(conn *flowexporter.Connection, servic // resolve destination Service information if cs.antreaProxier != nil { servicePortName, exists := cs.antreaProxier.GetServiceByIP(serviceStr) - if !exists { - klog.Warningf("Could not retrieve the Service info from antrea-agent-proxier for the serviceStr: %s", serviceStr) - } else { + if exists { conn.DestinationServicePortName = servicePortName.String() + } else { + klog.InfoS("Could not retrieve the Service info from antrea-agent-proxier", "serviceStr", serviceStr) } } } diff --git a/pkg/agent/flowexporter/connections/conntrack_connections.go b/pkg/agent/flowexporter/connections/conntrack_connections.go index 1f729ff062d..76f2f7a7fae 100644 --- a/pkg/agent/flowexporter/connections/conntrack_connections.go +++ b/pkg/agent/flowexporter/connections/conntrack_connections.go @@ -253,6 +253,12 @@ func (cs *ConntrackConnectionStore) AddOrUpdateConn(conn *flowexporter.Connectio klog.V(4).InfoS("Antrea flow updated", "connection", existingConn) } else { cs.fillPodInfo(conn) + if conn.SourcePodName == "" && conn.DestinationPodName == "" { + // We don't add connections to connection map or expirePriorityQueue if we can't find the pod + // information for both srcPod and dstPod + klog.V(5).InfoS("Skip this connection as we cannot map any of the connection IPs to a local Pod", "srcIP", conn.FlowKey.SourceAddress.String(), "dstIP", conn.FlowKey.DestinationAddress.String()) + return + } if conn.Mark&openflow.ServiceCTMark.GetRange().ToNXRange().ToUint32Mask() == openflow.ServiceCTMark.GetValue() { clusterIP := conn.DestinationServiceAddress.String() svcPort := conn.DestinationServicePort diff --git a/pkg/agent/flowexporter/connections/deny_connections.go b/pkg/agent/flowexporter/connections/deny_connections.go index 0f60ff0daeb..6645c3a66a6 100644 --- a/pkg/agent/flowexporter/connections/deny_connections.go +++ b/pkg/agent/flowexporter/connections/deny_connections.go @@ -23,6 +23,7 @@ import ( "antrea.io/antrea/pkg/agent/flowexporter" "antrea.io/antrea/pkg/agent/flowexporter/priorityqueue" "antrea.io/antrea/pkg/agent/metrics" + "antrea.io/antrea/pkg/agent/openflow" "antrea.io/antrea/pkg/agent/proxy" "antrea.io/antrea/pkg/util/ip" "antrea.io/antrea/pkg/util/podstore" @@ -96,9 +97,17 @@ func (ds *DenyConnectionStore) AddOrUpdateConn(conn *flowexporter.Connection, ti conn.OriginalBytes = bytes conn.OriginalPackets = uint64(1) ds.fillPodInfo(conn) + if conn.SourcePodName == "" && conn.DestinationPodName == "" { + // We don't add connections to connection map or expirePriorityQueue if we can't find the pod + // information for both srcPod and dstPod + klog.V(5).InfoS("Skip this connection as we cannot map any of the connection IPs to a local Pod", "srcIP", conn.FlowKey.SourceAddress.String(), "dstIP", conn.FlowKey.DestinationAddress.String()) + return + } protocolStr := ip.IPProtocolNumberToString(conn.FlowKey.Protocol, "UnknownProtocol") serviceStr := fmt.Sprintf("%s:%d/%s", conn.DestinationServiceAddress, conn.DestinationServicePort, protocolStr) - ds.fillServiceInfo(conn, serviceStr) + if conn.Mark&openflow.ServiceCTMark.GetRange().ToNXRange().ToUint32Mask() == openflow.ServiceCTMark.GetValue() { + ds.fillServiceInfo(conn, serviceStr) + } metrics.TotalDenyConnections.Inc() conn.IsActive = true ds.connections[connKey] = conn diff --git a/pkg/agent/flowexporter/connections/deny_connections_test.go b/pkg/agent/flowexporter/connections/deny_connections_test.go index de9766ae311..26b18ba65f7 100644 --- a/pkg/agent/flowexporter/connections/deny_connections_test.go +++ b/pkg/agent/flowexporter/connections/deny_connections_test.go @@ -30,6 +30,7 @@ import ( "antrea.io/antrea/pkg/agent/flowexporter" "antrea.io/antrea/pkg/agent/metrics" + "antrea.io/antrea/pkg/agent/openflow" proxytest "antrea.io/antrea/pkg/agent/proxy/testing" podstoretest "antrea.io/antrea/pkg/util/podstore/testing" k8sproxy "antrea.io/antrea/third_party/proxy" @@ -49,48 +50,83 @@ func TestDenyConnectionStore_AddOrUpdateConn(t *testing.T) { Port: "255", Protocol: v1.ProtocolTCP, } - // flow for testing adding and updating - testFlow := flowexporter.Connection{ - StopTime: refTime.Add(-(time.Second * 20)), - StartTime: refTime.Add(-(time.Second * 20)), - FlowKey: tuple, - DestinationServiceAddress: tuple.DestinationAddress, - DestinationServicePort: tuple.DestinationPort, - OriginalBytes: uint64(60), - OriginalPackets: uint64(1), - IsActive: true, + tc := []struct { + name string + // flow for testing adding and updating + testFlow flowexporter.Connection + isSvc bool + }{ + { + name: "Flow not through service", + testFlow: flowexporter.Connection{ + StopTime: refTime.Add(-(time.Second * 20)), + StartTime: refTime.Add(-(time.Second * 20)), + FlowKey: tuple, + DestinationServiceAddress: tuple.DestinationAddress, + DestinationServicePort: tuple.DestinationPort, + OriginalBytes: uint64(60), + OriginalPackets: uint64(1), + IsActive: true, + Mark: 0, + }, + isSvc: false, + }, { + name: "Flow through service", + testFlow: flowexporter.Connection{ + StopTime: refTime.Add(-(time.Second * 20)), + StartTime: refTime.Add(-(time.Second * 20)), + FlowKey: tuple, + DestinationServiceAddress: tuple.DestinationAddress, + DestinationServicePort: tuple.DestinationPort, + OriginalBytes: uint64(60), + OriginalPackets: uint64(1), + IsActive: true, + Mark: openflow.ServiceCTMark.GetValue(), + }, + isSvc: true, + }, } - mockPodStore := podstoretest.NewMockInterface(ctrl) - mockProxier := proxytest.NewMockProxier(ctrl) - protocol, _ := lookupServiceProtocol(tuple.Protocol) - serviceStr := fmt.Sprintf("%s:%d/%s", tuple.DestinationAddress.String(), tuple.DestinationPort, protocol) - mockProxier.EXPECT().GetServiceByIP(serviceStr).Return(servicePortName, true) - mockPodStore.EXPECT().GetPodByIPAndTime(tuple.SourceAddress.String(), gomock.Any()).Return(nil, false) - mockPodStore.EXPECT().GetPodByIPAndTime(tuple.DestinationAddress.String(), gomock.Any()).Return(nil, false) + for _, c := range tc { + t.Run(c.name, func(t *testing.T) { + // Reset the metrics. + metrics.TotalDenyConnections.Set(0) + mockPodStore := podstoretest.NewMockInterface(ctrl) + mockProxier := proxytest.NewMockProxier(ctrl) + protocol, _ := lookupServiceProtocol(tuple.Protocol) + serviceStr := fmt.Sprintf("%s:%d/%s", tuple.DestinationAddress.String(), tuple.DestinationPort, protocol) + if c.isSvc { + mockProxier.EXPECT().GetServiceByIP(serviceStr).Return(servicePortName, true) + } + mockPodStore.EXPECT().GetPodByIPAndTime(tuple.SourceAddress.String(), gomock.Any()).Return(pod1, true) + mockPodStore.EXPECT().GetPodByIPAndTime(tuple.DestinationAddress.String(), gomock.Any()).Return(pod1, true) - denyConnStore := NewDenyConnectionStore(mockPodStore, mockProxier, testFlowExporterOptions) + denyConnStore := NewDenyConnectionStore(mockPodStore, mockProxier, testFlowExporterOptions) - denyConnStore.AddOrUpdateConn(&testFlow, refTime.Add(-(time.Second * 20)), uint64(60)) - expConn := testFlow - expConn.DestinationServicePortName = servicePortName.String() - actualConn, ok := denyConnStore.GetConnByKey(flowexporter.NewConnectionKey(&testFlow)) - assert.Equal(t, ok, true, "deny connection should be there in deny connection store") - assert.Equal(t, expConn, *actualConn, "deny connections should be equal") - assert.Equal(t, 1, denyConnStore.connectionStore.expirePriorityQueue.Len(), "Length of the expire priority queue should be 1") - assert.Equal(t, refTime.Add(-(time.Second * 20)), actualConn.LastExportTime, "LastExportTime should be set to StartTime during Add") - checkDenyConnectionMetrics(t, len(denyConnStore.connections)) + denyConnStore.AddOrUpdateConn(&c.testFlow, refTime.Add(-(time.Second * 20)), uint64(60)) + expConn := c.testFlow + if c.isSvc { + expConn.DestinationServicePortName = servicePortName.String() + } + actualConn, ok := denyConnStore.GetConnByKey(flowexporter.NewConnectionKey(&c.testFlow)) + assert.Equal(t, ok, true, "deny connection should be there in deny connection store") + assert.Equal(t, expConn, *actualConn, "deny connections should be equal") + assert.Equal(t, 1, denyConnStore.connectionStore.expirePriorityQueue.Len(), "Length of the expire priority queue should be 1") + assert.Equal(t, refTime.Add(-(time.Second * 20)), actualConn.LastExportTime, "LastExportTime should be set to StartTime during Add") + checkDenyConnectionMetrics(t, len(denyConnStore.connections)) - denyConnStore.AddOrUpdateConn(&testFlow, refTime.Add(-(time.Second * 10)), uint64(60)) - expConn.OriginalBytes = uint64(120) - expConn.OriginalPackets = uint64(2) - expConn.StopTime = refTime.Add(-(time.Second * 10)) - actualConn, ok = denyConnStore.GetConnByKey(flowexporter.NewConnectionKey(&testFlow)) - assert.Equal(t, ok, true, "deny connection should be there in deny connection store") - assert.Equal(t, expConn, *actualConn, "deny connections should be equal") - assert.True(t, actualConn.IsActive) - assert.Equal(t, 1, denyConnStore.connectionStore.expirePriorityQueue.Len()) - assert.Equal(t, refTime.Add(-(time.Second * 20)), actualConn.LastExportTime, "LastExportTime should not be changed during Update") - checkDenyConnectionMetrics(t, len(denyConnStore.connections)) + denyConnStore.AddOrUpdateConn(&c.testFlow, refTime.Add(-(time.Second * 10)), uint64(60)) + expConn.OriginalBytes = uint64(120) + expConn.OriginalPackets = uint64(2) + expConn.StopTime = refTime.Add(-(time.Second * 10)) + actualConn, ok = denyConnStore.GetConnByKey(flowexporter.NewConnectionKey(&c.testFlow)) + assert.Equal(t, ok, true, "deny connection should be there in deny connection store") + assert.Equal(t, expConn, *actualConn, "deny connections should be equal") + assert.True(t, actualConn.IsActive) + assert.Equal(t, 1, denyConnStore.connectionStore.expirePriorityQueue.Len()) + assert.Equal(t, refTime.Add(-(time.Second * 20)), actualConn.LastExportTime, "LastExportTime should not be changed during Update") + checkDenyConnectionMetrics(t, len(denyConnStore.connections)) + }) + } } func checkDenyConnectionMetrics(t *testing.T, numConns int) { diff --git a/pkg/agent/flowexporter/exporter/exporter_test.go b/pkg/agent/flowexporter/exporter/exporter_test.go index 695c9229837..9d46a9dc667 100644 --- a/pkg/agent/flowexporter/exporter/exporter_test.go +++ b/pkg/agent/flowexporter/exporter/exporter_test.go @@ -501,7 +501,8 @@ func getDenyConnection(isIPv6 bool, protoID uint8) *flowexporter.Connection { tuple = flowexporter.Tuple{SourceAddress: srcIP, DestinationAddress: dstIP, Protocol: protoID, SourcePort: 65280, DestinationPort: 255} } conn := &flowexporter.Connection{ - FlowKey: tuple, + FlowKey: tuple, + SourcePodName: "pod", } return conn } diff --git a/test/e2e/bandwidth_test.go b/test/e2e/bandwidth_test.go index 3e95a8b0117..19162d2144d 100644 --- a/test/e2e/bandwidth_test.go +++ b/test/e2e/bandwidth_test.go @@ -24,8 +24,6 @@ import ( v1 "k8s.io/api/core/v1" ) -const iperfPort = 5201 - // TestBandwidth is the top-level test which contains all subtests for // Bandwidth related test cases so they can share setup, teardown. func TestBandwidth(t *testing.T) { diff --git a/test/e2e/flowaggregator_test.go b/test/e2e/flowaggregator_test.go index 6afe450c5a8..41d39e37771 100644 --- a/test/e2e/flowaggregator_test.go +++ b/test/e2e/flowaggregator_test.go @@ -32,6 +32,7 @@ import ( networkingv1 "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/utils/strings/slices" "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/openflow" @@ -152,14 +153,18 @@ var ( // exporting to the flow aggregator at time 2s, 4s, 6s, 8s, 10s, and 12s after iperf traffic begins. // Since flow aggregator will aggregate records based on 5-tuple connection key and active timeout is 3.5 seconds, // we expect 3 records at time 5.5s, 9s, and 12.5s after iperf traffic begins. - expectedNumDataRecords = 3 + expectedNumDataRecords = 3 + podAIPs, podBIPs, podCIPs, podDIPs, podEIPs *PodIPs + serviceNames = []string{"perftest-a", "perftest-b", "perftest-c", "perftest-d", "perftest-e"} ) type testFlow struct { - srcIP string - dstIP string - srcPodName string - dstPodName string + srcIP string + dstIP string + srcPodName string + dstPodName string + svcIP string + checkDstSvc bool } func TestFlowAggregatorSecureConnection(t *testing.T) { @@ -210,7 +215,7 @@ func TestFlowAggregatorSecureConnection(t *testing.T) { // of Flow Aggregator has been exported. teardownFlowAggregator(t, data) }() - podAIPs, podBIPs, _, _, _, err := createPerftestPods(data) + podAIPs, podBIPs, _, _, _, err = createPerftestPods(data) if err != nil { t.Fatalf("Error when creating perftest Pods: %v", err) } @@ -218,7 +223,7 @@ func TestFlowAggregatorSecureConnection(t *testing.T) { checkIntraNodeFlows(t, data, podAIPs, podBIPs, false) } if v6Enabled { - checkIntraNodeFlows(t, data, podAIPs, podBIPs, false) + checkIntraNodeFlows(t, data, podAIPs, podBIPs, true) } }) } @@ -246,17 +251,17 @@ func TestFlowAggregator(t *testing.T) { t.Fatalf("Error when creating Kubernetes utils client: %v", err) } - podAIPs, podBIPs, podCIPs, podDIPs, podEIPs, err := createPerftestPods(data) + podAIPs, podBIPs, podCIPs, podDIPs, podEIPs, err = createPerftestPods(data) if err != nil { t.Fatalf("Error when creating perftest Pods: %v", err) } if v4Enabled { - t.Run("IPv4", func(t *testing.T) { testHelper(t, data, podAIPs, podBIPs, podCIPs, podDIPs, podEIPs, false) }) + t.Run("IPv4", func(t *testing.T) { testHelper(t, data, false) }) } if v6Enabled { - t.Run("IPv6", func(t *testing.T) { testHelper(t, data, podAIPs, podBIPs, podCIPs, podDIPs, podEIPs, true) }) + t.Run("IPv6", func(t *testing.T) { testHelper(t, data, true) }) } } @@ -282,8 +287,8 @@ func checkIntraNodeFlows(t *testing.T, data *TestData, podAIPs, podBIPs *PodIPs, } } -func testHelper(t *testing.T, data *TestData, podAIPs, podBIPs, podCIPs, podDIPs, podEIPs *PodIPs, isIPv6 bool) { - svcB, svcC, err := createPerftestServices(data, isIPv6) +func testHelper(t *testing.T, data *TestData, isIPv6 bool) { + _, svcB, svcC, svcD, svcE, err := createPerftestServices(data, isIPv6) if err != nil { t.Fatalf("Error when creating perftest Services: %v", err) } @@ -326,10 +331,10 @@ func testHelper(t *testing.T, data *TestData, podAIPs, podBIPs, podCIPs, podDIPs } if !isIPv6 { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.ipv4.String(), podBIPs.ipv4.String(), podAIPs.ipv4.String(), podDIPs.ipv4.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true, false) } else { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.ipv6.String(), podBIPs.ipv6.String(), podAIPs.ipv6.String(), podDIPs.ipv6.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true, false) } }) @@ -361,10 +366,10 @@ func testHelper(t *testing.T, data *TestData, podAIPs, podBIPs, podCIPs, podDIPs } if !isIPv6 { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.ipv4.String(), podBIPs.ipv4.String(), podAIPs.ipv4.String(), podDIPs.ipv4.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true, false) } else { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.ipv6.String(), podBIPs.ipv6.String(), podAIPs.ipv6.String(), podDIPs.ipv6.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true, false) } }) @@ -396,10 +401,90 @@ func testHelper(t *testing.T, data *TestData, podAIPs, podBIPs, podCIPs, podDIPs } if !isIPv6 { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.ipv4.String(), podBIPs.ipv4.String(), podDIPs.ipv4.String(), podAIPs.ipv4.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, false) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, false, false) } else { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.ipv6.String(), podBIPs.ipv6.String(), podDIPs.ipv6.String(), podAIPs.ipv6.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, false) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, false, false) + } + }) + + // IntraNodeDenyConnIngressANPThroughSvc tests the case, where Pods are deployed on same Node with an Antrea + // ingress deny policy rule applied to destination Pod (one reject rule, one drop rule) and their flow information + // is exported as IPFIX flow records. The test also verify if the service information is well filled in the record. + // perftest-a -> svcB -> perftest-b (Ingress reject), perftest-a -> svcD ->perftest-d (Ingress drop) + t.Run("IntraNodeDenyConnIngressANPThroughSvc", func(t *testing.T) { + skipIfAntreaPolicyDisabled(t) + anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-b", "perftest-d", controlPlaneNodeName(), controlPlaneNodeName(), true) + defer func() { + if anp1 != nil { + if err = data.deleteAntreaNetworkpolicy(anp1); err != nil { + t.Errorf("Error when deleting Antrea Network Policy: %v", err) + } + } + if anp2 != nil { + if err = data.deleteAntreaNetworkpolicy(anp2); err != nil { + t.Errorf("Error when deleting Antrea Network Policy: %v", err) + } + } + }() + testFlow1 := testFlow{ + srcPodName: "perftest-a", + dstPodName: "perftest-b", + svcIP: svcB.Spec.ClusterIP, + checkDstSvc: true, + } + testFlow2 := testFlow{ + srcPodName: "perftest-a", + dstPodName: "perftest-d", + svcIP: svcD.Spec.ClusterIP, + checkDstSvc: true, + } + if !isIPv6 { + testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.ipv4.String(), podBIPs.ipv4.String(), podAIPs.ipv4.String(), podDIPs.ipv4.String() + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true, true) + } else { + testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.ipv6.String(), podBIPs.ipv6.String(), podAIPs.ipv6.String(), podDIPs.ipv6.String() + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true, true) + } + }) + + // IntraNodeDenyConnEgressANPThroughSvc tests the case, where Pods are deployed on same Node with an Antrea + // egress deny policy rule applied to source Pod (one reject rule, one drop rule) and their flow information + // is exported as IPFIX flow records. The test also verify if the service information is well filled in the record. + // perftest-a (Egress reject) -> svcB ->perftest-b, perftest-a (Egress drop) -> svcD -> perftest-d + t.Run("IntraNodeDenyConnEgressANPThroughSvc", func(t *testing.T) { + skipIfAntreaPolicyDisabled(t) + anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-b", "perftest-d", controlPlaneNodeName(), controlPlaneNodeName(), false) + defer func() { + if anp1 != nil { + if err = data.deleteAntreaNetworkpolicy(anp1); err != nil { + t.Errorf("Error when deleting Antrea Network Policy: %v", err) + } + } + if anp2 != nil { + if err = data.deleteAntreaNetworkpolicy(anp2); err != nil { + t.Errorf("Error when deleting Antrea Network Policy: %v", err) + } + } + }() + testFlow1 := testFlow{ + srcPodName: "perftest-a", + dstPodName: "perftest-b", + svcIP: svcB.Spec.ClusterIP, + checkDstSvc: true, + } + testFlow2 := testFlow{ + srcPodName: "perftest-a", + dstPodName: "perftest-d", + svcIP: svcD.Spec.ClusterIP, + checkDstSvc: true, + } + if !isIPv6 { + testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.ipv4.String(), podBIPs.ipv4.String(), podAIPs.ipv4.String(), podDIPs.ipv4.String() + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true, true) + } else { + testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.ipv6.String(), podBIPs.ipv6.String(), podAIPs.ipv6.String(), podDIPs.ipv6.String() + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true, true) } }) @@ -452,10 +537,10 @@ func testHelper(t *testing.T, data *TestData, podAIPs, podBIPs, podCIPs, podDIPs } if !isIPv6 { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.ipv4.String(), podCIPs.ipv4.String(), podAIPs.ipv4.String(), podEIPs.ipv4.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true, false) } else { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.ipv6.String(), podCIPs.ipv6.String(), podAIPs.ipv6.String(), podEIPs.ipv6.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true, false) } }) @@ -487,10 +572,10 @@ func testHelper(t *testing.T, data *TestData, podAIPs, podBIPs, podCIPs, podDIPs } if !isIPv6 { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.ipv4.String(), podCIPs.ipv4.String(), podAIPs.ipv4.String(), podEIPs.ipv4.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true, false) } else { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.ipv6.String(), podCIPs.ipv6.String(), podAIPs.ipv6.String(), podEIPs.ipv6.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true, false) } }) @@ -522,10 +607,95 @@ func testHelper(t *testing.T, data *TestData, podAIPs, podBIPs, podCIPs, podDIPs } if !isIPv6 { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.ipv4.String(), podCIPs.ipv4.String(), podBIPs.ipv4.String(), podEIPs.ipv4.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, false) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, false, false) } else { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.ipv6.String(), podCIPs.ipv6.String(), podBIPs.ipv6.String(), podEIPs.ipv6.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, false) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, false, false) + } + }) + + // InterNodeDenyConnIngressANPThroughSvc tests the case, where Pods are deployed on different Node with an Antrea + // ingress deny policy rule applied to destination Pod (one reject rule, one drop rule) and their flow information + // is exported as IPFIX flow records. The test also verify if the service information is well filled in the record. + // perftest-a -> svcC -> perftest-c (Ingress reject), perftest-a -> svcE -> perftest-e (Ingress drop) + t.Run("InterNodeDenyConnIngressANPThroughSvc", func(t *testing.T) { + skipIfAntreaPolicyDisabled(t) + anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-c", "perftest-e", controlPlaneNodeName(), workerNodeName(1), true) + defer func() { + if anp1 != nil { + if err = data.deleteAntreaNetworkpolicy(anp1); err != nil { + t.Errorf("Error when deleting Antrea Network Policy: %v", err) + } + } + if anp2 != nil { + if err = data.deleteAntreaNetworkpolicy(anp2); err != nil { + t.Errorf("Error when deleting Antrea Network Policy: %v", err) + } + } + }() + // In theory, it's not possible to retrieve service information for these two flows because the packets are + // either rejected or dropped in other nodes. Nevertheless, we can still observe the connection being recorded + // in the conntrack table on the source node in cases of drop. This results in the aggregation process still + // occurring within our flow-aggregator. Consequently, we can still see the service information when dealing + // with inter-node traffic subject to an ingress drop network policy + testFlow1 := testFlow{ + srcPodName: "perftest-a", + dstPodName: "perftest-c", + svcIP: svcC.Spec.ClusterIP, + checkDstSvc: false, + } + testFlow2 := testFlow{ + srcPodName: "perftest-a", + dstPodName: "perftest-e", + svcIP: svcE.Spec.ClusterIP, + checkDstSvc: true, + } + if !isIPv6 { + testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.ipv4.String(), podCIPs.ipv4.String(), podAIPs.ipv4.String(), podEIPs.ipv4.String() + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true, true) + } else { + testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.ipv6.String(), podCIPs.ipv6.String(), podAIPs.ipv6.String(), podEIPs.ipv6.String() + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true, true) + } + }) + + // InterNodeDenyConnEgressANPThroughSvc tests the case, where Pods are deployed on different Node with an Antrea + // egress deny policy rule applied to source Pod (one reject rule, one drop rule) and their flow information + // is exported as IPFIX flow records. The test also verify if the service information is well filled in the record. + // perftest-a (Egress reject) -> svcC -> perftest-c, perftest-a (Egress drop) -> svcE -> perftest-e + t.Run("InterNodeDenyConnEgressANPThroughSvc", func(t *testing.T) { + skipIfAntreaPolicyDisabled(t) + anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-c", "perftest-e", controlPlaneNodeName(), workerNodeName(1), false) + defer func() { + if anp1 != nil { + if err = data.deleteAntreaNetworkpolicy(anp1); err != nil { + t.Errorf("Error when deleting Antrea Network Policy: %v", err) + } + } + if anp2 != nil { + if err = data.deleteAntreaNetworkpolicy(anp2); err != nil { + t.Errorf("Error when deleting Antrea Network Policy: %v", err) + } + } + }() + testFlow1 := testFlow{ + srcPodName: "perftest-a", + dstPodName: "perftest-c", + svcIP: svcC.Spec.ClusterIP, + checkDstSvc: true, + } + testFlow2 := testFlow{ + srcPodName: "perftest-a", + dstPodName: "perftest-e", + svcIP: svcE.Spec.ClusterIP, + checkDstSvc: true, + } + if !isIPv6 { + testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.ipv4.String(), podCIPs.ipv4.String(), podAIPs.ipv4.String(), podEIPs.ipv4.String() + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true, true) + } else { + testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.ipv6.String(), podCIPs.ipv6.String(), podAIPs.ipv6.String(), podEIPs.ipv6.String() + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true, true) } }) @@ -761,6 +931,10 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri } else { cmdStr = fmt.Sprintf("iperf3 -6 -c %s -t %d -b %s", dstIP, iperfTimeSec, iperfBandwidth) } + if checkService { + cmdStr += fmt.Sprintf(" -p %d", iperfSvcPort) + } + timeNow := time.Now() stdout, _, err := data.RunCommandFromPod(data.testNamespace, "perftest-a", "iperf", []string{"bash", "-c", cmdStr}) require.NoErrorf(t, err, "Error when running iperf3 client: %v", err) bwSlice, srcPort, _ := getBandwidthAndPorts(stdout) @@ -775,12 +949,12 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri t.Fatalf("Unit of the traffic bandwidth reported by iperf should be Mbits.") } - checkRecordsForFlowsCollector(t, data, srcIP, dstIP, srcPort, isIPv6, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy, bandwidthInMbps) - checkRecordsForFlowsClickHouse(t, data, srcIP, dstIP, srcPort, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy, bandwidthInMbps) + checkRecordsForFlowsCollector(t, data, srcIP, dstIP, srcPort, isIPv6, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy, bandwidthInMbps, timeNow) + checkRecordsForFlowsClickHouse(t, data, srcIP, dstIP, srcPort, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy, bandwidthInMbps, timeNow) } -func checkRecordsForFlowsCollector(t *testing.T, data *TestData, srcIP, dstIP, srcPort string, isIPv6, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy bool, bandwidthInMbps float64) { - collectorOutput, recordSlices := getCollectorOutput(t, srcIP, dstIP, srcPort, checkService, true, isIPv6, data) +func checkRecordsForFlowsCollector(t *testing.T, data *TestData, srcIP, dstIP, srcPort string, isIPv6, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy bool, bandwidthInMbps float64, timeSince time.Time) { + collectorOutput, recordSlices := getCollectorOutput(t, srcIP, dstIP, srcPort, checkService, true, isIPv6, data, timeSince) // Iterate over recordSlices and build some results to test with expected results dataRecordsCount := 0 src, dst := matchSrcAndDstAddress(srcIP, dstIP, checkService, isIPv6) @@ -856,11 +1030,11 @@ func checkRecordsForFlowsCollector(t *testing.T, data *TestData, srcIP, dstIP, s assert.GreaterOrEqualf(t, dataRecordsCount, expectedNumDataRecords, "IPFIX collector should receive expected number of flow records. Considered records: %s \n Collector output: %s", recordSlices, collectorOutput) } -func checkRecordsForFlowsClickHouse(t *testing.T, data *TestData, srcIP, dstIP, srcPort string, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy bool, bandwidthInMbps float64) { +func checkRecordsForFlowsClickHouse(t *testing.T, data *TestData, srcIP, dstIP, srcPort string, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy bool, bandwidthInMbps float64, timeSince time.Time) { // Check the source port along with source and destination IPs as there // are flow records for control flows during the iperf with same IPs // and destination port. - clickHouseRecords := getClickHouseOutput(t, data, srcIP, dstIP, srcPort, checkService, true) + clickHouseRecords := getClickHouseOutput(t, data, srcIP, dstIP, srcPort, checkService, true, timeSince) for _, record := range clickHouseRecords { // Check if record has both Pod name of source and destination Pod. @@ -933,10 +1107,11 @@ func checkRecordsForToExternalFlows(t *testing.T, data *TestData, srcNodeName st } else { cmd = fmt.Sprintf("wget -O- [%s]:%d", dstIP, dstPort) } + timeNow := time.Now() stdout, stderr, err := data.RunCommandFromPod(data.testNamespace, srcPodName, busyboxContainerName, strings.Fields(cmd)) require.NoErrorf(t, err, "Error when running wget command, stdout: %s, stderr: %s", stdout, stderr) - _, recordSlices := getCollectorOutput(t, srcIP, dstIP, "", false, false, isIPv6, data) + _, recordSlices := getCollectorOutput(t, srcIP, dstIP, "", false, false, isIPv6, data, timeNow) for _, record := range recordSlices { if strings.Contains(record, srcIP) && strings.Contains(record, dstIP) { checkPodAndNodeData(t, record, srcPodName, srcNodeName, "", "", data.testNamespace) @@ -948,7 +1123,7 @@ func checkRecordsForToExternalFlows(t *testing.T, data *TestData, srcNodeName st } } - clickHouseRecords := getClickHouseOutput(t, data, srcIP, dstIP, "", false, false) + clickHouseRecords := getClickHouseOutput(t, data, srcIP, dstIP, "", false, false, timeNow) for _, record := range clickHouseRecords { checkPodAndNodeDataClickHouse(data, t, record, srcPodName, srcNodeName, "", "") checkFlowTypeClickHouse(t, record, ipfixregistry.FlowTypeToExternal) @@ -959,39 +1134,54 @@ func checkRecordsForToExternalFlows(t *testing.T, data *TestData, srcNodeName st } } -func checkRecordsForDenyFlows(t *testing.T, data *TestData, testFlow1, testFlow2 testFlow, isIPv6, isIntraNode, isANP bool) { +func checkRecordsForDenyFlows(t *testing.T, data *TestData, testFlow1, testFlow2 testFlow, isIPv6, isIntraNode, isANP, useSvcIP bool) { var cmdStr1, cmdStr2 string if !isIPv6 { - cmdStr1 = fmt.Sprintf("iperf3 -c %s -n 1", testFlow1.dstIP) - cmdStr2 = fmt.Sprintf("iperf3 -c %s -n 1", testFlow2.dstIP) + if useSvcIP { + cmdStr1 = fmt.Sprintf("iperf3 -c %s -p %d -n 1", testFlow1.svcIP, iperfSvcPort) + cmdStr2 = fmt.Sprintf("iperf3 -c %s -p %d -n 1", testFlow2.svcIP, iperfSvcPort) + } else { + cmdStr1 = fmt.Sprintf("iperf3 -c %s -n 1", testFlow1.dstIP) + cmdStr2 = fmt.Sprintf("iperf3 -c %s -n 1", testFlow2.dstIP) + } + } else { - cmdStr1 = fmt.Sprintf("iperf3 -6 -c %s -n 1", testFlow1.dstIP) - cmdStr2 = fmt.Sprintf("iperf3 -6 -c %s -n 1", testFlow2.dstIP) + if useSvcIP { + cmdStr1 = fmt.Sprintf("iperf3 -6 -c %s -p %d -n 1", testFlow1.svcIP, iperfSvcPort) + cmdStr2 = fmt.Sprintf("iperf3 -6 -c %s -p %d -n 1", testFlow2.svcIP, iperfSvcPort) + } else { + cmdStr1 = fmt.Sprintf("iperf3 -6 -c %s -n 1", testFlow1.dstIP) + cmdStr2 = fmt.Sprintf("iperf3 -6 -c %s -n 1", testFlow2.dstIP) + } } + timeNow := time.Now() _, _, err := data.RunCommandFromPod(data.testNamespace, testFlow1.srcPodName, "", []string{"timeout", "2", "bash", "-c", cmdStr1}) assert.Error(t, err) _, _, err = data.RunCommandFromPod(data.testNamespace, testFlow2.srcPodName, "", []string{"timeout", "2", "bash", "-c", cmdStr2}) assert.Error(t, err) - checkRecordsForDenyFlowsCollector(t, data, testFlow1, testFlow2, isIPv6, isIntraNode, isANP) - checkRecordsForDenyFlowsClickHouse(t, data, testFlow1, testFlow2, isIPv6, isIntraNode, isANP) + checkRecordsForDenyFlowsCollector(t, data, testFlow1, testFlow2, isIPv6, isIntraNode, isANP, timeNow) + checkRecordsForDenyFlowsClickHouse(t, data, testFlow1, testFlow2, isIPv6, isIntraNode, isANP, timeNow) } -func checkRecordsForDenyFlowsCollector(t *testing.T, data *TestData, testFlow1, testFlow2 testFlow, isIPv6, isIntraNode, isANP bool) { - _, recordSlices1 := getCollectorOutput(t, testFlow1.srcIP, testFlow1.dstIP, "", false, false, isIPv6, data) - _, recordSlices2 := getCollectorOutput(t, testFlow2.srcIP, testFlow2.dstIP, "", false, false, isIPv6, data) +func checkRecordsForDenyFlowsCollector(t *testing.T, data *TestData, testFlow1, testFlow2 testFlow, isIPv6, isIntraNode, isANP bool, timeSince time.Time) { + _, recordSlices1 := getCollectorOutput(t, testFlow1.srcIP, testFlow1.dstIP, "", false, false, isIPv6, data, timeSince) + _, recordSlices2 := getCollectorOutput(t, testFlow2.srcIP, testFlow2.dstIP, "", false, false, isIPv6, data, timeSince) recordSlices := append(recordSlices1, recordSlices2...) src_flow1, dst_flow1 := matchSrcAndDstAddress(testFlow1.srcIP, testFlow1.dstIP, false, isIPv6) src_flow2, dst_flow2 := matchSrcAndDstAddress(testFlow2.srcIP, testFlow2.dstIP, false, isIPv6) // Iterate over recordSlices and build some results to test with expected results for _, record := range recordSlices { var srcPodName, dstPodName string + var checkDstSvc bool if strings.Contains(record, src_flow1) && strings.Contains(record, dst_flow1) { srcPodName = testFlow1.srcPodName dstPodName = testFlow1.dstPodName + checkDstSvc = testFlow1.checkDstSvc } else if strings.Contains(record, src_flow2) && strings.Contains(record, dst_flow2) { srcPodName = testFlow2.srcPodName dstPodName = testFlow2.dstPodName + checkDstSvc = testFlow2.checkDstSvc } if strings.Contains(record, src_flow1) && strings.Contains(record, dst_flow1) || strings.Contains(record, src_flow2) && strings.Contains(record, dst_flow2) { ingressRejectStr := fmt.Sprintf("ingressNetworkPolicyRuleAction: %d", ipfixregistry.NetworkPolicyRuleActionReject) @@ -1036,24 +1226,34 @@ func checkRecordsForDenyFlowsCollector(t *testing.T, data *TestData, testFlow1, assert.Contains(record, fmt.Sprintf("egressNetworkPolicyRuleName: %s", testEgressRuleName), "Record does not have the correct NetworkPolicy RuleName with the egress drop rule") } } - + if checkDstSvc { + destinationServicePortName := data.testNamespace + "/" + dstPodName + assert.Contains(record, fmt.Sprintf("destinationServicePortName: %s", destinationServicePortName), "Record does not have correct destinationServicePortName") + assert.Contains(record, fmt.Sprintf("destinationServicePort: %d", iperfSvcPort), "Record does not have correct destinationServicePort") + } else { + assert.Contains(record, "destinationServicePortName: \n", "Record does not have correct destinationServicePortName") + assert.Contains(record, "destinationServicePort: 0 \n", "Record does not have correct destinationServicePort") + } } } } -func checkRecordsForDenyFlowsClickHouse(t *testing.T, data *TestData, testFlow1, testFlow2 testFlow, isIPv6, isIntraNode, isANP bool) { - clickHouseRecords1 := getClickHouseOutput(t, data, testFlow1.srcIP, testFlow1.dstIP, "", false, false) - clickHouseRecords2 := getClickHouseOutput(t, data, testFlow2.srcIP, testFlow2.dstIP, "", false, false) +func checkRecordsForDenyFlowsClickHouse(t *testing.T, data *TestData, testFlow1, testFlow2 testFlow, isIPv6, isIntraNode, isANP bool, timeSince time.Time) { + clickHouseRecords1 := getClickHouseOutput(t, data, testFlow1.srcIP, testFlow1.dstIP, "", false, false, timeSince) + clickHouseRecords2 := getClickHouseOutput(t, data, testFlow2.srcIP, testFlow2.dstIP, "", false, false, timeSince) recordSlices := append(clickHouseRecords1, clickHouseRecords2...) // Iterate over recordSlices and build some results to test with expected results for _, record := range recordSlices { var srcPodName, dstPodName string - if record.SourceIP == testFlow1.srcIP && record.DestinationIP == testFlow1.dstIP { + var checkDstSvc bool + if record.SourceIP == testFlow1.srcIP && (record.DestinationIP == testFlow1.dstIP || record.DestinationClusterIP == testFlow1.dstIP) { srcPodName = testFlow1.srcPodName dstPodName = testFlow1.dstPodName - } else if record.SourceIP == testFlow2.srcIP && record.DestinationIP == testFlow2.dstIP { + checkDstSvc = testFlow1.checkDstSvc + } else if record.SourceIP == testFlow2.srcIP && (record.DestinationIP == testFlow2.dstIP || record.DestinationClusterIP == testFlow2.dstIP) { srcPodName = testFlow2.srcPodName dstPodName = testFlow2.dstPodName + checkDstSvc = testFlow2.checkDstSvc } if isIntraNode { @@ -1063,6 +1263,14 @@ func checkRecordsForDenyFlowsClickHouse(t *testing.T, data *TestData, testFlow1, checkPodAndNodeDataClickHouse(data, t, record, srcPodName, controlPlaneNodeName(), dstPodName, workerNodeName(1)) checkFlowTypeClickHouse(t, record, ipfixregistry.FlowTypeInterNode) } + if checkDstSvc { + destinationServicePortName := data.testNamespace + "/" + dstPodName + assert.Contains(t, record.DestinationServicePortName, destinationServicePortName) + assert.Equal(t, iperfSvcPort, int(record.DestinationServicePort)) + } else { + assert.Equal(t, "", record.DestinationServicePortName) + assert.Equal(t, 0, int(record.DestinationServicePort)) + } assert := assert.New(t) if !isANP { // K8s Network Policies if (record.IngressNetworkPolicyRuleAction == ipfixregistry.NetworkPolicyRuleActionDrop) && (record.IngressNetworkPolicyName != ingressDropANPName) { @@ -1172,7 +1380,7 @@ func getUint64FieldFromRecord(t *testing.T, record string, field string) uint64 // received all the expected records for a given flow with source IP, destination IP // and source port. We send source port to ignore the control flows during the // iperf test. -func getCollectorOutput(t *testing.T, srcIP, dstIP, srcPort string, isDstService bool, checkAllRecords bool, isIPv6 bool, data *TestData) (string, []string) { +func getCollectorOutput(t *testing.T, srcIP, dstIP, srcPort string, isDstService bool, checkAllRecords bool, isIPv6 bool, data *TestData, timeSince time.Time) (string, []string) { var collectorOutput string var recordSlices []string // In the ToExternalFlows test, flow record will arrive 5.5s (exporterActiveFlowExportTimeout+aggregatorActiveFlowRecordTimeout) after executing wget command @@ -1181,12 +1389,12 @@ func getCollectorOutput(t *testing.T, srcIP, dstIP, srcPort string, isDstService var rc int var err error // `pod-running-timeout` option is added to cover scenarios where ipfix flow-collector has crashed after being deployed - rc, collectorOutput, _, err = data.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl logs --pod-running-timeout=%v ipfix-collector -n %s", aggregatorInactiveFlowRecordTimeout.String(), data.testNamespace)) + rc, collectorOutput, _, err = data.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl logs --pod-running-timeout=%v ipfix-collector -n %s --since-time %s", aggregatorInactiveFlowRecordTimeout.String(), data.testNamespace, timeSince.Format(time.RFC3339))) if err != nil || rc != 0 { return false, err } // Checking that all the data records which correspond to the iperf flow are received - recordSlices = getRecordsFromOutput(collectorOutput) + recordSlices = getRecordsFromOutput(t, collectorOutput, timeSince) src, dst := matchSrcAndDstAddress(srcIP, dstIP, isDstService, isIPv6) if checkAllRecords { for _, record := range recordSlices { @@ -1212,13 +1420,13 @@ func getCollectorOutput(t *testing.T, srcIP, dstIP, srcPort string, isDstService // received all the expected records for a given flow with source IP, destination IP // and source port. We send source port to ignore the control flows during the iperf test. // Polling timeout is coded assuming IPFIX output has been checked first. -func getClickHouseOutput(t *testing.T, data *TestData, srcIP, dstIP, srcPort string, isDstService, checkAllRecords bool) []*ClickHouseFullRow { +func getClickHouseOutput(t *testing.T, data *TestData, srcIP, dstIP, srcPort string, isDstService, checkAllRecords bool, timeSince time.Time) []*ClickHouseFullRow { var flowRecords []*ClickHouseFullRow var queryOutput string - query := fmt.Sprintf("SELECT * FROM flows WHERE (sourceIP = '%s') AND (destinationIP = '%s')", srcIP, dstIP) + query := fmt.Sprintf("SELECT * FROM flows WHERE (sourceIP = '%s') AND (destinationIP = '%s') AND (flowStartSeconds >= toDateTime(%d))", srcIP, dstIP, timeSince.Unix()) if isDstService { - query = fmt.Sprintf("SELECT * FROM flows WHERE (sourceIP = '%s') AND (destinationClusterIP = '%s')", srcIP, dstIP) + query = fmt.Sprintf("SELECT * FROM flows WHERE (sourceIP = '%s') AND (destinationClusterIP = '%s') AND (flowStartSeconds >= toDateTime(%d))", srcIP, dstIP, timeSince.Unix()) } if len(srcPort) > 0 { query = fmt.Sprintf("%s AND (sourceTransportPort = %s)", query, srcPort) @@ -1269,12 +1477,19 @@ func getClickHouseOutput(t *testing.T, data *TestData, srcIP, dstIP, srcPort str return flowRecords } -func getRecordsFromOutput(output string) []string { +func getRecordsFromOutput(t *testing.T, output string, startTime time.Time) []string { re := regexp.MustCompile("(?m)^.*" + "#" + ".*$[\r\n]+") output = re.ReplaceAllString(output, "") output = strings.TrimSpace(output) recordSlices := strings.Split(output, "IPFIX-HDR:") - return recordSlices + result := []string{} + for _, record := range recordSlices { + flowStartTime := int64(getUint64FieldFromRecord(t, record, "flowStartSeconds")) + if flowStartTime >= startTime.Unix() { + result = append(result, record) + } + } + return result } func deployK8sNetworkPolicies(t *testing.T, data *TestData, srcPod, dstPod string) (np1 *networkingv1.NetworkPolicy, np2 *networkingv1.NetworkPolicy) { @@ -1453,76 +1668,49 @@ func deployDenyNetworkPolicies(t *testing.T, data *TestData, pod1, pod2 string, return np1, np2 } -func createPerftestPods(data *TestData) (podAIPs *PodIPs, podBIPs *PodIPs, podCIPs *PodIPs, podDIPs *PodIPs, podEIPs *PodIPs, err error) { +func createPerftestPods(data *TestData) (*PodIPs, *PodIPs, *PodIPs, *PodIPs, *PodIPs, error) { cmd := []string{"iperf3", "-s"} create := func(name string, nodeName string, ports []corev1.ContainerPort) error { return NewPodBuilder(name, data.testNamespace, toolboxImage).WithContainerName("iperf").WithCommand(cmd).OnNode(nodeName).WithPorts(ports).Create(data) } - - if err := create("perftest-a", controlPlaneNodeName(), nil); err != nil { - return nil, nil, nil, nil, nil, fmt.Errorf("Error when creating the perftest client Pod: %v", err) - } - podAIPs, err = data.podWaitForIPs(defaultTimeout, "perftest-a", data.testNamespace) - if err != nil { - return nil, nil, nil, nil, nil, fmt.Errorf("Error when waiting for the perftest client Pod: %v", err) - } - - if err := create("perftest-b", controlPlaneNodeName(), []corev1.ContainerPort{{Protocol: corev1.ProtocolTCP, ContainerPort: iperfPort}}); err != nil { - return nil, nil, nil, nil, nil, fmt.Errorf("Error when creating the perftest server Pod: %v", err) - } - podBIPs, err = data.podWaitForIPs(defaultTimeout, "perftest-b", data.testNamespace) - if err != nil { - return nil, nil, nil, nil, nil, fmt.Errorf("Error when getting the perftest server Pod's IPs: %v", err) - } - - if err := create("perftest-c", workerNodeName(1), []corev1.ContainerPort{{Protocol: corev1.ProtocolTCP, ContainerPort: iperfPort}}); err != nil { - return nil, nil, nil, nil, nil, fmt.Errorf("Error when creating the perftest server Pod: %v", err) - } - podCIPs, err = data.podWaitForIPs(defaultTimeout, "perftest-c", data.testNamespace) - if err != nil { - return nil, nil, nil, nil, nil, fmt.Errorf("Error when getting the perftest server Pod's IPs: %v", err) - } - - if err := create("perftest-d", controlPlaneNodeName(), []corev1.ContainerPort{{Protocol: corev1.ProtocolTCP, ContainerPort: iperfPort}}); err != nil { - return nil, nil, nil, nil, nil, fmt.Errorf("Error when creating the perftest server Pod: %v", err) - } - podDIPs, err = data.podWaitForIPs(defaultTimeout, "perftest-d", data.testNamespace) - if err != nil { - return nil, nil, nil, nil, nil, fmt.Errorf("Error when getting the perftest server Pod's IPs: %v", err) - } - - if err := create("perftest-e", workerNodeName(1), []corev1.ContainerPort{{Protocol: corev1.ProtocolTCP, ContainerPort: iperfPort}}); err != nil { - return nil, nil, nil, nil, nil, fmt.Errorf("Error when creating the perftest server Pod: %v", err) - } - podEIPs, err = data.podWaitForIPs(defaultTimeout, "perftest-e", data.testNamespace) - if err != nil { - return nil, nil, nil, nil, nil, fmt.Errorf("Error when getting the perftest server Pod's IPs: %v", err) + var err error + var podIPsArray [5]*PodIPs + for i, serviceName := range serviceNames { + var nodeName string + if slices.Contains([]string{"perftest-a", "perftest-b", "perftest-d"}, serviceName) { + nodeName = controlPlaneNodeName() + } else { + nodeName = workerNodeName(1) + } + if err := create(serviceName, nodeName, []corev1.ContainerPort{{Protocol: corev1.ProtocolTCP, ContainerPort: iperfPort}}); err != nil { + return nil, nil, nil, nil, nil, fmt.Errorf("error when creating the perftest client Pod: %v", err) + } + podIPsArray[i], err = data.podWaitForIPs(defaultTimeout, serviceName, data.testNamespace) + if err != nil { + return nil, nil, nil, nil, nil, fmt.Errorf("error when waiting for the perftest client Pod: %v", err) + } } - - return podAIPs, podBIPs, podCIPs, podDIPs, podEIPs, nil + return podIPsArray[0], podIPsArray[1], podIPsArray[2], podIPsArray[3], podIPsArray[4], nil } -func createPerftestServices(data *TestData, isIPv6 bool) (svcB *corev1.Service, svcC *corev1.Service, err error) { +func createPerftestServices(data *TestData, isIPv6 bool) (*corev1.Service, *corev1.Service, *corev1.Service, *corev1.Service, *corev1.Service, error) { svcIPFamily := corev1.IPv4Protocol if isIPv6 { svcIPFamily = corev1.IPv6Protocol } - - svcB, err = data.CreateService("perftest-b", data.testNamespace, iperfPort, iperfPort, map[string]string{"antrea-e2e": "perftest-b"}, false, false, corev1.ServiceTypeClusterIP, &svcIPFamily) - if err != nil { - return nil, nil, fmt.Errorf("Error when creating perftest-b Service: %v", err) - } - - svcC, err = data.CreateService("perftest-c", data.testNamespace, iperfPort, iperfPort, map[string]string{"antrea-e2e": "perftest-c"}, false, false, corev1.ServiceTypeClusterIP, &svcIPFamily) - if err != nil { - return nil, nil, fmt.Errorf("Error when creating perftest-c Service: %v", err) + var err error + var services [5]*corev1.Service + for i, serviceName := range serviceNames { + services[i], err = data.CreateService(serviceName, data.testNamespace, iperfSvcPort, iperfPort, map[string]string{"antrea-e2e": serviceName}, false, false, corev1.ServiceTypeClusterIP, &svcIPFamily) + if err != nil { + return nil, nil, nil, nil, nil, fmt.Errorf("error when creating perftest-b Service: %v", err) + } } - - return svcB, svcC, nil + return services[0], services[1], services[2], services[3], services[4], nil } func deletePerftestServices(t *testing.T, data *TestData) { - for _, serviceName := range []string{"perftest-b", "perftest-c"} { + for _, serviceName := range serviceNames { err := data.deleteService(data.testNamespace, serviceName) if err != nil { t.Logf("Error when deleting %s Service: %v", serviceName, err) diff --git a/test/e2e/framework.go b/test/e2e/framework.go index 0c8ad225edd..12af781685b 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -148,6 +148,8 @@ const ( statefulSetRestartAnnotationKey = "antrea-e2e/restartedAt" defaultCHDatabaseURL = "tcp://clickhouse-clickhouse.flow-visibility.svc:9000" + iperfPort = 5201 + iperfSvcPort = 9999 ) type ClusterNode struct {