Skip to content

Commit

Permalink
Add single stack IPv6 support in Flow aggregator and ELK flow collect…
Browse files Browse the repository at this point in the history
…or (#1819)

In this commit, we add single stack IPv6 support in flow aggregator
 and elk flow collector.
  • Loading branch information
Yongming Ding authored Mar 19, 2021
1 parent 1d67f6f commit aa112c7
Show file tree
Hide file tree
Showing 13 changed files with 244 additions and 151 deletions.
15 changes: 12 additions & 3 deletions build/yamls/elk-flow-collector/elk-flow-collector.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ spec:
- name: bootstrap.memory_lock
value: "false"
- name: network.host
value: "0.0.0.0"
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: http.port
value: "9200"
- name: discovery.type
Expand Down Expand Up @@ -155,7 +157,9 @@ spec:
- name: action.destructive_requires_name
value: "true"
- name: SERVER_HOST
value: "0.0.0.0"
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: SERVER_PORT
value: "5601"
- name: ELASTICSEARCH_URL
Expand Down Expand Up @@ -201,6 +205,11 @@ spec:
containers:
- name: logstash
image: docker.elastic.co/logstash/logstash-oss:7.8.0
env:
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
volumeMounts:
- name: logstash-definition-volume
mountPath: /usr/share/logstash/definitions
Expand Down Expand Up @@ -231,4 +240,4 @@ spec:
name: logstash-configmap
items:
- key: logstash.conf
path: logstash.conf
path: logstash.conf
123 changes: 62 additions & 61 deletions build/yamls/elk-flow-collector/kibana.ndjson

Large diffs are not rendered by default.

12 changes: 11 additions & 1 deletion build/yamls/elk-flow-collector/logstash/filter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ def filter(event)
event.remove("[ipfix][protocolIdentifier]")
event.set("[ipfix][protocolIdentifier]", "UDP")
end
if event.get("[ipfix][destinationIPv6Address]").nil?
event.set("[ipfix][destinationIP]", event.get("[ipfix][destinationIPv4Address]"))
else
event.set("[ipfix][destinationIP]", event.get("[ipfix][destinationIPv6Address]"))
end
if event.get("[ipfix][sourceIPv6Address]").nil?
event.set("[ipfix][sourceIP]", event.get("[ipfix][sourceIPv4Address]"))
else
event.set("[ipfix][sourceIP]", event.get("[ipfix][sourceIPv6Address]"))
end
if event.get("[ipfix][sourcePodName]") != ""
if event.get("[ipfix][destinationServicePortName]") != ""
flowkey = ""
Expand Down Expand Up @@ -51,7 +61,7 @@ def filter(event)
flowkey << ":"
flowkey << event.get("[ipfix][sourceTransportPort]").to_s
flowkey << "->"
flowkey << event.get("[ipfix][destinationIPv4Address]")
flowkey << event.get("[ipfix][destinationIP]")
flowkey << ":"
flowkey << event.get("[ipfix][destinationTransportPort]").to_s
flowkey << " "
Expand Down
2 changes: 1 addition & 1 deletion build/yamls/elk-flow-collector/logstash/logstash.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

input {
udp {
host => "0.0.0.0"
host => "${POD_IP}"
port => "4739"
workers => "4"
queue_size => "2048"
Expand Down
4 changes: 2 additions & 2 deletions build/yamls/elk-flow-collector/logstash/logstash.yml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
http.host: "0.0.0.0"
path.config: /usr/share/logstash/pipeline
http.host: "${POD_IP}"
path.config: /usr/share/logstash/pipeline
4 changes: 4 additions & 0 deletions ci/jenkins/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ function deliver_antrea {
docker tag "${DOCKER_REGISTRY}/antrea/sonobuoy-systemd-logs:v0.3" "sonobuoy/systemd-logs:v0.3"
fi
DOCKER_REGISTRY=${DOCKER_REGISTRY} make
DOCKER_REGISTRY=${DOCKER_REGISTRY} make flow-aggregator-ubuntu

echo "====== Delivering Antrea to all the Nodes ======"
echo "=== Fill serviceCIDRv6 and serviceCIDR ==="
Expand All @@ -311,10 +312,13 @@ function deliver_antrea {

cp -f build/yamls/*.yml $WORKDIR
docker save -o antrea-ubuntu.tar projects.registry.vmware.com/antrea/antrea-ubuntu:latest
docker save -o flow-aggregator.tar projects.registry.vmware.com/antrea/flow-aggregator:latest

kubectl get nodes -o wide --no-headers=true | awk -v role="$CONTROL_PLANE_NODE_ROLE" '$3 != role {print $6}' | while read IP; do
rsync -avr --progress --inplace -e "ssh -o StrictHostKeyChecking=no" antrea-ubuntu.tar jenkins@[${IP}]:${WORKDIR}/antrea-ubuntu.tar
rsync -avr --progress --inplace -e "ssh -o StrictHostKeyChecking=no" flow-aggregator.tar jenkins@[${IP}]:${WORKDIR}/flow-aggregator.tar
ssh -o StrictHostKeyChecking=no -n jenkins@${IP} "docker images | grep 'antrea-ubuntu' | awk '{print \$3}' | xargs -r docker rmi ; docker load -i ${WORKDIR}/antrea-ubuntu.tar ; docker images | grep '<none>' | awk '{print \$3}' | xargs -r docker rmi" || true
ssh -o StrictHostKeyChecking=no -n jenkins@${IP} "docker images | grep 'flow-aggregator' | awk '{print \$3}' | xargs -r docker rmi ; docker load -i ${WORKDIR}/flow-aggregator.tar ; docker images | grep '<none>' | awk '{print \$3}' | xargs -r docker rmi" || true
if [[ "${DOCKER_REGISTRY}" != "" ]]; then
ssh -o StrictHostKeyChecking=no -n jenkins@${IP} "docker pull ${DOCKER_REGISTRY}/antrea/sonobuoy-systemd-logs:v0.3 ; docker tag ${DOCKER_REGISTRY}/antrea/sonobuoy-systemd-logs:v0.3 sonobuoy/systemd-logs:v0.3"
fi
Expand Down
74 changes: 55 additions & 19 deletions pkg/flowaggregator/flowaggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package flowaggregator

import (
"fmt"
"net"
"time"

"github.com/vmware/go-ipfix/pkg/collector"
Expand All @@ -30,7 +31,7 @@ import (
)

var (
ianaInfoElements = []string{
ianaInfoElementsCommon = []string{
"flowStartSeconds",
"flowEndSeconds",
"flowEndReason",
Expand All @@ -41,16 +42,16 @@ var (
"octetTotalCount",
"packetDeltaCount",
"octetDeltaCount",
"sourceIPv4Address",
"destinationIPv4Address",
}
ianaInfoElementsIPv4 = append(ianaInfoElementsCommon, []string{"sourceIPv4Address", "destinationIPv4Address"}...)
ianaInfoElementsIPv6 = append(ianaInfoElementsCommon, []string{"sourceIPv6Address", "destinationIPv6Address"}...)
ianaReverseInfoElements = []string{
"reversePacketTotalCount",
"reverseOctetTotalCount",
"reversePacketDeltaCount",
"reverseOctetDeltaCount",
}
antreaInfoElements = []string{
antreaInfoElementsCommon = []string{
"sourcePodName",
"sourcePodNamespace",
"sourceNodeName",
Expand All @@ -63,12 +64,14 @@ var (
"ingressNetworkPolicyNamespace",
"egressNetworkPolicyName",
"egressNetworkPolicyNamespace",
"destinationClusterIPv4",
}
aggregatorElements = []string{
"originalExporterIPv4Address",
antreaInfoElementsIPv4 = append(antreaInfoElementsCommon, []string{"destinationClusterIPv4"}...)
antreaInfoElementsIPv6 = append(antreaInfoElementsCommon, []string{"destinationClusterIPv6"}...)
aggregatorElementsCommon = []string{
"originalObservationDomainId",
}
aggregatorElementsIPv4 = append([]string{"originalExporterIPv4Address"}, aggregatorElementsCommon...)
aggregatorElementsIPv6 = append([]string{"originalExporterIPv6Address"}, aggregatorElementsCommon...)

nonStatsElementList = []string{
"flowEndSeconds",
Expand Down Expand Up @@ -118,6 +121,7 @@ var (
"destinationPodNamespace",
"destinationNodeName",
"destinationClusterIPv4",
"destinationClusterIPv6",
"destinationServicePort",
"destinationServicePortName",
"ingressNetworkPolicyName",
Expand Down Expand Up @@ -150,7 +154,8 @@ type flowAggregator struct {
aggregationProcess ipfix.IPFIXAggregationProcess
exportInterval time.Duration
exportingProcess ipfix.IPFIXExportingProcess
templateID uint16
templateIDv4 uint16
templateIDv6 uint16
registry ipfix.IPFIXRegistry
set ipfix.IPFIXSet
flowAggregatorAddress string
Expand Down Expand Up @@ -178,6 +183,7 @@ func NewFlowAggregator(
exportInterval,
nil,
0,
0,
registry,
ipfix.NewSet(false),
flowAggregatorAddress,
Expand Down Expand Up @@ -281,20 +287,36 @@ func (fa *flowAggregator) initExportingProcess() error {
return fmt.Errorf("got error when initializing IPFIX exporting process: %v", err)
}
fa.exportingProcess = ep
fa.templateID = fa.exportingProcess.NewTemplateID()
if err := fa.set.PrepareSet(ipfixentities.Template, fa.templateID); err != nil {
return fmt.Errorf("error when preparing set: %v", err)
// Currently, we send two templates for IPv4 and IPv6 regardless of the IP families supported by cluster
fa.templateIDv4 = fa.exportingProcess.NewTemplateID()
if err := fa.set.PrepareSet(ipfixentities.Template, fa.templateIDv4); err != nil {
return fmt.Errorf("error when preparing IPv4 template set: %v", err)
}
bytesSent, err := fa.sendTemplateSet(fa.set, false)
if err != nil {
fa.exportingProcess.CloseConnToCollector()
fa.exportingProcess = nil
fa.templateIDv4 = 0
fa.set.ResetSet()
return fmt.Errorf("sending IPv4 template set failed, err: %v", err)
}
klog.V(2).Infof("Initialized exporting process and sent %d bytes size of IPv4 template set", bytesSent)

bytesSent, err := fa.sendTemplateSet(fa.set)
fa.set.ResetSet()
fa.templateIDv6 = fa.exportingProcess.NewTemplateID()
if err := fa.set.PrepareSet(ipfixentities.Template, fa.templateIDv6); err != nil {
return fmt.Errorf("error when preparing IPv6 template set: %v", err)
}
bytesSent, err = fa.sendTemplateSet(fa.set, true)
if err != nil {
fa.exportingProcess.CloseConnToCollector()
fa.exportingProcess = nil
fa.templateID = 0
fa.templateIDv6 = 0
fa.set.ResetSet()
return fmt.Errorf("sending template set failed, err: %v", err)
return fmt.Errorf("sending IPv6 template set failed, err: %v", err)
}
klog.V(2).Infof("Initialized exporting process and sent %d bytes size of template set", bytesSent)
klog.V(2).Infof("Initialized exporting process and sent %d bytes size of IPv6 template set", bytesSent)

return nil
}

Expand Down Expand Up @@ -339,12 +361,16 @@ func (fa *flowAggregator) sendFlowKeyRecord(key ipfixintermediate.FlowKey, recor
klog.V(4).Info("Skip sending record that is not correlated.")
return nil
}
templateID := fa.templateIDv4
if net.ParseIP(key.SourceAddress).To4() == nil || net.ParseIP(key.DestinationAddress).To4() == nil {
templateID = fa.templateIDv6
}
// TODO: more records per data set will be supported when go-ipfix supports size check when adding records
fa.set.ResetSet()
if err := fa.set.PrepareSet(ipfixentities.Data, fa.templateID); err != nil {
if err := fa.set.PrepareSet(ipfixentities.Data, templateID); err != nil {
return fmt.Errorf("error when preparing set: %v", err)
}
err := fa.set.AddRecord(record.Record.GetOrderedElementList(), fa.templateID)
err := fa.set.AddRecord(record.Record.GetOrderedElementList(), templateID)
if err != nil {
return fmt.Errorf("error when adding the record to the set: %v", err)
}
Expand All @@ -356,8 +382,18 @@ func (fa *flowAggregator) sendFlowKeyRecord(key ipfixintermediate.FlowKey, recor
return nil
}

func (fa *flowAggregator) sendTemplateSet(templateSet ipfix.IPFIXSet) (int, error) {
func (fa *flowAggregator) sendTemplateSet(templateSet ipfix.IPFIXSet, isIPv6 bool) (int, error) {
elements := make([]*ipfixentities.InfoElementWithValue, 0)
ianaInfoElements := ianaInfoElementsIPv4
antreaInfoElements := antreaInfoElementsIPv4
aggregatorElements := aggregatorElementsIPv4
templateID := fa.templateIDv4
if isIPv6 {
ianaInfoElements = ianaInfoElementsIPv6
antreaInfoElements = antreaInfoElementsIPv6
aggregatorElements = aggregatorElementsIPv6
templateID = fa.templateIDv6
}
for _, ie := range ianaInfoElements {
element, err := fa.registry.GetInfoElement(ie, ipfixregistry.IANAEnterpriseID)
if err != nil {
Expand Down Expand Up @@ -406,7 +442,7 @@ func (fa *flowAggregator) sendTemplateSet(templateSet ipfix.IPFIXSet) (int, erro
ie := ipfixentities.NewInfoElementWithValue(element, nil)
elements = append(elements, ie)
}
err := templateSet.AddRecord(elements, fa.templateID)
err := templateSet.AddRecord(elements, templateID)
if err != nil {
return 0, fmt.Errorf("error when adding record to set, error: %v", err)
}
Expand Down
83 changes: 48 additions & 35 deletions pkg/flowaggregator/flowaggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import (
)

const (
testTemplateID = uint16(256)
testTemplateIDv4 = uint16(256)
testTemplateIDv6 = uint16(257)
testExportInterval = 60 * time.Second
testObservationDomainID = 0xabcd
)
Expand All @@ -52,47 +53,59 @@ func TestFlowAggregator_sendTemplateSet(t *testing.T) {
nil,
testExportInterval,
mockIPFIXExpProc,
testTemplateID,
testTemplateIDv4,
testTemplateIDv6,
mockIPFIXRegistry,
ipfixtest.NewMockIPFIXSet(ctrl),
"",
nil,
testObservationDomainID,
}

// Following consists of all elements that are in ianaInfoElements and antreaInfoElements (globals)
// Only the element name is needed, other arguments have dummy values.
elemList := make([]*ipfixentities.InfoElementWithValue, 0)
for i, ie := range ianaInfoElements {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.IANAEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAEnterpriseID).Return(elemList[i].Element, nil)
}
for i, ie := range ianaReverseInfoElements {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.IANAReversedEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAReversedEnterpriseID).Return(elemList[i+len(ianaInfoElements)].Element, nil)
}
for i, ie := range antreaInfoElements {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.AntreaEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)].Element, nil)
}
for i, ie := range aggregatorElements {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.IANAEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)+len(antreaInfoElements)].Element, nil)
}
for i, ie := range antreaSourceStatsElementList {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.AntreaEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)+len(antreaInfoElements)+len(aggregatorElements)].Element, nil)
}
for _, isIPv6 := range []bool{false, true} {
ianaInfoElements := ianaInfoElementsIPv4
antreaInfoElements := antreaInfoElementsIPv4
aggregatorElements := aggregatorElementsIPv4
testTemplateID := fa.templateIDv4
if isIPv6 {
ianaInfoElements = ianaInfoElementsIPv6
antreaInfoElements = antreaInfoElementsIPv6
aggregatorElements = aggregatorElementsIPv6
testTemplateID = fa.templateIDv6
}
// Following consists of all elements that are in ianaInfoElements and antreaInfoElements (globals)
// Only the element name is needed, other arguments have dummy values.
elemList := make([]*ipfixentities.InfoElementWithValue, 0)
for i, ie := range ianaInfoElements {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.IANAEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAEnterpriseID).Return(elemList[i].Element, nil)
}
for i, ie := range ianaReverseInfoElements {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.IANAReversedEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAReversedEnterpriseID).Return(elemList[i+len(ianaInfoElements)].Element, nil)
}
for i, ie := range antreaInfoElements {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.AntreaEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)].Element, nil)
}
for i, ie := range aggregatorElements {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.IANAEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)+len(antreaInfoElements)].Element, nil)
}
for i, ie := range antreaSourceStatsElementList {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.AntreaEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)+len(antreaInfoElements)+len(aggregatorElements)].Element, nil)
}
for i, ie := range antreaDestinationStatsElementList {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.AntreaEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)+len(antreaInfoElements)+len(aggregatorElements)+len(antreaSourceStatsElementList)].Element, nil)
}
mockTempSet.EXPECT().AddRecord(elemList, testTemplateID).Return(nil)
// Passing 0 for sentBytes as it is not used anywhere in the test. If this not a call to mock, the actual sentBytes
// above elements: ianaInfoElements, ianaReverseInfoElements and antreaInfoElements.
mockIPFIXExpProc.EXPECT().SendSet(mockTempSet).Return(0, nil)

for i, ie := range antreaDestinationStatsElementList {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.AntreaEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)+len(antreaInfoElements)+len(aggregatorElements)+len(antreaSourceStatsElementList)].Element, nil)
_, err := fa.sendTemplateSet(mockTempSet, isIPv6)
assert.NoErrorf(t, err, "Error in sending template record: %v, isIPv6: %v", err, isIPv6)
}
mockTempSet.EXPECT().AddRecord(elemList, testTemplateID).Return(nil)
// Passing 0 for sentBytes as it is not used anywhere in the test. If this not a call to mock, the actual sentBytes
// above elements: ianaInfoElements, ianaReverseInfoElements and antreaInfoElements.
mockIPFIXExpProc.EXPECT().SendSet(mockTempSet).Return(0, nil)

_, err := fa.sendTemplateSet(mockTempSet)
assert.NoErrorf(t, err, "Error in sending template record: %v", err)
}
2 changes: 1 addition & 1 deletion pkg/util/flowexport/flowexport.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func ParseFlowCollectorAddr(addr string, defaultPort string, defaultProtocol str
}
if match {
idx := strings.Index(addr, "]")
strSlice = append(strSlice, addr[:idx+1])
strSlice = append(strSlice, addr[1:idx])
strSlice = append(strSlice, strings.Split(addr[idx+2:], ":")...)
} else {
strSlice = strings.Split(addr, ":")
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/flowexport/flowexport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestParseFlowCollectorAddr(t *testing.T) {
},
{
addr: "[fe80:ffff:ffff:ffff:ffff:ffff:ffff:ffff]:80:tcp",
expectedHost: "[fe80:ffff:ffff:ffff:ffff:ffff:ffff:ffff]",
expectedHost: "fe80:ffff:ffff:ffff:ffff:ffff:ffff:ffff",
expectedPort: "80",
expectedProto: "tcp",
expectedError: nil,
Expand Down
Loading

0 comments on commit aa112c7

Please sign in to comment.