Skip to content

Commit

Permalink
addressed comments
Browse files Browse the repository at this point in the history
Signed-off-by: Yun-Tang Hsu <hsuy@vmware.com>
  • Loading branch information
yuntanghsu committed Jun 11, 2022
1 parent 6be16b5 commit 1317491
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 48 deletions.
2 changes: 1 addition & 1 deletion cmd/flow-aggregator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func newFlowAggregatorCommand() *cobra.Command {
defer log.FlushLogs()
configFile, err := cmd.Flags().GetString("config")
if err != nil {
klog.Fatalf("error when finding the path of config: %v", err)
klog.Fatalf("Error when finding the path of config: %v", err)
}
if err := run(configFile); err != nil {
klog.Fatalf("Error running flow aggregator: %v", err)
Expand Down
8 changes: 8 additions & 0 deletions pkg/config/flowaggregator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@

package config

type AggregatorTransportProtocol string

const (
AggregatorTransportProtocolTCP AggregatorTransportProtocol = "TCP"
AggregatorTransportProtocolTLS AggregatorTransportProtocol = "TLS"
AggregatorTransportProtocolUDP AggregatorTransportProtocol = "UDP"
)

type FlowAggregatorConfig struct {
// Provide the active flow record timeout as a duration string. This determines
// how often the flow aggregator exports the active flow records to the flow
Expand Down
8 changes: 0 additions & 8 deletions pkg/config/flowaggregator/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,6 @@ import (
"antrea.io/antrea/pkg/apis"
)

type AggregatorTransportProtocol string

const (
AggregatorTransportProtocolTCP AggregatorTransportProtocol = "TCP"
AggregatorTransportProtocolTLS AggregatorTransportProtocol = "TLS"
AggregatorTransportProtocolUDP AggregatorTransportProtocol = "UDP"
)

const (
DefaultExternalFlowCollectorTransport = "tcp"
DefaultExternalFlowCollectorPort = "4739"
Expand Down
14 changes: 7 additions & 7 deletions pkg/flowaggregator/clickhouseclient/clickhouseclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ type ClickHouseExportProcess struct {
// commitInterval is the interval between batch commits
commitInterval time.Duration
// stopChan is the channel to receive stop message
stopChan chan bool
stopCh chan bool
// commitTicker is the channel to transmit the time with a period
commitTicker *time.Ticker
}
Expand Down Expand Up @@ -199,7 +199,7 @@ func NewClickHouseClient(input ClickHouseInput) (*ClickHouseExportProcess, error
mutex: sync.RWMutex{},
queueSize: maxQueueSize,
commitInterval: input.CommitInterval,
stopChan: make(chan bool),
stopCh: make(chan bool),
commitTicker: time.NewTicker(input.CommitInterval),
}
return chClient, nil
Expand All @@ -219,12 +219,11 @@ func (ch *ClickHouseExportProcess) CacheSet(set ipfixentities.Set) {

func (ch *ClickHouseExportProcess) Start() {
go ch.flowRecordPeriodicCommit()
<-ch.stopChan
<-ch.stopCh
}

func (ch *ClickHouseExportProcess) Stop() {
ch.stopChan <- true
close(ch.stopChan)
close(ch.stopCh)
}

func (ch *ClickHouseExportProcess) getClickHouseFlowRow(record ipfixentities.Record) *ClickHouseFlowRow {
Expand Down Expand Up @@ -384,10 +383,11 @@ func (ch *ClickHouseExportProcess) flowRecordPeriodicCommit() {
committedRec := 0
for {
select {
case <-ch.stopChan:
case <-ch.stopCh:
klog.Info("Stop clickHouse exporting process")
committed, err := ch.batchCommitAll()
if err != nil {
klog.ErrorS(err, "error when do last batchCommitAll")
klog.ErrorS(err, "Error when doing last batchCommitAll")
} else {
committedRec += committed
klog.V(4).InfoS("Total number of records committed to DB", "count", committedRec)
Expand Down
52 changes: 25 additions & 27 deletions pkg/flowaggregator/flowaggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"hash/fnv"
"io/ioutil"
"os"
"path/filepath"
"sync"
"time"

Expand Down Expand Up @@ -253,13 +254,13 @@ func NewFlowAggregator(
if err != nil {
return nil, fmt.Errorf("error when creating file watcher for configuration file: %v", err)
}
if err = configWatcher.Add(configFile); err != nil {
return nil, fmt.Errorf("error when starting file watch on configuration file: %v", err)
if err = configWatcher.Add(filepath.Dir(configFile)); err != nil {
return nil, fmt.Errorf("error when starting file watch on configuration dir: %v", err)
}

data, err := ioutil.ReadFile(configFile)
if err != nil {
return nil, fmt.Errorf("cannot read FlowAggregator configFile: %v", err)
return nil, fmt.Errorf("cannot read FlowAggregator configuration file: %v", err)
}
opt, err := util.LoadConfig(data)
if err != nil {
Expand Down Expand Up @@ -526,13 +527,12 @@ func (fa *flowAggregator) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
go fa.dbExportProcess.Start()
defer fa.dbExportProcess.Stop()
}
go fa.flowRecordExpiryCheck(stopCh)
go fa.flowExportLoop(stopCh)
go fa.watchConfiguration(stopCh)
<-stopCh
close(fa.updateCh)
}

func (fa *flowAggregator) flowRecordExpiryCheck(stopCh <-chan struct{}) {
func (fa *flowAggregator) flowExportLoop(stopCh <-chan struct{}) {
expireTimer := time.NewTimer(fa.activeFlowRecordTimeout)
logTicker := time.NewTicker(time.Minute)
for {
Expand All @@ -547,7 +547,7 @@ func (fa *flowAggregator) flowRecordExpiryCheck(stopCh <-chan struct{}) {
if fa.externalFlowCollectorAddr != "" && fa.exportingProcess == nil {
err := fa.initExportingProcess()
if err != nil {
klog.ErrorS(err, "Error when initializing exporting process, will retry in %s", fa.activeFlowRecordTimeout)
klog.ErrorS(err, "Error when initializing exporting process", "wait time for retry", fa.activeFlowRecordTimeout)
// Initializing exporting process fails, will retry in next cycle.
expireTimer.Reset(fa.activeFlowRecordTimeout)
continue
Expand Down Expand Up @@ -635,7 +635,7 @@ func (fa *flowAggregator) flowRecordExpiryCheck(stopCh <-chan struct{}) {
klog.Info("Disabling Clickhouse")
fa.dbExportProcess.Stop()
fa.dbExportProcess = nil
klog.Info("Clickhouse Disabled")
klog.Info("Clickhouse disabled")
}
}
}
Expand Down Expand Up @@ -669,7 +669,7 @@ func (fa *flowAggregator) sendFlowKeyRecord(key ipfixintermediate.FlowKey, recor
if err != nil {
return err
}
klog.V(4).InfoS("Data set sent successfully: %d Bytes sent", sentBytes)
klog.V(4).InfoS("Data set sent successfully", "bytes sent", sentBytes)
}
if fa.dbExportProcess != nil {
fa.dbExportProcess.CacheSet(fa.set)
Expand Down Expand Up @@ -910,45 +910,42 @@ func (fa *flowAggregator) watchConfiguration(stopCh <-chan struct{}) {
for {
select {
case <-stopCh:
close(fa.updateCh)
return
case event, ok := <-fa.configWatcher.Events:
if err := fa.handleWatcherEvent(event, ok); err != nil {
// if the watcher cannot add mounted configMap or the configMap is not readable, we kill the flow-aggregator pod (serious error)
klog.InfoS("Event happened", "event", event.String())
if !ok {
// If configWatcher event channel is closed, we kill the flow-aggregator Pod to restore
// the channel.
klog.Fatal("ConfigWatcher event channel closed")
}
if err := fa.handleWatcherEvent(); err != nil {
// If the watcher cannot add mounted configuration file or the configuration file is not readable,
// we kill the flow-aggregator Pod (serious error)
klog.Fatalf("Cannot watch or read configMap: %v", err)
}
case err, _ := <-fa.configWatcher.Errors:
case err := <-fa.configWatcher.Errors:
if err != nil {
// if the error happens to watcher, we kill the flow-aggregator pod.
// If the error happens to watcher, we kill the flow-aggregator Pod.
// watcher might be shut-down or broken in this situation.
klog.Fatalf("configWatcher err: %v", err)
}
}
}
}

func (fa *flowAggregator) handleWatcherEvent(event fsnotify.Event, ok bool) error {
if !ok {
return fmt.Errorf("configWatcher event channel closed")
}
// When mounted configmap is updating, watcher event "CHMOD" and "REMOVE" will happen.
// Need to add configFile back to watcher in order to watch the updated configMap.
if event.Op != fsnotify.Remove {
return nil
}
if err := fa.configWatcher.Add(fa.configFile); err != nil {
return fmt.Errorf("error when adding configFile back to watcher: %v", err)
}
func (fa *flowAggregator) handleWatcherEvent() error {
data, err := ioutil.ReadFile(fa.configFile)
if err != nil {
return fmt.Errorf("cannot read FlowAggregator configFile: %v", err)
return fmt.Errorf("cannot read FlowAggregator configuration file: %v", err)
}
opt, err := util.LoadConfig(data)
if err != nil {
klog.Error(err)
return nil
}
if bytes.Equal(data, fa.configData) {
klog.InfoS("Flow-aggregator configuration hasn't changed")
klog.InfoS("Flow-aggregator configuration doesn't changed")
return nil
}
fa.configData = data
Expand All @@ -957,6 +954,7 @@ func (fa *flowAggregator) handleWatcherEvent(event fsnotify.Event, ok bool) erro
}

func (fa *flowAggregator) updateFlowAggregator(opt *util.Options) {
klog.Info("Updating Flow Aggregator")
if opt.Config.FlowCollector.Enable {
query := querier.ExternalFlowCollectorAddr{
Address: opt.ExternalFlowCollectorAddr,
Expand Down
51 changes: 46 additions & 5 deletions pkg/flowaggregator/flowaggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@ package flowaggregator
import (
"bytes"
"fmt"
"os"
"path/filepath"
"testing"
"time"

"github.com/fsnotify/fsnotify"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
ipfixentities "github.com/vmware/go-ipfix/pkg/entities"
ipfixentitiestesting "github.com/vmware/go-ipfix/pkg/entities/testing"
ipfixintermediate "github.com/vmware/go-ipfix/pkg/intermediate"
ipfixregistry "github.com/vmware/go-ipfix/pkg/registry"
"gopkg.in/yaml.v2"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"

Expand Down Expand Up @@ -283,7 +287,7 @@ func createElement(name string, enterpriseID uint32) ipfixentities.InfoElementWi
return ieWithValue
}

func TestFlowAggregator_updateFlowAggregator(t *testing.T) {
func TestFlowAggregator_watchConfiguration(t *testing.T) {
dbEP := &clickhouseclient.ClickHouseExportProcess{}
testcases := []struct {
FlowCollectorEnable bool
Expand All @@ -302,6 +306,7 @@ func TestFlowAggregator_updateFlowAggregator(t *testing.T) {

for i, tc := range testcases {
t.Run(fmt.Sprintf("subtest: %d", i), func(t *testing.T) {
stopCh := make(chan struct{})
opt := util.Options{
Config: &flowaggregatorconfig.FlowAggregatorConfig{
FlowCollector: flowaggregatorconfig.FlowCollectorConfig{
Expand All @@ -313,18 +318,54 @@ func TestFlowAggregator_updateFlowAggregator(t *testing.T) {
},
},
}
// create watcher
fileName := fmt.Sprintf("test_%d.config", i)
configWatcher, err := fsnotify.NewWatcher()
assert.NoError(t, err)
flowAggregator := &flowAggregator{
// use a larger buffer to prevent the buffered channel from blocking
updateCh: make(chan updateMsg, 100),
externalFlowCollectorAddr: tc.externalFlowCollectorAddr,
dbExportProcess: tc.dbExportProcess,
configFile: fileName,
configWatcher: configWatcher,
}
flowAggregator.updateFlowAggregator(&opt)
// receiving from a buffered channel will be blocked if the channel is not closed when using range channel
dir := filepath.Dir(fileName)
// create default file
f, err := os.Create(flowAggregator.configFile)
assert.NoError(t, err)
// add default file to watcher
err = flowAggregator.configWatcher.Add(dir)
assert.NoError(t, err)
// chmod and delete file, then create a new one (mimic replacement)
err = f.Chmod(0777)
assert.NoError(t, err)
err = f.Close()
assert.NoError(t, err)
err = os.Remove(flowAggregator.configFile)
assert.NoError(t, err)
f, err = os.Create(flowAggregator.configFile)
assert.NoError(t, err)
b, err := yaml.Marshal(opt.Config)
assert.NoError(t, err)
_, err = f.Write(b)
assert.NoError(t, err)
err = f.Close()
assert.NoError(t, err)
go flowAggregator.watchConfiguration(stopCh)

for _, message := range tc.message {
msg := <-flowAggregator.updateCh
assert.Equal(t, message, msg.param)
select {
case msg := <-flowAggregator.updateCh:
assert.Equal(t, message, msg.param)
case <-time.After(1 * time.Minute):
assert.NoError(t, fmt.Errorf("timeout"))
}
}
close(stopCh)
// wait for watchConfiguration to return to prevent "no such file" error.
time.Sleep(5 * time.Second)
os.Remove(flowAggregator.configFile)
})
}
}

0 comments on commit 1317491

Please sign in to comment.