Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add "egressNodeName" in flow records #6012

Merged
merged 1 commit into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci/kind/test-e2e-kind.sh
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ COMMON_IMAGES_LIST=("registry.k8s.io/e2e-test-images/agnhost:2.29" \
"projects.registry.vmware.com/antrea/nginx:1.21.6-alpine" \
"projects.registry.vmware.com/antrea/toolbox:1.3-0")

FLOW_VISIBILITY_IMAGE_LIST=("projects.registry.vmware.com/antrea/ipfix-collector:v0.8.2" \
FLOW_VISIBILITY_IMAGE_LIST=("projects.registry.vmware.com/antrea/ipfix-collector:v0.9.0" \
"projects.registry.vmware.com/antrea/clickhouse-operator:0.21.0" \
"projects.registry.vmware.com/antrea/metrics-exporter:0.21.0" \
"projects.registry.vmware.com/antrea/clickhouse-server:23.4")
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ require (
github.com/stretchr/testify v1.8.4
github.com/ti-mo/conntrack v0.5.0
github.com/vishvananda/netlink v1.2.1-beta.2
github.com/vmware/go-ipfix v0.8.2
github.com/vmware/go-ipfix v0.9.0
go.uber.org/mock v0.4.0
golang.org/x/crypto v0.19.0
golang.org/x/mod v0.15.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -759,8 +759,8 @@ github.com/vishvananda/netlink v1.2.1-beta.2/go.mod h1:twkDnbuQxJYemMlGd4JFIcuhg
github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8=
github.com/vishvananda/netns v0.0.4/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM=
github.com/vmware/go-ipfix v0.8.2 h1:7pnmXZpI0995psJgno4Bur5fr9PCxGQuKjCI/RYurzA=
github.com/vmware/go-ipfix v0.8.2/go.mod h1:NvEehcpptPOTBaLSkMA+88l2Oe8YNelVBdvj8PA/1d0=
github.com/vmware/go-ipfix v0.9.0 h1:4/N5eFliqULEaCUQV0lafOpN/1bItPE9OTAPGhrIXus=
github.com/vmware/go-ipfix v0.9.0/go.mod h1:MYEdL6Uel2ufOZyVCKvIAaw9hwnewK8aPr7rnwRbxMY=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xlab/treeprint v1.1.0 h1:G/1DjNkPpfZCFt9CSh6b5/nY4VimlbHF3Rh4obvtzDk=
Expand Down
8 changes: 6 additions & 2 deletions pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ var (
"egressIP",
"appProtocolName",
"httpVals",
"egressNodeName",
}
AntreaInfoElementsIPv4 = append(antreaInfoElementsCommon, []string{"destinationClusterIPv4"}...)
AntreaInfoElementsIPv6 = append(antreaInfoElementsCommon, []string{"destinationClusterIPv6"}...)
Expand Down Expand Up @@ -595,6 +596,8 @@ func (exp *FlowExporter) addConnToSet(conn *flowexporter.Connection) error {
ie.SetStringValue(conn.AppProtocolName)
case "httpVals":
ie.SetStringValue(conn.HttpVals)
case "egressNodeName":
ie.SetStringValue(conn.EgressNodeName)
}
}
err := exp.ipfixSet.AddRecord(eL, templateID)
Expand Down Expand Up @@ -641,14 +644,15 @@ func (exp *FlowExporter) findFlowType(conn flowexporter.Connection) uint8 {
}

func (exp *FlowExporter) fillEgressInfo(conn *flowexporter.Connection) {
egressName, egressIP, _, err := exp.egressQuerier.GetEgress(conn.SourcePodNamespace, conn.SourcePodName)
egressName, egressIP, egressNodeName, err := exp.egressQuerier.GetEgress(conn.SourcePodNamespace, conn.SourcePodName)
if err != nil {
// Egress is not enabled or no Egress is applied to this Pod
return
}
conn.EgressName = egressName
conn.EgressIP = egressIP
klog.V(4).InfoS("Filling Egress Info for flow", "Egress", conn.EgressName, "EgressIP", conn.EgressIP, "SourcePodNamespace", conn.SourcePodNamespace, "SourcePodName", conn.SourcePodName)
conn.EgressNodeName = egressNodeName
klog.V(4).InfoS("Filling Egress Info for flow", "Egress", conn.EgressName, "EgressIP", conn.EgressIP, "EgressNode", conn.EgressNodeName, "SourcePod", klog.KRef(conn.SourcePodNamespace, conn.SourcePodName))
}

func (exp *FlowExporter) exportConn(conn *flowexporter.Connection) error {
Expand Down
38 changes: 21 additions & 17 deletions pkg/agent/flowexporter/exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -806,26 +806,29 @@ func TestFlowExporter_findFlowType(t *testing.T) {
func TestFlowExporter_fillEgressInfo(t *testing.T) {
ctrl := gomock.NewController(t)
testCases := []struct {
name string
sourcePodNamespace string
sourcePodName string
expectedEgressName string
expectedEgressIP string
expectedErr string
name string
sourcePodNamespace string
sourcePodName string
expectedEgressName string
expectedEgressIP string
expectedEgressNodeName string
expectedErr string
}{
{
name: "Both EgressName and EgressIP filled",
sourcePodNamespace: "namespaceA",
sourcePodName: "podA",
expectedEgressName: "test-egress",
expectedEgressIP: "172.18.0.1",
name: "EgressName, EgressIP and EgressNodeName filled",
sourcePodNamespace: "namespaceA",
sourcePodName: "podA",
expectedEgressName: "test-egress",
expectedEgressIP: "172.18.0.1",
expectedEgressNodeName: "test-egress-node",
},
{
name: "No Egress Information filled",
sourcePodNamespace: "namespaceA",
sourcePodName: "podC",
expectedEgressName: "",
expectedEgressIP: "",
name: "No Egress Information filled",
sourcePodNamespace: "namespaceA",
sourcePodName: "podC",
expectedEgressName: "",
expectedEgressIP: "",
expectedEgressNodeName: "",
},
}

Expand All @@ -841,13 +844,14 @@ func TestFlowExporter_fillEgressInfo(t *testing.T) {
SourcePodName: tc.sourcePodName,
}
if tc.expectedEgressName != "" {
egressQuerier.EXPECT().GetEgress(conn.SourcePodNamespace, conn.SourcePodName).Return(tc.expectedEgressName, tc.expectedEgressIP, "", nil)
egressQuerier.EXPECT().GetEgress(conn.SourcePodNamespace, conn.SourcePodName).Return(tc.expectedEgressName, tc.expectedEgressIP, tc.expectedEgressNodeName, nil)
} else {
egressQuerier.EXPECT().GetEgress(conn.SourcePodNamespace, conn.SourcePodName).Return("", "", "", fmt.Errorf("no Egress applied to Pod %s", conn.SourcePodName))
}
exp.fillEgressInfo(&conn)
assert.Equal(t, tc.expectedEgressName, conn.EgressName)
assert.Equal(t, tc.expectedEgressIP, conn.EgressIP)
assert.Equal(t, tc.expectedEgressNodeName, conn.EgressNodeName)
})
}
}
1 change: 1 addition & 0 deletions pkg/agent/flowexporter/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type Connection struct {
EgressIP string
AppProtocolName string
HttpVals string
EgressNodeName string
}

