Skip to content

Commit

Permalink
Improve antctl features in flow-aggregator
Browse files Browse the repository at this point in the history
1. Add “antctl set flow-aggregator” command for antctl in order to update clickhouse/flow-collector related parameters.
2. “antctl set flow-aggregator” will directly mutate the volume configMap of flow-aggregator
3. Mounted ConfigMap will be updated in 1min if there is a modification of volume configMap
4. Create watcher in flow-aggregator pod to watch mounted configMap  and let fa pod react to the changes
5. “antctl set flow-aggregator” can only be used in flow-aggregator pod currently.

Signed-off-by: Yun-Tang Hsu <hsuy@vmware.com>
  • Loading branch information
Yun-Tang Hsu authored and yuntanghsu committed Jun 27, 2022
1 parent c6df2eb commit e9dad14
Show file tree
Hide file tree
Showing 24 changed files with 980 additions and 419 deletions.
17 changes: 15 additions & 2 deletions build/yamls/flow-aggregator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ rules:
- secrets
verbs:
- create
- apiGroups:
- ""
resourceNames:
- flow-aggregator-configmap-7572tg842t
resources:
- configmaps
verbs:
- update
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
Expand Down Expand Up @@ -301,6 +309,10 @@ spec:
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: CH_USERNAME
valueFrom:
secretKeyRef:
Expand All @@ -311,16 +323,17 @@ spec:
secretKeyRef:
key: password
name: clickhouse-secret
- name: FA_CONFIG_MAP_NAME
value: flow-aggregator-configmap-7572tg842t
image: projects.registry.vmware.com/antrea/flow-aggregator:latest
imagePullPolicy: IfNotPresent
name: flow-aggregator
ports:
- containerPort: 4739
volumeMounts:
- mountPath: /etc/flow-aggregator/flow-aggregator.conf
- mountPath: /etc/flow-aggregator
name: flow-aggregator-config
readOnly: true
subPath: flow-aggregator.conf
- mountPath: /var/log/antrea/flow-aggregator
name: host-var-log-antrea-flow-aggregator
nodeSelector:
Expand Down
13 changes: 11 additions & 2 deletions build/yamls/flow-aggregator/base/flow-aggregator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ rules:
- apiGroups: [""]
resources: ["secrets"]
verbs: ["create"]
- apiGroups: [ "" ]
resources: [ "configmaps" ]
resourceNames: [ "flow-aggregator-configmap" ]
verbs: [ "update" ]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
Expand Down Expand Up @@ -145,6 +149,10 @@ spec:
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: CH_USERNAME
valueFrom:
secretKeyRef:
Expand All @@ -155,13 +163,14 @@ spec:
secretKeyRef:
name: clickhouse-secret
key: password
- name: FA_CONFIG_MAP_NAME
value: "$(FA_CONFIG_MAP_NAME)"
ports:
- containerPort: 4739
volumeMounts:
- mountPath: /etc/flow-aggregator/flow-aggregator.conf
- mountPath: /etc/flow-aggregator
name: flow-aggregator-config
readOnly: true
subPath: flow-aggregator.conf
- mountPath: /var/log/antrea/flow-aggregator
name: host-var-log-antrea-flow-aggregator
nodeSelector:
Expand Down
6 changes: 6 additions & 0 deletions build/yamls/flow-aggregator/base/kustomization.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,9 @@ commonLabels:
namespace: flow-aggregator
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
vars:
- name: FA_CONFIG_MAP_NAME
objref:
kind: ConfigMap
name: flow-aggregator-configmap
apiVersion: v1
105 changes: 9 additions & 96 deletions cmd/flow-aggregator/flow-aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,67 +16,24 @@ package main

import (
"fmt"
"hash/fnv"
"os"
"sync"
"time"

"github.com/google/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/clusteridentity"
aggregator "antrea.io/antrea/pkg/flowaggregator"
"antrea.io/antrea/pkg/flowaggregator/apiserver"
"antrea.io/antrea/pkg/flowaggregator/clickhouseclient"
"antrea.io/antrea/pkg/log"
"antrea.io/antrea/pkg/signals"
"antrea.io/antrea/pkg/util/cipher"
)

const informerDefaultResync = 12 * time.Hour

// genObservationDomainID generates an IPFIX Observation Domain ID when one is not provided by the
// user through the flow aggregator configuration. It will first try to generate one
// deterministically based on the cluster UUID (if available, with a timeout of 10s). Otherwise, it
// will generate a random one. The cluster UUID should be available if Antrea is deployed to the
// cluster ahead of the flow aggregator, which is the expectation since when deploying flow
// aggregator as a Pod, networking needs to be configured by the CNI plugin.
func genObservationDomainID(k8sClient kubernetes.Interface) uint32 {
const retryInterval = time.Second
const timeout = 10 * time.Second
const defaultAntreaNamespace = "kube-system"

clusterIdentityProvider := clusteridentity.NewClusterIdentityProvider(
defaultAntreaNamespace,
clusteridentity.DefaultClusterIdentityConfigMapName,
k8sClient,
)
var clusterUUID uuid.UUID
if err := wait.PollImmediate(retryInterval, timeout, func() (bool, error) {
clusterIdentity, _, err := clusterIdentityProvider.Get()
if err != nil {
return false, nil
}
clusterUUID = clusterIdentity.UUID
return true, nil
}); err != nil {
klog.Warningf(
"Unable to retrieve cluster UUID after %v (does ConfigMap '%s/%s' exist?); will generate a random observation domain ID",
timeout, defaultAntreaNamespace, clusteridentity.DefaultClusterIdentityConfigMapName,
)
clusterUUID = uuid.New()
}
h := fnv.New32()
h.Write(clusterUUID[:])
observationDomainID := h.Sum32()
return observationDomainID
}

