Skip to content

Commit

Permalink
Remove redundant log in fillPodInfo/fillServiceInfo and update deny c…
Browse files Browse the repository at this point in the history
…onnection store. (#5592)

1. Remove redundant logs in fillPodInfo/fillServiceInfo to avoid Flow
   Exporter from flooding logs.
2. Add Mark field for deny connections. We were missing this field
   previously, resulting in trying to fill service information for
   non-service IPs.
3. Update the DestinationServiceAddress for deny connections when we can
   find this information in PacketIn.
4. Update e2e/unit tests to verify that the Service-relateds field are
   filled correctly.

Fixes #5573

Signed-off-by: Yun-Tang Hsu <hsuy@vmware.com>
  • Loading branch information
yuntanghsu authored and antoninbas committed Jan 23, 2024
1 parent 2cad573 commit 7b3ef5c
Show file tree
Hide file tree
Showing 9 changed files with 448 additions and 160 deletions.
53 changes: 52 additions & 1 deletion pkg/agent/controller/networkpolicy/packetin.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func (c *Controller) storeDenyConnection(pktIn *ofctrl.PacketIn) error {
if err != nil {
return fmt.Errorf("error in parsing packetIn: %v", err)
}
matchers := pktIn.GetMatches()

// Get 5-tuple information
tuple := flowexporter.Tuple{
Expand All @@ -125,14 +126,22 @@ func (c *Controller) storeDenyConnection(pktIn *ofctrl.PacketIn) error {
denyConn.FlowKey = tuple
denyConn.DestinationServiceAddress = tuple.DestinationAddress
denyConn.DestinationServicePort = tuple.DestinationPort
denyConn.Mark = getCTMarkValue(matchers)
dstSvcAddress := getCTNwDstValue(matchers)
dstSvcPort := getCTTpDstValue(matchers)
if dstSvcAddress != nil {
denyConn.DestinationServiceAddress = dstSvcAddress
}
if dstSvcPort != 0 {
denyConn.DestinationServicePort = dstSvcPort
}

// No need to obtain connection info again if it already exists in denyConnectionStore.
if conn, exist := c.denyConnStore.GetConnByKey(flowexporter.NewConnectionKey(&denyConn)); exist {
c.denyConnStore.AddOrUpdateConn(conn, time.Now(), uint64(packet.IPLength))
return nil
}

matchers := pktIn.GetMatches()
var match *ofctrl.MatchField
// Get table ID
tableID := getPacketInTableID(pktIn)
Expand Down Expand Up @@ -224,3 +233,45 @@ func getPacketInTableID(pktIn *ofctrl.PacketIn) uint8 {
}
return tableID
}

func getCTMarkValue(matchers *ofctrl.Matchers) uint32 {
ctMark := matchers.GetMatchByName("NXM_NX_CT_MARK")
if ctMark == nil {
return 0
}
ctMarkValue, ok := ctMark.GetValue().(uint32)
if !ok {
return 0
}
return ctMarkValue
}

func getCTNwDstValue(matchers *ofctrl.Matchers) net.IP {
nwDst := matchers.GetMatchByName("NXM_NX_CT_NW_DST")
if nwDst != nil {
nwDstValue, ok := nwDst.GetValue().(net.IP)
if ok {
return nwDstValue
}
}
nwDst = matchers.GetMatchByName("NXM_NX_CT_IPV6_DST")
if nwDst != nil {
nwDstValue, ok := nwDst.GetValue().(net.IP)
if ok {
return nwDstValue
}
}
return nil
}

func getCTTpDstValue(matchers *ofctrl.Matchers) uint16 {
port := matchers.GetMatchByName("NXM_NX_CT_TP_DST")
if port == nil {
return 0
}
portValue, ok := port.GetValue().(uint16)
if !ok {
return 0
}
return portValue
}
9 changes: 3 additions & 6 deletions pkg/agent/flowexporter/connections/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,6 @@ func (cs *connectionStore) fillPodInfo(conn *flowexporter.Connection) {

srcPod, srcFound := cs.podStore.GetPodByIPAndTime(srcIP, conn.StartTime)
dstPod, dstFound := cs.podStore.GetPodByIPAndTime(dstIP, conn.StartTime)
if !srcFound && !dstFound {
klog.Warningf("Cannot map any of the IP %s or %s to a local Pod", srcIP, dstIP)
}
if srcFound {
conn.SourcePodName = srcPod.Name
conn.SourcePodNamespace = srcPod.Namespace
Expand All @@ -125,10 +122,10 @@ func (cs *connectionStore) fillServiceInfo(conn *flowexporter.Connection, servic
// resolve destination Service information
if cs.antreaProxier != nil {
servicePortName, exists := cs.antreaProxier.GetServiceByIP(serviceStr)
if !exists {
klog.Warningf("Could not retrieve the Service info from antrea-agent-proxier for the serviceStr: %s", serviceStr)
} else {
if exists {
conn.DestinationServicePortName = servicePortName.String()
} else {
klog.InfoS("Could not retrieve the Service info from antrea-agent-proxier", "serviceStr", serviceStr)
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/agent/flowexporter/connections/conntrack_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,12 @@ func (cs *ConntrackConnectionStore) AddOrUpdateConn(conn *flowexporter.Connectio
klog.V(4).InfoS("Antrea flow updated", "connection", existingConn)
} else {
cs.fillPodInfo(conn)
if conn.SourcePodName == "" && conn.DestinationPodName == "" {
// We don't add connections to connection map or expirePriorityQueue if we can't find the pod
// information for both srcPod and dstPod
klog.V(5).InfoS("Skip this connection as we cannot map any of the connection IPs to a local Pod", "srcIP", conn.FlowKey.SourceAddress.String(), "dstIP", conn.FlowKey.DestinationAddress.String())
return
}
if conn.Mark&openflow.ServiceCTMark.GetRange().ToNXRange().ToUint32Mask() == openflow.ServiceCTMark.GetValue() {
clusterIP := conn.DestinationServiceAddress.String()
svcPort := conn.DestinationServicePort
Expand Down
11 changes: 10 additions & 1 deletion pkg/agent/flowexporter/connections/deny_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"antrea.io/antrea/pkg/agent/flowexporter"
"antrea.io/antrea/pkg/agent/flowexporter/priorityqueue"
"antrea.io/antrea/pkg/agent/metrics"
"antrea.io/antrea/pkg/agent/openflow"
"antrea.io/antrea/pkg/agent/proxy"
"antrea.io/antrea/pkg/util/ip"
"antrea.io/antrea/pkg/util/podstore"
Expand Down Expand Up @@ -96,9 +97,17 @@ func (ds *DenyConnectionStore) AddOrUpdateConn(conn *flowexporter.Connection, ti
conn.OriginalBytes = bytes
conn.OriginalPackets = uint64(1)
ds.fillPodInfo(conn)
if conn.SourcePodName == "" && conn.DestinationPodName == "" {
// We don't add connections to connection map or expirePriorityQueue if we can't find the pod
// information for both srcPod and dstPod
klog.V(5).InfoS("Skip this connection as we cannot map any of the connection IPs to a local Pod", "srcIP", conn.FlowKey.SourceAddress.String(), "dstIP", conn.FlowKey.DestinationAddress.String())
return
}
protocolStr := ip.IPProtocolNumberToString(conn.FlowKey.Protocol, "UnknownProtocol")
serviceStr := fmt.Sprintf("%s:%d/%s", conn.DestinationServiceAddress, conn.DestinationServicePort, protocolStr)
ds.fillServiceInfo(conn, serviceStr)
if conn.Mark&openflow.ServiceCTMark.GetRange().ToNXRange().ToUint32Mask() == openflow.ServiceCTMark.GetValue() {
ds.fillServiceInfo(conn, serviceStr)
}
metrics.TotalDenyConnections.Inc()
conn.IsActive = true
ds.connections[connKey] = conn
Expand Down
112 changes: 74 additions & 38 deletions pkg/agent/flowexporter/connections/deny_connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"antrea.io/antrea/pkg/agent/flowexporter"
"antrea.io/antrea/pkg/agent/metrics"
"antrea.io/antrea/pkg/agent/openflow"
proxytest "antrea.io/antrea/pkg/agent/proxy/testing"
podstoretest "antrea.io/antrea/pkg/util/podstore/testing"
k8sproxy "antrea.io/antrea/third_party/proxy"
Expand All @@ -49,48 +50,83 @@ func TestDenyConnectionStore_AddOrUpdateConn(t *testing.T) {
Port: "255",
Protocol: v1.ProtocolTCP,
}
// flow for testing adding and updating
testFlow := flowexporter.Connection{
StopTime: refTime.Add(-(time.Second * 20)),
StartTime: refTime.Add(-(time.Second * 20)),
FlowKey: tuple,
DestinationServiceAddress: tuple.DestinationAddress,
DestinationServicePort: tuple.DestinationPort,
OriginalBytes: uint64(60),
OriginalPackets: uint64(1),
IsActive: true,
tc := []struct {
name string
// flow for testing adding and updating
testFlow flowexporter.Connection
isSvc bool
}{
{
name: "Flow not through service",
testFlow: flowexporter.Connection{
StopTime: refTime.Add(-(time.Second * 20)),
StartTime: refTime.Add(-(time.Second * 20)),
FlowKey: tuple,
DestinationServiceAddress: tuple.DestinationAddress,
DestinationServicePort: tuple.DestinationPort,
OriginalBytes: uint64(60),
OriginalPackets: uint64(1),
IsActive: true,
Mark: 0,
},
isSvc: false,
}, {
name: "Flow through service",
testFlow: flowexporter.Connection{
StopTime: refTime.Add(-(time.Second * 20)),
StartTime: refTime.Add(-(time.Second * 20)),
FlowKey: tuple,
DestinationServiceAddress: tuple.DestinationAddress,
DestinationServicePort: tuple.DestinationPort,
OriginalBytes: uint64(60),
OriginalPackets: uint64(1),
IsActive: true,
Mark: openflow.ServiceCTMark.GetValue(),
},
isSvc: true,
},
}
mockPodStore := podstoretest.NewMockInterface(ctrl)
mockProxier := proxytest.NewMockProxier(ctrl)
protocol, _ := lookupServiceProtocol(tuple.Protocol)
serviceStr := fmt.Sprintf("%s:%d/%s", tuple.DestinationAddress.String(), tuple.DestinationPort, protocol)
mockProxier.EXPECT().GetServiceByIP(serviceStr).Return(servicePortName, true)
mockPodStore.EXPECT().GetPodByIPAndTime(tuple.SourceAddress.String(), gomock.Any()).Return(nil, false)
mockPodStore.EXPECT().GetPodByIPAndTime(tuple.DestinationAddress.String(), gomock.Any()).Return(nil, false)
for _, c := range tc {
t.Run(c.name, func(t *testing.T) {
// Reset the metrics.
metrics.TotalDenyConnections.Set(0)
mockPodStore := podstoretest.NewMockInterface(ctrl)
mockProxier := proxytest.NewMockProxier(ctrl)
protocol, _ := lookupServiceProtocol(tuple.Protocol)
serviceStr := fmt.Sprintf("%s:%d/%s", tuple.DestinationAddress.String(), tuple.DestinationPort, protocol)
if c.isSvc {
mockProxier.EXPECT().GetServiceByIP(serviceStr).Return(servicePortName, true)
}
mockPodStore.EXPECT().GetPodByIPAndTime(tuple.SourceAddress.String(), gomock.Any()).Return(pod1, true)
mockPodStore.EXPECT().GetPodByIPAndTime(tuple.DestinationAddress.String(), gomock.Any()).Return(pod1, true)

denyConnStore := NewDenyConnectionStore(mockPodStore, mockProxier, testFlowExporterOptions)
denyConnStore := NewDenyConnectionStore(mockPodStore, mockProxier, testFlowExporterOptions)

denyConnStore.AddOrUpdateConn(&testFlow, refTime.Add(-(time.Second * 20)), uint64(60))
expConn := testFlow
expConn.DestinationServicePortName = servicePortName.String()
actualConn, ok := denyConnStore.GetConnByKey(flowexporter.NewConnectionKey(&testFlow))
assert.Equal(t, ok, true, "deny connection should be there in deny connection store")
assert.Equal(t, expConn, *actualConn, "deny connections should be equal")
assert.Equal(t, 1, denyConnStore.connectionStore.expirePriorityQueue.Len(), "Length of the expire priority queue should be 1")
assert.Equal(t, refTime.Add(-(time.Second * 20)), actualConn.LastExportTime, "LastExportTime should be set to StartTime during Add")
checkDenyConnectionMetrics(t, len(denyConnStore.connections))
denyConnStore.AddOrUpdateConn(&c.testFlow, refTime.Add(-(time.Second * 20)), uint64(60))
expConn := c.testFlow
if c.isSvc {
expConn.DestinationServicePortName = servicePortName.String()
}
actualConn, ok := denyConnStore.GetConnByKey(flowexporter.NewConnectionKey(&c.testFlow))
assert.Equal(t, ok, true, "deny connection should be there in deny connection store")
assert.Equal(t, expConn, *actualConn, "deny connections should be equal")
assert.Equal(t, 1, denyConnStore.connectionStore.expirePriorityQueue.Len(), "Length of the expire priority queue should be 1")
assert.Equal(t, refTime.Add(-(time.Second * 20)), actualConn.LastExportTime, "LastExportTime should be set to StartTime during Add")
checkDenyConnectionMetrics(t, len(denyConnStore.connections))

denyConnStore.AddOrUpdateConn(&testFlow, refTime.Add(-(time.Second * 10)), uint64(60))
expConn.OriginalBytes = uint64(120)
expConn.OriginalPackets = uint64(2)
expConn.StopTime = refTime.Add(-(time.Second * 10))
actualConn, ok = denyConnStore.GetConnByKey(flowexporter.NewConnectionKey(&testFlow))
assert.Equal(t, ok, true, "deny connection should be there in deny connection store")
assert.Equal(t, expConn, *actualConn, "deny connections should be equal")
assert.True(t, actualConn.IsActive)
assert.Equal(t, 1, denyConnStore.connectionStore.expirePriorityQueue.Len())
assert.Equal(t, refTime.Add(-(time.Second * 20)), actualConn.LastExportTime, "LastExportTime should not be changed during Update")
checkDenyConnectionMetrics(t, len(denyConnStore.connections))
denyConnStore.AddOrUpdateConn(&c.testFlow, refTime.Add(-(time.Second * 10)), uint64(60))
expConn.OriginalBytes = uint64(120)
expConn.OriginalPackets = uint64(2)
expConn.StopTime = refTime.Add(-(time.Second * 10))
actualConn, ok = denyConnStore.GetConnByKey(flowexporter.NewConnectionKey(&c.testFlow))
assert.Equal(t, ok, true, "deny connection should be there in deny connection store")
assert.Equal(t, expConn, *actualConn, "deny connections should be equal")
assert.True(t, actualConn.IsActive)
assert.Equal(t, 1, denyConnStore.connectionStore.expirePriorityQueue.Len())
assert.Equal(t, refTime.Add(-(time.Second * 20)), actualConn.LastExportTime, "LastExportTime should not be changed during Update")
checkDenyConnectionMetrics(t, len(denyConnStore.connections))
})
}
}

func checkDenyConnectionMetrics(t *testing.T, numConns int) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/agent/flowexporter/exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,8 @@ func getDenyConnection(isIPv6 bool, protoID uint8) *flowexporter.Connection {
tuple = flowexporter.Tuple{SourceAddress: srcIP, DestinationAddress: dstIP, Protocol: protoID, SourcePort: 65280, DestinationPort: 255}
}
conn := &flowexporter.Connection{
FlowKey: tuple,
FlowKey: tuple,
SourcePodName: "pod",
}
return conn
}
Expand Down
2 changes: 0 additions & 2 deletions test/e2e/bandwidth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
v1 "k8s.io/api/core/v1"
)

const iperfPort = 5201

// TestBandwidth is the top-level test which contains all subtests for
// Bandwidth related test cases so they can share setup, teardown.
func TestBandwidth(t *testing.T) {
Expand Down
Loading

0 comments on commit 7b3ef5c

Please sign in to comment.