Skip to content

Commit

Permalink
Flow aggregator IPv6 support
Browse files Browse the repository at this point in the history
Add IPv6 support in flow aggregator implementation.
  • Loading branch information
Yongming Ding committed Feb 5, 2021
1 parent 8555d62 commit 1c8979d
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 87 deletions.
10 changes: 2 additions & 8 deletions pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,14 +180,8 @@ func (exp *flowExporter) initFlowExporter(collectorAddr string, collectorProto s
if err != nil {
return err
}
// Currently, supporting only IPv4 for Flow Aggregator.
ip := hostIPs[0].To4()
if ip != nil {
// Update the collector address with resolved IP of flow aggregator
collectorAddr = net.JoinHostPort(ip.String(), defaultIPFIXPort)
} else {
return fmt.Errorf("resolved Flow Aggregator address %v is not supported", hostIPs[0])
}
// Update the collector address with resolved IP of flow aggregator
collectorAddr = net.JoinHostPort(hostIPs[0].String(), defaultIPFIXPort)
}

// TODO: This code can be further simplified by changing the go-ipfix API to accept
Expand Down
65 changes: 48 additions & 17 deletions pkg/flowaggregator/flowaggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
)

var (
ianaInfoElements = []string{
ianaInfoElementsCommon = []string{
"flowStartSeconds",
"flowEndSeconds",
"sourceTransportPort",
Expand All @@ -41,16 +41,16 @@ var (
"octetTotalCount",
"packetDeltaCount",
"octetDeltaCount",
"sourceIPv4Address",
"destinationIPv4Address",
}
ianaInfoElementsIPv4 = append(ianaInfoElementsCommon, []string{"sourceIPv4Address", "destinationIPv4Address"}...)
ianaInfoElementsIPv6 = append(ianaInfoElementsCommon, []string{"sourceIPv6Address", "destinationIPv6Address"}...)
ianaReverseInfoElements = []string{
"reversePacketTotalCount",
"reverseOctetTotalCount",
"reversePacketDeltaCount",
"reverseOctetDeltaCount",
}
antreaInfoElements = []string{
antreaInfoElementsCommon = []string{
"sourcePodName",
"sourcePodNamespace",
"sourceNodeName",
Expand All @@ -63,12 +63,14 @@ var (
"ingressNetworkPolicyNamespace",
"egressNetworkPolicyName",
"egressNetworkPolicyNamespace",
"destinationClusterIPv4",
}
aggregatorElements = []string{
"originalExporterIPv4Address",
antreaInfoElementsIPv4 = append(antreaInfoElementsCommon, []string{"destinationClusterIPv4"}...)
antreaInfoElementsIPv6 = append(antreaInfoElementsCommon, []string{"destinationClusterIPv6"}...)
aggregatorElementsCommon = []string{
"originalObservationDomainId",
}
aggregatorElementsIPv4 = append([]string{"originalExporterIPv4Address"}, aggregatorElementsCommon...)
aggregatorElementsIPv6 = append([]string{"originalExporterIPv6Address"}, aggregatorElementsCommon...)

nonStatsElementList = []string{
"flowEndSeconds",
Expand Down Expand Up @@ -118,6 +120,7 @@ var (
"destinationPodNamespace",
"destinationNodeName",
"destinationClusterIPv4",
"destinationClusterIPv6",
"destinationServicePort",
"destinationServicePortName",
"ingressNetworkPolicyName",
Expand Down Expand Up @@ -147,7 +150,8 @@ type flowAggregator struct {
aggregationProcess ipfix.IPFIXAggregationProcess
exportInterval time.Duration
exportingProcess ipfix.IPFIXExportingProcess
templateID uint16
templateIDv4 uint16
templateIDv6 uint16
registry ipfix.IPFIXRegistry
}

Expand All @@ -163,6 +167,7 @@ func NewFlowAggregator(externalFlowCollectorAddr string, externalFlowCollectorPr
exportInterval,
nil,
0,
0,
registry,
}
return fa
Expand Down Expand Up @@ -252,17 +257,29 @@ func (fa *flowAggregator) initExportingProcess() error {
return fmt.Errorf("got error when initializing IPFIX exporting process: %v", err)
}
fa.exportingProcess = ep
fa.templateID = fa.exportingProcess.NewTemplateID()
templateSet := ipfix.NewSet(ipfixentities.Template, fa.templateID, false)

bytesSent, err := fa.sendTemplateSet(templateSet)
fa.templateIDv4 = fa.exportingProcess.NewTemplateID()
templateSet := ipfix.NewSet(ipfixentities.Template, fa.templateIDv4, false)
bytesSent, err := fa.sendTemplateSet(templateSet, false)
if err != nil {
fa.exportingProcess.CloseConnToCollector()
fa.exportingProcess = nil
fa.templateID = 0
fa.templateIDv4 = 0
return fmt.Errorf("sending template set failed, err: %v", err)
}
klog.V(2).Infof("Initialized exporting process and sent %d bytes size of template set", bytesSent)
klog.V(2).Infof("Initialized exporting process and sent %d bytes size of IPv4 template set", bytesSent)

fa.templateIDv6 = fa.exportingProcess.NewTemplateID()
templateSet = ipfix.NewSet(ipfixentities.Template, fa.templateIDv6, false)
bytesSent, err = fa.sendTemplateSet(templateSet, true)
if err != nil {
fa.exportingProcess.CloseConnToCollector()
fa.exportingProcess = nil
fa.templateIDv6 = 0
return fmt.Errorf("sending template set failed, err: %v", err)
}
klog.V(2).Infof("Initialized exporting process and sent %d bytes size of IPv6 template set", bytesSent)

return nil
}

Expand Down Expand Up @@ -307,9 +324,13 @@ func (fa *flowAggregator) sendFlowKeyRecord(key ipfixintermediate.FlowKey, recor
klog.V(4).Info("Skip sending record that is not correlated.")
return nil
}
templateID := fa.templateIDv4
if net.ParseIP(key.SourceAddress).To4() == nil || net.ParseIP(key.DestinationAddress).To4() == nil {
templateID = fa.templateIDv6
}
// TODO: more records per data set will be supported when go-ipfix supports size check when adding records
dataSet := ipfix.NewSet(ipfixentities.Data, fa.templateID, false)
err := dataSet.AddRecord(record.Record.GetOrderedElementList(), fa.templateID)
dataSet := ipfix.NewSet(ipfixentities.Data, templateID, false)
err := dataSet.AddRecord(record.Record.GetOrderedElementList(), templateID)
if err != nil {
return fmt.Errorf("error when adding the record to the set: %v", err)
}
Expand All @@ -321,8 +342,18 @@ func (fa *flowAggregator) sendFlowKeyRecord(key ipfixintermediate.FlowKey, recor
return nil
}

func (fa *flowAggregator) sendTemplateSet(templateSet ipfix.IPFIXSet) (int, error) {
func (fa *flowAggregator) sendTemplateSet(templateSet ipfix.IPFIXSet, isIPv6 bool) (int, error) {
elements := make([]*ipfixentities.InfoElementWithValue, 0)
ianaInfoElements := ianaInfoElementsIPv4
antreaInfoElements := antreaInfoElementsIPv4
aggregatorElements := aggregatorElementsIPv4
templateID := fa.templateIDv4
if isIPv6 {
ianaInfoElements = ianaInfoElementsIPv6
antreaInfoElements = antreaInfoElementsIPv6
aggregatorElements = aggregatorElementsIPv6
templateID = fa.templateIDv6
}
for _, ie := range ianaInfoElements {
element, err := fa.registry.GetInfoElement(ie, ipfixregistry.IANAEnterpriseID)
if err != nil {
Expand Down Expand Up @@ -371,7 +402,7 @@ func (fa *flowAggregator) sendTemplateSet(templateSet ipfix.IPFIXSet) (int, erro
ie := ipfixentities.NewInfoElementWithValue(element, nil)
elements = append(elements, ie)
}
err := templateSet.AddRecord(elements, fa.templateID)
err := templateSet.AddRecord(elements, templateID)
if err != nil {
return 0, fmt.Errorf("error when adding record to set, error: %v", err)
}
Expand Down
88 changes: 51 additions & 37 deletions pkg/flowaggregator/flowaggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import (
)

const (
testTemplateID = uint16(256)
testTemplateIDv4 = uint16(256)
testTemplateIDv6 = uint16(257)
testExportInterval = 60 * time.Second
)

Expand All @@ -51,46 +52,59 @@ func TestFlowAggregator_sendTemplateSet(t *testing.T) {
nil,
testExportInterval,
mockIPFIXExpProc,
testTemplateID,
testTemplateIDv4,
testTemplateIDv6,
mockIPFIXRegistry,
}

// Following consists of all elements that are in ianaInfoElements and antreaInfoElements (globals)
// Only the element name is needed, other arguments have dummy values.
elemList := make([]*ipfixentities.InfoElementWithValue, 0)
for i, ie := range ianaInfoElements {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.IANAEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAEnterpriseID).Return(elemList[i].Element, nil)
}
for i, ie := range ianaReverseInfoElements {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.IANAReversedEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAReversedEnterpriseID).Return(elemList[i+len(ianaInfoElements)].Element, nil)
}
for i, ie := range antreaInfoElements {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.AntreaEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)].Element, nil)
}
for i, ie := range aggregatorElements {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.IANAEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)+len(antreaInfoElements)].Element, nil)
}
for i, ie := range antreaSourceStatsElementList {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.AntreaEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)+len(antreaInfoElements)+len(aggregatorElements)].Element, nil)
}
for _, isIPv6 := range []bool{false, true} {
ianaInfoElements := ianaInfoElementsIPv4
antreaInfoElements := antreaInfoElementsIPv4
aggregatorElements := aggregatorElementsIPv4
testTemplateID := fa.templateIDv4
if isIPv6 {
ianaInfoElements = ianaInfoElementsIPv6
antreaInfoElements = antreaInfoElementsIPv6
aggregatorElements = aggregatorElementsIPv6
testTemplateID = fa.templateIDv6
}
// Following consists of all elements that are in ianaInfoElements and antreaInfoElements (globals)
// Only the element name is needed, other arguments have dummy values.
elemList := make([]*ipfixentities.InfoElementWithValue, 0)
for i, ie := range ianaInfoElements {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.IANAEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAEnterpriseID).Return(elemList[i].Element, nil)
}
for i, ie := range ianaReverseInfoElements {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.IANAReversedEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAReversedEnterpriseID).Return(elemList[i+len(ianaInfoElements)].Element, nil)
}
for i, ie := range antreaInfoElements {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.AntreaEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)].Element, nil)
}
for i, ie := range aggregatorElements {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.IANAEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)+len(antreaInfoElements)].Element, nil)
}
for i, ie := range antreaSourceStatsElementList {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.AntreaEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)+len(antreaInfoElements)+len(aggregatorElements)].Element, nil)
}