func run(o *Options) error {
func run(configFile string) error {
klog.Infof("Flow aggregator starting...")
// Set up signal capture: the first SIGTERM / SIGINT signal is handled gracefully and will
// cause the stopCh channel to be closed; if another signal is received before the program
Expand All @@ -93,72 +50,28 @@ func run(o *Options) error {
informerFactory := informers.NewSharedInformerFactory(k8sClient, informerDefaultResync)
podInformer := informerFactory.Core().V1().Pods()

var observationDomainID uint32
if o.config.FlowCollector.ObservationDomainID != nil {
observationDomainID = *o.config.FlowCollector.ObservationDomainID
} else {
observationDomainID = genObservationDomainID(k8sClient)
}
klog.Infof("Flow aggregator Observation Domain ID: %d", observationDomainID)

var sendJSONRecord bool
if o.format == "JSON" {
sendJSONRecord = true
} else {
sendJSONRecord = false
}

flowAggregator := aggregator.NewFlowAggregator(
o.externalFlowCollectorAddr,
o.externalFlowCollectorProto,
o.activeFlowRecordTimeout,
o.inactiveFlowRecordTimeout,
o.aggregatorTransportProtocol,
o.flowAggregatorAddress,
o.includePodLabels,
flowAggregator, err := aggregator.NewFlowAggregator(
k8sClient,
observationDomainID,
podInformer,
sendJSONRecord,
configFile,
)
err = flowAggregator.InitCollectingProcess()
if err != nil {
return fmt.Errorf("error when creating collecting process: %v", err)
}
err = flowAggregator.InitAggregationProcess()
if err != nil {
return fmt.Errorf("error when creating aggregation process: %v", err)
}

if o.config.ClickHouse.Enable {
chInput := clickhouseclient.ClickHouseInput{
Username: os.Getenv("CH_USERNAME"),
Password: os.Getenv("CH_PASSWORD"),
Database: o.config.ClickHouse.Database,
DatabaseURL: o.config.ClickHouse.DatabaseURL,
Debug: o.config.ClickHouse.Debug,
Compress: o.config.ClickHouse.Compress,
CommitInterval: o.clickHouseCommitInterval,
}
err = flowAggregator.InitDBExportProcess(chInput)
if err != nil {
return fmt.Errorf("error when creating db export process: %v", err)
}
if err != nil {
return err
}

var wg sync.WaitGroup
wg.Add(1)
go flowAggregator.Run(stopCh, &wg)

cipherSuites, err := cipher.GenerateCipherSuitesList(o.config.APIServer.TLSCipherSuites)
cipherSuites, err := cipher.GenerateCipherSuitesList(flowAggregator.APIServer.TLSCipherSuites)
if err != nil {
return fmt.Errorf("error generating Cipher Suite list: %v", err)
}
apiServer, err := apiserver.New(
flowAggregator,
o.config.APIServer.APIPort,
flowAggregator.APIServer.APIPort,
cipherSuites,
cipher.TLSVersionMap[o.config.APIServer.TLSMinVersion])
cipher.TLSVersionMap[flowAggregator.APIServer.TLSMinVersion])
if err != nil {
return fmt.Errorf("error when creating flow aggregator API server: %v", err)
}
Expand All @@ -167,7 +80,7 @@ func run(o *Options) error {
informerFactory.Start(stopCh)

<-stopCh
klog.Infof("Stopping flow aggregator")
klog.InfoS("Stopping flow aggregator")
wg.Wait()
return nil
}
Expand Down
14 changes: 5 additions & 9 deletions cmd/flow-aggregator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,29 +35,25 @@ func main() {
}

func newFlowAggregatorCommand() *cobra.Command {
opts := newOptions()

cmd := &cobra.Command{
Use: "flow-aggregator",
Long: "The Flow Aggregator.",
Run: func(cmd *cobra.Command, args []string) {
log.InitLogs(cmd.Flags())
defer log.FlushLogs()
if err := opts.complete(args); err != nil {
klog.Fatalf("Failed to complete args: %v", err)
configFile, err := cmd.Flags().GetString("config")
if err != nil {
klog.Fatalf("Error when finding the path of config: %v", err)
}
if err := opts.validate(args); err != nil {
klog.Fatalf("Failed to validate args: %v", err)
}
if err := run(opts); err != nil {
if err := run(configFile); err != nil {
klog.Fatalf("Error running flow aggregator: %v", err)
}
},
Version: version.GetFullVersionWithRuntimeInfo(),
}

flags := cmd.Flags()
opts.addFlags(flags)
flags.String("config", "", "The path to the configuration file")
log.AddFlags(flags)
return cmd
}
Loading

0 comments on commit e9dad14

Please sign in to comment.