Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve antctl features in flow-aggregator #3642

Merged
merged 1 commit into from
Jun 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions build/yamls/flow-aggregator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ rules:
- secrets
verbs:
- create
- apiGroups:
- ""
resourceNames:
- flow-aggregator-configmap-7572tg842t
resources:
- configmaps
verbs:
- update
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
Expand Down Expand Up @@ -301,6 +309,10 @@ spec:
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: CH_USERNAME
valueFrom:
secretKeyRef:
Expand All @@ -311,16 +323,17 @@ spec:
secretKeyRef:
key: password
name: clickhouse-secret
- name: FA_CONFIG_MAP_NAME
value: flow-aggregator-configmap-7572tg842t
image: projects.registry.vmware.com/antrea/flow-aggregator:latest
imagePullPolicy: IfNotPresent
name: flow-aggregator
ports:
- containerPort: 4739
volumeMounts:
- mountPath: /etc/flow-aggregator/flow-aggregator.conf
- mountPath: /etc/flow-aggregator
name: flow-aggregator-config
readOnly: true
subPath: flow-aggregator.conf
- mountPath: /var/log/antrea/flow-aggregator
name: host-var-log-antrea-flow-aggregator
nodeSelector:
Expand Down
13 changes: 11 additions & 2 deletions build/yamls/flow-aggregator/base/flow-aggregator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ rules:
- apiGroups: [""]
resources: ["secrets"]
verbs: ["create"]
- apiGroups: [ "" ]
resources: [ "configmaps" ]
resourceNames: [ "flow-aggregator-configmap" ]
verbs: [ "update" ]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
Expand Down Expand Up @@ -145,6 +149,10 @@ spec:
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: CH_USERNAME
valueFrom:
secretKeyRef:
Expand All @@ -155,13 +163,14 @@ spec:
secretKeyRef:
name: clickhouse-secret
key: password
- name: FA_CONFIG_MAP_NAME
value: "$(FA_CONFIG_MAP_NAME)"
ports:
- containerPort: 4739
volumeMounts:
- mountPath: /etc/flow-aggregator/flow-aggregator.conf
- mountPath: /etc/flow-aggregator
name: flow-aggregator-config
readOnly: true
subPath: flow-aggregator.conf
- mountPath: /var/log/antrea/flow-aggregator
name: host-var-log-antrea-flow-aggregator
nodeSelector:
Expand Down
6 changes: 6 additions & 0 deletions build/yamls/flow-aggregator/base/kustomization.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,9 @@ commonLabels:
namespace: flow-aggregator
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
vars:
- name: FA_CONFIG_MAP_NAME
objref:
kind: ConfigMap
name: flow-aggregator-configmap
apiVersion: v1
105 changes: 9 additions & 96 deletions cmd/flow-aggregator/flow-aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,67 +16,24 @@ package main

import (
"fmt"
"hash/fnv"
"os"
"sync"
"time"

"github.com/google/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/clusteridentity"
aggregator "antrea.io/antrea/pkg/flowaggregator"
"antrea.io/antrea/pkg/flowaggregator/apiserver"
"antrea.io/antrea/pkg/flowaggregator/clickhouseclient"
"antrea.io/antrea/pkg/log"
"antrea.io/antrea/pkg/signals"
"antrea.io/antrea/pkg/util/cipher"
)

const informerDefaultResync = 12 * time.Hour

// genObservationDomainID generates an IPFIX Observation Domain ID when one is not provided by the
// user through the flow aggregator configuration. It will first try to generate one
// deterministically based on the cluster UUID (if available, with a timeout of 10s). Otherwise, it
// will generate a random one. The cluster UUID should be available if Antrea is deployed to the
// cluster ahead of the flow aggregator, which is the expectation since when deploying flow
// aggregator as a Pod, networking needs to be configured by the CNI plugin.
func genObservationDomainID(k8sClient kubernetes.Interface) uint32 {
const retryInterval = time.Second
const timeout = 10 * time.Second
const defaultAntreaNamespace = "kube-system"

clusterIdentityProvider := clusteridentity.NewClusterIdentityProvider(
defaultAntreaNamespace,
clusteridentity.DefaultClusterIdentityConfigMapName,
k8sClient,
)
var clusterUUID uuid.UUID
if err := wait.PollImmediate(retryInterval, timeout, func() (bool, error) {
clusterIdentity, _, err := clusterIdentityProvider.Get()
if err != nil {
return false, nil
}
clusterUUID = clusterIdentity.UUID
return true, nil
}); err != nil {
klog.Warningf(
"Unable to retrieve cluster UUID after %v (does ConfigMap '%s/%s' exist?); will generate a random observation domain ID",
timeout, defaultAntreaNamespace, clusteridentity.DefaultClusterIdentityConfigMapName,
)
clusterUUID = uuid.New()
}
h := fnv.New32()
h.Write(clusterUUID[:])
observationDomainID := h.Sum32()
return observationDomainID
}