type ItemToExpire struct {
Expand Down
6 changes: 4 additions & 2 deletions pkg/flowaggregator/clickhouseclient/clickhouseclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,11 @@ const (
egressName,
egressIP,
appProtocolName,
httpVals)
httpVals,
egressNodeName)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?)`
?, ?, ?, ?, ?)`
)

// PrepareClickHouseConnection is used for unit testing
Expand Down Expand Up @@ -334,6 +335,7 @@ func (ch *ClickHouseExportProcess) batchCommitAll(ctx context.Context) (int, err
record.EgressIP,
record.AppProtocolName,
record.HttpVals,
record.EgressNodeName,
)

if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ func TestBatchCommitAll(t *testing.T) {
"test-egress",
"172.18.0.1",
"http",
"mockHttpString").
"mockHttpString",
"test-egress-node").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectCommit()

Expand Down
1 change: 1 addition & 0 deletions pkg/flowaggregator/flowlogger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func (fl *FlowLogger) WriteRecord(r *flowrecord.FlowRecord, prettyPrint bool) er
r.EgressIP,
r.AppProtocolName,
r.HttpVals,
r.EgressNodeName,
}

str := strings.Join(fields, ",")
Expand Down
4 changes: 2 additions & 2 deletions pkg/flowaggregator/flowlogger/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ func TestWriteRecord(t *testing.T) {
}{
{
prettyPrint: true,
expected: "1637706961,1637706973,10.10.0.79,10.10.0.80,44752,5201,TCP,perftest-a,antrea-test,k8s-node-control-plane,perftest-b,antrea-test-b,k8s-node-control-plane-b,10.10.1.10,5202,perftest,test-flow-aggregator-networkpolicy-ingress-allow,antrea-test-ns,test-flow-aggregator-networkpolicy-rule,Drop,K8sNetworkPolicy,test-flow-aggregator-networkpolicy-egress-allow,antrea-test-ns-e,test-flow-aggregator-networkpolicy-rule-e,Invalid,Invalid,test-egress,172.18.0.1,http,mockHttpString",
expected: "1637706961,1637706973,10.10.0.79,10.10.0.80,44752,5201,TCP,perftest-a,antrea-test,k8s-node-control-plane,perftest-b,antrea-test-b,k8s-node-control-plane-b,10.10.1.10,5202,perftest,test-flow-aggregator-networkpolicy-ingress-allow,antrea-test-ns,test-flow-aggregator-networkpolicy-rule,Drop,K8sNetworkPolicy,test-flow-aggregator-networkpolicy-egress-allow,antrea-test-ns-e,test-flow-aggregator-networkpolicy-rule-e,Invalid,Invalid,test-egress,172.18.0.1,http,mockHttpString,test-egress-node",
},
{
prettyPrint: false,
expected: "1637706961,1637706973,10.10.0.79,10.10.0.80,44752,5201,6,perftest-a,antrea-test,k8s-node-control-plane,perftest-b,antrea-test-b,k8s-node-control-plane-b,10.10.1.10,5202,perftest,test-flow-aggregator-networkpolicy-ingress-allow,antrea-test-ns,test-flow-aggregator-networkpolicy-rule,2,1,test-flow-aggregator-networkpolicy-egress-allow,antrea-test-ns-e,test-flow-aggregator-networkpolicy-rule-e,5,4,test-egress,172.18.0.1,http,mockHttpString",
expected: "1637706961,1637706973,10.10.0.79,10.10.0.80,44752,5201,6,perftest-a,antrea-test,k8s-node-control-plane,perftest-b,antrea-test-b,k8s-node-control-plane-b,10.10.1.10,5202,perftest,test-flow-aggregator-networkpolicy-ingress-allow,antrea-test-ns,test-flow-aggregator-networkpolicy-rule,2,1,test-flow-aggregator-networkpolicy-egress-allow,antrea-test-ns-e,test-flow-aggregator-networkpolicy-rule-e,5,4,test-egress,172.18.0.1,http,mockHttpString,test-egress-node",
},
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/flowaggregator/flowrecord/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type FlowRecord struct {
EgressIP string
AppProtocolName string
HttpVals string
EgressNodeName string
}

// GetFlowRecord converts ipfixentities.Record to FlowRecord
Expand Down Expand Up @@ -236,6 +237,9 @@ func GetFlowRecord(record ipfixentities.Record) *FlowRecord {
if httpVals, _, ok := record.GetInfoElementWithValue("httpVals"); ok {
r.HttpVals = httpVals.GetStringValue()
}
if egressNodeName, _, ok := record.GetInfoElementWithValue("egressNodeName"); ok {
r.EgressNodeName = egressNodeName.GetStringValue()
}
return r
}

Expand Down
1 change: 1 addition & 0 deletions pkg/flowaggregator/flowrecord/testing/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,6 @@ func PrepareTestFlowRecord() *flowrecord.FlowRecord {
EgressIP: "172.18.0.1",
AppProtocolName: "http",
HttpVals: "mockHttpString",
EgressNodeName: "test-egress-node",
}
}
1 change: 1 addition & 0 deletions pkg/flowaggregator/infoelements/elements.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ var (
"egressIP",
"appProtocolName",
"httpVals",
"egressNodeName",
}
AntreaInfoElementsIPv4 = append(AntreaInfoElementsCommon, []string{"destinationClusterIPv4"}...)
AntreaInfoElementsIPv6 = append(AntreaInfoElementsCommon, []string{"destinationClusterIPv6"}...)
Expand Down
2 changes: 2 additions & 0 deletions pkg/flowaggregator/s3uploader/s3uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,4 +488,6 @@ func writeRecord(w io.Writer, r *flowrecord.FlowRecord, clusterUUID string) {
io.WriteString(w, r.AppProtocolName)
io.WriteString(w, ",")
io.WriteString(w, r.HttpVals)
io.WriteString(w, ",")
io.WriteString(w, r.EgressNodeName)
}
4 changes: 2 additions & 2 deletions pkg/flowaggregator/s3uploader/s3uploader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ import (

var (
fakeClusterUUID = uuid.New().String()
recordStrIPv4 = "1637706961,1637706973,1637706974,1637706975,3,10.10.0.79,10.10.0.80,44752,5201,6,823188,30472817041,241333,8982624938,471111,24500996,136211,7083284,perftest-a,antrea-test,k8s-node-control-plane,perftest-b,antrea-test-b,k8s-node-control-plane-b,10.10.1.10,5202,perftest,test-flow-aggregator-networkpolicy-ingress-allow,antrea-test-ns,test-flow-aggregator-networkpolicy-rule,2,1,test-flow-aggregator-networkpolicy-egress-allow,antrea-test-ns-e,test-flow-aggregator-networkpolicy-rule-e,5,4,TIME_WAIT,11,'{\"antrea-e2e\":\"perftest-a\",\"app\":\"iperf\"}','{\"antrea-e2e\":\"perftest-b\",\"app\":\"iperf\"}',15902813472,12381344,15902813473,15902813474,12381345,12381346," + fakeClusterUUID + "," + fmt.Sprintf("%d", time.Now().Unix()) + ",test-egress,172.18.0.1,http,mockHttpString"
recordStrIPv6 = "1637706961,1637706973,1637706974,1637706975,3,2001:0:3238:dfe1:63::fefb,2001:0:3238:dfe1:63::fefc,44752,5201,6,823188,30472817041,241333,8982624938,471111,24500996,136211,7083284,perftest-a,antrea-test,k8s-node-control-plane,perftest-b,antrea-test-b,k8s-node-control-plane-b,2001:0:3238:dfe1:64::a,5202,perftest,test-flow-aggregator-networkpolicy-ingress-allow,antrea-test-ns,test-flow-aggregator-networkpolicy-rule,2,1,test-flow-aggregator-networkpolicy-egress-allow,antrea-test-ns-e,test-flow-aggregator-networkpolicy-rule-e,5,4,TIME_WAIT,11,'{\"antrea-e2e\":\"perftest-a\",\"app\":\"iperf\"}','{\"antrea-e2e\":\"perftest-b\",\"app\":\"iperf\"}',15902813472,12381344,15902813473,15902813474,12381345,12381346," + fakeClusterUUID + "," + fmt.Sprintf("%d", time.Now().Unix()) + ",test-egress,172.18.0.1,http,mockHttpString"
recordStrIPv4 = "1637706961,1637706973,1637706974,1637706975,3,10.10.0.79,10.10.0.80,44752,5201,6,823188,30472817041,241333,8982624938,471111,24500996,136211,7083284,perftest-a,antrea-test,k8s-node-control-plane,perftest-b,antrea-test-b,k8s-node-control-plane-b,10.10.1.10,5202,perftest,test-flow-aggregator-networkpolicy-ingress-allow,antrea-test-ns,test-flow-aggregator-networkpolicy-rule,2,1,test-flow-aggregator-networkpolicy-egress-allow,antrea-test-ns-e,test-flow-aggregator-networkpolicy-rule-e,5,4,TIME_WAIT,11,'{\"antrea-e2e\":\"perftest-a\",\"app\":\"iperf\"}','{\"antrea-e2e\":\"perftest-b\",\"app\":\"iperf\"}',15902813472,12381344,15902813473,15902813474,12381345,12381346," + fakeClusterUUID + "," + fmt.Sprintf("%d", time.Now().Unix()) + ",test-egress,172.18.0.1,http,mockHttpString,test-egress-node"
recordStrIPv6 = "1637706961,1637706973,1637706974,1637706975,3,2001:0:3238:dfe1:63::fefb,2001:0:3238:dfe1:63::fefc,44752,5201,6,823188,30472817041,241333,8982624938,471111,24500996,136211,7083284,perftest-a,antrea-test,k8s-node-control-plane,perftest-b,antrea-test-b,k8s-node-control-plane-b,2001:0:3238:dfe1:64::a,5202,perftest,test-flow-aggregator-networkpolicy-ingress-allow,antrea-test-ns,test-flow-aggregator-networkpolicy-rule,2,1,test-flow-aggregator-networkpolicy-egress-allow,antrea-test-ns-e,test-flow-aggregator-networkpolicy-rule-e,5,4,TIME_WAIT,11,'{\"antrea-e2e\":\"perftest-a\",\"app\":\"iperf\"}','{\"antrea-e2e\":\"perftest-b\",\"app\":\"iperf\"}',15902813472,12381344,15902813473,15902813474,12381345,12381346," + fakeClusterUUID + "," + fmt.Sprintf("%d", time.Now().Unix()) + ",test-egress,172.18.0.1,http,mockHttpString,test-egress-node"
)

const seed = 1
Expand Down
4 changes: 4 additions & 0 deletions pkg/flowaggregator/testing/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,10 @@ func PrepareMockIpfixRecord(mockRecord *ipfixentitiestesting.MockRecord, isIPv4
httpValsElem.SetStringValue("mockHttpString")
mockRecord.EXPECT().GetInfoElementWithValue("httpVals").Return(httpValsElem, 0, true)

egressNodeNameElem := createElement("egressNodeName", ipfixregistry.AntreaEnterpriseID)
egressNodeNameElem.SetStringValue("test-egress-node")
mockRecord.EXPECT().GetInfoElementWithValue("egressNodeName").Return(egressNodeNameElem, 0, true)

if isIPv4 {
sourceIPv4Elem := createElement("sourceIPv4Address", ipfixregistry.IANAEnterpriseID)
sourceIPv4Elem.SetIPAddressValue(net.ParseIP("10.10.0.79"))
Expand Down
3 changes: 2 additions & 1 deletion test/e2e/charts/flow-visibility/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ data:
egressName String,
egressIP String,
appProtocolName String,
httpVals String
httpVals String,
egressNodeName String
) engine=MergeTree
ORDER BY (timeInserted, flowEndSeconds)
TTL timeInserted + INTERVAL 1 HOUR
Expand Down
Loading
Loading