Skip to content

Commit

Permalink
Add single stack IPv6 support in Flow aggregator and ELK flow collector
Browse files Browse the repository at this point in the history
In this commit, we add single stack IPv6 support in flow aggregator
 and elk flow collector.
For dual stack, there are some issues in service name retreival at
flow exporter, will add support in a new PR.
  • Loading branch information
Yongming Ding committed Mar 17, 2021
1 parent a9adf5a commit 338e110
Show file tree
Hide file tree
Showing 13 changed files with 246 additions and 155 deletions.
15 changes: 12 additions & 3 deletions build/yamls/elk-flow-collector/elk-flow-collector.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ spec:
- name: bootstrap.memory_lock
value: "false"
- name: network.host
value: "0.0.0.0"
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: http.port
value: "9200"
- name: discovery.type
Expand Down Expand Up @@ -155,7 +157,9 @@ spec:
- name: action.destructive_requires_name
value: "true"
- name: SERVER_HOST
value: "0.0.0.0"
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: SERVER_PORT
value: "5601"
- name: ELASTICSEARCH_URL
Expand Down Expand Up @@ -201,6 +205,11 @@ spec:
containers:
- name: logstash
image: docker.elastic.co/logstash/logstash-oss:7.8.0
env:
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
volumeMounts:
- name: logstash-definition-volume
mountPath: /usr/share/logstash/definitions
Expand Down Expand Up @@ -231,4 +240,4 @@ spec:
name: logstash-configmap
items:
- key: logstash.conf
path: logstash.conf
path: logstash.conf
123 changes: 62 additions & 61 deletions build/yamls/elk-flow-collector/kibana.ndjson

Large diffs are not rendered by default.

12 changes: 11 additions & 1 deletion build/yamls/elk-flow-collector/logstash/filter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ def filter(event)
event.remove("[ipfix][protocolIdentifier]")
event.set("[ipfix][protocolIdentifier]", "UDP")
end
if event.get("[ipfix][destinationIPv6Address]").nil?
event.set("[ipfix][destinationIP]", event.get("[ipfix][destinationIPv4Address]"))
else
event.set("[ipfix][destinationIP]", event.get("[ipfix][destinationIPv6Address]"))
end
if event.get("[ipfix][sourceIPv6Address]").nil?
event.set("[ipfix][sourceIP]", event.get("[ipfix][sourceIPv4Address]"))
else
event.set("[ipfix][sourceIP]", event.get("[ipfix][sourceIPv6Address]"))
end
if event.get("[ipfix][sourcePodName]") != ""
if event.get("[ipfix][destinationServicePortName]") != ""
flowkey = ""
Expand Down Expand Up @@ -51,7 +61,7 @@ def filter(event)
flowkey << ":"
flowkey << event.get("[ipfix][sourceTransportPort]").to_s
flowkey << "->"
flowkey << event.get("[ipfix][destinationIPv4Address]")
flowkey << event.get("[ipfix][destinationIP]")
flowkey << ":"
flowkey << event.get("[ipfix][destinationTransportPort]").to_s
flowkey << " "
Expand Down
2 changes: 1 addition & 1 deletion build/yamls/elk-flow-collector/logstash/logstash.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