for i, ie := range antreaDestinationStatsElementList {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.AntreaEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)+len(antreaInfoElements)+len(aggregatorElements)+len(antreaSourceStatsElementList)].Element, nil)
}
var tempSet ipfixentities.Set
mockTempSet.EXPECT().AddRecord(elemList, testTemplateID).Return(nil)
mockTempSet.EXPECT().GetSet().Return(tempSet)
for i, ie := range antreaDestinationStatsElementList {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.AntreaEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)+len(antreaInfoElements)+len(aggregatorElements)+len(antreaSourceStatsElementList)].Element, nil)
}
var tempSet ipfixentities.Set
mockTempSet.EXPECT().AddRecord(elemList, testTemplateID).Return(nil)
mockTempSet.EXPECT().GetSet().Return(tempSet)

// Passing 0 for sentBytes as it is not used anywhere in the test. If this not a call to mock, the actual sentBytes
// above elements: ianaInfoElements, ianaReverseInfoElements and antreaInfoElements.
mockIPFIXExpProc.EXPECT().SendSet(tempSet).Return(0, nil)
// Passing 0 for sentBytes as it is not used anywhere in the test. If this not a call to mock, the actual sentBytes
// above elements: ianaInfoElements, ianaReverseInfoElements and antreaInfoElements.
mockIPFIXExpProc.EXPECT().SendSet(tempSet).Return(0, nil)

