Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FlowAggregator] Add templateRefreshTimeout configuration #6699

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build/charts/flow-aggregator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ Kubernetes: `>= 1.19.0-0`
| flowCollector.enable | bool | `false` | Determine whether to enable exporting flow records to external flow collector. |
| flowCollector.observationDomainID | string | `""` | Provide the 32-bit Observation Domain ID which will uniquely identify this instance of the flow aggregator to an external flow collector. If omitted, an Observation Domain ID will be generated from the persistent cluster UUID generated by Antrea. |
| flowCollector.recordFormat | string | `"IPFIX"` | Provide format for records sent to the configured flow collector. Supported formats are IPFIX and JSON. |
| flowCollector.templateRefreshTimeout | string | `"600s"` | Template retransmission interval when using the udp protocol to export records. The value must be provided as a duration string. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". |
| flowLogger.compress | bool | `true` | Compress enables gzip compression on rotated files. |
| flowLogger.enable | bool | `false` | Determine whether to enable exporting flow records to a local log file. |
| flowLogger.filters | list | `[]` | Filters can be used to select which flow records to log to file. The provided filters are OR-ed to determine whether a specific flow should be logged. By default, all flows are logged. With the following filters, only flows which are denied because of a network policy will be logged: [{ingressNetworkPolicyRuleActions: ["Drop", "Reject"]}, {egressNetworkPolicyRuleActions: ["Drop", "Reject"]}] |
Expand Down
5 changes: 5 additions & 0 deletions build/charts/flow-aggregator/conf/flow-aggregator.conf
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ flowCollector:
# Supported formats are IPFIX and JSON.
recordFormat: {{ .Values.flowCollector.recordFormat | quote }}

# Template retransmission interval when using the udp protocol to export records.
# The value must be provided as a duration string.
# Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
templateRefreshTimeout: {{ .Values.flowCollector.templateRefreshTimeout | quote }}

# clickHouse contains ClickHouse related configuration options.
clickHouse:
# Enable is the switch to enable exporting flow records to ClickHouse.
Expand Down
3 changes: 3 additions & 0 deletions build/charts/flow-aggregator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ flowCollector:
# -- Provide format for records sent to the configured flow collector.
# Supported formats are IPFIX and JSON.
recordFormat: "IPFIX"
# -- Template retransmission interval when using the udp protocol to export records.
# The value must be provided as a duration string. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
templateRefreshTimeout: "600s"
# clickHouse contains ClickHouse related configuration options.
clickHouse:
# -- Determine whether to enable exporting flow records to ClickHouse.
Expand Down
5 changes: 5 additions & 0 deletions build/yamls/flow-aggregator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,11 @@ data:
# Supported formats are IPFIX and JSON.
recordFormat: "IPFIX"

# Template retransmission interval when using the udp protocol to export records.
# The value must be provided as a duration string.
# Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
templateRefreshTimeout: "600s"

