Skip to content

Commit

Permalink
Address comment
Browse files Browse the repository at this point in the history
Signed-off-by: Yun-Tang Hsu <hsuy@vmware.com>
  • Loading branch information
yuntanghsu committed Oct 31, 2023
1 parent 31dbaad commit bbddf99
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 118 deletions.
36 changes: 29 additions & 7 deletions pkg/agent/controller/networkpolicy/packetin.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,14 @@ func (c *Controller) storeDenyConnection(pktIn *ofctrl.PacketIn) error {
denyConn.DestinationServiceAddress = tuple.DestinationAddress
denyConn.DestinationServicePort = tuple.DestinationPort
denyConn.Mark = getCTMarkValue(matchers)
dstSvcAddress := getSvcAddress(matchers)
dstSvcAddress := getCTNwDstValue(matchers)
dstSvcPort := getCTTpDstValue(matchers)
if dstSvcAddress != nil {
denyConn.DestinationServiceAddress = *dstSvcAddress
}
if dstSvcPort != 0 {
denyConn.DestinationServicePort = dstSvcPort
}

// No need to obtain connection info again if it already exists in denyConnectionStore.
if conn, exist := c.denyConnStore.GetConnByKey(flowexporter.NewConnectionKey(&denyConn)); exist {
Expand Down Expand Up @@ -242,14 +246,32 @@ func getCTMarkValue(matchers *ofctrl.Matchers) uint32 {
return ctMarkValue
}

func getSvcAddress(matchers *ofctrl.Matchers) *net.IP {
func getCTNwDstValue(matchers *ofctrl.Matchers) *net.IP {
svcIp := matchers.GetMatchByName("NXM_NX_CT_NW_DST")
if svcIp == nil {
return nil
if svcIp != nil {
svcIpValue, ok := svcIp.GetValue().(net.IP)
if ok {
return &svcIpValue
}
}
svcIp = matchers.GetMatchByName("NXM_NX_CT_IPV6_DST")
if svcIp != nil {
svcIpValue, ok := svcIp.GetValue().(net.IP)
if ok {
return &svcIpValue
}
}
return nil
}

func getCTTpDstValue(matchers *ofctrl.Matchers) uint16 {
port := matchers.GetMatchByName("NXM_NX_CT_TP_DST")
if port == nil {
return 0
}
svcIpValue, ok := svcIp.GetValue().(net.IP)
portValue, ok := port.GetValue().(uint16)
if !ok {
return nil
return 0
}
return &svcIpValue
return portValue
}
5 changes: 5 additions & 0 deletions pkg/agent/flowexporter/connections/conntrack_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,11 @@ func (cs *ConntrackConnectionStore) AddOrUpdateConn(conn *flowexporter.Connectio
klog.V(4).InfoS("Antrea flow updated", "connection", existingConn)
} else {
cs.fillPodInfo(conn)
if conn.SourcePodName == "" && conn.DestinationPodName == "" {
// We don't add connections to connection map or expirePriorityQueue if we can't find the pod
// information for both srcPod and dstPod
return
}
if conn.Mark&openflow.ServiceCTMark.GetRange().ToNXRange().ToUint32Mask() == openflow.ServiceCTMark.GetValue() {
clusterIP := conn.DestinationServiceAddress.String()
svcPort := conn.DestinationServicePort
Expand Down
5 changes: 5 additions & 0 deletions pkg/agent/flowexporter/connections/deny_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ func (ds *DenyConnectionStore) AddOrUpdateConn(conn *flowexporter.Connection, ti
conn.OriginalBytes = bytes
conn.OriginalPackets = uint64(1)
ds.fillPodInfo(conn)
if conn.SourcePodName == "" && conn.DestinationPodName == "" {
// We don't add connections to connection map or expirePriorityQueue if we can't find the pod
// information for both srcPod and dstPod
return
}
protocolStr := ip.IPProtocolNumberToString(conn.FlowKey.Protocol, "UnknownProtocol")
serviceStr := fmt.Sprintf("%s:%d/%s", conn.DestinationServiceAddress, conn.DestinationServicePort, protocolStr)
if conn.Mark&openflow.ServiceCTMark.GetRange().ToNXRange().ToUint32Mask() == openflow.ServiceCTMark.GetValue() {
Expand Down
7 changes: 4 additions & 3 deletions pkg/agent/flowexporter/connections/deny_connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"antrea.io/antrea/pkg/agent/flowexporter"
"antrea.io/antrea/pkg/agent/metrics"
"antrea.io/antrea/pkg/agent/openflow"
proxytest "antrea.io/antrea/pkg/agent/proxy/testing"
podstoretest "antrea.io/antrea/pkg/util/podstore/testing"
k8sproxy "antrea.io/antrea/third_party/proxy"
Expand Down Expand Up @@ -80,7 +81,7 @@ func TestDenyConnectionStore_AddOrUpdateConn(t *testing.T) {
OriginalBytes: uint64(60),
OriginalPackets: uint64(1),
IsActive: true,
Mark: 19,
Mark: openflow.ServiceCTMark.GetValue(),
},
isSvc: true,
},
Expand All @@ -96,8 +97,8 @@ func TestDenyConnectionStore_AddOrUpdateConn(t *testing.T) {
if c.isSvc {
mockProxier.EXPECT().GetServiceByIP(serviceStr).Return(servicePortName, true)
}
mockPodStore.EXPECT().GetPodByIPAndTime(tuple.SourceAddress.String(), gomock.Any()).Return(nil, false)
mockPodStore.EXPECT().GetPodByIPAndTime(tuple.DestinationAddress.String(), gomock.Any()).Return(nil, false)
mockPodStore.EXPECT().GetPodByIPAndTime(tuple.SourceAddress.String(), gomock.Any()).Return(pod1, true)
mockPodStore.EXPECT().GetPodByIPAndTime(tuple.DestinationAddress.String(), gomock.Any()).Return(pod1, true)

denyConnStore := NewDenyConnectionStore(mockPodStore, mockProxier, testFlowExporterOptions)

Expand Down
1 change: 1 addition & 0 deletions test/e2e/bandwidth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
)

const iperfPort = 5201
const iperfSvcPort = 9999

// TestBandwidth is the top-level test which contains all subtests for
// Bandwidth related test cases so they can share setup, teardown.
Expand Down
Loading

0 comments on commit bbddf99

Please sign in to comment.