input {
udp {
host => "0.0.0.0"
host => "${POD_IP}"
port => "4739"
workers => "4"
queue_size => "2048"
Expand Down
4 changes: 2 additions & 2 deletions build/yamls/elk-flow-collector/logstash/logstash.yml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
http.host: "0.0.0.0"
path.config: /usr/share/logstash/pipeline
http.host: "${POD_IP}"
path.config: /usr/share/logstash/pipeline
4 changes: 4 additions & 0 deletions ci/jenkins/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ function deliver_antrea {
docker tag "${DOCKER_REGISTRY}/antrea/sonobuoy-systemd-logs:v0.3" "sonobuoy/systemd-logs:v0.3"
fi
DOCKER_REGISTRY=${DOCKER_REGISTRY} make
DOCKER_REGISTRY=${DOCKER_REGISTRY} make flow-aggregator-ubuntu

echo "====== Delivering Antrea to all the Nodes ======"
echo "=== Fill serviceCIDRv6 and serviceCIDR ==="
Expand All @@ -311,10 +312,13 @@ function deliver_antrea {

cp -f build/yamls/*.yml $WORKDIR
docker save -o antrea-ubuntu.tar projects.registry.vmware.com/antrea/antrea-ubuntu:latest
docker save -o flow-aggregator.tar projects.registry.vmware.com/antrea/flow-aggregator:latest

kubectl get nodes -o wide --no-headers=true | awk -v role="$CONTROL_PLANE_NODE_ROLE" '$3 != role {print $6}' | while read IP; do
rsync -avr --progress --inplace -e "ssh -o StrictHostKeyChecking=no" antrea-ubuntu.tar jenkins@[${IP}]:${WORKDIR}/antrea-ubuntu.tar
rsync -avr --progress --inplace -e "ssh -o StrictHostKeyChecking=no" flow-aggregator.tar jenkins@[${IP}]:${WORKDIR}/flow-aggregator.tar
ssh -o StrictHostKeyChecking=no -n jenkins@${IP} "docker images | grep 'antrea-ubuntu' | awk '{print \$3}' | xargs -r docker rmi ; docker load -i ${WORKDIR}/antrea-ubuntu.tar ; docker images | grep '<none>' | awk '{print \$3}' | xargs -r docker rmi" || true
ssh -o StrictHostKeyChecking=no -n jenkins@${IP} "docker images | grep 'flow-aggregator' | awk '{print \$3}' | xargs -r docker rmi ; docker load -i ${WORKDIR}/flow-aggregator.tar ; docker images | grep '<none>' | awk '{print \$3}' | xargs -r docker rmi" || true
if [[ "${DOCKER_REGISTRY}" != "" ]]; then
ssh -o StrictHostKeyChecking=no -n jenkins@${IP} "docker pull ${DOCKER_REGISTRY}/antrea/sonobuoy-systemd-logs:v0.3 ; docker tag ${DOCKER_REGISTRY}/antrea/sonobuoy-systemd-logs:v0.3 sonobuoy/systemd-logs:v0.3"
fi
Expand Down
74 changes: 55 additions & 19 deletions pkg/flowaggregator/flowaggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package flowaggregator

import (
"fmt"
"net"
"time"

"github.com/vmware/go-ipfix/pkg/collector"
Expand All @@ -30,7 +31,7 @@ import (
)

var (
ianaInfoElements = []string{
ianaInfoElementsCommon = []string{
"flowStartSeconds",
"flowEndSeconds",
"flowEndReason",
Expand All @@ -41,16 +42,16 @@ var (
"octetTotalCount",
"packetDeltaCount",
"octetDeltaCount",
"sourceIPv4Address",
"destinationIPv4Address",
}
ianaInfoElementsIPv4 = append(ianaInfoElementsCommon, []string{"sourceIPv4Address", "destinationIPv4Address"}...)
ianaInfoElementsIPv6 = append(ianaInfoElementsCommon, []string{"sourceIPv6Address", "destinationIPv6Address"}...)
ianaReverseInfoElements = []string{
"reversePacketTotalCount",
"reverseOctetTotalCount",
"reversePacketDeltaCount",
"reverseOctetDeltaCount",
}
antreaInfoElements = []string{
antreaInfoElementsCommon = []string{
"sourcePodName",
"sourcePodNamespace",
"sourceNodeName",
Expand All @@ -63,12 +64,14 @@ var (
"ingressNetworkPolicyNamespace",
"egressNetworkPolicyName",
"egressNetworkPolicyNamespace",
"destinationClusterIPv4",
}
aggregatorElements = []string{
"originalExporterIPv4Address",
antreaInfoElementsIPv4 = append(antreaInfoElementsCommon, []string{"destinationClusterIPv4"}...)
antreaInfoElementsIPv6 = append(antreaInfoElementsCommon, []string{"destinationClusterIPv6"}...)
aggregatorElementsCommon = []string{
"originalObservationDomainId",
}
aggregatorElementsIPv4 = append([]string{"originalExporterIPv4Address"}, aggregatorElementsCommon...)
aggregatorElementsIPv6 = append([]string{"originalExporterIPv6Address"}, aggregatorElementsCommon...)

nonStatsElementList = []string{
"flowEndSeconds",
Expand Down Expand Up @@ -118,6 +121,7 @@ var (
"destinationPodNamespace",
"destinationNodeName",
"destinationClusterIPv4",
"destinationClusterIPv6",
"destinationServicePort",
"destinationServicePortName",
"ingressNetworkPolicyName",
Expand Down Expand Up @@ -150,7 +154,8 @@ type flowAggregator struct {
aggregationProcess ipfix.IPFIXAggregationProcess
exportInterval time.Duration
exportingProcess ipfix.IPFIXExportingProcess
templateID uint16
templateIDv4 uint16
templateIDv6 uint16
registry ipfix.IPFIXRegistry
set ipfix.IPFIXSet
flowAggregatorAddress string
Expand Down Expand Up @@ -178,6 +183,7 @@ func NewFlowAggregator(
exportInterval,
nil,
0,
0,
registry,
ipfix.NewSet(false),
flowAggregatorAddress,
Expand Down Expand Up @@ -281,20 +287,36 @@ func (fa *flowAggregator) initExportingProcess() error {
return fmt.Errorf("got error when initializing IPFIX exporting process: %v", err)
}
fa.exportingProcess = ep
fa.templateID = fa.exportingProcess.NewTemplateID()
if err := fa.set.PrepareSet(ipfixentities.Template, fa.templateID); err != nil {
return fmt.Errorf("error when preparing set: %v", err)
// Currently, we send two templates for IPv4 and IPv6 regardless of the IP families supported by cluster
fa.templateIDv4 = fa.exportingProcess.NewTemplateID()
if err := fa.set.PrepareSet(ipfixentities.Template, fa.templateIDv4); err != nil {
return fmt.Errorf("error when preparing IPv4 template set: %v", err)
}
bytesSent, err := fa.sendTemplateSet(fa.set, false)
if err != nil {
fa.exportingProcess.CloseConnToCollector()
fa.exportingProcess = nil
fa.templateIDv4 = 0
fa.set.ResetSet()
return fmt.Errorf("sending IPv4 template set failed, err: %v", err)
}
klog.V(2).Infof("Initialized exporting process and sent %d bytes size of IPv4 template set", bytesSent)

bytesSent, err := fa.sendTemplateSet(fa.set)
fa.set.ResetSet()
fa.templateIDv6 = fa.exportingProcess.NewTemplateID()
if err := fa.set.PrepareSet(ipfixentities.Template, fa.templateIDv6); err != nil {
return fmt.Errorf("error when preparing IPv6 template set: %v", err)
}
bytesSent, err = fa.sendTemplateSet(fa.set, true)
if err != nil {
fa.exportingProcess.CloseConnToCollector()
fa.exportingProcess = nil
fa.templateID = 0
fa.templateIDv6 = 0
fa.set.ResetSet()
return fmt.Errorf("sending template set failed, err: %v", err)
return fmt.Errorf("sending IPv6 template set failed, err: %v", err)
}
klog.V(2).Infof("Initialized exporting process and sent %d bytes size of template set", bytesSent)
klog.V(2).Infof("Initialized exporting process and sent %d bytes size of IPv6 template set", bytesSent)

return nil
}

Expand Down Expand Up @@ -339,12 +361,16 @@ func (fa *flowAggregator) sendFlowKeyRecord(key ipfixintermediate.FlowKey, recor
klog.V(4).Info("Skip sending record that is not correlated.")
return nil
}
templateID := fa.templateIDv4
if net.ParseIP(key.SourceAddress).To4() == nil || net.ParseIP(key.DestinationAddress).To4() == nil {
templateID = fa.templateIDv6
}
// TODO: more records per data set will be supported when go-ipfix supports size check when adding records
fa.set.ResetSet()
if err := fa.set.PrepareSet(ipfixentities.Data, fa.templateID); err != nil {
if err := fa.set.PrepareSet(ipfixentities.Data, templateID); err != nil {
return fmt.Errorf("error when preparing set: %v", err)
}
err := fa.set.AddRecord(record.Record.GetOrderedElementList(), fa.templateID)
err := fa.set.AddRecord(record.Record.GetOrderedElementList(), templateID)
if err != nil {
return fmt.Errorf("error when adding the record to the set: %v", err)
}
Expand All @@ -356,8 +382,18 @@ func (fa *flowAggregator) sendFlowKeyRecord(key ipfixintermediate.FlowKey, recor
return nil
}

func (fa *flowAggregator) sendTemplateSet(templateSet ipfix.IPFIXSet) (int, error) {
func (fa *flowAggregator) sendTemplateSet(templateSet ipfix.IPFIXSet, isIPv6 bool) (int, error) {
elements := make([]*ipfixentities.InfoElementWithValue, 0)
ianaInfoElements := ianaInfoElementsIPv4
antreaInfoElements := antreaInfoElementsIPv4
aggregatorElements := aggregatorElementsIPv4
templateID := fa.templateIDv4
if isIPv6 {
ianaInfoElements = ianaInfoElementsIPv6
antreaInfoElements = antreaInfoElementsIPv6
aggregatorElements = aggregatorElementsIPv6
templateID = fa.templateIDv6
}
for _, ie := range ianaInfoElements {
element, err := fa.registry.GetInfoElement(ie, ipfixregistry.IANAEnterpriseID)
if err != nil {
Expand Down Expand Up @@ -406,7 +442,7 @@ func (fa *flowAggregator) sendTemplateSet(templateSet ipfix.IPFIXSet) (int, erro
ie := ipfixentities.NewInfoElementWithValue(element, nil)
elements = append(elements, ie)
}
err := templateSet.AddRecord(elements, fa.templateID)
err := templateSet.AddRecord(elements, templateID)
if err != nil {
return 0, fmt.Errorf("error when adding record to set, error: %v", err)
}
Expand Down
83 changes: 48 additions & 35 deletions pkg/flowaggregator/flowaggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import (
)

const (
testTemplateID = uint16(256)
testTemplateIDv4 = uint16(256)
testTemplateIDv6 = uint16(257)
testExportInterval = 60 * time.Second
testObservationDomainID = 0xabcd
)
Expand All @@ -52,47 +53,59 @@ func TestFlowAggregator_sendTemplateSet(t *testing.T) {
nil,
testExportInterval,
mockIPFIXExpProc,
testTemplateID,
testTemplateIDv4,
testTemplateIDv6,
mockIPFIXRegistry,
ipfixtest.NewMockIPFIXSet(ctrl),
"",
nil,
testObservationDomainID,
}

// Following consists of all elements that are in ianaInfoElements and antreaInfoElements (globals)
// Only the element name is needed, other arguments have dummy values.
elemList := make([]*ipfixentities.InfoElementWithValue, 0)
for i, ie := range ianaInfoElements {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.IANAEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAEnterpriseID).Return(elemList[i].Element, nil)
}
for i, ie := range ianaReverseInfoElements {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.IANAReversedEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAReversedEnterpriseID).Return(elemList[i+len(ianaInfoElements)].Element, nil)
}
for i, ie := range antreaInfoElements {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.AntreaEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)].Element, nil)
}
for i, ie := range aggregatorElements {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.IANAEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)+len(antreaInfoElements)].Element, nil)
}
for i, ie := range antreaSourceStatsElementList {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.AntreaEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)+len(antreaInfoElements)+len(aggregatorElements)].Element, nil)
}
for _, isIPv6 := range []bool{false, true} {
ianaInfoElements := ianaInfoElementsIPv4
antreaInfoElements := antreaInfoElementsIPv4
aggregatorElements := aggregatorElementsIPv4
testTemplateID := fa.templateIDv4
if isIPv6 {
ianaInfoElements = ianaInfoElementsIPv6
antreaInfoElements = antreaInfoElementsIPv6
aggregatorElements = aggregatorElementsIPv6
testTemplateID = fa.templateIDv6
}
// Following consists of all elements that are in ianaInfoElements and antreaInfoElements (globals)
// Only the element name is needed, other arguments have dummy values.
elemList := make([]*ipfixentities.InfoElementWithValue, 0)
for i, ie := range ianaInfoElements {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.IANAEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAEnterpriseID).Return(elemList[i].Element, nil)
}
for i, ie := range ianaReverseInfoElements {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.IANAReversedEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAReversedEnterpriseID).Return(elemList[i+len(ianaInfoElements)].Element, nil)
}
for i, ie := range antreaInfoElements {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.AntreaEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)].Element, nil)
}
for i, ie := range aggregatorElements {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.IANAEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)+len(antreaInfoElements)].Element, nil)
}
for i, ie := range antreaSourceStatsElementList {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.AntreaEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)+len(antreaInfoElements)+len(aggregatorElements)].Element, nil)
}
for i, ie := range antreaDestinationStatsElementList {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.AntreaEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)+len(antreaInfoElements)+len(aggregatorElements)+len(antreaSourceStatsElementList)].Element, nil)
}
mockTempSet.EXPECT().AddRecord(elemList, testTemplateID).Return(nil)
// Passing 0 for sentBytes as it is not used anywhere in the test. If this not a call to mock, the actual sentBytes
// above elements: ianaInfoElements, ianaReverseInfoElements and antreaInfoElements.
mockIPFIXExpProc.EXPECT().SendSet(mockTempSet).Return(0, nil)

for i, ie := range antreaDestinationStatsElementList {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.AntreaEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)+len(antreaInfoElements)+len(aggregatorElements)+len(antreaSourceStatsElementList)].Element, nil)
_, err := fa.sendTemplateSet(mockTempSet, isIPv6)
assert.NoErrorf(t, err, "Error in sending template record: %v, isIPv6: %v", err, isIPv6)
}
mockTempSet.EXPECT().AddRecord(elemList, testTemplateID).Return(nil)
// Passing 0 for sentBytes as it is not used anywhere in the test. If this not a call to mock, the actual sentBytes
// above elements: ianaInfoElements, ianaReverseInfoElements and antreaInfoElements.
mockIPFIXExpProc.EXPECT().SendSet(mockTempSet).Return(0, nil)

_, err := fa.sendTemplateSet(mockTempSet)
assert.NoErrorf(t, err, "Error in sending template record: %v", err)
}
2 changes: 1 addition & 1 deletion pkg/util/flowexport/flowexport.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func ParseFlowCollectorAddr(addr string, defaultPort string, defaultProtocol str
}
if match {
idx := strings.Index(addr, "]")
strSlice = append(strSlice, addr[:idx+1])
strSlice = append(strSlice, addr[1:idx])
strSlice = append(strSlice, strings.Split(addr[idx+2:], ":")...)
} else {
strSlice = strings.Split(addr, ":")
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/flowexport/flowexport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestParseFlowCollectorAddr(t *testing.T) {
},
{
addr: "[fe80:ffff:ffff:ffff:ffff:ffff:ffff:ffff]:80:tcp",
expectedHost: "[fe80:ffff:ffff:ffff:ffff:ffff:ffff:ffff]",
expectedHost: "fe80:ffff:ffff:ffff:ffff:ffff:ffff:ffff",
expectedPort: "80",
expectedProto: "tcp",
expectedError: nil,
Expand Down
Loading

0 comments on commit 338e110

Please sign in to comment.