# clickHouse contains ClickHouse related configuration options.
clickHouse:
# Enable is the switch to enable exporting flow records to ClickHouse.
Expand Down
12 changes: 4 additions & 8 deletions pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,15 +341,11 @@ func (exp *FlowExporter) initFlowExporter(ctx context.Context) error {
}
// TLS transport does not need any tempRefTimeout, so sending 0.
exp.exporterInput.TempRefTimeout = 0
} else if exp.exporterInput.CollectorProtocol == "tcp" {
// TCP transport does not need any tempRefTimeout, so sending 0.
// tempRefTimeout is the template refresh timeout, which specifies how often
// the exporting process should send the template again.
exp.exporterInput.TempRefTimeout = 0
} else {
// For UDP transport, hardcoding tempRefTimeout value as 1800s.
exp.exporterInput.TempRefTimeout = 1800
}
// TempRefTimeout specifies how often the exporting process should send the template
// again. It is only relevant when using the UDP protocol. We use 0 to tell the go-ipfix
// library to use the default value, which should be 600s as per the IPFIX standards.
exp.exporterInput.TempRefTimeout = 0
expProcess, err := exporter.InitExportingProcess(exp.exporterInput)
if err != nil {
return fmt.Errorf("error when starting exporter: %v", err)
Expand Down
12 changes: 6 additions & 6 deletions pkg/agent/flowexporter/exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,12 +374,11 @@ func TestFlowExporter_initFlowExporter(t *testing.T) {
defer conn2.Close()

for _, tc := range []struct {
protocol string
address string
expectedTempRefTimeout uint32
protocol string
address string
}{
{conn1.LocalAddr().Network(), conn1.LocalAddr().String(), uint32(1800)},
{conn2.Addr().Network(), conn2.Addr().String(), uint32(0)},
{conn1.LocalAddr().Network(), conn1.LocalAddr().String()},
{conn2.Addr().Network(), conn2.Addr().String()},
} {
exp := &FlowExporter{
collectorAddr: tc.address,
Expand All @@ -391,7 +390,8 @@ func TestFlowExporter_initFlowExporter(t *testing.T) {
err = exp.initFlowExporter(context.Background())
require.NoError(t, err)
assert.Equal(t, tc.address, exp.exporterInput.CollectorAddress)
assert.Equal(t, tc.expectedTempRefTimeout, exp.exporterInput.TempRefTimeout)
// exporter should use the default value as per the go-ipfix library.
assert.Equal(t, uint32(0), exp.exporterInput.TempRefTimeout)
checkTotalReconnectionsMetric(t)
metrics.ReconnectionsToFlowCollector.Dec()
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/config/flowaggregator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ type FlowCollectorConfig struct {
// Provide format for records sent to the configured flow collector. Supported formats are IPFIX and JSON.
// Defaults to "IPFIX"
RecordFormat string `yaml:"recordFormat,omitempty"`
// Template retransmission interval when using the udp protocol to export records.
// The value must be provided as a duration string. Defaults to 600s.
// Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
TemplateRefreshTimeout string `yaml:"templateRefreshTimeout,omitempty"`
}

type ClickHouseConfig struct {
Expand Down
4 changes: 4 additions & 0 deletions pkg/config/flowaggregator/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
DefaultInactiveFlowRecordTimeout = "90s"
DefaultAggregatorTransportProtocol = "TLS"
DefaultRecordFormat = "IPFIX"
DefaultTemplateRefreshTimeout = "600s"

DefaultClickHouseDatabase = "default"
DefaultClickHouseCommitInterval = "8s"
Expand Down Expand Up @@ -62,6 +63,9 @@ func SetConfigDefaults(flowAggregatorConf *FlowAggregatorConfig) {
if flowAggregatorConf.FlowCollector.RecordFormat == "" {
flowAggregatorConf.FlowCollector.RecordFormat = DefaultRecordFormat
}
if flowAggregatorConf.FlowCollector.TemplateRefreshTimeout == "" {
flowAggregatorConf.FlowCollector.TemplateRefreshTimeout = DefaultTemplateRefreshTimeout
}
if flowAggregatorConf.ClickHouse.Database == "" {
flowAggregatorConf.ClickHouse.Database = DefaultClickHouseDatabase
}
Expand Down
17 changes: 10 additions & 7 deletions pkg/flowaggregator/exporter/ipfix.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"hash/fnv"
"reflect"
"time"

"github.com/google/uuid"
ipfixentities "github.com/vmware/go-ipfix/pkg/entities"
Expand Down Expand Up @@ -46,6 +47,7 @@ type IPFIXExporter struct {
exportingProcess ipfix.IPFIXExportingProcess
sendJSONRecord bool
observationDomainID uint32
templateRefreshTimeout time.Duration
templateIDv4 uint16
templateIDv6 uint16
registry ipfix.IPFIXRegistry
Expand Down Expand Up @@ -95,6 +97,7 @@ func NewIPFIXExporter(
externalFlowCollectorProto: opt.ExternalFlowCollectorProto,
sendJSONRecord: sendJSONRecord,
observationDomainID: observationDomainID,
templateRefreshTimeout: opt.TemplateRefreshTimeout,
registry: registry,
set: ipfixentities.NewSet(false),
k8sClient: k8sClient,
Expand Down Expand Up @@ -146,7 +149,8 @@ func (e *IPFIXExporter) UpdateOptions(opt *options.Options) {
} else {
e.observationDomainID = genObservationDomainID(e.k8sClient)
}
klog.InfoS("New IPFIXExporter configuration", "collectorAddress", e.externalFlowCollectorAddr, "collectorProtocol", e.externalFlowCollectorProto, "sendJSON", e.sendJSONRecord, "domainID", e.observationDomainID)
e.templateRefreshTimeout = opt.TemplateRefreshTimeout
klog.InfoS("New IPFIXExporter configuration", "collectorAddress", e.externalFlowCollectorAddr, "collectorProtocol", e.externalFlowCollectorProto, "sendJSON", e.sendJSONRecord, "domainID", e.observationDomainID, "templateRefreshTimeout", e.templateRefreshTimeout)

if e.exportingProcess != nil {
e.exportingProcess.CloseConnToCollector()
Expand Down Expand Up @@ -188,22 +192,21 @@ func (e *IPFIXExporter) initExportingProcess() error {
// externalFlowCollectorAddr and externalFlowCollectorProto instead of net.Addr input.
var expInput exporter.ExporterInput
if e.externalFlowCollectorProto == "tcp" {
// TCP transport does not need any tempRefTimeout, so sending 0.
expInput = exporter.ExporterInput{
CollectorAddress: e.externalFlowCollectorAddr,
CollectorProtocol: e.externalFlowCollectorProto,
ObservationDomainID: e.observationDomainID,
TempRefTimeout: 0,
TLSClientConfig: nil,
SendJSONRecord: e.sendJSONRecord,
// TCP transport does not need any tempRefTimeout, so sending 0.
TempRefTimeout: 0,
TLSClientConfig: nil,
SendJSONRecord: e.sendJSONRecord,
}
} else {
// For UDP transport, hardcoding tempRefTimeout value as 1800s. So we will send out template every 30 minutes.
expInput = exporter.ExporterInput{
CollectorAddress: e.externalFlowCollectorAddr,
CollectorProtocol: e.externalFlowCollectorProto,
ObservationDomainID: e.observationDomainID,
TempRefTimeout: 1800,
TempRefTimeout: uint32(e.templateRefreshTimeout.Seconds()),
TLSClientConfig: nil,
SendJSONRecord: e.sendJSONRecord,
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/flowaggregator/exporter/ipfix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"net"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -144,18 +145,22 @@ func TestIPFIXExporter_UpdateOptions(t *testing.T) {

const newAddr = "newAddr"
const newProto = "newProto"
const newTemplateRefreshTimeout = 1200 * time.Second
config.FlowCollector.Address = fmt.Sprintf("%s:%s", newAddr, newProto)
config.FlowCollector.RecordFormat = "JSON"
config.FlowCollector.TemplateRefreshTimeout = newTemplateRefreshTimeout.String()

ipfixExporter.UpdateOptions(&options.Options{
Config: config,
ExternalFlowCollectorAddr: newAddr,
ExternalFlowCollectorProto: newProto,
TemplateRefreshTimeout: newTemplateRefreshTimeout,
})

assert.Equal(t, newAddr, ipfixExporter.externalFlowCollectorAddr)
assert.Equal(t, newProto, ipfixExporter.externalFlowCollectorProto)
assert.True(t, ipfixExporter.sendJSONRecord)
assert.Equal(t, newTemplateRefreshTimeout, ipfixExporter.templateRefreshTimeout)

require.NoError(t, ipfixExporter.AddRecord(mockRecord, false))
assert.Equal(t, 2, setCount, "Invalid number of flow sets sent by exporter")
Expand Down
6 changes: 3 additions & 3 deletions pkg/flowaggregator/flowaggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func (fa *flowAggregator) InitCollectingProcess() error {
Address: collectorAddress,
Protocol: tcpTransport,
MaxBufferSize: 65535,
TemplateTTL: 0,
TemplateTTL: 0, // use default value from go-ipfix library
IsEncrypted: true,
CACert: caCert,
ServerKey: serverKey,
Expand All @@ -241,15 +241,15 @@ func (fa *flowAggregator) InitCollectingProcess() error {
Address: collectorAddress,
Protocol: tcpTransport,
MaxBufferSize: 65535,
TemplateTTL: 0,
TemplateTTL: 0, // use default value from go-ipfix library
IsEncrypted: false,
}
} else {
cpInput = collector.CollectorInput{
Address: collectorAddress,
Protocol: udpTransport,
MaxBufferSize: 1024,
TemplateTTL: 0,
TemplateTTL: 0, // use default value from go-ipfix library
IsEncrypted: false,
}
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/flowaggregator/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type Options struct {
ExternalFlowCollectorAddr string
// IPFIX flow collector transport protocol
ExternalFlowCollectorProto string
// Template retransmission interval when using the UDP protocol to export records.
TemplateRefreshTimeout time.Duration
// clickHouseCommitInterval flow records batch commit interval to clickhouse in the flow aggregator
ClickHouseCommitInterval time.Duration
// Flow records batch upload interval from flow aggregator to S3 bucket
Expand Down Expand Up @@ -87,6 +89,14 @@ func LoadConfig(configBytes []byte) (*Options, error) {
if opt.Config.FlowCollector.RecordFormat != "IPFIX" && opt.Config.FlowCollector.RecordFormat != "JSON" {
return nil, fmt.Errorf("record format %s is not supported", opt.Config.FlowCollector.RecordFormat)
}

opt.TemplateRefreshTimeout, err = time.ParseDuration(opt.Config.FlowCollector.TemplateRefreshTimeout)
if err != nil {
return nil, fmt.Errorf("templateRefreshTimeout is not a valid duration: %w", err)
}
if opt.TemplateRefreshTimeout < 0 {
return nil, fmt.Errorf("templateRefreshTimeout cannot be a negative duration")
}
}
// Validate clickhouse specific parameters
if opt.Config.ClickHouse.Enable {
Expand Down
Loading