Skip to content

Commit

Permalink
Add "egressNodeName" in FlowExporter records (#6012)
Browse files Browse the repository at this point in the history
Signed-off-by: Kumar Atish <atish.iaf@gmail.com>
  • Loading branch information
Atish-iaf authored Feb 23, 2024
1 parent d7355e8 commit 04d2eba
Show file tree
Hide file tree
Showing 19 changed files with 77 additions and 45 deletions.
2 changes: 1 addition & 1 deletion ci/kind/test-e2e-kind.sh
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ COMMON_IMAGES_LIST=("registry.k8s.io/e2e-test-images/agnhost:2.29" \
"projects.registry.vmware.com/antrea/nginx:1.21.6-alpine" \
"projects.registry.vmware.com/antrea/toolbox:1.3-0")

FLOW_VISIBILITY_IMAGE_LIST=("projects.registry.vmware.com/antrea/ipfix-collector:v0.8.2" \
FLOW_VISIBILITY_IMAGE_LIST=("projects.registry.vmware.com/antrea/ipfix-collector:v0.9.0" \
"projects.registry.vmware.com/antrea/clickhouse-operator:0.21.0" \
"projects.registry.vmware.com/antrea/metrics-exporter:0.21.0" \
"projects.registry.vmware.com/antrea/clickhouse-server:23.4")
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ require (
github.com/stretchr/testify v1.8.4
github.com/ti-mo/conntrack v0.5.0
github.com/vishvananda/netlink v1.2.1-beta.2
github.com/vmware/go-ipfix v0.8.2
github.com/vmware/go-ipfix v0.9.0
go.uber.org/mock v0.4.0
golang.org/x/crypto v0.19.0
golang.org/x/mod v0.15.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -759,8 +759,8 @@ github.com/vishvananda/netlink v1.2.1-beta.2/go.mod h1:twkDnbuQxJYemMlGd4JFIcuhg
github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8=
github.com/vishvananda/netns v0.0.4/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM=
github.com/vmware/go-ipfix v0.8.2 h1:7pnmXZpI0995psJgno4Bur5fr9PCxGQuKjCI/RYurzA=
github.com/vmware/go-ipfix v0.8.2/go.mod h1:NvEehcpptPOTBaLSkMA+88l2Oe8YNelVBdvj8PA/1d0=
github.com/vmware/go-ipfix v0.9.0 h1:4/N5eFliqULEaCUQV0lafOpN/1bItPE9OTAPGhrIXus=
github.com/vmware/go-ipfix v0.9.0/go.mod h1:MYEdL6Uel2ufOZyVCKvIAaw9hwnewK8aPr7rnwRbxMY=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xlab/treeprint v1.1.0 h1:G/1DjNkPpfZCFt9CSh6b5/nY4VimlbHF3Rh4obvtzDk=
Expand Down
8 changes: 6 additions & 2 deletions pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ var (
"egressIP",
"appProtocolName",
"httpVals",
"egressNodeName",
}
AntreaInfoElementsIPv4 = append(antreaInfoElementsCommon, []string{"destinationClusterIPv4"}...)
AntreaInfoElementsIPv6 = append(antreaInfoElementsCommon, []string{"destinationClusterIPv6"}...)
Expand Down Expand Up @@ -595,6 +596,8 @@ func (exp *FlowExporter) addConnToSet(conn *flowexporter.Connection) error {
ie.SetStringValue(conn.AppProtocolName)
case "httpVals":
ie.SetStringValue(conn.HttpVals)
case "egressNodeName":
ie.SetStringValue(conn.EgressNodeName)
}
}
err := exp.ipfixSet.AddRecord(eL, templateID)
Expand Down Expand Up @@ -641,14 +644,15 @@ func (exp *FlowExporter) findFlowType(conn flowexporter.Connection) uint8 {
}

func (exp *FlowExporter) fillEgressInfo(conn *flowexporter.Connection) {
egressName, egressIP, _, err := exp.egressQuerier.GetEgress(conn.SourcePodNamespace, conn.SourcePodName)
egressName, egressIP, egressNodeName, err := exp.egressQuerier.GetEgress(conn.SourcePodNamespace, conn.SourcePodName)
if err != nil {
// Egress is not enabled or no Egress is applied to this Pod
return
}
conn.EgressName = egressName
conn.EgressIP = egressIP
klog.V(4).InfoS("Filling Egress Info for flow", "Egress", conn.EgressName, "EgressIP", conn.EgressIP, "SourcePodNamespace", conn.SourcePodNamespace, "SourcePodName", conn.SourcePodName)
conn.EgressNodeName = egressNodeName
klog.V(4).InfoS("Filling Egress Info for flow", "Egress", conn.EgressName, "EgressIP", conn.EgressIP, "EgressNode", conn.EgressNodeName, "SourcePod", klog.KRef(conn.SourcePodNamespace, conn.SourcePodName))
}

func (exp *FlowExporter) exportConn(conn *flowexporter.Connection) error {
Expand Down
38 changes: 21 additions & 17 deletions pkg/agent/flowexporter/exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -806,26 +806,29 @@ func TestFlowExporter_findFlowType(t *testing.T) {
func TestFlowExporter_fillEgressInfo(t *testing.T) {
ctrl := gomock.NewController(t)
testCases := []struct {
name string
sourcePodNamespace string
sourcePodName string
expectedEgressName string
expectedEgressIP string
expectedErr string
name string
sourcePodNamespace string
sourcePodName string
expectedEgressName string
expectedEgressIP string
expectedEgressNodeName string
expectedErr string
}{
{
name: "Both EgressName and EgressIP filled",
sourcePodNamespace: "namespaceA",
sourcePodName: "podA",
expectedEgressName: "test-egress",
expectedEgressIP: "172.18.0.1",
name: "EgressName, EgressIP and EgressNodeName filled",
sourcePodNamespace: "namespaceA",
sourcePodName: "podA",
expectedEgressName: "test-egress",
expectedEgressIP: "172.18.0.1",
expectedEgressNodeName: "test-egress-node",
},
{
name: "No Egress Information filled",
sourcePodNamespace: "namespaceA",
sourcePodName: "podC",
expectedEgressName: "",
expectedEgressIP: "",
name: "No Egress Information filled",
sourcePodNamespace: "namespaceA",
sourcePodName: "podC",
expectedEgressName: "",
expectedEgressIP: "",
expectedEgressNodeName: "",
},
}

Expand All @@ -841,13 +844,14 @@ func TestFlowExporter_fillEgressInfo(t *testing.T) {
SourcePodName: tc.sourcePodName,
}
if tc.expectedEgressName != "" {
egressQuerier.EXPECT().GetEgress(conn.SourcePodNamespace, conn.SourcePodName).Return(tc.expectedEgressName, tc.expectedEgressIP, "", nil)
egressQuerier.EXPECT().GetEgress(conn.SourcePodNamespace, conn.SourcePodName).Return(tc.expectedEgressName, tc.expectedEgressIP, tc.expectedEgressNodeName, nil)
} else {
egressQuerier.EXPECT().GetEgress(conn.SourcePodNamespace, conn.SourcePodName).Return("", "", "", fmt.Errorf("no Egress applied to Pod %s", conn.SourcePodName))
}
exp.fillEgressInfo(&conn)
assert.Equal(t, tc.expectedEgressName, conn.EgressName)
assert.Equal(t, tc.expectedEgressIP, conn.EgressIP)
assert.Equal(t, tc.expectedEgressNodeName, conn.EgressNodeName)
})
}
}
1 change: 1 addition & 0 deletions pkg/agent/flowexporter/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type Connection struct {
EgressIP string
AppProtocolName string
HttpVals string
EgressNodeName string
}

type ItemToExpire struct {
Expand Down
6 changes: 4 additions & 2 deletions pkg/flowaggregator/clickhouseclient/clickhouseclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,11 @@ const (
egressName,
egressIP,
appProtocolName,
httpVals)
httpVals,
egressNodeName)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?)`
?, ?, ?, ?, ?)`
)

