Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add flowType field for Flow Exporter #2000

Merged
merged 1 commit into from
Apr 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ func run(o *Options) error {
if features.DefaultFeatureGate.Enabled(features.FlowExporter) {
v4Enabled := config.IsIPv4Enabled(nodeConfig, networkConfig.TrafficEncapMode)
v6Enabled := config.IsIPv6Enabled(nodeConfig, networkConfig.TrafficEncapMode)
isNetworkPolicyOnly := networkConfig.TrafficEncapMode.IsNetworkPolicyOnly()

flowRecords := flowrecords.NewFlowRecords()
connStore := connections.NewConnectionStore(
Expand All @@ -376,7 +377,9 @@ func run(o *Options) error {
o.config.EnableTLSToFlowAggregator,
v4Enabled,
v6Enabled,
k8sClient)
k8sClient,
nodeRouteController,
isNetworkPolicyOnly)
if err != nil {
return fmt.Errorf("error when creating IPFIX flow exporter: %v", err)
}
Expand Down
40 changes: 39 additions & 1 deletion pkg/agent/controller/noderoute/node_route_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/vmware-tanzu/antrea/pkg/agent/route"
"github.com/vmware-tanzu/antrea/pkg/agent/util"
"github.com/vmware-tanzu/antrea/pkg/ovs/ovsconfig"
utilip "github.com/vmware-tanzu/antrea/pkg/util/ip"
)

const (
Expand Down Expand Up @@ -97,7 +98,8 @@ func NewNodeRouteController(
nodeLister: nodeInformer.Lister(),
nodeListerSynced: nodeInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "noderoute"),
installedNodes: cache.NewIndexer(nodeRouteInfoKeyFunc, cache.Indexers{nodeRouteInfoPodCIDRIndexName: nodeRouteInfoPodCIDRIndexFunc})}
installedNodes: cache.NewIndexer(nodeRouteInfoKeyFunc, cache.Indexers{nodeRouteInfoPodCIDRIndexName: nodeRouteInfoPodCIDRIndexFunc}),
}
nodeInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: func(cur interface{}) {
Expand Down Expand Up @@ -615,3 +617,39 @@ func GetNodeAddr(node *corev1.Node) (net.IP, error) {
}
return ipAddr, nil
}

func (c *Controller) IPInPodSubnets(ip net.IP) bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a simple unit test for this method?

Copy link
Contributor Author

@dreamtalen dreamtalen Apr 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, added a unit test for IPInPodSubnets function.
For Pod-To-External flows, I have tested manually by watching the records on the ipfix-collector side.

var ipCIDR *net.IPNet
var curNodeCIDRStr string
if ip.To4() != nil {
var podIPv4CIDRMaskSize int
if c.nodeConfig.PodIPv4CIDR != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we tend to use early returns when possible to reduce indentation levels:

if c.nodeConfig.PodIPv4CIDR == nil {
    return false
}
curNodeCIDRStr = c.nodeConfig.PodIPv4CIDR.String()
podIPv4CIDRMaskSize, _ := c.nodeConfig.PodIPv4CIDR.Mask.Size()

same below

this can be addressed in a future PR

curNodeCIDRStr = c.nodeConfig.PodIPv4CIDR.String()
podIPv4CIDRMaskSize, _ = c.nodeConfig.PodIPv4CIDR.Mask.Size()
} else {
return false
}
v4Mask := net.CIDRMask(podIPv4CIDRMaskSize, utilip.V4BitLen)
ipCIDR = &net.IPNet{
IP: ip.Mask(v4Mask),
Mask: v4Mask,
}

} else {
var podIPv6CIDRMaskSize int
if c.nodeConfig.PodIPv6CIDR != nil {
curNodeCIDRStr = c.nodeConfig.PodIPv6CIDR.String()
podIPv6CIDRMaskSize, _ = c.nodeConfig.PodIPv6CIDR.Mask.Size()
} else {
return false
}
v6Mask := net.CIDRMask(podIPv6CIDRMaskSize, utilip.V6BitLen)
ipCIDR = &net.IPNet{
IP: ip.Mask(v6Mask),
Mask: v6Mask,
}
}
ipCIDRStr := ipCIDR.String()
nodeInCluster, _ := c.installedNodes.ByIndex(nodeRouteInfoPodCIDRIndexName, ipCIDRStr)
return len(nodeInCluster) > 0 || ipCIDRStr == curNodeCIDRStr
}
80 changes: 75 additions & 5 deletions pkg/agent/controller/noderoute/node_route_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/containernetworking/plugins/pkg/ip"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
Expand All @@ -35,11 +36,13 @@ import (
)

