-
Notifications
You must be signed in to change notification settings - Fork 373
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add flowType field for Flow Exporter #2000
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @dreamtalen for working on this.
Retreiving podCIDRs from nodes to maintain Pod Subnets is only possible solution. However, not sure if this is completely foolproof. Please see the comment.
Discussion with @antoninbas offline: One more option is to use nodeRouteController, which is part of AntreaController that takes the responsibility of install routes for antrea-gw on every node(route client). It has podCIDR info for all nodes.
Ok with nodeSpec solution, if it serves the purpose. Otherwise, nodeRouteController should be explored.
if i == subnet { | ||
return true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not use i.Contains(subnet) to skip the subnet? i is not a great variable name in this context IMO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, net.IPNet.Contain function can only reports whether the network includes ip. I added some logic to judge whether the network includes the new network.
|
||
func fetchPodSubnets(k8sClient kubernetes.Interface) ([]*net.IPNet, error) { | ||
podSubnets := []*net.IPNet{} | ||
nodeList, err := k8sClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As this is done as part of the initialization of Flow Exporter, can we be sure that all the nodes with Antrea agents are up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We discussed offline and current solution is if source/destination ip is not known we will do the query and update the podCIDRs first, then update the flow type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please integrate with the NodeRouteController, it already has this information
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the PR integrating with the NodeRouteController.
f94943c
to
45c400e
Compare
Codecov Report
@@ Coverage Diff @@
## main #2000 +/- ##
==========================================
+ Coverage 60.87% 62.03% +1.16%
==========================================
Files 268 269 +1
Lines 20236 20486 +250
==========================================
+ Hits 12319 12709 +390
+ Misses 6633 6460 -173
- Partials 1284 1317 +33
Flags with carried forward coverage won't be shown. Click here to find out more.
|
45c400e
to
8400dbc
Compare
if exp.nodeRouteController != nil { | ||
exp.podSubnets = exp.nodeRouteController.GetPodSubnetsFromAllNodes() | ||
klog.V(4).Infof("Updated Pod subnets: %v", exp.podSubnets) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have to do this every time we create the flow record?
@@ -551,3 +553,34 @@ func (exp *flowExporter) sendDataSet() (int, error) { | |||
klog.V(4).Infof("Data set sent successfully. Bytes sent: %d", sentBytes) | |||
return sentBytes, nil | |||
} | |||
|
|||
func (exp *flowExporter) findFlowType(record flowexporter.FlowRecord) uint8 { | |||
if record.Conn.Mark == openflow.ServiceCTMark || exp.ipInPodSubnets(record.Conn.TupleOrig.SourceAddress) && exp.ipInPodSubnets(record.Conn.TupleOrig.DestinationAddress) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this has to be modified.
Do you mean the following to represent Pod-To-Pod flows and Pod-To-Service flows?
exp.ipInPodSubnets(record.Conn.TupleOrig.SourceAddress) && (record.Conn.Mark == openflow.ServiceCTMark || exp.ipInPodSubnets(record.Conn.TupleOrig.DestinationAddress))
klog.V(4).Infof("Updated Pod subnets: %v", exp.podSubnets) | ||
} | ||
if !exp.ipInPodSubnets(record.Conn.TupleOrig.SourceAddress) { | ||
return ipfixregistry.FromExternal |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not support External-To-Pod flows (nodePort service) for now. They will be ignored in the connection flow dump. Prefer removing this to avoid confusion.
for _, subnet := range exp.podSubnets { | ||
if subnet.Contains(ip) { | ||
return true | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, we have fixed mask(same size) for all podCIDRs. Based on that we could optimize further.
Ok to keep this for now and consider the optimization later.
pkg/agent/flowexporter/utils.go
Outdated
@@ -27,3 +31,16 @@ func NewConnectionKey(conn *Connection) ConnectionKey { | |||
strconv.FormatUint(uint64(conn.TupleOrig.Protocol), 10), | |||
} | |||
} | |||
|
|||
func IsConnectionDying(conn *Connection) bool { | |||
// "TIME_WAIT" state indicates local endpoint has closed the connection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to add '.' at the end of the sentence.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -125,7 +125,7 @@ func (ct *connTrackOvsCtl) ovsAppctlDumpConnections(zoneFilter uint16) ([]*flowe | |||
|
|||
// flowStringToAntreaConnection parses the flow string and converts to Antrea connection. | |||
// Example of flow string: | |||
// "tcp,orig=(src=10.10.1.2,dst=10.10.1.3,sport=45170,dport=2379,packets=80743,bytes=5416239),reply=(src=10.10.1.3,dst=10.10.1.2,sport=2379,dport=45170,packets=63361,bytes=4811261),start=2020-07-24T05:07:01.591,id=462801621,mark=32,zone=65520,status=SEEN_REPLY|ASSURED|CONFIRMED|SRC_NAT_DONE|DST_NAT_DONE,timeout=86397" | |||
// "tcp,orig=(src=127.0.0.1,dst=127.0.0.1,sport=45218,dport=2379,packets=320108,bytes=24615344),reply=(src=127.0.0.1,dst=127.0.0.1,sport=2379,dport=45218,packets=239595,bytes=24347883),start=2020-07-24T05:07:03.998,id=3750535678,status=SEEN_REPLY|ASSURED|CONFIRMED|SRC_NAT_DONE|DST_NAT_DONE,timeout=86399,protoinfo=(state_orig=ESTABLISHED,state_reply=ESTABLISHED,wscale_orig=7,wscale_reply=7,flags_orig=WINDOW_SCALE|SACK_PERM|MAXACK_SET,flags_reply=WINDOW_SCALE|SACK_PERM|MAXACK_SET)" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not mean we must do it in this PR, but in general I feel such OVS level parsing should be put into pkg/ovs/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noted. We are planning to decouple conntrack polling and exporting process where we can move ovs specific parsing or handling to pkg/ovs and decouple conntrack part. #1278
@@ -155,6 +155,10 @@ func NetlinkFlowToAntreaConnection(conn *conntrack.Flow) *flowexporter.Connectio | |||
SourcePodName: "", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file probably worth a separate pkg under pkg/agent/util/conntrack. Again not saying we must do in this PR.
} else { | ||
// Update Pod subnets to distinguish Pod-To-External flows. | ||
if exp.nodeRouteController != nil { | ||
exp.podSubnets = exp.nodeRouteController.GetPodSubnetsFromAllNodes() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we call it for every flow? Then we should optimize here; probably by maintaining a single map in NodeRouteController and look up there, or even require a podCIDR parameter to add flow type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We call it if a flow record is not a Pod-To-Pod or Pod-To-Service record. Since new nodes may be added into the cluster anytime, current solution is that if we failed to judge current record is a Pod-To-Pod or Pod-To-Service record, we will fetch the latest PodCIDRs from NodeRouteController, then distinguish if it is a Pod-To-External flow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But when this happens - failed to judge current record is a Pod-To-Pod or Pod-To-Service record?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be brief, if the destination IP of a record is not in all PodSubnets we currently known && it's not a Pod-To-Service record.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it mean every flow to external will end up here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then we should optimize in my mind.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. @dreamtalen Please optimize this as discussed offline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, updated PR with following optimization:
Maintained a PodCIDRIPsMap in NodeRouteController to avoid looking up PodSubsets in flow exporter when send record.
Took advantage of the fixed mask size among PodCIDRs, using Map to accelerate the judgement of IP in PodSubnets.
069b8ed
to
ea9bdca
Compare
@@ -97,7 +103,9 @@ func NewNodeRouteController( | |||
nodeLister: nodeInformer.Lister(), | |||
nodeListerSynced: nodeInformer.Informer().HasSynced, | |||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "noderoute"), | |||
installedNodes: cache.NewIndexer(nodeRouteInfoKeyFunc, cache.Indexers{nodeRouteInfoPodCIDRIndexName: nodeRouteInfoPodCIDRIndexFunc})} | |||
installedNodes: cache.NewIndexer(nodeRouteInfoKeyFunc, cache.Indexers{nodeRouteInfoPodCIDRIndexName: nodeRouteInfoPodCIDRIndexFunc}), | |||
PodCIDRIPsMap: make(map[string]int), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
installedNodes (cache.indexer) already have this mapping functionality of searching using podCIDR through nodeRouteInfoPodCIDRIndexFunc. We do not need a separate map. Please take a look.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I updated code using nodeRouteInfoPodCIDRIndexFunc.
ea9bdca
to
645423b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall logic is LGTM. I just had a couple of comments.
Wondering if you got a chance to test this out manually to make sure Flow Aggregator exports Pod-To-External flows.
Not sure if adding an e2e test case for Pod-To-External flow is trivial. If its not trivial we probably can take it up in a different PR. At least we should test manually.
} | ||
} | ||
ipCIDRStr := ipCIDR.String() | ||
nodesHaveSamePodCIDR, _ := c.installedNodes.ByIndex(nodeRouteInfoPodCIDRIndexName, ipCIDRStr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nodeInCluster
is better name. I feel the context for nodesHaveSamePodCIDR
is different.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, addressed.
PodIPv4CIDRMaskSize = 24 | ||
IPv4BitLen = 32 | ||
PodIPv6CIDRMaskSize = 64 | ||
IPv6BitLen = 128 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These could be gathered runtime from local podCIDRs. Hardcoded values may work fine for now.. just want to confirm with @antoninbas
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's fine to assume fixed size CIDRs across the cluster for now (because this is how Node IPAM works), but it is not fine to assume that the size is 24 (for IPv4) or 64 (for IPv6). The cluster admin can easily change this by providing a different value with --node-cidr-mask-size
for kube-controller-manager.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, Antonin.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I add the logic of gathering PodIPv4/v6CIDRMaskSize from local podCIDRs, if it doesn't exist, using 24 (for IPv4) or 64 (for IPv6) as default value instead.
@@ -615,3 +621,30 @@ func GetNodeAddr(node *corev1.Node) (net.IP, error) { | |||
} | |||
return ipAddr, nil | |||
} | |||
|
|||
func (c *Controller) IPInPodSubnets(ip net.IP) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add a simple unit test for this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, added a unit test for IPInPodSubnets function.
For Pod-To-External flows, I have tested manually by watching the records on the ipfix-collector side.
645423b
to
e9d9b39
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding the unit test and sharing the result of the manual test. Please add a TODO in e2e tests to add a test that checks the functionality of Pod-To-External flow.
LGTM except for the corner case situation of network policy only mode.
klog.Warningf("Can't find flowType without nodeRouteController") | ||
return 0 | ||
} | ||
if exp.nodeRouteController.IPInPodSubnets(record.Conn.TupleOrig.SourceAddress) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will not work for network-policy only mode traffic as the installed node cache indexer is not maintained at all.
https://github.com/vmware-tanzu/antrea/blob/main/pkg/agent/controller/noderoute/node_route_controller.go#L278
I think we need to check the network subnet of local nodeAddr
in nodeConfig
to determine if the IP address is inside cluster or not. @jianjuns Could you please confirm if this check is correct or not for network policy only mode?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Srikar for pointing out it, added a TODO right now and wait for Jianjun's opinion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding these lines to address this.
My initial assessment on node network based identification is not correct and was mixed up with something else. There is no straightforward way to detect PodIPs at Antrea agent when operating in network policy only mode.
Therefore, we are supporting Pod-To-External flows for all traffic modes except for the special mode of network policy only mode. Pod-To-Pod flows and Pod-To-Service flows are supported in network policy only mode. There is no regression here.
} else { | ||
podIPv4CIDRMaskSize = defaultPodIPv4CIDRMaskSize | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you provide some clarity (and maybe add a comment) as to why this is needed? shouldn't we just return false
at this point?
same for the v6 case below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. I thought it might be ok since it is the default setting in Kube-control-manager.
However, thinking more about it it is better to not resolve the flow type and print a warning. This situation also arises for network policy only mode.
https://github.com/vmware-tanzu/antrea/blob/main/pkg/agent/config/node_config.go#L113
Any thoughts @dreamtalen ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree too, addressed.
test/e2e/flowaggregator_test.go
Outdated
if !strings.Contains(record, fmt.Sprintf("%s: %d", "flowType", flowType)) { | ||
t.Errorf("Record does not have correct flowType") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: there is an assert.Contains
assertion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, addressed.
3ecd2b0
to
6d359c2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except for one nit.
@@ -52,6 +52,9 @@ const ( | |||
ovsExternalIDNodeName = "node-name" | |||
|
|||
nodeRouteInfoPodCIDRIndexName = "podCIDR" | |||
|
|||
IPv4BitLen = 32 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there are constants for these lengths in net package: https://golang.org/pkg/net/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I only found bytes length constants, so I changed the code to:
IPv4BitLen = net.IPv4len * 8
IPv6BitLen = net.IPv6len * 8
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for checking.
Realized that this is available here: https://github.com/vmware-tanzu/antrea/blob/main/pkg/util/ip/ip.go#L26
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I capitalized these constant and updated the code.
klog.Warningf("Can't find flowType without nodeRouteController") | ||
return 0 | ||
} | ||
if exp.nodeRouteController.IPInPodSubnets(record.Conn.TupleOrig.SourceAddress) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding these lines to address this.
My initial assessment on node network based identification is not correct and was mixed up with something else. There is no straightforward way to detect PodIPs at Antrea agent when operating in network policy only mode.
Therefore, we are supporting Pod-To-External flows for all traffic modes except for the special mode of network policy only mode. Pod-To-Pod flows and Pod-To-Service flows are supported in network policy only mode. There is no regression here.
6d359c2
to
55b6c7e
Compare
In this PR, we implemented the logic of flowType value assignment. We distinguished Pod-To-Pod flows and Pod-To-External flows using the podCIDRs of all nodes in the k8s cluster.
55b6c7e
to
c52b20d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Please edit the description by adding Fixes #XXX to link the issue properly.
Thanks, done. |
var curNodeCIDRStr string | ||
if ip.To4() != nil { | ||
var podIPv4CIDRMaskSize int | ||
if c.nodeConfig.PodIPv4CIDR != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we tend to use early returns when possible to reduce indentation levels:
if c.nodeConfig.PodIPv4CIDR == nil {
return false
}
curNodeCIDRStr = c.nodeConfig.PodIPv4CIDR.String()
podIPv4CIDRMaskSize, _ := c.nodeConfig.PodIPv4CIDR.Mask.Size()
same below
this can be addressed in a future PR
func (exp *flowExporter) findFlowType(record flowexporter.FlowRecord) uint8 { | ||
// TODO: support Pod-To-External flows in network policy only mode. | ||
if exp.isNetworkPolicyOnly { | ||
if record.Conn.SourcePodName == "" || record.Conn.DestinationPodName == "" { | ||
return ipfixregistry.InterNode | ||
} | ||
return ipfixregistry.IntraNode | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as discussed with @srikartati offline, I feel like flow type resolution should belong in the Flow Aggregator
The Flow Aggregator could watch all Pods if needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @antoninbas. Yes, we have to explore this to support Pod-To-External flows in Nwtqork Policy only mode in the future.
// TODO: support Pod-To-External flows in network policy only mode. | ||
if exp.isNetworkPolicyOnly { | ||
if record.Conn.SourcePodName == "" || record.Conn.DestinationPodName == "" { | ||
return ipfixregistry.InterNode |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you probably should rename these to FlowTypeInterNode
, etc. in go-ipfix so there is more context in the name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As these were used in the context of adding flow types (the function is GetFlowType here), I thought a simple name might be sufficient. If it's confusing, we could change it.
@dreamtalen Could you please create an issue in go-ipfix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, thanks.
@@ -336,6 +341,11 @@ func checkBandwidthFromRecord(t *testing.T, record, bandwidth string) { | |||
} | |||
} | |||
|
|||
// TODO: Add a test that checks the functionality of Pod-To-External flow. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this TODO seems out-of-place on this function, maybe it should have been part of the commit message
/test-all |
/test-ipv6-only-e2e |
I see flow aggregator tests passing both on single stack v6 and dual stack clusters.. there was traceflow test failure in the previous run. Merging this |
Fixes #1925
In this PR, we implemented the logic of flowType value assignment.
We distinguished Pod-To-Pod flows and Pod-To-External flows using the podCIDRs of all nodes in the k8s cluster.