Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove redundant log in fillPodInfo/fillServiceInfo and update deny connection store. #5592

Merged
merged 6 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 55 additions & 1 deletion pkg/agent/controller/networkpolicy/packetin.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package networkpolicy
import (
"errors"
"fmt"
"net"
"net/netip"
"time"

Expand Down Expand Up @@ -107,6 +108,7 @@ func (c *Controller) storeDenyConnection(pktIn *ofctrl.PacketIn) error {
if err != nil {
return fmt.Errorf("error in parsing packetIn: %v", err)
}
matchers := pktIn.GetMatches()

// Get 5-tuple information
sourceAddr, _ := netip.AddrFromSlice(packet.SourceIP)
Expand All @@ -124,14 +126,22 @@ func (c *Controller) storeDenyConnection(pktIn *ofctrl.PacketIn) error {
denyConn.FlowKey = tuple
denyConn.DestinationServiceAddress = tuple.DestinationAddress
denyConn.DestinationServicePort = tuple.DestinationPort
denyConn.Mark = getCTMarkValue(matchers)
dstSvcAddress := getCTNwDstValue(matchers)
dstSvcPort := getCTTpDstValue(matchers)
if dstSvcAddress.IsValid() {
denyConn.DestinationServiceAddress = dstSvcAddress
}
if dstSvcPort != 0 {
denyConn.DestinationServicePort = dstSvcPort
}

// No need to obtain connection info again if it already exists in denyConnectionStore.
if conn, exist := c.denyConnStore.GetConnByKey(flowexporter.NewConnectionKey(&denyConn)); exist {
c.denyConnStore.AddOrUpdateConn(conn, time.Now(), uint64(packet.IPLength))
return nil
}

matchers := pktIn.GetMatches()
var match *ofctrl.MatchField
// Get table ID
tableID := getPacketInTableID(pktIn)
Expand Down Expand Up @@ -223,3 +233,47 @@ func getPacketInTableID(pktIn *ofctrl.PacketIn) uint8 {
}
return tableID
}

func getCTMarkValue(matchers *ofctrl.Matchers) uint32 {
ctMark := matchers.GetMatchByName("NXM_NX_CT_MARK")
if ctMark == nil {
return 0
}
ctMarkValue, ok := ctMark.GetValue().(uint32)
if !ok {
return 0
}
return ctMarkValue
}

func getCTNwDstValue(matchers *ofctrl.Matchers) netip.Addr {
nwDst := matchers.GetMatchByName("NXM_NX_CT_NW_DST")
if nwDst != nil {
if nwDstValue, ok := nwDst.GetValue().(net.IP); ok {
if ip, ok := netip.AddrFromSlice(nwDstValue.To4()); ok {
return ip
}
}
}
nwDst = matchers.GetMatchByName("NXM_NX_CT_IPV6_DST")
if nwDst != nil {
if nwDstValue, ok := nwDst.GetValue().(net.IP); ok {
if ip, ok := netip.AddrFromSlice(nwDstValue.To16()); ok {
return ip
}
}
}
return netip.Addr{}
}

func getCTTpDstValue(matchers *ofctrl.Matchers) uint16 {
port := matchers.GetMatchByName("NXM_NX_CT_TP_DST")
if port == nil {
return 0
}
portValue, ok := port.GetValue().(uint16)
if !ok {
return 0
}
return portValue
}
9 changes: 3 additions & 6 deletions pkg/agent/flowexporter/connections/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,6 @@ func (cs *connectionStore) fillPodInfo(conn *flowexporter.Connection) {

srcPod, srcFound := cs.podStore.GetPodByIPAndTime(srcIP, conn.StartTime)
dstPod, dstFound := cs.podStore.GetPodByIPAndTime(dstIP, conn.StartTime)
if !srcFound && !dstFound {
klog.InfoS("Cannot map any of the connection IPs to a local Pod", "srcIP", srcIP, "dstIP", dstIP)
}
if srcFound {
conn.SourcePodName = srcPod.Name
conn.SourcePodNamespace = srcPod.Namespace
Expand All @@ -125,10 +122,10 @@ func (cs *connectionStore) fillServiceInfo(conn *flowexporter.Connection, servic
// resolve destination Service information
if cs.antreaProxier != nil {
servicePortName, exists := cs.antreaProxier.GetServiceByIP(serviceStr)
if !exists {
klog.Warningf("Could not retrieve the Service info from antrea-agent-proxier for the serviceStr: %s", serviceStr)
} else {
yuntanghsu marked this conversation as resolved.
Show resolved Hide resolved
if exists {
conn.DestinationServicePortName = servicePortName.String()
} else {
klog.InfoS("Could not retrieve the Service info from antrea-agent-proxier", "serviceStr", serviceStr)
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/agent/flowexporter/connections/conntrack_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,12 @@ func (cs *ConntrackConnectionStore) AddOrUpdateConn(conn *flowexporter.Connectio
klog.V(4).InfoS("Antrea flow updated", "connection", existingConn)
} else {
cs.fillPodInfo(conn)
if conn.SourcePodName == "" && conn.DestinationPodName == "" {
// We don't add connections to connection map or expirePriorityQueue if we can't find the pod
// information for both srcPod and dstPod
yuntanghsu marked this conversation as resolved.
Show resolved Hide resolved
klog.V(5).InfoS("Skip this connection as we cannot map any of the connection IPs to a local Pod", "srcIP", conn.FlowKey.SourceAddress.String(), "dstIP", conn.FlowKey.DestinationAddress.String())
return
}
if conn.Mark&openflow.ServiceCTMark.GetRange().ToNXRange().ToUint32Mask() == openflow.ServiceCTMark.GetValue() {
clusterIP := conn.DestinationServiceAddress.String()
svcPort := conn.DestinationServicePort
Expand Down
11 changes: 10 additions & 1 deletion pkg/agent/flowexporter/connections/deny_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"antrea.io/antrea/pkg/agent/flowexporter"
"antrea.io/antrea/pkg/agent/flowexporter/priorityqueue"
"antrea.io/antrea/pkg/agent/metrics"
"antrea.io/antrea/pkg/agent/openflow"
"antrea.io/antrea/pkg/agent/proxy"
"antrea.io/antrea/pkg/util/ip"
"antrea.io/antrea/pkg/util/podstore"
Expand Down Expand Up @@ -96,9 +97,17 @@ func (ds *DenyConnectionStore) AddOrUpdateConn(conn *flowexporter.Connection, ti
conn.OriginalBytes = bytes
conn.OriginalPackets = uint64(1)
ds.fillPodInfo(conn)
if conn.SourcePodName == "" && conn.DestinationPodName == "" {
// We don't add connections to connection map or expirePriorityQueue if we can't find the pod
// information for both srcPod and dstPod
yuntanghsu marked this conversation as resolved.
Show resolved Hide resolved
klog.V(5).InfoS("Skip this connection as we cannot map any of the connection IPs to a local Pod", "srcIP", conn.FlowKey.SourceAddress.String(), "dstIP", conn.FlowKey.DestinationAddress.String())
return
}
protocolStr := ip.IPProtocolNumberToString(conn.FlowKey.Protocol, "UnknownProtocol")
serviceStr := fmt.Sprintf("%s:%d/%s", conn.DestinationServiceAddress, conn.DestinationServicePort, protocolStr)
ds.fillServiceInfo(conn, serviceStr)
if conn.Mark&openflow.ServiceCTMark.GetRange().ToNXRange().ToUint32Mask() == openflow.ServiceCTMark.GetValue() {
ds.fillServiceInfo(conn, serviceStr)
}
metrics.TotalDenyConnections.Inc()
conn.IsActive = true
ds.connections[connKey] = conn
Expand Down
113 changes: 75 additions & 38 deletions pkg/agent/flowexporter/connections/deny_connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"k8s.io/apimachinery/pkg/types"

"antrea.io/antrea/pkg/agent/flowexporter"
"antrea.io/antrea/pkg/agent/metrics"
"antrea.io/antrea/pkg/agent/openflow"
proxytest "antrea.io/antrea/pkg/agent/proxy/testing"
podstoretest "antrea.io/antrea/pkg/util/podstore/testing"
k8sproxy "antrea.io/antrea/third_party/proxy"
Expand All @@ -44,46 +46,81 @@ func TestDenyConnectionStore_AddOrUpdateConn(t *testing.T) {
Port: "255",
Protocol: v1.ProtocolTCP,
}
// flow for testing adding and updating
testFlow := flowexporter.Connection{
StopTime: refTime.Add(-(time.Second * 20)),
StartTime: refTime.Add(-(time.Second * 20)),
FlowKey: tuple,
DestinationServiceAddress: tuple.DestinationAddress,
DestinationServicePort: tuple.DestinationPort,
OriginalBytes: uint64(60),
OriginalPackets: uint64(1),
IsActive: true,
tc := []struct {
name string
// flow for testing adding and updating
testFlow flowexporter.Connection
isSvc bool
}{
{
name: "Flow not through service",
testFlow: flowexporter.Connection{
StopTime: refTime.Add(-(time.Second * 20)),
StartTime: refTime.Add(-(time.Second * 20)),
FlowKey: tuple,
DestinationServiceAddress: tuple.DestinationAddress,
DestinationServicePort: tuple.DestinationPort,
OriginalBytes: uint64(60),
OriginalPackets: uint64(1),
IsActive: true,
Mark: 0,
},
isSvc: false,
}, {
name: "Flow through service",
testFlow: flowexporter.Connection{
StopTime: refTime.Add(-(time.Second * 20)),
StartTime: refTime.Add(-(time.Second * 20)),
FlowKey: tuple,
DestinationServiceAddress: tuple.DestinationAddress,
DestinationServicePort: tuple.DestinationPort,
OriginalBytes: uint64(60),
OriginalPackets: uint64(1),
IsActive: true,
Mark: openflow.ServiceCTMark.GetValue(),
},
isSvc: true,
},
}
mockPodStore := podstoretest.NewMockInterface(ctrl)
mockProxier := proxytest.NewMockProxier(ctrl)
protocol, _ := lookupServiceProtocol(tuple.Protocol)
serviceStr := fmt.Sprintf("%s:%d/%s", tuple.DestinationAddress.String(), tuple.DestinationPort, protocol)
mockProxier.EXPECT().GetServiceByIP(serviceStr).Return(servicePortName, true)
mockPodStore.EXPECT().GetPodByIPAndTime(tuple.SourceAddress.String(), gomock.Any()).Return(nil, false)
mockPodStore.EXPECT().GetPodByIPAndTime(tuple.DestinationAddress.String(), gomock.Any()).Return(nil, false)
for _, c := range tc {
t.Run(c.name, func(t *testing.T) {
// Reset the metrics.
metrics.TotalDenyConnections.Set(0)
mockPodStore := podstoretest.NewMockInterface(ctrl)
mockProxier := proxytest.NewMockProxier(ctrl)
protocol, _ := lookupServiceProtocol(tuple.Protocol)
serviceStr := fmt.Sprintf("%s:%d/%s", tuple.DestinationAddress.String(), tuple.DestinationPort, protocol)
if c.isSvc {
mockProxier.EXPECT().GetServiceByIP(serviceStr).Return(servicePortName, true)
}
mockPodStore.EXPECT().GetPodByIPAndTime(tuple.SourceAddress.String(), gomock.Any()).Return(pod1, true)
mockPodStore.EXPECT().GetPodByIPAndTime(tuple.DestinationAddress.String(), gomock.Any()).Return(pod1, true)

denyConnStore := NewDenyConnectionStore(mockPodStore, mockProxier, testFlowExporterOptions)
denyConnStore := NewDenyConnectionStore(mockPodStore, mockProxier, testFlowExporterOptions)

denyConnStore.AddOrUpdateConn(&testFlow, refTime.Add(-(time.Second * 20)), uint64(60))
expConn := testFlow
expConn.DestinationServicePortName = servicePortName.String()
actualConn, ok := denyConnStore.GetConnByKey(flowexporter.NewConnectionKey(&testFlow))
assert.True(t, ok, "deny connection should be there in deny connection store")
assert.Equal(t, expConn, *actualConn, "deny connections should be equal")
assert.Equal(t, 1, denyConnStore.connectionStore.expirePriorityQueue.Len(), "Length of the expire priority queue should be 1")
assert.Equal(t, refTime.Add(-(time.Second * 20)), actualConn.LastExportTime, "LastExportTime should be set to StartTime during Add")
checkDenyConnectionMetrics(t, len(denyConnStore.connections))
denyConnStore.AddOrUpdateConn(&c.testFlow, refTime.Add(-(time.Second * 20)), uint64(60))
expConn := c.testFlow
if c.isSvc {
expConn.DestinationServicePortName = servicePortName.String()
}
actualConn, ok := denyConnStore.GetConnByKey(flowexporter.NewConnectionKey(&c.testFlow))
assert.Equal(t, ok, true, "deny connection should be there in deny connection store")
assert.Equal(t, expConn, *actualConn, "deny connections should be equal")
assert.Equal(t, 1, denyConnStore.connectionStore.expirePriorityQueue.Len(), "Length of the expire priority queue should be 1")
assert.Equal(t, refTime.Add(-(time.Second * 20)), actualConn.LastExportTime, "LastExportTime should be set to StartTime during Add")
checkDenyConnectionMetrics(t, len(denyConnStore.connections))

denyConnStore.AddOrUpdateConn(&testFlow, refTime.Add(-(time.Second * 10)), uint64(60))
expConn.OriginalBytes = uint64(120)
expConn.OriginalPackets = uint64(2)
expConn.StopTime = refTime.Add(-(time.Second * 10))
actualConn, ok = denyConnStore.GetConnByKey(flowexporter.NewConnectionKey(&testFlow))
assert.Equal(t, ok, true, "deny connection should be there in deny connection store")
assert.Equal(t, expConn, *actualConn, "deny connections should be equal")
assert.True(t, actualConn.IsActive)
assert.Equal(t, 1, denyConnStore.connectionStore.expirePriorityQueue.Len())
assert.Equal(t, refTime.Add(-(time.Second * 20)), actualConn.LastExportTime, "LastExportTime should not be changed during Update")
checkDenyConnectionMetrics(t, len(denyConnStore.connections))
denyConnStore.AddOrUpdateConn(&c.testFlow, refTime.Add(-(time.Second * 10)), uint64(60))
expConn.OriginalBytes = uint64(120)
expConn.OriginalPackets = uint64(2)
expConn.StopTime = refTime.Add(-(time.Second * 10))
actualConn, ok = denyConnStore.GetConnByKey(flowexporter.NewConnectionKey(&c.testFlow))
assert.Equal(t, ok, true, "deny connection should be there in deny connection store")
assert.Equal(t, expConn, *actualConn, "deny connections should be equal")
assert.True(t, actualConn.IsActive)
assert.Equal(t, 1, denyConnStore.connectionStore.expirePriorityQueue.Len())
assert.Equal(t, refTime.Add(-(time.Second * 20)), actualConn.LastExportTime, "LastExportTime should not be changed during Update")
checkDenyConnectionMetrics(t, len(denyConnStore.connections))
})
}
}
3 changes: 2 additions & 1 deletion pkg/agent/flowexporter/exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,8 @@ func getDenyConnection(isIPv6 bool, protoID uint8) *flowexporter.Connection {
tuple = flowexporter.Tuple{SourceAddress: srcIP, DestinationAddress: dstIP, Protocol: protoID, SourcePort: 65280, DestinationPort: 255}
}
conn := &flowexporter.Connection{
FlowKey: tuple,
FlowKey: tuple,
SourcePodName: "pod",
}
return conn
}
Expand Down
2 changes: 0 additions & 2 deletions test/e2e/bandwidth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
v1 "k8s.io/api/core/v1"
)

const iperfPort = 5201

// TestBandwidth is the top-level test which contains all subtests for
// Bandwidth related test cases so they can share setup, teardown.
func TestBandwidth(t *testing.T) {
Expand Down
Loading
Loading