var (
gatewayMAC, _ = net.ParseMAC("00:00:00:00:00:01")
_, podCIDR, _ = net.ParseCIDR("1.1.1.0/24")
podCIDRGateway = ip.NextIP(podCIDR.IP)
nodeIP1 = net.ParseIP("10.10.10.10")
nodeIP2 = net.ParseIP("10.10.10.11")
gatewayMAC, _ = net.ParseMAC("00:00:00:00:00:01")
_, podCIDR, _ = net.ParseCIDR("1.1.1.0/24")
_, podCIDR2, _ = net.ParseCIDR("1.1.2.0/24")
podCIDRGateway = ip.NextIP(podCIDR.IP)
podCIDR2Gateway = ip.NextIP(podCIDR2.IP)
nodeIP1 = net.ParseIP("10.10.10.10")
nodeIP2 = net.ParseIP("10.10.10.11")
)

type fakeController struct {
Expand Down Expand Up @@ -158,3 +161,70 @@ func TestControllerWithDuplicatePodCIDR(t *testing.T) {
case <-finishCh:
}
}

func TestIPInPodSubnets(t *testing.T) {
c, closeFn := newController(t)
defer closeFn()
defer c.queue.ShutDown()

stopCh := make(chan struct{})
defer close(stopCh)
c.informerFactory.Start(stopCh)
// Must wait for cache sync, otherwise resource creation events will be missing if the resources are created
// in-between list and watch call of an informer. This is because fake clientset doesn't support watching with
// resourceVersion. A watcher of fake clientset only gets events that happen after the watcher is created.
c.informerFactory.WaitForCacheSync(stopCh)
c.Controller.nodeConfig.PodIPv4CIDR = podCIDR

node1 := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
},
Spec: corev1.NodeSpec{
PodCIDR: podCIDR.String(),
PodCIDRs: []string{podCIDR.String()},
},
Status: corev1.NodeStatus{
Addresses: []corev1.NodeAddress{
{
Type: corev1.NodeInternalIP,
Address: nodeIP1.String(),
},
},
},
}
node2 := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node2",
},
Spec: corev1.NodeSpec{
PodCIDR: podCIDR2.String(),
PodCIDRs: []string{podCIDR2.String()},
},
Status: corev1.NodeStatus{
Addresses: []corev1.NodeAddress{
{
Type: corev1.NodeInternalIP,
Address: nodeIP2.String(),
},
},
},
}

c.clientset.CoreV1().Nodes().Create(context.TODO(), node1, metav1.CreateOptions{})
// The 2nd argument is Any() because the argument is unpredictable when it uses pointer as the key of map.
// The argument type is map[*net.IPNet]net.IP.
c.ofClient.EXPECT().InstallNodeFlows("node1", gomock.Any(), nodeIP1, uint32(0)).Times(1)
c.routeClient.EXPECT().AddRoutes(podCIDR, "node1", nodeIP1, podCIDRGateway).Times(1)
c.processNextWorkItem()

c.clientset.CoreV1().Nodes().Create(context.TODO(), node2, metav1.CreateOptions{})
c.ofClient.EXPECT().InstallNodeFlows("node2", gomock.Any(), nodeIP2, uint32(0)).Times(1)
c.routeClient.EXPECT().AddRoutes(podCIDR2, "node2", nodeIP2, podCIDR2Gateway).Times(1)
c.processNextWorkItem()

assert.Equal(t, true, c.Controller.IPInPodSubnets(net.ParseIP("1.1.1.1")))
assert.Equal(t, true, c.Controller.IPInPodSubnets(net.ParseIP("1.1.2.1")))
assert.Equal(t, false, c.Controller.IPInPodSubnets(net.ParseIP("10.10.10.10")))
assert.Equal(t, false, c.Controller.IPInPodSubnets(net.ParseIP("8.8.8.8")))
}
46 changes: 39 additions & 7 deletions pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/klog"

