From ea9bdcab5d85d73a20e8b801a5c09ef0e1f9328b Mon Sep 17 00:00:00 2001 From: Yongming Ding Date: Thu, 25 Mar 2021 17:12:17 -0700 Subject: [PATCH] Add flowType field for Flow Exporter In this PR, we implemented the logic of flowType value assignment. We distinguished Pod-To-Pod flows and Pod-To-External flows using the podCIDRs of all nodes in the k8s cluster. --- .../elk-flow-collector/logstash/ipfix.yml | 3 ++ ci/kind/test-e2e-kind.sh | 2 +- cmd/antrea-agent/agent.go | 3 +- docs/network-flow-visibility.md | 1 + go.mod | 3 +- go.sum | 4 +- .../noderoute/node_route_controller.go | 42 ++++++++++++++++++- pkg/agent/flowexporter/exporter/exporter.go | 33 ++++++++++++++- pkg/flowaggregator/flowaggregator.go | 2 + test/e2e/flowaggregator_test.go | 9 ++++ test/e2e/framework.go | 2 +- 11 files changed, 95 insertions(+), 9 deletions(-) diff --git a/build/yamls/elk-flow-collector/logstash/ipfix.yml b/build/yamls/elk-flow-collector/logstash/ipfix.yml index eb3c93d57d2..ea0ac1e1bb8 100644 --- a/build/yamls/elk-flow-collector/logstash/ipfix.yml +++ b/build/yamls/elk-flow-collector/logstash/ipfix.yml @@ -4118,3 +4118,6 @@ 135: - :uint64 - :reverseOctetDeltaCountFromDestinationNode + 137: + - :uint8 + - :flowType diff --git a/ci/kind/test-e2e-kind.sh b/ci/kind/test-e2e-kind.sh index 101dbcb7075..64fe67274f9 100755 --- a/ci/kind/test-e2e-kind.sh +++ b/ci/kind/test-e2e-kind.sh @@ -101,7 +101,7 @@ if $np; then manifest_args="$manifest_args --np --tun vxlan" fi -COMMON_IMAGES_LIST=("gcr.io/kubernetes-e2e-test-images/agnhost:2.8" "projects.registry.vmware.com/library/busybox" "projects.registry.vmware.com/antrea/nginx" "projects.registry.vmware.com/antrea/perftool" "projects.registry.vmware.com/antrea/ipfix-collector:v0.4.3") +COMMON_IMAGES_LIST=("gcr.io/kubernetes-e2e-test-images/agnhost:2.8" "projects.registry.vmware.com/library/busybox" "projects.registry.vmware.com/antrea/nginx" "projects.registry.vmware.com/antrea/perftool" "projects.registry.vmware.com/antrea/ipfix-collector:v0.4.7") for image in "${COMMON_IMAGES_LIST[@]}"; do docker pull $image done diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 209de4b0d4e..b8914db55af 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -360,7 +360,8 @@ func run(o *Options) error { o.config.EnableTLSToFlowAggregator, v4Enabled, v6Enabled, - k8sClient) + k8sClient, + nodeRouteController) if err != nil { return fmt.Errorf("error when creating IPFIX flow exporter: %v", err) } diff --git a/docs/network-flow-visibility.md b/docs/network-flow-visibility.md index e43b6ab3425..66e9ce3afa8 100644 --- a/docs/network-flow-visibility.md +++ b/docs/network-flow-visibility.md @@ -159,6 +159,7 @@ the flow. All the IEs used by the Antrea Flow Exporter are listed below: | ingressNetworkPolicyNamespace| 56506 | 111 | string | | egressNetworkPolicyName | 56506 | 112 | string | | egressNetworkPolicyNamespace | 56506 | 113 | string | +| flowType | 56506 | 137 | unsigned8 | ### Supported capabilities diff --git a/go.mod b/go.mod index 86e5210c24c..23b08d6634e 100644 --- a/go.mod +++ b/go.mod @@ -28,7 +28,6 @@ require ( github.com/golang/protobuf v1.3.2 github.com/google/uuid v1.1.1 github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd - github.com/kr/text v0.2.0 // indirect github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect github.com/pkg/errors v0.9.1 github.com/prometheus/common v0.4.1 @@ -42,7 +41,7 @@ require ( github.com/stretchr/testify v1.6.1 github.com/ti-mo/conntrack v0.3.0 github.com/vishvananda/netlink v1.1.0 - github.com/vmware/go-ipfix v0.4.5 + github.com/vmware/go-ipfix v0.4.7 golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a golang.org/x/exp v0.0.0-20190312203227-4b39c73a6495 golang.org/x/mod v0.4.0 diff --git a/go.sum b/go.sum index b1ec43a9692..e2e8b1f51b4 100644 --- a/go.sum +++ b/go.sum @@ -407,8 +407,8 @@ github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYp github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc/go.mod h1:ZjcWmFBXmLKZu9Nxj3WKYEafiSqer2rnvPr0en9UNpI= github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df h1:OviZH7qLw/7ZovXvuNyL3XQl8UFofeikI1NW1Gypu7k= github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU= -github.com/vmware/go-ipfix v0.4.5 h1:EwG2bQXKT72IzMOsCcbvP1Po2PncLoSoPuYrHf3YrsI= -github.com/vmware/go-ipfix v0.4.5/go.mod h1:lQz3f4r2pZWo0q8s8BtZ0xo5fPSOYsYteqJgBASP69o= +github.com/vmware/go-ipfix v0.4.7 h1:zyKpsifh19Mdmo6FE3C2GooAWiiThX2JV+X4VdzqeZY= +github.com/vmware/go-ipfix v0.4.7/go.mod h1:lQz3f4r2pZWo0q8s8BtZ0xo5fPSOYsYteqJgBASP69o= github.com/wenyingd/ofnet v0.0.0-20210318032909-171b6795a2da h1:ragN21nQa4zKuCwR2UEbTXEAh3L2YN/Id5SCVkjjwdY= github.com/wenyingd/ofnet v0.0.0-20210318032909-171b6795a2da/go.mod h1:8mMMWAYBNUeTGXYKizOLETfN3WIbu3P5DgvS2jiXKdI= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8= diff --git a/pkg/agent/controller/noderoute/node_route_controller.go b/pkg/agent/controller/noderoute/node_route_controller.go index 4d318b6b4b0..12bc770006e 100644 --- a/pkg/agent/controller/noderoute/node_route_controller.go +++ b/pkg/agent/controller/noderoute/node_route_controller.go @@ -52,6 +52,11 @@ const ( ovsExternalIDNodeName = "node-name" nodeRouteInfoPodCIDRIndexName = "podCIDR" + + PodIPv4CIDRMaskSize = 24 + IPv4BitLen = 32 + PodIPv6CIDRMaskSize = 64 + IPv6BitLen = 128 ) // Controller is responsible for setting up necessary IP routes and Openflow entries for inter-node traffic. @@ -71,6 +76,7 @@ type Controller struct { // The key is the host name of the Node, the value is the nodeRouteInfo of the Node. // A node will be in the map after its flows and routes are installed successfully. installedNodes cache.Indexer + PodCIDRIPsMap map[string]int } // NewNodeRouteController instantiates a new Controller object which will process Node events @@ -97,7 +103,9 @@ func NewNodeRouteController( nodeLister: nodeInformer.Lister(), nodeListerSynced: nodeInformer.Informer().HasSynced, queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "noderoute"), - installedNodes: cache.NewIndexer(nodeRouteInfoKeyFunc, cache.Indexers{nodeRouteInfoPodCIDRIndexName: nodeRouteInfoPodCIDRIndexFunc})} + installedNodes: cache.NewIndexer(nodeRouteInfoKeyFunc, cache.Indexers{nodeRouteInfoPodCIDRIndexName: nodeRouteInfoPodCIDRIndexFunc}), + PodCIDRIPsMap: make(map[string]int), + } nodeInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ AddFunc: func(cur interface{}) { @@ -112,6 +120,12 @@ func NewNodeRouteController( }, nodeResyncPeriod, ) + if nodeConfig.PodIPv4CIDR != nil { + controller.PodCIDRIPsMap[nodeConfig.PodIPv4CIDR.IP.String()] = 1 + } + if nodeConfig.PodIPv6CIDR != nil { + controller.PodCIDRIPsMap[nodeConfig.PodIPv6CIDR.IP.String()] = 1 + } return controller } @@ -381,6 +395,15 @@ func (c *Controller) deleteNodeRoute(nodeName string) error { if err := c.routeClient.DeleteRoutes(podCIDR); err != nil { return fmt.Errorf("failed to delete the route to Node %s: %v", nodeName, err) } + podCIDRStr := podCIDR.IP.String() + if _, ok := c.PodCIDRIPsMap[podCIDRStr]; ok { + c.PodCIDRIPsMap[podCIDRStr] -= 1 + if c.PodCIDRIPsMap[podCIDRStr] == 0 { + delete(c.PodCIDRIPsMap, podCIDRStr) + } + } else { + klog.Warningf("PodCIDR IP: %s doesn't exist in PodCIDRIPsMap", podCIDRStr) + } } if err := c.ofClient.UninstallNodeFlows(nodeName); err != nil { return fmt.Errorf("failed to uninstall flows to Node %s: %v", nodeName, err) @@ -452,6 +475,12 @@ func (c *Controller) addNodeRoute(nodeName string, node *corev1.Node) error { peerGatewayIP := ip.NextIP(peerPodCIDRAddr) peerConfig[peerPodCIDR] = peerGatewayIP podCIDRs = append(podCIDRs, peerPodCIDR) + podCIDRStr := peerPodCIDR.IP.String() + if _, ok := c.PodCIDRIPsMap[podCIDRStr]; ok { + c.PodCIDRIPsMap[podCIDRStr] += 1 + } else { + c.PodCIDRIPsMap[podCIDRStr] = 1 + } } peerNodeIP, err := GetNodeAddr(node) @@ -615,3 +644,14 @@ func GetNodeAddr(node *corev1.Node) (net.IP, error) { } return ipAddr, nil } + +func (c *Controller) IPInPodSubnets(ip net.IP) bool { + var ipStr string + if ip.To4() != nil { + ipStr = ip.Mask(net.CIDRMask(PodIPv4CIDRMaskSize, IPv4BitLen)).String() + } else { + ipStr = ip.Mask(net.CIDRMask(PodIPv6CIDRMaskSize, IPv6BitLen)).String() + } + _, ok := c.PodCIDRIPsMap[ipStr] + return ok +} diff --git a/pkg/agent/flowexporter/exporter/exporter.go b/pkg/agent/flowexporter/exporter/exporter.go index e9789a8e1c1..ce7a15e7d4e 100644 --- a/pkg/agent/flowexporter/exporter/exporter.go +++ b/pkg/agent/flowexporter/exporter/exporter.go @@ -27,9 +27,11 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/klog" + "github.com/vmware-tanzu/antrea/pkg/agent/controller/noderoute" "github.com/vmware-tanzu/antrea/pkg/agent/flowexporter" "github.com/vmware-tanzu/antrea/pkg/agent/flowexporter/connections" "github.com/vmware-tanzu/antrea/pkg/agent/flowexporter/flowrecords" + "github.com/vmware-tanzu/antrea/pkg/agent/openflow" "github.com/vmware-tanzu/antrea/pkg/ipfix" "github.com/vmware-tanzu/antrea/pkg/util/env" ) @@ -69,6 +71,7 @@ var ( "ingressNetworkPolicyNamespace", "egressNetworkPolicyName", "egressNetworkPolicyNamespace", + "flowType", } AntreaInfoElementsIPv4 = append(antreaInfoElementsCommon, []string{"destinationClusterIPv4"}...) AntreaInfoElementsIPv6 = append(antreaInfoElementsCommon, []string{"destinationClusterIPv6"}...) @@ -97,6 +100,8 @@ type flowExporter struct { idleFlowTimeout time.Duration enableTLSToFlowAggregator bool k8sClient kubernetes.Interface + nodeRouteController *noderoute.Controller + podSubnets []*net.IPNet } func genObservationID() (uint32, error) { @@ -126,7 +131,8 @@ func prepareExporterInputArgs(collectorAddr, collectorProto string) (exporter.Ex func NewFlowExporter(connStore connections.ConnectionStore, records *flowrecords.FlowRecords, collectorAddr string, collectorProto string, activeFlowTimeout time.Duration, idleFlowTimeout time.Duration, - enableTLSToFlowAggregator bool, v4Enabled bool, v6Enabled bool, k8sClient kubernetes.Interface) (*flowExporter, error) { + enableTLSToFlowAggregator bool, v4Enabled bool, v6Enabled bool, k8sClient kubernetes.Interface, + nodeRouteController *noderoute.Controller) (*flowExporter, error) { // Initialize IPFIX registry registry := ipfix.NewIPFIXRegistry() registry.LoadRegistry() @@ -136,6 +142,7 @@ func NewFlowExporter(connStore connections.ConnectionStore, records *flowrecords if err != nil { return nil, err } + return &flowExporter{ connStore: connStore, flowRecords: records, @@ -148,6 +155,7 @@ func NewFlowExporter(connStore connections.ConnectionStore, records *flowrecords ipfixSet: ipfix.NewSet(false), enableTLSToFlowAggregator: enableTLSToFlowAggregator, k8sClient: k8sClient, + nodeRouteController: nodeRouteController, }, nil } @@ -517,6 +525,8 @@ func (exp *flowExporter) addRecordToSet(record flowexporter.FlowRecord) error { ie.Value = record.Conn.EgressNetworkPolicyName case "egressNetworkPolicyNamespace": ie.Value = record.Conn.EgressNetworkPolicyNamespace + case "flowType": + ie.Value = exp.findFlowType(record) } } @@ -539,3 +549,24 @@ func (exp *flowExporter) sendDataSet() (int, error) { klog.V(4).Infof("Data set sent successfully. Bytes sent: %d", sentBytes) return sentBytes, nil } + +func (exp *flowExporter) findFlowType(record flowexporter.FlowRecord) uint8 { + if exp.nodeRouteController == nil { + klog.Warningf("Can't find flowType without nodeRouteController") + return 0 + } + if exp.nodeRouteController.IPInPodSubnets(record.Conn.TupleOrig.SourceAddress) { + if record.Conn.Mark == openflow.ServiceCTMark || exp.nodeRouteController.IPInPodSubnets(record.Conn.TupleOrig.DestinationAddress) { + if record.Conn.SourcePodName == "" || record.Conn.DestinationPodName == "" { + return ipfixregistry.InterNode + } + return ipfixregistry.IntraNode + } else { + return ipfixregistry.ToExternal + } + } else { + // We do not support External-To-Pod flows for now. + klog.Warningf("Source IP: %s doesn't exist in PodCIDRs", record.Conn.TupleOrig.SourceAddress.String()) + return 0 + } +} diff --git a/pkg/flowaggregator/flowaggregator.go b/pkg/flowaggregator/flowaggregator.go index 929dd9a957d..f62b10a4f13 100644 --- a/pkg/flowaggregator/flowaggregator.go +++ b/pkg/flowaggregator/flowaggregator.go @@ -64,6 +64,7 @@ var ( "ingressNetworkPolicyNamespace", "egressNetworkPolicyName", "egressNetworkPolicyNamespace", + "flowType", } antreaInfoElementsIPv4 = append(antreaInfoElementsCommon, []string{"destinationClusterIPv4"}...) antreaInfoElementsIPv6 = append(antreaInfoElementsCommon, []string{"destinationClusterIPv6"}...) @@ -75,6 +76,7 @@ var ( nonStatsElementList = []string{ "flowEndSeconds", + "flowEndReason", } statsElementList = []string{ "octetDeltaCount", diff --git a/test/e2e/flowaggregator_test.go b/test/e2e/flowaggregator_test.go index decc0f316c8..95af3997225 100644 --- a/test/e2e/flowaggregator_test.go +++ b/test/e2e/flowaggregator_test.go @@ -24,6 +24,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + ipfixregistry "github.com/vmware/go-ipfix/pkg/registry" corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" networkingv1 "k8s.io/api/networking/v1" @@ -242,8 +243,10 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri // Check if record has both Pod name of source and destination pod. if isIntraNode { checkPodAndNodeData(t, record, "perftest-a", controlPlaneNodeName(), "perftest-b", controlPlaneNodeName()) + checkFlowType(t, record, ipfixregistry.IntraNode) } else { checkPodAndNodeData(t, record, "perftest-a", controlPlaneNodeName(), "perftest-c", workerNodeName(1)) + checkFlowType(t, record, ipfixregistry.InterNode) } if checkService { @@ -336,6 +339,12 @@ func checkBandwidthFromRecord(t *testing.T, record, bandwidth string) { } } +func checkFlowType(t *testing.T, record string, flowType uint8) { + if !strings.Contains(record, fmt.Sprintf("%s: %d", "flowType", flowType)) { + t.Errorf("Record does not have correct flowType") + } +} + func getRecordsFromOutput(output string) []string { re := regexp.MustCompile("(?m)^.*" + "#" + ".*$[\r\n]+") output = re.ReplaceAllString(output, "") diff --git a/test/e2e/framework.go b/test/e2e/framework.go index a7dbd7cc3d3..64966590866 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -98,7 +98,7 @@ const ( busyboxImage = "projects.registry.vmware.com/library/busybox" nginxImage = "projects.registry.vmware.com/antrea/nginx" perftoolImage = "projects.registry.vmware.com/antrea/perftool" - ipfixCollectorImage = "projects.registry.vmware.com/antrea/ipfix-collector:v0.4.3" + ipfixCollectorImage = "projects.registry.vmware.com/antrea/ipfix-collector:v0.4.7" ipfixCollectorPort = "4739" nginxLBService = "nginx-loadbalancer"