Skip to content

Commit

Permalink
Rename service related fields in Connection struct (#5704)
Browse files Browse the repository at this point in the history
we rename DestinationServiceAddress and DestinationServicePort in Connection struct as
they actually represent the value of OriginalDestinationAddress and OriginalDestinationPort.

Signed-off-by: Yun-Tang Hsu <hsuy@vmware.com>
  • Loading branch information
yuntanghsu authored Nov 15, 2023
1 parent d6766cc commit dce23d6
Show file tree
Hide file tree
Showing 12 changed files with 111 additions and 111 deletions.
16 changes: 8 additions & 8 deletions pkg/agent/controller/networkpolicy/packetin.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,16 +124,16 @@ func (c *Controller) storeDenyConnection(pktIn *ofctrl.PacketIn) error {
// Generate deny connection and add to deny connection store
denyConn := flowexporter.Connection{}
denyConn.FlowKey = tuple
denyConn.DestinationServiceAddress = tuple.DestinationAddress
denyConn.DestinationServicePort = tuple.DestinationPort
denyConn.OriginalDestinationAddress = tuple.DestinationAddress
denyConn.OriginalDestinationPort = tuple.DestinationPort
denyConn.Mark = getCTMarkValue(matchers)
dstSvcAddress := getCTNwDstValue(matchers)
dstSvcPort := getCTTpDstValue(matchers)
if dstSvcAddress.IsValid() {
denyConn.DestinationServiceAddress = dstSvcAddress
nwDstValue := getCTNwDstValue(matchers)
dstPortValue := getCTTpDstValue(matchers)
if nwDstValue.IsValid() {
denyConn.OriginalDestinationAddress = nwDstValue
}
if dstSvcPort != 0 {
denyConn.DestinationServicePort = dstSvcPort
if dstPortValue != 0 {
denyConn.OriginalDestinationPort = dstPortValue
}

// No need to obtain connection info again if it already exists in denyConnectionStore.
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/flowexporter/connections/conntrack_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,8 @@ func (cs *ConntrackConnectionStore) AddOrUpdateConn(conn *flowexporter.Connectio
return
}
if conn.Mark&openflow.ServiceCTMark.GetRange().ToNXRange().ToUint32Mask() == openflow.ServiceCTMark.GetValue() {
clusterIP := conn.DestinationServiceAddress.String()
svcPort := conn.DestinationServicePort
clusterIP := conn.OriginalDestinationAddress.String()
svcPort := conn.OriginalDestinationPort
protocol, err := lookupServiceProtocol(conn.FlowKey.Protocol)
if err != nil {
klog.InfoS("Could not retrieve Service protocol", "error", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,18 +200,18 @@ func getNewConn() *flowexporter.Connection {
}
flowKey := flowexporter.Tuple{SourceAddress: src, DestinationAddress: dst, Protocol: 6, SourcePort: uint16(randomNum1), DestinationPort: uint16(randomNum2)}
return &flowexporter.Connection{
StartTime: time.Now().Add(-time.Duration(randomNum1) * time.Second),
StopTime: time.Now(),
IsPresent: true,
ReadyToDelete: false,
FlowKey: flowKey,
OriginalPackets: 10,
OriginalBytes: 100,
ReversePackets: 5,
ReverseBytes: 50,
DestinationServiceAddress: svc,
DestinationServicePort: 30000,
TCPState: "SYN_SENT",
StartTime: time.Now().Add(-time.Duration(randomNum1) * time.Second),
StopTime: time.Now(),
IsPresent: true,
ReadyToDelete: false,
FlowKey: flowKey,
OriginalPackets: 10,
OriginalBytes: 100,
ReversePackets: 5,
ReverseBytes: 50,
OriginalDestinationAddress: svc,
OriginalDestinationPort: 30000,
TCPState: "SYN_SENT",
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func testAddNewConn(mockPodStore *podstoretest.MockInterface, mockProxier *proxy
mockPodStore.EXPECT().GetPodByIPAndTime(conn.FlowKey.DestinationAddress.String(), gomock.Any()).Return(pod1, true)

protocol, _ := lookupServiceProtocol(conn.FlowKey.Protocol)
serviceStr := fmt.Sprintf("%s:%d/%s", conn.DestinationServiceAddress.String(), conn.DestinationServicePort, protocol)
serviceStr := fmt.Sprintf("%s:%d/%s", conn.OriginalDestinationAddress.String(), conn.OriginalDestinationPort, protocol)
mockProxier.EXPECT().GetServiceByIP(serviceStr).Return(servicePortName, true)

ingressOfID := binary.LittleEndian.Uint32(conn.Labels[:4])
Expand Down
22 changes: 11 additions & 11 deletions pkg/agent/flowexporter/connections/conntrack_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,17 +140,17 @@ func NetlinkFlowToAntreaConnection(conn *conntrack.Flow) *flowexporter.Connectio
SourcePort: conn.TupleOrig.Proto.SourcePort,
DestinationPort: conn.TupleReply.Proto.SourcePort,
},
DestinationServiceAddress: conn.TupleOrig.IP.DestinationAddress,
DestinationServicePort: conn.TupleOrig.Proto.DestinationPort,
OriginalPackets: conn.CountersOrig.Packets,
OriginalBytes: conn.CountersOrig.Bytes,
ReversePackets: conn.CountersReply.Packets,
ReverseBytes: conn.CountersReply.Bytes,
SourcePodNamespace: "",
SourcePodName: "",
DestinationPodNamespace: "",
DestinationPodName: "",
TCPState: "",
OriginalDestinationAddress: conn.TupleOrig.IP.DestinationAddress,
OriginalDestinationPort: conn.TupleOrig.Proto.DestinationPort,
OriginalPackets: conn.CountersOrig.Packets,
OriginalBytes: conn.CountersOrig.Bytes,
ReversePackets: conn.CountersReply.Packets,
ReverseBytes: conn.CountersReply.Bytes,
SourcePodNamespace: "",
SourcePodName: "",
DestinationPodNamespace: "",
DestinationPodName: "",
TCPState: "",
}
if conn.ProtoInfo.TCP != nil {
newConn.TCPState = stateToString(conn.ProtoInfo.TCP.State)
Expand Down
98 changes: 49 additions & 49 deletions pkg/agent/flowexporter/connections/conntrack_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,18 +159,18 @@ func TestConnTrackOvsAppCtl_DumpFlows(t *testing.T) {
SourcePort: uint16(41284),
DestinationPort: uint16(6443),
},
DestinationServiceAddress: netip.MustParseAddr("100.50.25.1"),
DestinationServicePort: uint16(443),
OriginalPackets: 343260,
OriginalBytes: 19340621,
ReversePackets: 381035,
ReverseBytes: 181176472,
SourcePodNamespace: "",
SourcePodName: "",
DestinationPodNamespace: "",
DestinationPodName: "",
TCPState: "ESTABLISHED",
Labels: []byte{1, 0, 0, 0, 2, 0, 0, 0},
OriginalDestinationAddress: netip.MustParseAddr("100.50.25.1"),
OriginalDestinationPort: uint16(443),
OriginalPackets: 343260,
OriginalBytes: 19340621,
ReversePackets: 381035,
ReverseBytes: 181176472,
SourcePodNamespace: "",
SourcePodName: "",
DestinationPodNamespace: "",
DestinationPodName: "",
TCPState: "ESTABLISHED",
Labels: []byte{1, 0, 0, 0, 2, 0, 0, 0},
}
mockOVSCtlClient.EXPECT().RunAppctlCmd("dpctl/dump-conntrack", false, "-m", "-s").Return(ovsctlCmdOutput, nil)

Expand Down Expand Up @@ -229,24 +229,24 @@ func TestNetLinkFlowToAntreaConnection(t *testing.T) {
DestinationPort: conntrackFlowTupleReply.Proto.SourcePort,
}
expectedAntreaFlow := &flowexporter.Connection{
Timeout: netlinkFlow.Timeout,
StartTime: netlinkFlow.Timestamp.Start,
IsPresent: true,
Zone: 2,
StatusFlag: 0x4,
Mark: 0x1234,
FlowKey: tuple,
DestinationServiceAddress: conntrackFlowTuple.IP.DestinationAddress,
DestinationServicePort: conntrackFlowTuple.Proto.DestinationPort,
OriginalPackets: netlinkFlow.CountersOrig.Packets,
OriginalBytes: netlinkFlow.CountersOrig.Bytes,
ReversePackets: netlinkFlow.CountersReply.Packets,
ReverseBytes: netlinkFlow.CountersReply.Bytes,
SourcePodNamespace: "",
SourcePodName: "",
DestinationPodNamespace: "",
DestinationPodName: "",
TCPState: "",
Timeout: netlinkFlow.Timeout,
StartTime: netlinkFlow.Timestamp.Start,
IsPresent: true,
Zone: 2,
StatusFlag: 0x4,
Mark: 0x1234,
FlowKey: tuple,
OriginalDestinationAddress: conntrackFlowTuple.IP.DestinationAddress,
OriginalDestinationPort: conntrackFlowTuple.Proto.DestinationPort,
OriginalPackets: netlinkFlow.CountersOrig.Packets,
OriginalBytes: netlinkFlow.CountersOrig.Bytes,
ReversePackets: netlinkFlow.CountersReply.Packets,
ReverseBytes: netlinkFlow.CountersReply.Bytes,
SourcePodNamespace: "",
SourcePodName: "",
DestinationPodNamespace: "",
DestinationPodName: "",
TCPState: "",
}

antreaFlow := NetlinkFlowToAntreaConnection(netlinkFlow)
Expand All @@ -265,25 +265,25 @@ func TestNetLinkFlowToAntreaConnection(t *testing.T) {
},
}
expectedAntreaFlow = &flowexporter.Connection{
Timeout: netlinkFlow.Timeout,
StartTime: netlinkFlow.Timestamp.Start,
StopTime: netlinkFlow.Timestamp.Stop,
IsPresent: true,
Zone: 2,
StatusFlag: 0x204,
Mark: 0x1234,
FlowKey: tuple,
DestinationServiceAddress: conntrackFlowTuple.IP.DestinationAddress,
DestinationServicePort: conntrackFlowTuple.Proto.DestinationPort,
OriginalPackets: netlinkFlow.CountersOrig.Packets,
OriginalBytes: netlinkFlow.CountersOrig.Bytes,
ReversePackets: netlinkFlow.CountersReply.Packets,
ReverseBytes: netlinkFlow.CountersReply.Bytes,
SourcePodNamespace: "",
SourcePodName: "",
DestinationPodNamespace: "",
DestinationPodName: "",
TCPState: "",
Timeout: netlinkFlow.Timeout,
StartTime: netlinkFlow.Timestamp.Start,
StopTime: netlinkFlow.Timestamp.Stop,
IsPresent: true,
Zone: 2,
StatusFlag: 0x204,
Mark: 0x1234,
FlowKey: tuple,
OriginalDestinationAddress: conntrackFlowTuple.IP.DestinationAddress,
OriginalDestinationPort: conntrackFlowTuple.Proto.DestinationPort,
OriginalPackets: netlinkFlow.CountersOrig.Packets,
OriginalBytes: netlinkFlow.CountersOrig.Bytes,
ReversePackets: netlinkFlow.CountersReply.Packets,
ReverseBytes: netlinkFlow.CountersReply.Bytes,
SourcePodNamespace: "",
SourcePodName: "",
DestinationPodNamespace: "",
DestinationPodName: "",
TCPState: "",
}

antreaFlow = NetlinkFlowToAntreaConnection(netlinkFlow)
Expand Down
6 changes: 3 additions & 3 deletions pkg/agent/flowexporter/connections/conntrack_ovs.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,9 @@ func flowStringToAntreaConnection(flow string, zoneFilter uint16) (*flowexporter
if !isReply {
svcAddr, err := netip.ParseAddr(fields[len(fields)-1])
if err != nil {
return nil, fmt.Errorf("parsing destination service address failed: %w", err)
return nil, fmt.Errorf("parsing original destination address failed: %w", err)
}
conn.DestinationServiceAddress = svcAddr
conn.OriginalDestinationAddress = svcAddr
}
case strings.Contains(fs, "sport"):
fields := strings.Split(fs, "=")
Expand All @@ -189,7 +189,7 @@ func flowStringToAntreaConnection(flow string, zoneFilter uint16) (*flowexporter
return nil, fmt.Errorf("conversion of dport %s to int failed: %v", fields[len(fields)-1], err)
}
if !isReply {
conn.DestinationServicePort = uint16(val)
conn.OriginalDestinationPort = uint16(val)
}
case strings.Contains(fs, "packets"):
fields := strings.Split(fs, "=")
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/flowexporter/connections/deny_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (ds *DenyConnectionStore) AddOrUpdateConn(conn *flowexporter.Connection, ti
return
}
protocolStr := ip.IPProtocolNumberToString(conn.FlowKey.Protocol, "UnknownProtocol")
serviceStr := fmt.Sprintf("%s:%d/%s", conn.DestinationServiceAddress, conn.DestinationServicePort, protocolStr)
serviceStr := fmt.Sprintf("%s:%d/%s", conn.OriginalDestinationAddress, conn.OriginalDestinationPort, protocolStr)
if conn.Mark&openflow.ServiceCTMark.GetRange().ToNXRange().ToUint32Mask() == openflow.ServiceCTMark.GetValue() {
ds.fillServiceInfo(conn, serviceStr)
}
Expand Down
36 changes: 18 additions & 18 deletions pkg/agent/flowexporter/connections/deny_connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,29 +55,29 @@ func TestDenyConnectionStore_AddOrUpdateConn(t *testing.T) {
{
name: "Flow not through service",
testFlow: flowexporter.Connection{
StopTime: refTime.Add(-(time.Second * 20)),
StartTime: refTime.Add(-(time.Second * 20)),
FlowKey: tuple,
DestinationServiceAddress: tuple.DestinationAddress,
DestinationServicePort: tuple.DestinationPort,
OriginalBytes: uint64(60),
OriginalPackets: uint64(1),
IsActive: true,
Mark: 0,
StopTime: refTime.Add(-(time.Second * 20)),
StartTime: refTime.Add(-(time.Second * 20)),
FlowKey: tuple,
OriginalDestinationAddress: tuple.DestinationAddress,
OriginalDestinationPort: tuple.DestinationPort,
OriginalBytes: uint64(60),
OriginalPackets: uint64(1),
IsActive: true,
Mark: 0,
},
isSvc: false,
}, {
name: "Flow through service",
testFlow: flowexporter.Connection{
StopTime: refTime.Add(-(time.Second * 20)),
StartTime: refTime.Add(-(time.Second * 20)),
FlowKey: tuple,
DestinationServiceAddress: tuple.DestinationAddress,
DestinationServicePort: tuple.DestinationPort,
OriginalBytes: uint64(60),
OriginalPackets: uint64(1),
IsActive: true,
Mark: openflow.ServiceCTMark.GetValue(),
StopTime: refTime.Add(-(time.Second * 20)),
StartTime: refTime.Add(-(time.Second * 20)),
FlowKey: tuple,
OriginalDestinationAddress: tuple.DestinationAddress,
OriginalDestinationPort: tuple.DestinationPort,
OriginalBytes: uint64(60),
OriginalPackets: uint64(1),
IsActive: true,
Mark: openflow.ServiceCTMark.GetValue(),
},
isSvc: true,
},
Expand Down
6 changes: 3 additions & 3 deletions pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ func (exp *FlowExporter) addConnToSet(conn *flowexporter.Connection) error {
}
case "destinationClusterIPv4":
if conn.DestinationServicePortName != "" {
ie.SetIPAddressValue(conn.DestinationServiceAddress.AsSlice())
ie.SetIPAddressValue(conn.OriginalDestinationAddress.AsSlice())
} else {
// Sending dummy IP as IPFIX collector expects constant length of data for IP field.
// We should probably think of better approach as this involves customization of IPFIX collector to ignore
Expand All @@ -530,14 +530,14 @@ func (exp *FlowExporter) addConnToSet(conn *flowexporter.Connection) error {
}
case "destinationClusterIPv6":
if conn.DestinationServicePortName != "" {
ie.SetIPAddressValue(conn.DestinationServiceAddress.AsSlice())
ie.SetIPAddressValue(conn.OriginalDestinationAddress.AsSlice())
} else {
// Same as destinationClusterIPv4.
ie.SetIPAddressValue(net.ParseIP("::"))
}
case "destinationServicePort":
if conn.DestinationServicePortName != "" {
ie.SetUnsigned16Value(conn.DestinationServicePort)
ie.SetUnsigned16Value(conn.OriginalDestinationPort)
} else {
ie.SetUnsigned16Value(uint16(0))
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/flowexporter/exporter/exporter_perf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func addConns(connStore *connections.ConntrackConnectionStore, expirePriorityQue
DestinationPodNamespace: "ns2",
DestinationPodName: "pod2",
DestinationServicePortName: "service",
DestinationServiceAddress: svc,
OriginalDestinationAddress: svc,
TCPState: "SYN_SENT",
}
connKey := flowexporter.NewConnectionKey(conn)
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/flowexporter/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ type Connection struct {
DestinationPodNamespace string
DestinationPodName string
DestinationServicePortName string
DestinationServiceAddress netip.Addr
DestinationServicePort uint16
OriginalDestinationAddress netip.Addr
OriginalDestinationPort uint16
IngressNetworkPolicyName string
IngressNetworkPolicyNamespace string
IngressNetworkPolicyType uint8
Expand Down

0 comments on commit dce23d6

Please sign in to comment.