"github.com/vmware-tanzu/antrea/pkg/agent/controller/noderoute"
"github.com/vmware-tanzu/antrea/pkg/agent/flowexporter"
"github.com/vmware-tanzu/antrea/pkg/agent/flowexporter/connections"
"github.com/vmware-tanzu/antrea/pkg/agent/flowexporter/flowrecords"
"github.com/vmware-tanzu/antrea/pkg/agent/openflow"
"github.com/vmware-tanzu/antrea/pkg/ipfix"
"github.com/vmware-tanzu/antrea/pkg/util/env"
)
Expand Down Expand Up @@ -94,6 +96,8 @@ type flowExporter struct {
idleFlowTimeout time.Duration
enableTLSToFlowAggregator bool
k8sClient kubernetes.Interface
nodeRouteController *noderoute.Controller
isNetworkPolicyOnly bool
}

func genObservationID() (uint32, error) {
Expand Down Expand Up @@ -123,7 +127,8 @@ func prepareExporterInputArgs(collectorAddr, collectorProto string) (exporter.Ex

func NewFlowExporter(connStore connections.ConnectionStore, records *flowrecords.FlowRecords,
collectorAddr string, collectorProto string, activeFlowTimeout time.Duration, idleFlowTimeout time.Duration,
enableTLSToFlowAggregator bool, v4Enabled bool, v6Enabled bool, k8sClient kubernetes.Interface) (*flowExporter, error) {
enableTLSToFlowAggregator bool, v4Enabled bool, v6Enabled bool, k8sClient kubernetes.Interface,
nodeRouteController *noderoute.Controller, isNetworkPolicyOnly bool) (*flowExporter, error) {
// Initialize IPFIX registry
registry := ipfix.NewIPFIXRegistry()
registry.LoadRegistry()
Expand All @@ -133,6 +138,7 @@ func NewFlowExporter(connStore connections.ConnectionStore, records *flowrecords
if err != nil {
return nil, err
}

return &flowExporter{
connStore: connStore,
flowRecords: records,
Expand All @@ -145,6 +151,8 @@ func NewFlowExporter(connStore connections.ConnectionStore, records *flowrecords
ipfixSet: ipfix.NewSet(false),
enableTLSToFlowAggregator: enableTLSToFlowAggregator,
k8sClient: k8sClient,
nodeRouteController: nodeRouteController,
isNetworkPolicyOnly: isNetworkPolicyOnly,
}, nil
}

Expand Down Expand Up @@ -516,12 +524,7 @@ func (exp *flowExporter) addRecordToSet(record flowexporter.FlowRecord) error {
case "tcpState":
ie.Value = record.Conn.TCPState
case "flowType":
// TODO: assign flow type to support Pod-to-External flows
if record.Conn.SourcePodName == "" || record.Conn.DestinationPodName == "" {
ie.Value = ipfixregistry.InterNode
} else {
ie.Value = ipfixregistry.IntraNode
}
ie.Value = exp.findFlowType(record)
}
}

Expand All @@ -544,3 +547,32 @@ func (exp *flowExporter) sendDataSet() (int, error) {
klog.V(4).Infof("Data set sent successfully. Bytes sent: %d", sentBytes)
return sentBytes, nil
}

func (exp *flowExporter) findFlowType(record flowexporter.FlowRecord) uint8 {
// TODO: support Pod-To-External flows in network policy only mode.
if exp.isNetworkPolicyOnly {
if record.Conn.SourcePodName == "" || record.Conn.DestinationPodName == "" {
return ipfixregistry.InterNode
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you probably should rename these to FlowTypeInterNode, etc. in go-ipfix so there is more context in the name

Copy link
Member

@srikartati srikartati Apr 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As these were used in the context of adding flow types (the function is GetFlowType here), I thought a simple name might be sufficient. If it's confusing, we could change it.
@dreamtalen Could you please create an issue in go-ipfix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, thanks.

}
return ipfixregistry.IntraNode
}
Comment on lines +551 to +558
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed with @srikartati offline, I feel like flow type resolution should belong in the Flow Aggregator
The Flow Aggregator could watch all Pods if needed

Copy link
Member

@srikartati srikartati Apr 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @antoninbas. Yes, we have to explore this to support Pod-To-External flows in Nwtqork Policy only mode in the future.


if exp.nodeRouteController == nil {
klog.Warningf("Can't find flowType without nodeRouteController")
return 0
}
if exp.nodeRouteController.IPInPodSubnets(record.Conn.TupleOrig.SourceAddress) {
Copy link
Member

@srikartati srikartati Apr 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will not work for network-policy only mode traffic as the installed node cache indexer is not maintained at all.
https://github.com/vmware-tanzu/antrea/blob/main/pkg/agent/controller/noderoute/node_route_controller.go#L278

I think we need to check the network subnet of local nodeAddr in nodeConfig to determine if the IP address is inside cluster or not. @jianjuns Could you please confirm if this check is correct or not for network policy only mode?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Srikar for pointing out it, added a TODO right now and wait for Jianjun's opinion.

Copy link
Member

@srikartati srikartati Apr 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding these lines to address this.

My initial assessment on node network based identification is not correct and was mixed up with something else. There is no straightforward way to detect PodIPs at Antrea agent when operating in network policy only mode.
Therefore, we are supporting Pod-To-External flows for all traffic modes except for the special mode of network policy only mode. Pod-To-Pod flows and Pod-To-Service flows are supported in network policy only mode. There is no regression here.

if record.Conn.Mark == openflow.ServiceCTMark || exp.nodeRouteController.IPInPodSubnets(record.Conn.TupleOrig.DestinationAddress) {
if record.Conn.SourcePodName == "" || record.Conn.DestinationPodName == "" {
return ipfixregistry.InterNode
}
return ipfixregistry.IntraNode
} else {
return ipfixregistry.ToExternal
}
} else {
// We do not support External-To-Pod flows for now.
klog.Warningf("Source IP: %s doesn't exist in PodCIDRs", record.Conn.TupleOrig.SourceAddress.String())
return 0
}
}
8 changes: 4 additions & 4 deletions pkg/util/ip/ip.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
)

const (
v4BitLen = 8 * net.IPv4len
v6BitLen = 8 * net.IPv6len
V4BitLen = 8 * net.IPv4len
V6BitLen = 8 * net.IPv6len
)

// This function takes in one allow CIDR and multiple except CIDRs and gives diff CIDRs
Expand Down Expand Up @@ -71,9 +71,9 @@ func diffFromCIDR(allowCIDR, exceptCIDR *net.IPNet) []*net.IPNet {
exceptStartIP := exceptCIDR.IP.Mask(exceptCIDR.Mask)
var bits int
if allowStartIP.To4() != nil {
bits = v4BitLen
bits = V4BitLen
} else {
bits = v6BitLen
bits = V6BitLen
}

// New CIDRs should not contain the IPs in exceptCIDR. Manipulating the bits in start IP of
Expand Down
10 changes: 10 additions & 0 deletions test/e2e/flowaggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
ipfixregistry "github.com/vmware/go-ipfix/pkg/registry"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
Expand All @@ -40,6 +41,7 @@ DATA SET:
DATA RECORD-0:
flowStartSeconds: 1608338066
flowEndSeconds: 1608338072
flowEndReason: 2
sourceTransportPort: 43600
destinationTransportPort: 5201
protocolIdentifier: 6
Expand All @@ -65,6 +67,7 @@ DATA SET:
ingressNetworkPolicyNamespace: antrea-test
egressNetworkPolicyName: test-flow-aggregator-networkpolicy-egress
egressNetworkPolicyNamespace: antrea-test
flowType: 1
destinationClusterIPv4: 0.0.0.0
originalExporterIPv4Address: 10.10.0.1
originalObservationDomainId: 2134708971
Expand Down Expand Up @@ -242,8 +245,10 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri
// Check if record has both Pod name of source and destination pod.
if isIntraNode {
checkPodAndNodeData(t, record, "perftest-a", controlPlaneNodeName(), "perftest-b", controlPlaneNodeName())
checkFlowType(t, record, ipfixregistry.IntraNode)
} else {
checkPodAndNodeData(t, record, "perftest-a", controlPlaneNodeName(), "perftest-c", workerNodeName(1))
checkFlowType(t, record, ipfixregistry.InterNode)
}

if checkService {
Expand Down Expand Up @@ -336,6 +341,11 @@ func checkBandwidthFromRecord(t *testing.T, record, bandwidth string) {
}
}

// TODO: Add a test that checks the functionality of Pod-To-External flow.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this TODO seems out-of-place on this function, maybe it should have been part of the commit message

func checkFlowType(t *testing.T, record string, flowType uint8) {
assert.Containsf(t, record, fmt.Sprintf("%s: %d", "flowType", flowType), "Record does not have correct flowType")
}

func getRecordsFromOutput(output string) []string {
re := regexp.MustCompile("(?m)^.*" + "#" + ".*$[\r\n]+")
output = re.ReplaceAllString(output, "")
Expand Down