diff --git a/pkg/agent/controller/networkpolicy/packetin.go b/pkg/agent/controller/networkpolicy/packetin.go index 19b8c11efb7..b7ebf399705 100644 --- a/pkg/agent/controller/networkpolicy/packetin.go +++ b/pkg/agent/controller/networkpolicy/packetin.go @@ -127,10 +127,14 @@ func (c *Controller) storeDenyConnection(pktIn *ofctrl.PacketIn) error { denyConn.DestinationServiceAddress = tuple.DestinationAddress denyConn.DestinationServicePort = tuple.DestinationPort denyConn.Mark = getCTMarkValue(matchers) - dstSvcAddress := getSvcAddress(matchers) + dstSvcAddress := getCTNwDstValue(matchers) + dstSvcPort := getCTTpDstValue(matchers) if dstSvcAddress != nil { denyConn.DestinationServiceAddress = *dstSvcAddress } + if dstSvcPort != 0 { + denyConn.DestinationServicePort = dstSvcPort + } // No need to obtain connection info again if it already exists in denyConnectionStore. if conn, exist := c.denyConnStore.GetConnByKey(flowexporter.NewConnectionKey(&denyConn)); exist { @@ -242,14 +246,32 @@ func getCTMarkValue(matchers *ofctrl.Matchers) uint32 { return ctMarkValue } -func getSvcAddress(matchers *ofctrl.Matchers) *net.IP { +func getCTNwDstValue(matchers *ofctrl.Matchers) *net.IP { svcIp := matchers.GetMatchByName("NXM_NX_CT_NW_DST") - if svcIp == nil { - return nil + if svcIp != nil { + svcIpValue, ok := svcIp.GetValue().(net.IP) + if ok { + return &svcIpValue + } + } + svcIp = matchers.GetMatchByName("NXM_NX_CT_IPV6_DST") + if svcIp != nil { + svcIpValue, ok := svcIp.GetValue().(net.IP) + if ok { + return &svcIpValue + } + } + return nil +} + +func getCTTpDstValue(matchers *ofctrl.Matchers) uint16 { + port := matchers.GetMatchByName("NXM_NX_CT_TP_DST") + if port == nil { + return 0 } - svcIpValue, ok := svcIp.GetValue().(net.IP) + portValue, ok := port.GetValue().(uint16) if !ok { - return nil + return 0 } - return &svcIpValue + return portValue } diff --git a/pkg/agent/flowexporter/connections/conntrack_connections.go b/pkg/agent/flowexporter/connections/conntrack_connections.go index 1f729ff062d..d3f419ce823 100644 --- a/pkg/agent/flowexporter/connections/conntrack_connections.go +++ b/pkg/agent/flowexporter/connections/conntrack_connections.go @@ -253,6 +253,11 @@ func (cs *ConntrackConnectionStore) AddOrUpdateConn(conn *flowexporter.Connectio klog.V(4).InfoS("Antrea flow updated", "connection", existingConn) } else { cs.fillPodInfo(conn) + if conn.SourcePodName == "" && conn.DestinationPodName == "" { + // We don't add connections to connection map or expirePriorityQueue if we can't find the pod + // information for both srcPod and dstPod + return + } if conn.Mark&openflow.ServiceCTMark.GetRange().ToNXRange().ToUint32Mask() == openflow.ServiceCTMark.GetValue() { clusterIP := conn.DestinationServiceAddress.String() svcPort := conn.DestinationServicePort diff --git a/pkg/agent/flowexporter/connections/deny_connections.go b/pkg/agent/flowexporter/connections/deny_connections.go index b3afc14256d..0dd5dea66d6 100644 --- a/pkg/agent/flowexporter/connections/deny_connections.go +++ b/pkg/agent/flowexporter/connections/deny_connections.go @@ -97,6 +97,11 @@ func (ds *DenyConnectionStore) AddOrUpdateConn(conn *flowexporter.Connection, ti conn.OriginalBytes = bytes conn.OriginalPackets = uint64(1) ds.fillPodInfo(conn) + if conn.SourcePodName == "" && conn.DestinationPodName == "" { + // We don't add connections to connection map or expirePriorityQueue if we can't find the pod + // information for both srcPod and dstPod + return + } protocolStr := ip.IPProtocolNumberToString(conn.FlowKey.Protocol, "UnknownProtocol") serviceStr := fmt.Sprintf("%s:%d/%s", conn.DestinationServiceAddress, conn.DestinationServicePort, protocolStr) if conn.Mark&openflow.ServiceCTMark.GetRange().ToNXRange().ToUint32Mask() == openflow.ServiceCTMark.GetValue() { diff --git a/pkg/agent/flowexporter/connections/deny_connections_test.go b/pkg/agent/flowexporter/connections/deny_connections_test.go index 3a27d167ea3..26b18ba65f7 100644 --- a/pkg/agent/flowexporter/connections/deny_connections_test.go +++ b/pkg/agent/flowexporter/connections/deny_connections_test.go @@ -30,6 +30,7 @@ import ( "antrea.io/antrea/pkg/agent/flowexporter" "antrea.io/antrea/pkg/agent/metrics" + "antrea.io/antrea/pkg/agent/openflow" proxytest "antrea.io/antrea/pkg/agent/proxy/testing" podstoretest "antrea.io/antrea/pkg/util/podstore/testing" k8sproxy "antrea.io/antrea/third_party/proxy" @@ -80,7 +81,7 @@ func TestDenyConnectionStore_AddOrUpdateConn(t *testing.T) { OriginalBytes: uint64(60), OriginalPackets: uint64(1), IsActive: true, - Mark: 19, + Mark: openflow.ServiceCTMark.GetValue(), }, isSvc: true, }, @@ -96,8 +97,8 @@ func TestDenyConnectionStore_AddOrUpdateConn(t *testing.T) { if c.isSvc { mockProxier.EXPECT().GetServiceByIP(serviceStr).Return(servicePortName, true) } - mockPodStore.EXPECT().GetPodByIPAndTime(tuple.SourceAddress.String(), gomock.Any()).Return(nil, false) - mockPodStore.EXPECT().GetPodByIPAndTime(tuple.DestinationAddress.String(), gomock.Any()).Return(nil, false) + mockPodStore.EXPECT().GetPodByIPAndTime(tuple.SourceAddress.String(), gomock.Any()).Return(pod1, true) + mockPodStore.EXPECT().GetPodByIPAndTime(tuple.DestinationAddress.String(), gomock.Any()).Return(pod1, true) denyConnStore := NewDenyConnectionStore(mockPodStore, mockProxier, testFlowExporterOptions) diff --git a/test/e2e/bandwidth_test.go b/test/e2e/bandwidth_test.go index 3e95a8b0117..51915453e36 100644 --- a/test/e2e/bandwidth_test.go +++ b/test/e2e/bandwidth_test.go @@ -25,6 +25,7 @@ import ( ) const iperfPort = 5201 +const iperfSvcPort = 9999 // TestBandwidth is the top-level test which contains all subtests for // Bandwidth related test cases so they can share setup, teardown. diff --git a/test/e2e/flowaggregator_test.go b/test/e2e/flowaggregator_test.go index d3bc69ca1c1..dfe927f9a8f 100644 --- a/test/e2e/flowaggregator_test.go +++ b/test/e2e/flowaggregator_test.go @@ -157,12 +157,12 @@ var ( ) type testFlow struct { - srcIP string - dstIP string - srcPodName string - dstPodName string - svcIP string - checkDstPortName bool + srcIP string + dstIP string + srcPodName string + dstPodName string + svcIP string + checkDstSvc bool } func TestFlowAggregatorSecureConnection(t *testing.T) { @@ -306,7 +306,6 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // perftest-a -> perftest-b (Ingress reject), perftest-a -> perftest-d (Ingress drop) t.Run("IntraNodeDenyConnIngressANP", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) - recreatePerftestPod(t, data) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-b", "perftest-d", controlPlaneNodeName(), controlPlaneNodeName(), true) defer func() { if anp1 != nil { @@ -342,7 +341,6 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // perftest-a (Egress reject) -> perftest-b , perftest-a (Egress drop) -> perftest-d t.Run("IntraNodeDenyConnEgressANP", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) - recreatePerftestPod(t, data) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-b", "perftest-d", controlPlaneNodeName(), controlPlaneNodeName(), false) defer func() { if anp1 != nil { @@ -378,7 +376,6 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // perftest-a -> perftest-b (Ingress deny), perftest-d (Egress deny) -> perftest-a t.Run("IntraNodeDenyConnNP", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) - recreatePerftestPod(t, data) np1, np2 := deployDenyNetworkPolicies(t, data, "perftest-b", "perftest-d", controlPlaneNodeName(), controlPlaneNodeName()) defer func() { if np1 != nil { @@ -415,7 +412,6 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // perftest-a -> svcB -> perftest-b (Ingress reject), perftest-a -> svcD ->perftest-d (Ingress drop) t.Run("IntraNodeDenyConnIngressANPThroughSvc", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) - recreatePerftestPod(t, data) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-b", "perftest-d", controlPlaneNodeName(), controlPlaneNodeName(), true) defer func() { if anp1 != nil { @@ -430,16 +426,16 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { } }() testFlow1 := testFlow{ - srcPodName: "perftest-a", - dstPodName: "perftest-b", - svcIP: svcB.Spec.ClusterIP, - checkDstPortName: true, + srcPodName: "perftest-a", + dstPodName: "perftest-b", + svcIP: svcB.Spec.ClusterIP, + checkDstSvc: true, } testFlow2 := testFlow{ - srcPodName: "perftest-a", - dstPodName: "perftest-d", - svcIP: svcD.Spec.ClusterIP, - checkDstPortName: true, + srcPodName: "perftest-a", + dstPodName: "perftest-d", + svcIP: svcD.Spec.ClusterIP, + checkDstSvc: true, } if !isIPv6 { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.ipv4.String(), podBIPs.ipv4.String(), podAIPs.ipv4.String(), podDIPs.ipv4.String() @@ -456,7 +452,6 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // perftest-a (Egress reject) -> svcB ->perftest-b, perftest-a (Egress drop) -> svcD -> perftest-d t.Run("IntraNodeDenyConnEgressANPThroughSvc", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) - recreatePerftestPod(t, data) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-b", "perftest-d", controlPlaneNodeName(), controlPlaneNodeName(), false) defer func() { if anp1 != nil { @@ -471,16 +466,16 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { } }() testFlow1 := testFlow{ - srcPodName: "perftest-a", - dstPodName: "perftest-b", - svcIP: svcB.Spec.ClusterIP, - checkDstPortName: true, + srcPodName: "perftest-a", + dstPodName: "perftest-b", + svcIP: svcB.Spec.ClusterIP, + checkDstSvc: true, } testFlow2 := testFlow{ - srcPodName: "perftest-a", - dstPodName: "perftest-d", - svcIP: svcD.Spec.ClusterIP, - checkDstPortName: true, + srcPodName: "perftest-a", + dstPodName: "perftest-d", + svcIP: svcD.Spec.ClusterIP, + checkDstSvc: true, } if !isIPv6 { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.ipv4.String(), podBIPs.ipv4.String(), podAIPs.ipv4.String(), podDIPs.ipv4.String() @@ -517,7 +512,6 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // perftest-a -> perftest-c (Ingress reject), perftest-a -> perftest-e (Ingress drop) t.Run("InterNodeDenyConnIngressANP", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) - recreatePerftestPod(t, data) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-c", "perftest-e", controlPlaneNodeName(), workerNodeName(1), true) defer func() { if anp1 != nil { @@ -553,7 +547,6 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // perftest-a (Egress reject) -> perftest-c, perftest-a (Egress drop)-> perftest-e t.Run("InterNodeDenyConnEgressANP", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) - recreatePerftestPod(t, data) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-c", "perftest-e", controlPlaneNodeName(), workerNodeName(1), false) defer func() { if anp1 != nil { @@ -589,7 +582,6 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // perftest-a -> perftest-c (Ingress deny), perftest-b (Egress deny) -> perftest-e t.Run("InterNodeDenyConnNP", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) - recreatePerftestPod(t, data) np1, np2 := deployDenyNetworkPolicies(t, data, "perftest-c", "perftest-b", workerNodeName(1), controlPlaneNodeName()) defer func() { if np1 != nil { @@ -626,7 +618,6 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // perftest-a -> svcC -> perftest-c (Ingress reject), perftest-a -> svcE -> perftest-e (Ingress drop) t.Run("InterNodeDenyConnIngressANPThroughSvc", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) - recreatePerftestPod(t, data) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-c", "perftest-e", controlPlaneNodeName(), workerNodeName(1), true) defer func() { if anp1 != nil { @@ -646,16 +637,16 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // occurring within our flow-aggregator. Consequently, we can still see the service information when dealing // with inter-node traffic subject to an ingress drop network policy testFlow1 := testFlow{ - srcPodName: "perftest-a", - dstPodName: "perftest-c", - svcIP: svcC.Spec.ClusterIP, - checkDstPortName: false, + srcPodName: "perftest-a", + dstPodName: "perftest-c", + svcIP: svcC.Spec.ClusterIP, + checkDstSvc: false, } testFlow2 := testFlow{ - srcPodName: "perftest-a", - dstPodName: "perftest-e", - svcIP: svcE.Spec.ClusterIP, - checkDstPortName: true, + srcPodName: "perftest-a", + dstPodName: "perftest-e", + svcIP: svcE.Spec.ClusterIP, + checkDstSvc: true, } if !isIPv6 { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.ipv4.String(), podCIPs.ipv4.String(), podAIPs.ipv4.String(), podEIPs.ipv4.String() @@ -672,7 +663,6 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // perftest-a (Egress reject) -> svcC -> perftest-c, perftest-a (Egress drop) -> svcE -> perftest-e t.Run("InterNodeDenyConnEgressANPThroughSvc", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) - recreatePerftestPod(t, data) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-c", "perftest-e", controlPlaneNodeName(), workerNodeName(1), false) defer func() { if anp1 != nil { @@ -687,16 +677,16 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { } }() testFlow1 := testFlow{ - srcPodName: "perftest-a", - dstPodName: "perftest-c", - svcIP: svcC.Spec.ClusterIP, - checkDstPortName: true, + srcPodName: "perftest-a", + dstPodName: "perftest-c", + svcIP: svcC.Spec.ClusterIP, + checkDstSvc: true, } testFlow2 := testFlow{ - srcPodName: "perftest-a", - dstPodName: "perftest-e", - svcIP: svcE.Spec.ClusterIP, - checkDstPortName: true, + srcPodName: "perftest-a", + dstPodName: "perftest-e", + svcIP: svcE.Spec.ClusterIP, + checkDstSvc: true, } if !isIPv6 { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.ipv4.String(), podCIPs.ipv4.String(), podAIPs.ipv4.String(), podEIPs.ipv4.String() @@ -939,6 +929,10 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri } else { cmdStr = fmt.Sprintf("iperf3 -6 -c %s -t %d -b %s", dstIP, iperfTimeSec, iperfBandwidth) } + if checkService { + cmdStr += fmt.Sprintf(" -p %d", iperfSvcPort) + } + timeNow := time.Now() stdout, _, err := data.RunCommandFromPod(data.testNamespace, "perftest-a", "iperf", []string{"bash", "-c", cmdStr}) require.NoErrorf(t, err, "Error when running iperf3 client: %v", err) bwSlice, srcPort, _ := getBandwidthAndPorts(stdout) @@ -953,12 +947,12 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri t.Fatalf("Unit of the traffic bandwidth reported by iperf should be Mbits.") } - checkRecordsForFlowsCollector(t, data, srcIP, dstIP, srcPort, isIPv6, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy, bandwidthInMbps) - checkRecordsForFlowsClickHouse(t, data, srcIP, dstIP, srcPort, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy, bandwidthInMbps) + checkRecordsForFlowsCollector(t, data, srcIP, dstIP, srcPort, isIPv6, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy, bandwidthInMbps, timeNow) + checkRecordsForFlowsClickHouse(t, data, srcIP, dstIP, srcPort, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy, bandwidthInMbps, timeNow) } -func checkRecordsForFlowsCollector(t *testing.T, data *TestData, srcIP, dstIP, srcPort string, isIPv6, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy bool, bandwidthInMbps float64) { - collectorOutput, recordSlices := getCollectorOutput(t, srcIP, dstIP, srcPort, checkService, true, isIPv6, data) +func checkRecordsForFlowsCollector(t *testing.T, data *TestData, srcIP, dstIP, srcPort string, isIPv6, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy bool, bandwidthInMbps float64, timeSince time.Time) { + collectorOutput, recordSlices := getCollectorOutput(t, srcIP, dstIP, srcPort, checkService, true, isIPv6, data, timeSince) // Iterate over recordSlices and build some results to test with expected results dataRecordsCount := 0 src, dst := matchSrcAndDstAddress(srcIP, dstIP, checkService, isIPv6) @@ -1034,11 +1028,11 @@ func checkRecordsForFlowsCollector(t *testing.T, data *TestData, srcIP, dstIP, s assert.GreaterOrEqualf(t, dataRecordsCount, expectedNumDataRecords, "IPFIX collector should receive expected number of flow records. Considered records: %s \n Collector output: %s", recordSlices, collectorOutput) } -func checkRecordsForFlowsClickHouse(t *testing.T, data *TestData, srcIP, dstIP, srcPort string, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy bool, bandwidthInMbps float64) { +func checkRecordsForFlowsClickHouse(t *testing.T, data *TestData, srcIP, dstIP, srcPort string, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy bool, bandwidthInMbps float64, timeSince time.Time) { // Check the source port along with source and destination IPs as there // are flow records for control flows during the iperf with same IPs // and destination port. - clickHouseRecords := getClickHouseOutput(t, data, srcIP, dstIP, srcPort, checkService, true) + clickHouseRecords := getClickHouseOutput(t, data, srcIP, dstIP, srcPort, checkService, true, timeSince) for _, record := range clickHouseRecords { // Check if record has both Pod name of source and destination Pod. @@ -1111,10 +1105,11 @@ func checkRecordsForToExternalFlows(t *testing.T, data *TestData, srcNodeName st } else { cmd = fmt.Sprintf("wget -O- [%s]:%d", dstIP, dstPort) } + timeNow := time.Now() stdout, stderr, err := data.RunCommandFromPod(data.testNamespace, srcPodName, busyboxContainerName, strings.Fields(cmd)) require.NoErrorf(t, err, "Error when running wget command, stdout: %s, stderr: %s", stdout, stderr) - _, recordSlices := getCollectorOutput(t, srcIP, dstIP, "", false, false, isIPv6, data) + _, recordSlices := getCollectorOutput(t, srcIP, dstIP, "", false, false, isIPv6, data, timeNow) for _, record := range recordSlices { if strings.Contains(record, srcIP) && strings.Contains(record, dstIP) { checkPodAndNodeData(t, record, srcPodName, srcNodeName, "", "", data.testNamespace) @@ -1126,7 +1121,7 @@ func checkRecordsForToExternalFlows(t *testing.T, data *TestData, srcNodeName st } } - clickHouseRecords := getClickHouseOutput(t, data, srcIP, dstIP, "", false, false) + clickHouseRecords := getClickHouseOutput(t, data, srcIP, dstIP, "", false, false, timeNow) for _, record := range clickHouseRecords { checkPodAndNodeDataClickHouse(data, t, record, srcPodName, srcNodeName, "", "") checkFlowTypeClickHouse(t, record, ipfixregistry.FlowTypeToExternal) @@ -1139,44 +1134,52 @@ func checkRecordsForToExternalFlows(t *testing.T, data *TestData, srcNodeName st func checkRecordsForDenyFlows(t *testing.T, data *TestData, testFlow1, testFlow2 testFlow, isIPv6, isIntraNode, isANP, useSvcIP bool) { var cmdStr1, cmdStr2 string - dstIP1, dstIP2 := testFlow1.dstIP, testFlow2.dstIP - if useSvcIP { - dstIP1, dstIP2 = testFlow1.svcIP, testFlow2.svcIP - } if !isIPv6 { - cmdStr1 = fmt.Sprintf("iperf3 -c %s -n 1", dstIP1) - cmdStr2 = fmt.Sprintf("iperf3 -c %s -n 1", dstIP2) + if useSvcIP { + cmdStr1 = fmt.Sprintf("iperf3 -c %s -p %d -n 1", testFlow1.svcIP, iperfSvcPort) + cmdStr2 = fmt.Sprintf("iperf3 -c %s -p %d -n 1", testFlow2.svcIP, iperfSvcPort) + } else { + cmdStr1 = fmt.Sprintf("iperf3 -c %s -n 1", testFlow1.dstIP) + cmdStr2 = fmt.Sprintf("iperf3 -c %s -n 1", testFlow2.dstIP) + } + } else { - cmdStr1 = fmt.Sprintf("iperf3 -6 -c %s -n 1", dstIP1) - cmdStr2 = fmt.Sprintf("iperf3 -6 -c %s -n 1", dstIP2) + if useSvcIP { + cmdStr1 = fmt.Sprintf("iperf3 -6 -c %s -p %d -n 1", testFlow1.svcIP, iperfSvcPort) + cmdStr2 = fmt.Sprintf("iperf3 -6 -c %s -p %d -n 1", testFlow2.svcIP, iperfSvcPort) + } else { + cmdStr1 = fmt.Sprintf("iperf3 -6 -c %s -n 1", testFlow1.dstIP) + cmdStr2 = fmt.Sprintf("iperf3 -6 -c %s -n 1", testFlow2.dstIP) + } } + timeNow := time.Now() _, _, err := data.RunCommandFromPod(data.testNamespace, testFlow1.srcPodName, "", []string{"timeout", "2", "bash", "-c", cmdStr1}) assert.Error(t, err) _, _, err = data.RunCommandFromPod(data.testNamespace, testFlow2.srcPodName, "", []string{"timeout", "2", "bash", "-c", cmdStr2}) assert.Error(t, err) - checkRecordsForDenyFlowsCollector(t, data, testFlow1, testFlow2, isIPv6, isIntraNode, isANP) - checkRecordsForDenyFlowsClickHouse(t, data, testFlow1, testFlow2, isIPv6, isIntraNode, isANP) + checkRecordsForDenyFlowsCollector(t, data, testFlow1, testFlow2, isIPv6, isIntraNode, isANP, timeNow) + checkRecordsForDenyFlowsClickHouse(t, data, testFlow1, testFlow2, isIPv6, isIntraNode, isANP, timeNow) } -func checkRecordsForDenyFlowsCollector(t *testing.T, data *TestData, testFlow1, testFlow2 testFlow, isIPv6, isIntraNode, isANP bool) { - _, recordSlices1 := getCollectorOutput(t, testFlow1.srcIP, testFlow1.dstIP, "", false, false, isIPv6, data) - _, recordSlices2 := getCollectorOutput(t, testFlow2.srcIP, testFlow2.dstIP, "", false, false, isIPv6, data) +func checkRecordsForDenyFlowsCollector(t *testing.T, data *TestData, testFlow1, testFlow2 testFlow, isIPv6, isIntraNode, isANP bool, timeSince time.Time) { + _, recordSlices1 := getCollectorOutput(t, testFlow1.srcIP, testFlow1.dstIP, "", false, false, isIPv6, data, timeSince) + _, recordSlices2 := getCollectorOutput(t, testFlow2.srcIP, testFlow2.dstIP, "", false, false, isIPv6, data, timeSince) recordSlices := append(recordSlices1, recordSlices2...) src_flow1, dst_flow1 := matchSrcAndDstAddress(testFlow1.srcIP, testFlow1.dstIP, false, isIPv6) src_flow2, dst_flow2 := matchSrcAndDstAddress(testFlow2.srcIP, testFlow2.dstIP, false, isIPv6) // Iterate over recordSlices and build some results to test with expected results for _, record := range recordSlices { var srcPodName, dstPodName string - var checkDstSvcPorName bool + var checkDstSvc bool if strings.Contains(record, src_flow1) && strings.Contains(record, dst_flow1) { srcPodName = testFlow1.srcPodName dstPodName = testFlow1.dstPodName - checkDstSvcPorName = testFlow1.checkDstPortName + checkDstSvc = testFlow1.checkDstSvc } else if strings.Contains(record, src_flow2) && strings.Contains(record, dst_flow2) { srcPodName = testFlow2.srcPodName dstPodName = testFlow2.dstPodName - checkDstSvcPorName = testFlow2.checkDstPortName + checkDstSvc = testFlow2.checkDstSvc } if strings.Contains(record, src_flow1) && strings.Contains(record, dst_flow1) || strings.Contains(record, src_flow2) && strings.Contains(record, dst_flow2) { ingressRejectStr := fmt.Sprintf("ingressNetworkPolicyRuleAction: %d", ipfixregistry.NetworkPolicyRuleActionReject) @@ -1221,32 +1224,34 @@ func checkRecordsForDenyFlowsCollector(t *testing.T, data *TestData, testFlow1, assert.Contains(record, fmt.Sprintf("egressNetworkPolicyRuleName: %s", testEgressRuleName), "Record does not have the correct NetworkPolicy RuleName with the egress drop rule") } } - if checkDstSvcPorName { + if checkDstSvc { destinationServicePortName := data.testNamespace + "/" + dstPodName assert.Contains(record, fmt.Sprintf("destinationServicePortName: %s", destinationServicePortName), "Record does not have correct destinationServicePortName") + assert.Contains(record, fmt.Sprintf("destinationServicePort: %d", iperfSvcPort), "Record does not have correct destinationServicePort") } else { assert.Contains(record, "destinationServicePortName: \n", "Record does not have correct destinationServicePortName") + assert.Contains(record, "destinationServicePort: 0 \n", "Record does not have correct destinationServicePort") } } } } -func checkRecordsForDenyFlowsClickHouse(t *testing.T, data *TestData, testFlow1, testFlow2 testFlow, isIPv6, isIntraNode, isANP bool) { - clickHouseRecords1 := getClickHouseOutput(t, data, testFlow1.srcIP, testFlow1.dstIP, "", false, false) - clickHouseRecords2 := getClickHouseOutput(t, data, testFlow2.srcIP, testFlow2.dstIP, "", false, false) +func checkRecordsForDenyFlowsClickHouse(t *testing.T, data *TestData, testFlow1, testFlow2 testFlow, isIPv6, isIntraNode, isANP bool, timeSince time.Time) { + clickHouseRecords1 := getClickHouseOutput(t, data, testFlow1.srcIP, testFlow1.dstIP, "", false, false, timeSince) + clickHouseRecords2 := getClickHouseOutput(t, data, testFlow2.srcIP, testFlow2.dstIP, "", false, false, timeSince) recordSlices := append(clickHouseRecords1, clickHouseRecords2...) // Iterate over recordSlices and build some results to test with expected results for _, record := range recordSlices { var srcPodName, dstPodName string - var checkDstSvcPorName bool + var checkDstSvc bool if record.SourceIP == testFlow1.srcIP && (record.DestinationIP == testFlow1.dstIP || record.DestinationClusterIP == testFlow1.dstIP) { srcPodName = testFlow1.srcPodName dstPodName = testFlow1.dstPodName - checkDstSvcPorName = testFlow1.checkDstPortName + checkDstSvc = testFlow1.checkDstSvc } else if record.SourceIP == testFlow2.srcIP && (record.DestinationIP == testFlow2.dstIP || record.DestinationClusterIP == testFlow2.dstIP) { srcPodName = testFlow2.srcPodName dstPodName = testFlow2.dstPodName - checkDstSvcPorName = testFlow2.checkDstPortName + checkDstSvc = testFlow2.checkDstSvc } if isIntraNode { @@ -1256,11 +1261,13 @@ func checkRecordsForDenyFlowsClickHouse(t *testing.T, data *TestData, testFlow1, checkPodAndNodeDataClickHouse(data, t, record, srcPodName, controlPlaneNodeName(), dstPodName, workerNodeName(1)) checkFlowTypeClickHouse(t, record, ipfixregistry.FlowTypeInterNode) } - if checkDstSvcPorName { + if checkDstSvc { destinationServicePortName := data.testNamespace + "/" + dstPodName assert.Contains(t, record.DestinationServicePortName, destinationServicePortName) + assert.Equal(t, iperfSvcPort, int(record.DestinationServicePort)) } else { assert.Equal(t, "", record.DestinationServicePortName) + assert.Equal(t, 0, int(record.DestinationServicePort)) } assert := assert.New(t) if !isANP { // K8s Network Policies @@ -1371,7 +1378,7 @@ func getUint64FieldFromRecord(t *testing.T, record string, field string) uint64 // received all the expected records for a given flow with source IP, destination IP // and source port. We send source port to ignore the control flows during the // iperf test. -func getCollectorOutput(t *testing.T, srcIP, dstIP, srcPort string, isDstService bool, checkAllRecords bool, isIPv6 bool, data *TestData) (string, []string) { +func getCollectorOutput(t *testing.T, srcIP, dstIP, srcPort string, isDstService bool, checkAllRecords bool, isIPv6 bool, data *TestData, timeSince time.Time) (string, []string) { var collectorOutput string var recordSlices []string // In the ToExternalFlows test, flow record will arrive 5.5s (exporterActiveFlowExportTimeout+aggregatorActiveFlowRecordTimeout) after executing wget command @@ -1380,12 +1387,13 @@ func getCollectorOutput(t *testing.T, srcIP, dstIP, srcPort string, isDstService var rc int var err error // `pod-running-timeout` option is added to cover scenarios where ipfix flow-collector has crashed after being deployed - rc, collectorOutput, _, err = data.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl logs --pod-running-timeout=%v ipfix-collector -n %s", aggregatorInactiveFlowRecordTimeout.String(), data.testNamespace)) + rc, collectorOutput, _, err = data.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl logs --pod-running-timeout=%v ipfix-collector -n %s --since-time %s", aggregatorInactiveFlowRecordTimeout.String(), data.testNamespace, timeSince.Format(time.RFC3339))) if err != nil || rc != 0 { return false, err } // Checking that all the data records which correspond to the iperf flow are received recordSlices = getRecordsFromOutput(collectorOutput) + recordSlices = recordStartTimeFilter(t, recordSlices, timeSince) src, dst := matchSrcAndDstAddress(srcIP, dstIP, isDstService, isIPv6) if checkAllRecords { for _, record := range recordSlices { @@ -1411,13 +1419,13 @@ func getCollectorOutput(t *testing.T, srcIP, dstIP, srcPort string, isDstService // received all the expected records for a given flow with source IP, destination IP // and source port. We send source port to ignore the control flows during the iperf test. // Polling timeout is coded assuming IPFIX output has been checked first. -func getClickHouseOutput(t *testing.T, data *TestData, srcIP, dstIP, srcPort string, isDstService, checkAllRecords bool) []*ClickHouseFullRow { +func getClickHouseOutput(t *testing.T, data *TestData, srcIP, dstIP, srcPort string, isDstService, checkAllRecords bool, timeSince time.Time) []*ClickHouseFullRow { var flowRecords []*ClickHouseFullRow var queryOutput string - query := fmt.Sprintf("SELECT * FROM flows WHERE (sourceIP = '%s') AND (destinationIP = '%s')", srcIP, dstIP) + query := fmt.Sprintf("SELECT * FROM flows WHERE (sourceIP = '%s') AND (destinationIP = '%s') AND (flowStartSeconds >= toDateTime(%d))", srcIP, dstIP, timeSince.Unix()) if isDstService { - query = fmt.Sprintf("SELECT * FROM flows WHERE (sourceIP = '%s') AND (destinationClusterIP = '%s')", srcIP, dstIP) + query = fmt.Sprintf("SELECT * FROM flows WHERE (sourceIP = '%s') AND (destinationClusterIP = '%s') AND (flowStartSeconds >= toDateTime(%d))", srcIP, dstIP, timeSince.Unix()) } if len(srcPort) > 0 { query = fmt.Sprintf("%s AND (sourceTransportPort = %s)", query, srcPort) @@ -1476,6 +1484,17 @@ func getRecordsFromOutput(output string) []string { return recordSlices } +func recordStartTimeFilter(t *testing.T, records []string, startTime time.Time) []string { + result := []string{} + for _, record := range records { + flowStartTime := int64(getUint64FieldFromRecord(t, record, "flowStartSeconds")) + if flowStartTime >= startTime.Unix() { + result = append(result, record) + } + } + return result +} + func deployK8sNetworkPolicies(t *testing.T, data *TestData, srcPod, dstPod string) (np1 *networkingv1.NetworkPolicy, np2 *networkingv1.NetworkPolicy) { // Add K8s NetworkPolicy between two iperf Pods. var err error @@ -1707,22 +1726,22 @@ func createPerftestServices(data *TestData, isIPv6 bool) (svcB *corev1.Service, svcIPFamily = corev1.IPv6Protocol } - svcB, err = data.CreateService("perftest-b", data.testNamespace, iperfPort, iperfPort, map[string]string{"antrea-e2e": "perftest-b"}, false, false, corev1.ServiceTypeClusterIP, &svcIPFamily) + svcB, err = data.CreateService("perftest-b", data.testNamespace, iperfSvcPort, iperfPort, map[string]string{"antrea-e2e": "perftest-b"}, false, false, corev1.ServiceTypeClusterIP, &svcIPFamily) if err != nil { return nil, nil, nil, nil, fmt.Errorf("error when creating perftest-b Service: %v", err) } - svcC, err = data.CreateService("perftest-c", data.testNamespace, iperfPort, iperfPort, map[string]string{"antrea-e2e": "perftest-c"}, false, false, corev1.ServiceTypeClusterIP, &svcIPFamily) + svcC, err = data.CreateService("perftest-c", data.testNamespace, iperfSvcPort, iperfPort, map[string]string{"antrea-e2e": "perftest-c"}, false, false, corev1.ServiceTypeClusterIP, &svcIPFamily) if err != nil { return nil, nil, nil, nil, fmt.Errorf("error when creating perftest-c Service: %v", err) } - svcD, err = data.CreateService("perftest-d", data.testNamespace, iperfPort, iperfPort, map[string]string{"antrea-e2e": "perftest-d"}, false, false, corev1.ServiceTypeClusterIP, &svcIPFamily) + svcD, err = data.CreateService("perftest-d", data.testNamespace, iperfSvcPort, iperfPort, map[string]string{"antrea-e2e": "perftest-d"}, false, false, corev1.ServiceTypeClusterIP, &svcIPFamily) if err != nil { return nil, nil, nil, nil, fmt.Errorf("error when creating perftest-d Service: %v", err) } - svcE, err = data.CreateService("perftest-e", data.testNamespace, iperfPort, iperfPort, map[string]string{"antrea-e2e": "perftest-e"}, false, false, corev1.ServiceTypeClusterIP, &svcIPFamily) + svcE, err = data.CreateService("perftest-e", data.testNamespace, iperfSvcPort, iperfPort, map[string]string{"antrea-e2e": "perftest-e"}, false, false, corev1.ServiceTypeClusterIP, &svcIPFamily) if err != nil { return nil, nil, nil, nil, fmt.Errorf("error when creating perftest-e Service: %v", err) } @@ -1739,25 +1758,6 @@ func deletePerftestServices(t *testing.T, data *TestData) { } } -func deletePerftestPod(t *testing.T, data *TestData) { - for _, podName := range []string{"perftest-a", "perftest-b", "perftest-c", "perftest-d", "perftest-e"} { - err := data.DeletePodAndWait(defaultTimeout, podName, data.testNamespace) - if err != nil { - t.Logf("Error when deleting %s Pod: %v", podName, err) - } - - } -} - -func recreatePerftestPod(t *testing.T, data *TestData) { - var err error - deletePerftestPod(t, data) - podAIPs, podBIPs, podCIPs, podDIPs, podEIPs, err = createPerftestPods(data) - if err != nil { - t.Fatalf("Error when recreating perftest Pods: %v", err) - } -} - // getBandwidthAndPorts parses iperf commands output and returns bandwidth, // source port and destination port. Bandwidth is returned as a slice containing // two strings (bandwidth value and bandwidth unit).