From dce23d69db81f72a9d766470be28996a10f33e8e Mon Sep 17 00:00:00 2001 From: Yun-Tang Hsu <59460118+yuntanghsu@users.noreply.github.com> Date: Tue, 14 Nov 2023 19:31:39 -0800 Subject: [PATCH] Rename service related fields in Connection struct (#5704) we rename DestinationServiceAddress and DestinationServicePort in Connection struct as they actually represent the value of OriginalDestinationAddress and OriginalDestinationPort. Signed-off-by: Yun-Tang Hsu --- .../controller/networkpolicy/packetin.go | 16 +-- .../connections/conntrack_connections.go | 4 +- .../conntrack_connections_perf_test.go | 24 ++--- .../connections/conntrack_connections_test.go | 2 +- .../connections/conntrack_linux.go | 22 ++--- .../connections/conntrack_linux_test.go | 98 +++++++++---------- .../flowexporter/connections/conntrack_ovs.go | 6 +- .../connections/deny_connections.go | 2 +- .../connections/deny_connections_test.go | 36 +++---- pkg/agent/flowexporter/exporter/exporter.go | 6 +- .../exporter/exporter_perf_test.go | 2 +- pkg/agent/flowexporter/types.go | 4 +- 12 files changed, 111 insertions(+), 111 deletions(-) diff --git a/pkg/agent/controller/networkpolicy/packetin.go b/pkg/agent/controller/networkpolicy/packetin.go index 6ba98b98ced..ac7de7f95ea 100644 --- a/pkg/agent/controller/networkpolicy/packetin.go +++ b/pkg/agent/controller/networkpolicy/packetin.go @@ -124,16 +124,16 @@ func (c *Controller) storeDenyConnection(pktIn *ofctrl.PacketIn) error { // Generate deny connection and add to deny connection store denyConn := flowexporter.Connection{} denyConn.FlowKey = tuple - denyConn.DestinationServiceAddress = tuple.DestinationAddress - denyConn.DestinationServicePort = tuple.DestinationPort + denyConn.OriginalDestinationAddress = tuple.DestinationAddress + denyConn.OriginalDestinationPort = tuple.DestinationPort denyConn.Mark = getCTMarkValue(matchers) - dstSvcAddress := getCTNwDstValue(matchers) - dstSvcPort := getCTTpDstValue(matchers) - if dstSvcAddress.IsValid() { - denyConn.DestinationServiceAddress = dstSvcAddress + nwDstValue := getCTNwDstValue(matchers) + dstPortValue := getCTTpDstValue(matchers) + if nwDstValue.IsValid() { + denyConn.OriginalDestinationAddress = nwDstValue } - if dstSvcPort != 0 { - denyConn.DestinationServicePort = dstSvcPort + if dstPortValue != 0 { + denyConn.OriginalDestinationPort = dstPortValue } // No need to obtain connection info again if it already exists in denyConnectionStore. diff --git a/pkg/agent/flowexporter/connections/conntrack_connections.go b/pkg/agent/flowexporter/connections/conntrack_connections.go index 76f2f7a7fae..1d4bf619e5d 100644 --- a/pkg/agent/flowexporter/connections/conntrack_connections.go +++ b/pkg/agent/flowexporter/connections/conntrack_connections.go @@ -260,8 +260,8 @@ func (cs *ConntrackConnectionStore) AddOrUpdateConn(conn *flowexporter.Connectio return } if conn.Mark&openflow.ServiceCTMark.GetRange().ToNXRange().ToUint32Mask() == openflow.ServiceCTMark.GetValue() { - clusterIP := conn.DestinationServiceAddress.String() - svcPort := conn.DestinationServicePort + clusterIP := conn.OriginalDestinationAddress.String() + svcPort := conn.OriginalDestinationPort protocol, err := lookupServiceProtocol(conn.FlowKey.Protocol) if err != nil { klog.InfoS("Could not retrieve Service protocol", "error", err) diff --git a/pkg/agent/flowexporter/connections/conntrack_connections_perf_test.go b/pkg/agent/flowexporter/connections/conntrack_connections_perf_test.go index 406afda91af..8bfc97a0d12 100644 --- a/pkg/agent/flowexporter/connections/conntrack_connections_perf_test.go +++ b/pkg/agent/flowexporter/connections/conntrack_connections_perf_test.go @@ -200,18 +200,18 @@ func getNewConn() *flowexporter.Connection { } flowKey := flowexporter.Tuple{SourceAddress: src, DestinationAddress: dst, Protocol: 6, SourcePort: uint16(randomNum1), DestinationPort: uint16(randomNum2)} return &flowexporter.Connection{ - StartTime: time.Now().Add(-time.Duration(randomNum1) * time.Second), - StopTime: time.Now(), - IsPresent: true, - ReadyToDelete: false, - FlowKey: flowKey, - OriginalPackets: 10, - OriginalBytes: 100, - ReversePackets: 5, - ReverseBytes: 50, - DestinationServiceAddress: svc, - DestinationServicePort: 30000, - TCPState: "SYN_SENT", + StartTime: time.Now().Add(-time.Duration(randomNum1) * time.Second), + StopTime: time.Now(), + IsPresent: true, + ReadyToDelete: false, + FlowKey: flowKey, + OriginalPackets: 10, + OriginalBytes: 100, + ReversePackets: 5, + ReverseBytes: 50, + OriginalDestinationAddress: svc, + OriginalDestinationPort: 30000, + TCPState: "SYN_SENT", } } diff --git a/pkg/agent/flowexporter/connections/conntrack_connections_test.go b/pkg/agent/flowexporter/connections/conntrack_connections_test.go index 33bd3279e85..3a63e5fa49f 100644 --- a/pkg/agent/flowexporter/connections/conntrack_connections_test.go +++ b/pkg/agent/flowexporter/connections/conntrack_connections_test.go @@ -242,7 +242,7 @@ func testAddNewConn(mockPodStore *podstoretest.MockInterface, mockProxier *proxy mockPodStore.EXPECT().GetPodByIPAndTime(conn.FlowKey.DestinationAddress.String(), gomock.Any()).Return(pod1, true) protocol, _ := lookupServiceProtocol(conn.FlowKey.Protocol) - serviceStr := fmt.Sprintf("%s:%d/%s", conn.DestinationServiceAddress.String(), conn.DestinationServicePort, protocol) + serviceStr := fmt.Sprintf("%s:%d/%s", conn.OriginalDestinationAddress.String(), conn.OriginalDestinationPort, protocol) mockProxier.EXPECT().GetServiceByIP(serviceStr).Return(servicePortName, true) ingressOfID := binary.LittleEndian.Uint32(conn.Labels[:4]) diff --git a/pkg/agent/flowexporter/connections/conntrack_linux.go b/pkg/agent/flowexporter/connections/conntrack_linux.go index 56c1308000b..d9d19267b74 100644 --- a/pkg/agent/flowexporter/connections/conntrack_linux.go +++ b/pkg/agent/flowexporter/connections/conntrack_linux.go @@ -140,17 +140,17 @@ func NetlinkFlowToAntreaConnection(conn *conntrack.Flow) *flowexporter.Connectio SourcePort: conn.TupleOrig.Proto.SourcePort, DestinationPort: conn.TupleReply.Proto.SourcePort, }, - DestinationServiceAddress: conn.TupleOrig.IP.DestinationAddress, - DestinationServicePort: conn.TupleOrig.Proto.DestinationPort, - OriginalPackets: conn.CountersOrig.Packets, - OriginalBytes: conn.CountersOrig.Bytes, - ReversePackets: conn.CountersReply.Packets, - ReverseBytes: conn.CountersReply.Bytes, - SourcePodNamespace: "", - SourcePodName: "", - DestinationPodNamespace: "", - DestinationPodName: "", - TCPState: "", + OriginalDestinationAddress: conn.TupleOrig.IP.DestinationAddress, + OriginalDestinationPort: conn.TupleOrig.Proto.DestinationPort, + OriginalPackets: conn.CountersOrig.Packets, + OriginalBytes: conn.CountersOrig.Bytes, + ReversePackets: conn.CountersReply.Packets, + ReverseBytes: conn.CountersReply.Bytes, + SourcePodNamespace: "", + SourcePodName: "", + DestinationPodNamespace: "", + DestinationPodName: "", + TCPState: "", } if conn.ProtoInfo.TCP != nil { newConn.TCPState = stateToString(conn.ProtoInfo.TCP.State) diff --git a/pkg/agent/flowexporter/connections/conntrack_linux_test.go b/pkg/agent/flowexporter/connections/conntrack_linux_test.go index c8b945aeeb8..66ac537e778 100644 --- a/pkg/agent/flowexporter/connections/conntrack_linux_test.go +++ b/pkg/agent/flowexporter/connections/conntrack_linux_test.go @@ -159,18 +159,18 @@ func TestConnTrackOvsAppCtl_DumpFlows(t *testing.T) { SourcePort: uint16(41284), DestinationPort: uint16(6443), }, - DestinationServiceAddress: netip.MustParseAddr("100.50.25.1"), - DestinationServicePort: uint16(443), - OriginalPackets: 343260, - OriginalBytes: 19340621, - ReversePackets: 381035, - ReverseBytes: 181176472, - SourcePodNamespace: "", - SourcePodName: "", - DestinationPodNamespace: "", - DestinationPodName: "", - TCPState: "ESTABLISHED", - Labels: []byte{1, 0, 0, 0, 2, 0, 0, 0}, + OriginalDestinationAddress: netip.MustParseAddr("100.50.25.1"), + OriginalDestinationPort: uint16(443), + OriginalPackets: 343260, + OriginalBytes: 19340621, + ReversePackets: 381035, + ReverseBytes: 181176472, + SourcePodNamespace: "", + SourcePodName: "", + DestinationPodNamespace: "", + DestinationPodName: "", + TCPState: "ESTABLISHED", + Labels: []byte{1, 0, 0, 0, 2, 0, 0, 0}, } mockOVSCtlClient.EXPECT().RunAppctlCmd("dpctl/dump-conntrack", false, "-m", "-s").Return(ovsctlCmdOutput, nil) @@ -229,24 +229,24 @@ func TestNetLinkFlowToAntreaConnection(t *testing.T) { DestinationPort: conntrackFlowTupleReply.Proto.SourcePort, } expectedAntreaFlow := &flowexporter.Connection{ - Timeout: netlinkFlow.Timeout, - StartTime: netlinkFlow.Timestamp.Start, - IsPresent: true, - Zone: 2, - StatusFlag: 0x4, - Mark: 0x1234, - FlowKey: tuple, - DestinationServiceAddress: conntrackFlowTuple.IP.DestinationAddress, - DestinationServicePort: conntrackFlowTuple.Proto.DestinationPort, - OriginalPackets: netlinkFlow.CountersOrig.Packets, - OriginalBytes: netlinkFlow.CountersOrig.Bytes, - ReversePackets: netlinkFlow.CountersReply.Packets, - ReverseBytes: netlinkFlow.CountersReply.Bytes, - SourcePodNamespace: "", - SourcePodName: "", - DestinationPodNamespace: "", - DestinationPodName: "", - TCPState: "", + Timeout: netlinkFlow.Timeout, + StartTime: netlinkFlow.Timestamp.Start, + IsPresent: true, + Zone: 2, + StatusFlag: 0x4, + Mark: 0x1234, + FlowKey: tuple, + OriginalDestinationAddress: conntrackFlowTuple.IP.DestinationAddress, + OriginalDestinationPort: conntrackFlowTuple.Proto.DestinationPort, + OriginalPackets: netlinkFlow.CountersOrig.Packets, + OriginalBytes: netlinkFlow.CountersOrig.Bytes, + ReversePackets: netlinkFlow.CountersReply.Packets, + ReverseBytes: netlinkFlow.CountersReply.Bytes, + SourcePodNamespace: "", + SourcePodName: "", + DestinationPodNamespace: "", + DestinationPodName: "", + TCPState: "", } antreaFlow := NetlinkFlowToAntreaConnection(netlinkFlow) @@ -265,25 +265,25 @@ func TestNetLinkFlowToAntreaConnection(t *testing.T) { }, } expectedAntreaFlow = &flowexporter.Connection{ - Timeout: netlinkFlow.Timeout, - StartTime: netlinkFlow.Timestamp.Start, - StopTime: netlinkFlow.Timestamp.Stop, - IsPresent: true, - Zone: 2, - StatusFlag: 0x204, - Mark: 0x1234, - FlowKey: tuple, - DestinationServiceAddress: conntrackFlowTuple.IP.DestinationAddress, - DestinationServicePort: conntrackFlowTuple.Proto.DestinationPort, - OriginalPackets: netlinkFlow.CountersOrig.Packets, - OriginalBytes: netlinkFlow.CountersOrig.Bytes, - ReversePackets: netlinkFlow.CountersReply.Packets, - ReverseBytes: netlinkFlow.CountersReply.Bytes, - SourcePodNamespace: "", - SourcePodName: "", - DestinationPodNamespace: "", - DestinationPodName: "", - TCPState: "", + Timeout: netlinkFlow.Timeout, + StartTime: netlinkFlow.Timestamp.Start, + StopTime: netlinkFlow.Timestamp.Stop, + IsPresent: true, + Zone: 2, + StatusFlag: 0x204, + Mark: 0x1234, + FlowKey: tuple, + OriginalDestinationAddress: conntrackFlowTuple.IP.DestinationAddress, + OriginalDestinationPort: conntrackFlowTuple.Proto.DestinationPort, + OriginalPackets: netlinkFlow.CountersOrig.Packets, + OriginalBytes: netlinkFlow.CountersOrig.Bytes, + ReversePackets: netlinkFlow.CountersReply.Packets, + ReverseBytes: netlinkFlow.CountersReply.Bytes, + SourcePodNamespace: "", + SourcePodName: "", + DestinationPodNamespace: "", + DestinationPodName: "", + TCPState: "", } antreaFlow = NetlinkFlowToAntreaConnection(netlinkFlow) diff --git a/pkg/agent/flowexporter/connections/conntrack_ovs.go b/pkg/agent/flowexporter/connections/conntrack_ovs.go index 2df30315d1c..efea78c31e6 100644 --- a/pkg/agent/flowexporter/connections/conntrack_ovs.go +++ b/pkg/agent/flowexporter/connections/conntrack_ovs.go @@ -165,9 +165,9 @@ func flowStringToAntreaConnection(flow string, zoneFilter uint16) (*flowexporter if !isReply { svcAddr, err := netip.ParseAddr(fields[len(fields)-1]) if err != nil { - return nil, fmt.Errorf("parsing destination service address failed: %w", err) + return nil, fmt.Errorf("parsing original destination address failed: %w", err) } - conn.DestinationServiceAddress = svcAddr + conn.OriginalDestinationAddress = svcAddr } case strings.Contains(fs, "sport"): fields := strings.Split(fs, "=") @@ -189,7 +189,7 @@ func flowStringToAntreaConnection(flow string, zoneFilter uint16) (*flowexporter return nil, fmt.Errorf("conversion of dport %s to int failed: %v", fields[len(fields)-1], err) } if !isReply { - conn.DestinationServicePort = uint16(val) + conn.OriginalDestinationPort = uint16(val) } case strings.Contains(fs, "packets"): fields := strings.Split(fs, "=") diff --git a/pkg/agent/flowexporter/connections/deny_connections.go b/pkg/agent/flowexporter/connections/deny_connections.go index 6645c3a66a6..1136a124385 100644 --- a/pkg/agent/flowexporter/connections/deny_connections.go +++ b/pkg/agent/flowexporter/connections/deny_connections.go @@ -104,7 +104,7 @@ func (ds *DenyConnectionStore) AddOrUpdateConn(conn *flowexporter.Connection, ti return } protocolStr := ip.IPProtocolNumberToString(conn.FlowKey.Protocol, "UnknownProtocol") - serviceStr := fmt.Sprintf("%s:%d/%s", conn.DestinationServiceAddress, conn.DestinationServicePort, protocolStr) + serviceStr := fmt.Sprintf("%s:%d/%s", conn.OriginalDestinationAddress, conn.OriginalDestinationPort, protocolStr) if conn.Mark&openflow.ServiceCTMark.GetRange().ToNXRange().ToUint32Mask() == openflow.ServiceCTMark.GetValue() { ds.fillServiceInfo(conn, serviceStr) } diff --git a/pkg/agent/flowexporter/connections/deny_connections_test.go b/pkg/agent/flowexporter/connections/deny_connections_test.go index 45c5c18f0de..44e73297bd6 100644 --- a/pkg/agent/flowexporter/connections/deny_connections_test.go +++ b/pkg/agent/flowexporter/connections/deny_connections_test.go @@ -55,29 +55,29 @@ func TestDenyConnectionStore_AddOrUpdateConn(t *testing.T) { { name: "Flow not through service", testFlow: flowexporter.Connection{ - StopTime: refTime.Add(-(time.Second * 20)), - StartTime: refTime.Add(-(time.Second * 20)), - FlowKey: tuple, - DestinationServiceAddress: tuple.DestinationAddress, - DestinationServicePort: tuple.DestinationPort, - OriginalBytes: uint64(60), - OriginalPackets: uint64(1), - IsActive: true, - Mark: 0, + StopTime: refTime.Add(-(time.Second * 20)), + StartTime: refTime.Add(-(time.Second * 20)), + FlowKey: tuple, + OriginalDestinationAddress: tuple.DestinationAddress, + OriginalDestinationPort: tuple.DestinationPort, + OriginalBytes: uint64(60), + OriginalPackets: uint64(1), + IsActive: true, + Mark: 0, }, isSvc: false, }, { name: "Flow through service", testFlow: flowexporter.Connection{ - StopTime: refTime.Add(-(time.Second * 20)), - StartTime: refTime.Add(-(time.Second * 20)), - FlowKey: tuple, - DestinationServiceAddress: tuple.DestinationAddress, - DestinationServicePort: tuple.DestinationPort, - OriginalBytes: uint64(60), - OriginalPackets: uint64(1), - IsActive: true, - Mark: openflow.ServiceCTMark.GetValue(), + StopTime: refTime.Add(-(time.Second * 20)), + StartTime: refTime.Add(-(time.Second * 20)), + FlowKey: tuple, + OriginalDestinationAddress: tuple.DestinationAddress, + OriginalDestinationPort: tuple.DestinationPort, + OriginalBytes: uint64(60), + OriginalPackets: uint64(1), + IsActive: true, + Mark: openflow.ServiceCTMark.GetValue(), }, isSvc: true, }, diff --git a/pkg/agent/flowexporter/exporter/exporter.go b/pkg/agent/flowexporter/exporter/exporter.go index df99ce56e44..f5a64d7118a 100644 --- a/pkg/agent/flowexporter/exporter/exporter.go +++ b/pkg/agent/flowexporter/exporter/exporter.go @@ -521,7 +521,7 @@ func (exp *FlowExporter) addConnToSet(conn *flowexporter.Connection) error { } case "destinationClusterIPv4": if conn.DestinationServicePortName != "" { - ie.SetIPAddressValue(conn.DestinationServiceAddress.AsSlice()) + ie.SetIPAddressValue(conn.OriginalDestinationAddress.AsSlice()) } else { // Sending dummy IP as IPFIX collector expects constant length of data for IP field. // We should probably think of better approach as this involves customization of IPFIX collector to ignore @@ -530,14 +530,14 @@ func (exp *FlowExporter) addConnToSet(conn *flowexporter.Connection) error { } case "destinationClusterIPv6": if conn.DestinationServicePortName != "" { - ie.SetIPAddressValue(conn.DestinationServiceAddress.AsSlice()) + ie.SetIPAddressValue(conn.OriginalDestinationAddress.AsSlice()) } else { // Same as destinationClusterIPv4. ie.SetIPAddressValue(net.ParseIP("::")) } case "destinationServicePort": if conn.DestinationServicePortName != "" { - ie.SetUnsigned16Value(conn.DestinationServicePort) + ie.SetUnsigned16Value(conn.OriginalDestinationPort) } else { ie.SetUnsigned16Value(uint16(0)) } diff --git a/pkg/agent/flowexporter/exporter/exporter_perf_test.go b/pkg/agent/flowexporter/exporter/exporter_perf_test.go index ee7dd71a409..48df3418ba9 100644 --- a/pkg/agent/flowexporter/exporter/exporter_perf_test.go +++ b/pkg/agent/flowexporter/exporter/exporter_perf_test.go @@ -273,7 +273,7 @@ func addConns(connStore *connections.ConntrackConnectionStore, expirePriorityQue DestinationPodNamespace: "ns2", DestinationPodName: "pod2", DestinationServicePortName: "service", - DestinationServiceAddress: svc, + OriginalDestinationAddress: svc, TCPState: "SYN_SENT", } connKey := flowexporter.NewConnectionKey(conn) diff --git a/pkg/agent/flowexporter/types.go b/pkg/agent/flowexporter/types.go index b9aa6a031cd..825fc5a4d7b 100644 --- a/pkg/agent/flowexporter/types.go +++ b/pkg/agent/flowexporter/types.go @@ -63,8 +63,8 @@ type Connection struct { DestinationPodNamespace string DestinationPodName string DestinationServicePortName string - DestinationServiceAddress netip.Addr - DestinationServicePort uint16 + OriginalDestinationAddress netip.Addr + OriginalDestinationPort uint16 IngressNetworkPolicyName string IngressNetworkPolicyNamespace string IngressNetworkPolicyType uint8