Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #5592: Remove redundant log in fillPodInfo/fillServiceInfo #4252: e2e framework for Antrea's native secondary network #5731: Enhance Records Filtering: Utilizing Labels In The E2E Test #5770: Improve flow-visibility e2e test #5881

Closed
2 changes: 1 addition & 1 deletion ci/kind/test-e2e-kind.sh
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ COMMON_IMAGES_LIST=("registry.k8s.io/e2e-test-images/agnhost:2.29" \
"projects.registry.vmware.com/antrea/nginx:1.21.6-alpine" \
"projects.registry.vmware.com/antrea/toolbox:1.1-0")

FLOW_VISIBILITY_IMAGE_LIST=("projects.registry.vmware.com/antrea/ipfix-collector:v0.6.2" \
FLOW_VISIBILITY_IMAGE_LIST=("projects.registry.vmware.com/antrea/ipfix-collector:v0.8.2" \
"projects.registry.vmware.com/antrea/clickhouse-operator:0.21.0" \
"projects.registry.vmware.com/antrea/metrics-exporter:0.21.0" \
"projects.registry.vmware.com/antrea/clickhouse-server:23.4")
Expand Down
36 changes: 18 additions & 18 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,17 @@ require (
github.com/spf13/cobra v1.7.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.4
github.com/ti-mo/conntrack v0.4.0
github.com/vishvananda/netlink v1.1.1-0.20211101163509-b10eb8fe5cf6
github.com/vmware/go-ipfix v0.7.0
go.uber.org/mock v0.3.0
golang.org/x/crypto v0.14.0
golang.org/x/mod v0.13.0
golang.org/x/net v0.17.0
golang.org/x/sync v0.4.0
golang.org/x/sys v0.13.0
golang.org/x/time v0.3.0
golang.org/x/tools v0.14.0
github.com/ti-mo/conntrack v0.5.0
github.com/vishvananda/netlink v1.2.1-beta.2
github.com/vmware/go-ipfix v0.8.2
go.uber.org/mock v0.4.0
golang.org/x/crypto v0.17.0
golang.org/x/mod v0.14.0
golang.org/x/net v0.19.0
golang.org/x/sync v0.5.0
golang.org/x/sys v0.15.0
golang.org/x/time v0.5.0
golang.org/x/tools v0.16.1
golang.zx2c4.com/wireguard/wgctrl v0.0.0-20210506160403-92e472f520a5
google.golang.org/grpc v1.59.0
google.golang.org/protobuf v1.31.0
Expand Down Expand Up @@ -156,7 +156,7 @@ require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/josharian/native v1.0.0 // indirect
github.com/josharian/native v1.1.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.15.14 // indirect
github.com/kr/fs v0.1.0 // indirect
Expand All @@ -167,8 +167,8 @@ require (
github.com/mattn/go-runewidth v0.0.14 // indirect
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
github.com/mdlayher/genetlink v1.0.0 // indirect
github.com/mdlayher/netlink v1.4.0 // indirect
github.com/mdlayher/socket v0.2.1 // indirect
github.com/mdlayher/netlink v1.7.2 // indirect
github.com/mdlayher/socket v0.4.1 // indirect
github.com/mitchellh/go-wordwrap v1.0.0 // indirect
github.com/moby/spdystream v0.2.0 // indirect
github.com/moby/term v0.0.0-20220808134915-39b0c02b01ae // indirect
Expand All @@ -195,8 +195,8 @@ require (
github.com/segmentio/asm v1.2.0 // indirect
github.com/shopspring/decimal v1.3.1 // indirect
github.com/stoewer/go-strcase v1.2.0 // indirect
github.com/ti-mo/netfilter v0.3.1 // indirect
github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f // indirect
github.com/ti-mo/netfilter v0.5.0 // indirect
github.com/vishvananda/netns v0.0.4 // indirect
github.com/xlab/treeprint v1.1.0 // indirect
gitlab.com/golang-commonmark/puny v0.0.0-20191124015043-9f83538fa04f // indirect
go.etcd.io/etcd/api/v3 v3.5.5 // indirect
Expand All @@ -218,8 +218,8 @@ require (
go.uber.org/multierr v1.9.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/oauth2 v0.12.0 // indirect
golang.org/x/term v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/term v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.zx2c4.com/wireguard v0.0.0-20210427022245-097af6e1351b // indirect
gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
Expand Down
73 changes: 36 additions & 37 deletions go.sum

Large diffs are not rendered by default.

56 changes: 55 additions & 1 deletion pkg/agent/controller/networkpolicy/packetin.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"errors"
"fmt"
"net"
"net/netip"
"time"

"antrea.io/libOpenflow/openflow15"
Expand Down Expand Up @@ -107,6 +108,7 @@
if err != nil {
return fmt.Errorf("error in parsing packetIn: %v", err)
}
matchers := pktIn.GetMatches()

// Get 5-tuple information
tuple := flowexporter.Tuple{
Expand All @@ -125,14 +127,22 @@
denyConn.FlowKey = tuple
denyConn.DestinationServiceAddress = tuple.DestinationAddress
denyConn.DestinationServicePort = tuple.DestinationPort
denyConn.Mark = getCTMarkValue(matchers)
dstSvcAddress := getCTNwDstValue(matchers)
dstSvcPort := getCTTpDstValue(matchers)
if dstSvcAddress.IsValid() {
denyConn.DestinationServiceAddress = dstSvcAddress

Check failure on line 134 in pkg/agent/controller/networkpolicy/packetin.go

View workflow job for this annotation

GitHub Actions / Build Antrea Windows binaries

cannot use dstSvcAddress (variable of type netip.Addr) as "net".IP value in assignment

Check failure on line 134 in pkg/agent/controller/networkpolicy/packetin.go

View workflow job for this annotation

GitHub Actions / Golangci-lint (ubuntu-latest)

cannot use dstSvcAddress (variable of type netip.Addr) as "net".IP value in assignment (typecheck)

Check failure on line 134 in pkg/agent/controller/networkpolicy/packetin.go

View workflow job for this annotation

GitHub Actions / Golangci-lint (macos-latest)

cannot use dstSvcAddress (variable of type netip.Addr) as "net".IP value in assignment (typecheck)

Check failure on line 134 in pkg/agent/controller/networkpolicy/packetin.go

View workflow job for this annotation

GitHub Actions / Unit test (windows-2022)

cannot use dstSvcAddress (variable of type netip.Addr) as "net".IP value in assignment
}
if dstSvcPort != 0 {
denyConn.DestinationServicePort = dstSvcPort
}

// No need to obtain connection info again if it already exists in denyConnectionStore.
if conn, exist := c.denyConnStore.GetConnByKey(flowexporter.NewConnectionKey(&denyConn)); exist {
c.denyConnStore.AddOrUpdateConn(conn, time.Now(), uint64(packet.IPLength))
return nil
}

matchers := pktIn.GetMatches()
var match *ofctrl.MatchField
// Get table ID
tableID := getPacketInTableID(pktIn)
Expand Down Expand Up @@ -224,3 +234,47 @@
}
return tableID
}

func getCTMarkValue(matchers *ofctrl.Matchers) uint32 {
ctMark := matchers.GetMatchByName("NXM_NX_CT_MARK")
if ctMark == nil {
return 0
}
ctMarkValue, ok := ctMark.GetValue().(uint32)
if !ok {
return 0
}
return ctMarkValue
}

func getCTNwDstValue(matchers *ofctrl.Matchers) netip.Addr {
nwDst := matchers.GetMatchByName("NXM_NX_CT_NW_DST")
if nwDst != nil {
if nwDstValue, ok := nwDst.GetValue().(net.IP); ok {
if ip, ok := netip.AddrFromSlice(nwDstValue.To4()); ok {
return ip
}
}
}
nwDst = matchers.GetMatchByName("NXM_NX_CT_IPV6_DST")
if nwDst != nil {
if nwDstValue, ok := nwDst.GetValue().(net.IP); ok {
if ip, ok := netip.AddrFromSlice(nwDstValue.To16()); ok {
return ip
}
}
}
return netip.Addr{}
}

func getCTTpDstValue(matchers *ofctrl.Matchers) uint16 {
port := matchers.GetMatchByName("NXM_NX_CT_TP_DST")
if port == nil {
return 0
}
portValue, ok := port.GetValue().(uint16)
if !ok {
return 0
}
return portValue
}
9 changes: 3 additions & 6 deletions pkg/agent/flowexporter/connections/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,6 @@ func (cs *connectionStore) fillPodInfo(conn *flowexporter.Connection) {

srcPod, srcFound := cs.podStore.GetPodByIPAndTime(srcIP, conn.StartTime)
dstPod, dstFound := cs.podStore.GetPodByIPAndTime(dstIP, conn.StartTime)
if !srcFound && !dstFound {
klog.Warningf("Cannot map any of the IP %s or %s to a local Pod", srcIP, dstIP)
}
if srcFound {
conn.SourcePodName = srcPod.Name
conn.SourcePodNamespace = srcPod.Namespace
Expand All @@ -125,10 +122,10 @@ func (cs *connectionStore) fillServiceInfo(conn *flowexporter.Connection, servic
// resolve destination Service information
if cs.antreaProxier != nil {
servicePortName, exists := cs.antreaProxier.GetServiceByIP(serviceStr)
if !exists {
klog.Warningf("Could not retrieve the Service info from antrea-agent-proxier for the serviceStr: %s", serviceStr)
} else {
if exists {
conn.DestinationServicePortName = servicePortName.String()
} else {
klog.InfoS("Could not retrieve the Service info from antrea-agent-proxier", "serviceStr", serviceStr)
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/agent/flowexporter/connections/conntrack_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,12 @@ func (cs *ConntrackConnectionStore) AddOrUpdateConn(conn *flowexporter.Connectio
klog.V(4).InfoS("Antrea flow updated", "connection", existingConn)
} else {
cs.fillPodInfo(conn)
if conn.SourcePodName == "" && conn.DestinationPodName == "" {
// We don't add connections to connection map or expirePriorityQueue if we can't find the pod
// information for both srcPod and dstPod
klog.V(5).InfoS("Skip this connection as we cannot map any of the connection IPs to a local Pod", "srcIP", conn.FlowKey.SourceAddress.String(), "dstIP", conn.FlowKey.DestinationAddress.String())
return
}
if conn.Mark&openflow.ServiceCTMark.GetRange().ToNXRange().ToUint32Mask() == openflow.ServiceCTMark.GetValue() {
clusterIP := conn.DestinationServiceAddress.String()
svcPort := conn.DestinationServicePort
Expand Down
11 changes: 10 additions & 1 deletion pkg/agent/flowexporter/connections/deny_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"antrea.io/antrea/pkg/agent/flowexporter"
"antrea.io/antrea/pkg/agent/flowexporter/priorityqueue"
"antrea.io/antrea/pkg/agent/metrics"
"antrea.io/antrea/pkg/agent/openflow"
"antrea.io/antrea/pkg/agent/proxy"
"antrea.io/antrea/pkg/util/ip"
"antrea.io/antrea/pkg/util/podstore"
Expand Down Expand Up @@ -96,9 +97,17 @@ func (ds *DenyConnectionStore) AddOrUpdateConn(conn *flowexporter.Connection, ti
conn.OriginalBytes = bytes
conn.OriginalPackets = uint64(1)
ds.fillPodInfo(conn)
if conn.SourcePodName == "" && conn.DestinationPodName == "" {
// We don't add connections to connection map or expirePriorityQueue if we can't find the pod
// information for both srcPod and dstPod
klog.V(5).InfoS("Skip this connection as we cannot map any of the connection IPs to a local Pod", "srcIP", conn.FlowKey.SourceAddress.String(), "dstIP", conn.FlowKey.DestinationAddress.String())
return
}
protocolStr := ip.IPProtocolNumberToString(conn.FlowKey.Protocol, "UnknownProtocol")
serviceStr := fmt.Sprintf("%s:%d/%s", conn.DestinationServiceAddress, conn.DestinationServicePort, protocolStr)
ds.fillServiceInfo(conn, serviceStr)
if conn.Mark&openflow.ServiceCTMark.GetRange().ToNXRange().ToUint32Mask() == openflow.ServiceCTMark.GetValue() {
ds.fillServiceInfo(conn, serviceStr)
}
metrics.TotalDenyConnections.Inc()
conn.IsActive = true
ds.connections[connKey] = conn
Expand Down
112 changes: 74 additions & 38 deletions pkg/agent/flowexporter/connections/deny_connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"antrea.io/antrea/pkg/agent/flowexporter"
"antrea.io/antrea/pkg/agent/metrics"
"antrea.io/antrea/pkg/agent/openflow"
proxytest "antrea.io/antrea/pkg/agent/proxy/testing"
podstoretest "antrea.io/antrea/pkg/util/podstore/testing"
k8sproxy "antrea.io/antrea/third_party/proxy"
Expand All @@ -49,48 +50,83 @@ func TestDenyConnectionStore_AddOrUpdateConn(t *testing.T) {
Port: "255",
Protocol: v1.ProtocolTCP,
}
// flow for testing adding and updating
testFlow := flowexporter.Connection{
StopTime: refTime.Add(-(time.Second * 20)),
StartTime: refTime.Add(-(time.Second * 20)),
FlowKey: tuple,
DestinationServiceAddress: tuple.DestinationAddress,
DestinationServicePort: tuple.DestinationPort,
OriginalBytes: uint64(60),
OriginalPackets: uint64(1),
IsActive: true,
tc := []struct {
name string
// flow for testing adding and updating
testFlow flowexporter.Connection
isSvc bool
}{
{
name: "Flow not through service",
testFlow: flowexporter.Connection{
StopTime: refTime.Add(-(time.Second * 20)),
StartTime: refTime.Add(-(time.Second * 20)),
FlowKey: tuple,
DestinationServiceAddress: tuple.DestinationAddress,
DestinationServicePort: tuple.DestinationPort,
OriginalBytes: uint64(60),
OriginalPackets: uint64(1),
IsActive: true,
Mark: 0,
},
isSvc: false,
}, {
name: "Flow through service",
testFlow: flowexporter.Connection{
StopTime: refTime.Add(-(time.Second * 20)),
StartTime: refTime.Add(-(time.Second * 20)),
FlowKey: tuple,
DestinationServiceAddress: tuple.DestinationAddress,
DestinationServicePort: tuple.DestinationPort,
OriginalBytes: uint64(60),
OriginalPackets: uint64(1),
IsActive: true,
Mark: openflow.ServiceCTMark.GetValue(),
},
isSvc: true,
},
}
mockPodStore := podstoretest.NewMockInterface(ctrl)
mockProxier := proxytest.NewMockProxier(ctrl)
protocol, _ := lookupServiceProtocol(tuple.Protocol)
serviceStr := fmt.Sprintf("%s:%d/%s", tuple.DestinationAddress.String(), tuple.DestinationPort, protocol)
mockProxier.EXPECT().GetServiceByIP(serviceStr).Return(servicePortName, true)
mockPodStore.EXPECT().GetPodByIPAndTime(tuple.SourceAddress.String(), gomock.Any()).Return(nil, false)
mockPodStore.EXPECT().GetPodByIPAndTime(tuple.DestinationAddress.String(), gomock.Any()).Return(nil, false)
for _, c := range tc {
t.Run(c.name, func(t *testing.T) {
// Reset the metrics.
metrics.TotalDenyConnections.Set(0)
mockPodStore := podstoretest.NewMockInterface(ctrl)
mockProxier := proxytest.NewMockProxier(ctrl)
protocol, _ := lookupServiceProtocol(tuple.Protocol)
serviceStr := fmt.Sprintf("%s:%d/%s", tuple.DestinationAddress.String(), tuple.DestinationPort, protocol)
if c.isSvc {
mockProxier.EXPECT().GetServiceByIP(serviceStr).Return(servicePortName, true)
}
mockPodStore.EXPECT().GetPodByIPAndTime(tuple.SourceAddress.String(), gomock.Any()).Return(pod1, true)
mockPodStore.EXPECT().GetPodByIPAndTime(tuple.DestinationAddress.String(), gomock.Any()).Return(pod1, true)

denyConnStore := NewDenyConnectionStore(mockPodStore, mockProxier, testFlowExporterOptions)
denyConnStore := NewDenyConnectionStore(mockPodStore, mockProxier, testFlowExporterOptions)

denyConnStore.AddOrUpdateConn(&testFlow, refTime.Add(-(time.Second * 20)), uint64(60))
expConn := testFlow
expConn.DestinationServicePortName = servicePortName.String()
actualConn, ok := denyConnStore.GetConnByKey(flowexporter.NewConnectionKey(&testFlow))
assert.Equal(t, ok, true, "deny connection should be there in deny connection store")
assert.Equal(t, expConn, *actualConn, "deny connections should be equal")
assert.Equal(t, 1, denyConnStore.connectionStore.expirePriorityQueue.Len(), "Length of the expire priority queue should be 1")
assert.Equal(t, refTime.Add(-(time.Second * 20)), actualConn.LastExportTime, "LastExportTime should be set to StartTime during Add")
checkDenyConnectionMetrics(t, len(denyConnStore.connections))
denyConnStore.AddOrUpdateConn(&c.testFlow, refTime.Add(-(time.Second * 20)), uint64(60))
expConn := c.testFlow
if c.isSvc {
expConn.DestinationServicePortName = servicePortName.String()
}
actualConn, ok := denyConnStore.GetConnByKey(flowexporter.NewConnectionKey(&c.testFlow))
assert.Equal(t, ok, true, "deny connection should be there in deny connection store")
assert.Equal(t, expConn, *actualConn, "deny connections should be equal")
assert.Equal(t, 1, denyConnStore.connectionStore.expirePriorityQueue.Len(), "Length of the expire priority queue should be 1")
assert.Equal(t, refTime.Add(-(time.Second * 20)), actualConn.LastExportTime, "LastExportTime should be set to StartTime during Add")
checkDenyConnectionMetrics(t, len(denyConnStore.connections))

denyConnStore.AddOrUpdateConn(&testFlow, refTime.Add(-(time.Second * 10)), uint64(60))
expConn.OriginalBytes = uint64(120)
expConn.OriginalPackets = uint64(2)
expConn.StopTime = refTime.Add(-(time.Second * 10))
actualConn, ok = denyConnStore.GetConnByKey(flowexporter.NewConnectionKey(&testFlow))
assert.Equal(t, ok, true, "deny connection should be there in deny connection store")
assert.Equal(t, expConn, *actualConn, "deny connections should be equal")
assert.True(t, actualConn.IsActive)
assert.Equal(t, 1, denyConnStore.connectionStore.expirePriorityQueue.Len())
assert.Equal(t, refTime.Add(-(time.Second * 20)), actualConn.LastExportTime, "LastExportTime should not be changed during Update")
checkDenyConnectionMetrics(t, len(denyConnStore.connections))
denyConnStore.AddOrUpdateConn(&c.testFlow, refTime.Add(-(time.Second * 10)), uint64(60))
expConn.OriginalBytes = uint64(120)
expConn.OriginalPackets = uint64(2)
expConn.StopTime = refTime.Add(-(time.Second * 10))
actualConn, ok = denyConnStore.GetConnByKey(flowexporter.NewConnectionKey(&c.testFlow))
assert.Equal(t, ok, true, "deny connection should be there in deny connection store")
assert.Equal(t, expConn, *actualConn, "deny connections should be equal")
assert.True(t, actualConn.IsActive)
assert.Equal(t, 1, denyConnStore.connectionStore.expirePriorityQueue.Len())
assert.Equal(t, refTime.Add(-(time.Second * 20)), actualConn.LastExportTime, "LastExportTime should not be changed during Update")
checkDenyConnectionMetrics(t, len(denyConnStore.connections))
})
}
}

func checkDenyConnectionMetrics(t *testing.T, numConns int) {
Expand Down
8 changes: 7 additions & 1 deletion pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,14 @@ func (exp *FlowExporter) Run(stopCh <-chan struct{}) {
func (exp *FlowExporter) sendFlowRecords() (time.Duration, error) {
currTime := time.Now()
var expireTime1, expireTime2 time.Duration
exp.expiredConns, expireTime1 = exp.conntrackConnStore.GetExpiredConns(exp.expiredConns, currTime, maxConnsToExport)
// We export records from denyConnStore first, then conntrackConnStore. We enforce the ordering to handle a
// special case: for an inter-node connection with egress drop network policy, both conntrackConnStore and
// denyConnStore from the same Node will send out records to Flow Aggregator. If the record from conntrackConnStore
// arrives FA first, FA will not be able to capture the deny network policy metadata, and it will keep waiting
// for a record from destination Node to finish flow correlation until timeout. Later on we probably should
// consider doing a record deduplication between conntrackConnStore and denyConnStore before exporting records.
exp.expiredConns, expireTime2 = exp.denyConnStore.GetExpiredConns(exp.expiredConns, currTime, maxConnsToExport)
exp.expiredConns, expireTime1 = exp.conntrackConnStore.GetExpiredConns(exp.expiredConns, currTime, maxConnsToExport)
// Select the shorter time out among two connection stores to do the next round of export.
nextExpireTime := getMinTime(expireTime1, expireTime2)
for i := range exp.expiredConns {
Expand Down
3 changes: 2 additions & 1 deletion pkg/agent/flowexporter/exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,8 @@ func getDenyConnection(isIPv6 bool, protoID uint8) *flowexporter.Connection {
tuple = flowexporter.Tuple{SourceAddress: srcIP, DestinationAddress: dstIP, Protocol: protoID, SourcePort: 65280, DestinationPort: 255}
}
conn := &flowexporter.Connection{
FlowKey: tuple,
FlowKey: tuple,
SourcePodName: "pod",
}
return conn
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/antctl/transform/common/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ func Int64ToString(val int64) string {
return strconv.Itoa(int(val))
}

func BoolToString(val bool) string {
return strconv.FormatBool(val)
}

func GenerateTableElementWithSummary(list []string, maxColumnLength int) string {
element := ""
sort.Strings(list)
Expand Down
Loading
Loading