_, err := fa.sendTemplateSet(mockTempSet)
assert.NoErrorf(t, err, "Error in sending template record: %v", err)
_, err := fa.sendTemplateSet(mockTempSet, isIPv6)
assert.NoErrorf(t, err, "Error in sending template record: %v, isIPv6: %v", err, isIPv6)
}
}
2 changes: 1 addition & 1 deletion pkg/util/flowexport/flowexport.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func ParseFlowCollectorAddr(addr string, defaultPort string, defaultProtocol str
}
if match {
idx := strings.Index(addr, "]")
strSlice = append(strSlice, addr[:idx+1])
strSlice = append(strSlice, addr[1:idx])
strSlice = append(strSlice, strings.Split(addr[idx+2:], ":")...)
} else {
strSlice = strings.Split(addr, ":")
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/flowexport/flowexport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestParseFlowCollectorAddr(t *testing.T) {
},
{
addr: "[fe80:ffff:ffff:ffff:ffff:ffff:ffff:ffff]:80:tcp",
expectedHost: "[fe80:ffff:ffff:ffff:ffff:ffff:ffff:ffff]",
expectedHost: "fe80:ffff:ffff:ffff:ffff:ffff:ffff:ffff",
expectedPort: "80",
expectedProto: "tcp",
expectedError: nil,
Expand Down
39 changes: 20 additions & 19 deletions test/e2e/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,20 +128,16 @@ func setupTest(tb testing.TB) (*TestData, error) {
return testData, nil
}

func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, error, bool) {
// TODO: remove hardcoding to IPv4 after flow aggregator supports IPv6
// Also use setupTest.
isIPv6 := false
if err := testData.setupLogDirectoryForTest(tb.Name()); err != nil {
tb.Errorf("Error creating logs directory '%s': %v", testData.logsDirForTestCase, err)
return nil, err, isIPv6
}
tb.Logf("Creating '%s' K8s Namespace", testNamespace)
if err := ensureAntreaRunning(tb, testData); err != nil {
return nil, err, isIPv6
func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, bool, error) {
var isIPv6 bool
if clusterInfo.podV6NetworkCIDR == "" {
isIPv6 = false
} else {
isIPv6 = true
}
if err := testData.createTestNamespace(); err != nil {
return nil, err, isIPv6
testData, err := setupTest(tb)
if err != nil {
return testData, isIPv6, err
}

// Create pod using ipfix collector image
Expand All @@ -151,14 +147,19 @@ func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, error, bool) {
ipfixCollectorIP, err := testData.podWaitForIPs(defaultTimeout, "ipfix-collector", testNamespace)
if err != nil || len(ipfixCollectorIP.ipStrings) == 0 {
tb.Errorf("Error when waiting to get ipfix collector Pod IP: %v", err)
return nil, err, isIPv6
return nil, isIPv6, err
}
var ipStr string
if isIPv6 {
ipStr = ipfixCollectorIP.ipv6.String()
} else {
ipStr = ipfixCollectorIP.ipv4.String()
}
ipStr := ipfixCollectorIP.ipv4.String()
ipfixCollectorAddr := fmt.Sprintf("%s:%s:tcp", ipStr, ipfixCollectorPort)
tb.Logf("Applying flow aggregator YAML with ipfix collector address: %s", ipfixCollectorAddr)
faClusterIP, err := testData.deployFlowAggregator(ipfixCollectorAddr)
if err != nil {
return testData, err, isIPv6
return testData, isIPv6, err
}

faClusterIPAddr := ""
Expand All @@ -169,14 +170,14 @@ func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, error, bool) {
}
tb.Logf("Deploying flow exporter with collector address: %s", faClusterIPAddr)
if err = testData.deployAntreaFlowExporter(faClusterIPAddr); err != nil {
return testData, err, isIPv6
return testData, isIPv6, err
}

tb.Logf("Checking CoreDNS deployment")
if err = testData.checkCoreDNSPods(defaultTimeout); err != nil {
return testData, err, isIPv6
return testData, isIPv6, err
}
return testData, nil, isIPv6
return testData, isIPv6, nil
}

func exportLogs(tb testing.TB, data *TestData, logsSubDir string, writeNodeLogs bool) {
Expand Down
5 changes: 1 addition & 4 deletions test/e2e/flowaggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,7 @@ const (
)

func TestFlowAggregator(t *testing.T) {
// TODO: remove this limitation after flow aggregator supports IPv6
skipIfIPv6Cluster(t)
skipIfNotIPv4Cluster(t)
data, err, isIPv6 := setupTestWithIPFIXCollector(t)
data, isIPv6, err := setupTestWithIPFIXCollector(t)
if err != nil {
t.Fatalf("Error when setting up test: %v", err)
}
Expand Down

0 comments on commit 1c8979d

Please sign in to comment.