// PrepareClickHouseConnection is used for unit testing
Expand Down Expand Up @@ -334,6 +335,7 @@ func (ch *ClickHouseExportProcess) batchCommitAll(ctx context.Context) (int, err
record.EgressIP,
record.AppProtocolName,
record.HttpVals,
record.EgressNodeName,
)

if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ func TestBatchCommitAll(t *testing.T) {
"test-egress",
"172.18.0.1",
"http",
"mockHttpString").
"mockHttpString",
"test-egress-node").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectCommit()

Expand Down
1 change: 1 addition & 0 deletions pkg/flowaggregator/flowlogger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func (fl *FlowLogger) WriteRecord(r *flowrecord.FlowRecord, prettyPrint bool) er
r.EgressIP,
r.AppProtocolName,
r.HttpVals,
r.EgressNodeName,
}

str := strings.Join(fields, ",")
Expand Down
4 changes: 2 additions & 2 deletions pkg/flowaggregator/flowlogger/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ func TestWriteRecord(t *testing.T) {
}{
{
prettyPrint: true,
expected: "1637706961,1637706973,10.10.0.79,10.10.0.80,44752,5201,TCP,perftest-a,antrea-test,k8s-node-control-plane,perftest-b,antrea-test-b,k8s-node-control-plane-b,10.10.1.10,5202,perftest,test-flow-aggregator-networkpolicy-ingress-allow,antrea-test-ns,test-flow-aggregator-networkpolicy-rule,Drop,K8sNetworkPolicy,test-flow-aggregator-networkpolicy-egress-allow,antrea-test-ns-e,test-flow-aggregator-networkpolicy-rule-e,Invalid,Invalid,test-egress,172.18.0.1,http,mockHttpString",
expected: "1637706961,1637706973,10.10.0.79,10.10.0.80,44752,5201,TCP,perftest-a,antrea-test,k8s-node-control-plane,perftest-b,antrea-test-b,k8s-node-control-plane-b,10.10.1.10,5202,perftest,test-flow-aggregator-networkpolicy-ingress-allow,antrea-test-ns,test-flow-aggregator-networkpolicy-rule,Drop,K8sNetworkPolicy,test-flow-aggregator-networkpolicy-egress-allow,antrea-test-ns-e,test-flow-aggregator-networkpolicy-rule-e,Invalid,Invalid,test-egress,172.18.0.1,http,mockHttpString,test-egress-node",
},
{
prettyPrint: false,
expected: "1637706961,1637706973,10.10.0.79,10.10.0.80,44752,5201,6,perftest-a,antrea-test,k8s-node-control-plane,perftest-b,antrea-test-b,k8s-node-control-plane-b,10.10.1.10,5202,perftest,test-flow-aggregator-networkpolicy-ingress-allow,antrea-test-ns,test-flow-aggregator-networkpolicy-rule,2,1,test-flow-aggregator-networkpolicy-egress-allow,antrea-test-ns-e,test-flow-aggregator-networkpolicy-rule-e,5,4,test-egress,172.18.0.1,http,mockHttpString",
expected: "1637706961,1637706973,10.10.0.79,10.10.0.80,44752,5201,6,perftest-a,antrea-test,k8s-node-control-plane,perftest-b,antrea-test-b,k8s-node-control-plane-b,10.10.1.10,5202,perftest,test-flow-aggregator-networkpolicy-ingress-allow,antrea-test-ns,test-flow-aggregator-networkpolicy-rule,2,1,test-flow-aggregator-networkpolicy-egress-allow,antrea-test-ns-e,test-flow-aggregator-networkpolicy-rule-e,5,4,test-egress,172.18.0.1,http,mockHttpString,test-egress-node",
},
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/flowaggregator/flowrecord/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type FlowRecord struct {
EgressIP string
AppProtocolName string
HttpVals string
EgressNodeName string
}

// GetFlowRecord converts ipfixentities.Record to FlowRecord
Expand Down Expand Up @@ -236,6 +237,9 @@ func GetFlowRecord(record ipfixentities.Record) *FlowRecord {
if httpVals, _, ok := record.GetInfoElementWithValue("httpVals"); ok {
r.HttpVals = httpVals.GetStringValue()
}
if egressNodeName, _, ok := record.GetInfoElementWithValue("egressNodeName"); ok {
r.EgressNodeName = egressNodeName.GetStringValue()
}
return r
}

Expand Down
1 change: 1 addition & 0 deletions pkg/flowaggregator/flowrecord/testing/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,6 @@ func PrepareTestFlowRecord() *flowrecord.FlowRecord {
EgressIP: "172.18.0.1",
AppProtocolName: "http",
HttpVals: "mockHttpString",
EgressNodeName: "test-egress-node",
}
}
1 change: 1 addition & 0 deletions pkg/flowaggregator/infoelements/elements.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ var (
"egressIP",
"appProtocolName",
"httpVals",
"egressNodeName",
}
AntreaInfoElementsIPv4 = append(AntreaInfoElementsCommon, []string{"destinationClusterIPv4"}...)
AntreaInfoElementsIPv6 = append(AntreaInfoElementsCommon, []string{"destinationClusterIPv6"}...)
Expand Down
2 changes: 2 additions & 0 deletions pkg/flowaggregator/s3uploader/s3uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,4 +488,6 @@ func writeRecord(w io.Writer, r *flowrecord.FlowRecord, clusterUUID string) {
io.WriteString(w, r.AppProtocolName)
io.WriteString(w, ",")
io.WriteString(w, r.HttpVals)
io.WriteString(w, ",")
io.WriteString(w, r.EgressNodeName)
}
4 changes: 2 additions & 2 deletions pkg/flowaggregator/s3uploader/s3uploader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ import (

var (
fakeClusterUUID = uuid.New().String()
recordStrIPv4 = "1637706961,1637706973,1637706974,1637706975,3,10.10.0.79,10.10.0.80,44752,5201,6,823188,30472817041,241333,8982624938,471111,24500996,136211,7083284,perftest-a,antrea-test,k8s-node-control-plane,perftest-b,antrea-test-b,k8s-node-control-plane-b,10.10.1.10,5202,perftest,test-flow-aggregator-networkpolicy-ingress-allow,antrea-test-ns,test-flow-aggregator-networkpolicy-rule,2,1,test-flow-aggregator-networkpolicy-egress-allow,antrea-test-ns-e,test-flow-aggregator-networkpolicy-rule-e,5,4,TIME_WAIT,11,'{\"antrea-e2e\":\"perftest-a\",\"app\":\"iperf\"}','{\"antrea-e2e\":\"perftest-b\",\"app\":\"iperf\"}',15902813472,12381344,15902813473,15902813474,12381345,12381346," + fakeClusterUUID + "," + fmt.Sprintf("%d", time.Now().Unix()) + ",test-egress,172.18.0.1,http,mockHttpString"
recordStrIPv6 = "1637706961,1637706973,1637706974,1637706975,3,2001:0:3238:dfe1:63::fefb,2001:0:3238:dfe1:63::fefc,44752,5201,6,823188,30472817041,241333,8982624938,471111,24500996,136211,7083284,perftest-a,antrea-test,k8s-node-control-plane,perftest-b,antrea-test-b,k8s-node-control-plane-b,2001:0:3238:dfe1:64::a,5202,perftest,test-flow-aggregator-networkpolicy-ingress-allow,antrea-test-ns,test-flow-aggregator-networkpolicy-rule,2,1,test-flow-aggregator-networkpolicy-egress-allow,antrea-test-ns-e,test-flow-aggregator-networkpolicy-rule-e,5,4,TIME_WAIT,11,'{\"antrea-e2e\":\"perftest-a\",\"app\":\"iperf\"}','{\"antrea-e2e\":\"perftest-b\",\"app\":\"iperf\"}',15902813472,12381344,15902813473,15902813474,12381345,12381346," + fakeClusterUUID + "," + fmt.Sprintf("%d", time.Now().Unix()) + ",test-egress,172.18.0.1,http,mockHttpString"
recordStrIPv4 = "1637706961,1637706973,1637706974,1637706975,3,10.10.0.79,10.10.0.80,44752,5201,6,823188,30472817041,241333,8982624938,471111,24500996,136211,7083284,perftest-a,antrea-test,k8s-node-control-plane,perftest-b,antrea-test-b,k8s-node-control-plane-b,10.10.1.10,5202,perftest,test-flow-aggregator-networkpolicy-ingress-allow,antrea-test-ns,test-flow-aggregator-networkpolicy-rule,2,1,test-flow-aggregator-networkpolicy-egress-allow,antrea-test-ns-e,test-flow-aggregator-networkpolicy-rule-e,5,4,TIME_WAIT,11,'{\"antrea-e2e\":\"perftest-a\",\"app\":\"iperf\"}','{\"antrea-e2e\":\"perftest-b\",\"app\":\"iperf\"}',15902813472,12381344,15902813473,15902813474,12381345,12381346," + fakeClusterUUID + "," + fmt.Sprintf("%d", time.Now().Unix()) + ",test-egress,172.18.0.1,http,mockHttpString,test-egress-node"
recordStrIPv6 = "1637706961,1637706973,1637706974,1637706975,3,2001:0:3238:dfe1:63::fefb,2001:0:3238:dfe1:63::fefc,44752,5201,6,823188,30472817041,241333,8982624938,471111,24500996,136211,7083284,perftest-a,antrea-test,k8s-node-control-plane,perftest-b,antrea-test-b,k8s-node-control-plane-b,2001:0:3238:dfe1:64::a,5202,perftest,test-flow-aggregator-networkpolicy-ingress-allow,antrea-test-ns,test-flow-aggregator-networkpolicy-rule,2,1,test-flow-aggregator-networkpolicy-egress-allow,antrea-test-ns-e,test-flow-aggregator-networkpolicy-rule-e,5,4,TIME_WAIT,11,'{\"antrea-e2e\":\"perftest-a\",\"app\":\"iperf\"}','{\"antrea-e2e\":\"perftest-b\",\"app\":\"iperf\"}',15902813472,12381344,15902813473,15902813474,12381345,12381346," + fakeClusterUUID + "," + fmt.Sprintf("%d", time.Now().Unix()) + ",test-egress,172.18.0.1,http,mockHttpString,test-egress-node"
)

const seed = 1
Expand Down
4 changes: 4 additions & 0 deletions pkg/flowaggregator/testing/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,10 @@ func PrepareMockIpfixRecord(mockRecord *ipfixentitiestesting.MockRecord, isIPv4
httpValsElem.SetStringValue("mockHttpString")
mockRecord.EXPECT().GetInfoElementWithValue("httpVals").Return(httpValsElem, 0, true)

egressNodeNameElem := createElement("egressNodeName", ipfixregistry.AntreaEnterpriseID)
egressNodeNameElem.SetStringValue("test-egress-node")
mockRecord.EXPECT().GetInfoElementWithValue("egressNodeName").Return(egressNodeNameElem, 0, true)

if isIPv4 {
sourceIPv4Elem := createElement("sourceIPv4Address", ipfixregistry.IANAEnterpriseID)
sourceIPv4Elem.SetIPAddressValue(net.ParseIP("10.10.0.79"))
Expand Down
3 changes: 2 additions & 1 deletion test/e2e/charts/flow-visibility/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ data:
egressName String,
egressIP String,
appProtocolName String,
httpVals String
httpVals String,
egressNodeName String
) engine=MergeTree
ORDER BY (timeInserted, flowEndSeconds)
TTL timeInserted + INTERVAL 1 HOUR
Expand Down
Loading

0 comments on commit 04d2eba

Please sign in to comment.