func run(o *Options) error {
func run(configFile string) error {
klog.Infof("Flow aggregator starting...")
// Set up signal capture: the first SIGTERM / SIGINT signal is handled gracefully and will
// cause the stopCh channel to be closed; if another signal is received before the program
Expand All @@ -93,72 +50,28 @@ func run(o *Options) error {
informerFactory := informers.NewSharedInformerFactory(k8sClient, informerDefaultResync)
podInformer := informerFactory.Core().V1().Pods()

var observationDomainID uint32
if o.config.FlowCollector.ObservationDomainID != nil {
observationDomainID = *o.config.FlowCollector.ObservationDomainID
} else {
observationDomainID = genObservationDomainID(k8sClient)
}
klog.Infof("Flow aggregator Observation Domain ID: %d", observationDomainID)

var sendJSONRecord bool
if o.format == "JSON" {
sendJSONRecord = true
} else {
sendJSONRecord = false
}

flowAggregator := aggregator.NewFlowAggregator(
o.externalFlowCollectorAddr,
o.externalFlowCollectorProto,
o.activeFlowRecordTimeout,
o.inactiveFlowRecordTimeout,
o.aggregatorTransportProtocol,
o.flowAggregatorAddress,
o.includePodLabels,
flowAggregator, err := aggregator.NewFlowAggregator(
k8sClient,
observationDomainID,
podInformer,
sendJSONRecord,
configFile,
)
err = flowAggregator.InitCollectingProcess()
if err != nil {
return fmt.Errorf("error when creating collecting process: %v", err)
}
err = flowAggregator.InitAggregationProcess()
if err != nil {
return fmt.Errorf("error when creating aggregation process: %v", err)
}

if o.config.ClickHouse.Enable {
chInput := clickhouseclient.ClickHouseInput{
Username: os.Getenv("CH_USERNAME"),
Password: os.Getenv("CH_PASSWORD"),
Database: o.config.ClickHouse.Database,
DatabaseURL: o.config.ClickHouse.DatabaseURL,
Debug: o.config.ClickHouse.Debug,
Compress: o.config.ClickHouse.Compress,
CommitInterval: o.clickHouseCommitInterval,
}
err = flowAggregator.InitDBExportProcess(chInput)
if err != nil {
return fmt.Errorf("error when creating db export process: %v", err)
}
if err != nil {
return err
}

var wg sync.WaitGroup
wg.Add(1)
go flowAggregator.Run(stopCh, &wg)

cipherSuites, err := cipher.GenerateCipherSuitesList(o.config.APIServer.TLSCipherSuites)
cipherSuites, err := cipher.GenerateCipherSuitesList(flowAggregator.APIServer.TLSCipherSuites)
if err != nil {
return fmt.Errorf("error generating Cipher Suite list: %v", err)
}
apiServer, err := apiserver.New(
flowAggregator,
o.config.APIServer.APIPort,
flowAggregator.APIServer.APIPort,
cipherSuites,
cipher.TLSVersionMap[o.config.APIServer.TLSMinVersion])
cipher.TLSVersionMap[flowAggregator.APIServer.TLSMinVersion])
if err != nil {
return fmt.Errorf("error when creating flow aggregator API server: %v", err)
}
Expand All @@ -167,7 +80,7 @@ func run(o *Options) error {
informerFactory.Start(stopCh)

<-stopCh
klog.Infof("Stopping flow aggregator")
klog.InfoS("Stopping flow aggregator")
wg.Wait()
return nil
}
Expand Down
14 changes: 5 additions & 9 deletions cmd/flow-aggregator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,29 +35,25 @@ func main() {
}

func newFlowAggregatorCommand() *cobra.Command {
opts := newOptions()

cmd := &cobra.Command{
Use: "flow-aggregator",
Long: "The Flow Aggregator.",
Run: func(cmd *cobra.Command, args []string) {
log.InitLogs(cmd.Flags())
defer log.FlushLogs()
if err := opts.complete(args); err != nil {
klog.Fatalf("Failed to complete args: %v", err)
configFile, err := cmd.Flags().GetString("config")
if err != nil {
klog.Fatalf("Error when finding the path of config: %v", err)
}
if err := opts.validate(args); err != nil {
klog.Fatalf("Failed to validate args: %v", err)
}
if err := run(opts); err != nil {
if err := run(configFile); err != nil {
klog.Fatalf("Error running flow aggregator: %v", err)
}
},
Version: version.GetFullVersionWithRuntimeInfo(),
}

flags := cmd.Flags()
opts.addFlags(flags)
flags.String("config", "", "The path to the configuration file")
log.AddFlags(flags)
return cmd
}
Loading