Skip to content

Commit

Permalink
Add 'update disable' for clickhouse and flow-collector
Browse files Browse the repository at this point in the history
Signed-off-by: Yun-Tang Hsu <hsuy@hsuy-a01.vmware.com>
  • Loading branch information
Yun-Tang Hsu committed Apr 15, 2022
1 parent df0f3e2 commit 9467396
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 17 deletions.
18 changes: 12 additions & 6 deletions pkg/antctl/antctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,40 +533,46 @@ var CommandList = &commandList{
name: "enable",
shorthand: "",
supportedValues: []string{"clickhouse"},
usage: "Choose Clickhouse for exporting flow records",
usage: "Enable Clickhouse for exporting flow records",
},
{
name: "database",
shorthand: "db",
shorthand: "b",
usage: "Update the name of database",
},
{
name: "databaseURL",
shorthand: "dbURL",
shorthand: "d",
usage: "Update the URL of the database",
},
{
name: "debug",
shorthand: "d",
shorthand: "",
supportedValues: []string{"true", "false"},
usage: "Enable or disable debug log",
},
{
name: "compress",
shorthand: "cp",
shorthand: "c",
supportedValues: []string{"true", "false"},
usage: "Enable or disable lz4 compression",
},
{
name: "commitInterval",
shorthand: "ci",
shorthand: "i",
usage: "Set the periodical interval between batch commit of flow records to DB",
},
{
name: "externalflowcollectoraddr",
shorthand: "e",
usage: "Update or enable flow-collector, provide the flow collector address as string with format <IP>:<port>[:<proto>].",
},
{
name: "disable",
shorthand: "",
supportedValues: []string{"clickhouse", "flow-collector"},
usage: "Disable clickhouse or flow-collector",
},
},
},
},
Expand Down
92 changes: 92 additions & 0 deletions pkg/flowaggregator/apiserver/handlers/update/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2020 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package update

import (
"antrea.io/antrea/pkg/flowaggregator/querier"
"antrea.io/antrea/pkg/util/flowexport"
"net"
"net/http"
"strings"
)

const (
defaultExternalFlowCollectorTransport = "tcp"
defaultExternalFlowCollectorPort = "4739"
)

// HandleFunc returns the function which can handle the /update API request.
func HandleFunc(faq querier.FlowAggregatorQuerier) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
enableFlowCollector := r.URL.Query().Get("enable")
if enableFlowCollector != "" {
if enableFlowCollector == "clickhouse" {
faq.EnableDefaultClickHouse()
}
}
disableFlowCollector := r.URL.Query().Get("disable")
if disableFlowCollector != "" {
if disableFlowCollector == "clickhouse" {
if err := faq.DisableClickHouse(); err != nil {
http.Error(w, "Error when disable clickhouse: "+err.Error(), http.StatusNotFound)
}
} else if disableFlowCollector == "flow-collector" {
if err := faq.DisableFlowCollector(); err != nil {
http.Error(w, "Error when disable Flow-collector: "+err.Error(), http.StatusNotFound)
}
}
}
externalFlowCollectorAddr := r.URL.Query().Get("externalflowcollectoraddr")
if externalFlowCollectorAddr != "" {
host, port, proto, err := flowexport.ParseFlowCollectorAddr(externalFlowCollectorAddr, defaultExternalFlowCollectorPort, defaultExternalFlowCollectorTransport)
if err != nil {
http.Error(w, "Error when parsing externalFlowCollectorAddr: "+err.Error(), http.StatusNotFound)
}
faq.SetExternalFlowCollectorAddr(querier.ExternalFlowCollectorAddr{
Address: net.JoinHostPort(host, port),
Protocol: proto,
})
}
var sb strings.Builder
updateDataBase := r.URL.Query().Get("database")
if updateDataBase != "" {
sb.WriteString("&database=")
sb.WriteString(updateDataBase)
}
updateDataBaseURL := r.URL.Query().Get("databaseURL")
if updateDataBaseURL != "" {
sb.WriteString("&databaseURL=")
sb.WriteString(updateDataBaseURL)
}
updateDebug := r.URL.Query().Get("debug")
if updateDebug != "" {
sb.WriteString("&debug=")
sb.WriteString(updateDebug)
}
updateCompress := r.URL.Query().Get("compress")
if updateCompress != "" {
sb.WriteString("&compress=")
sb.WriteString(updateCompress)
}
updateCommitInterval := r.URL.Query().Get("commitInterval")
if updateCommitInterval != "" {
sb.WriteString("&commitInterval=")
sb.WriteString(updateCommitInterval)
}
if sb.Len() != 0 {
faq.UpdateClickHouse(sb.String())
}
}
}
16 changes: 14 additions & 2 deletions pkg/flowaggregator/clickhouseclient/clickhouseclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ func (ci *ClickHouseInput) getDataSourceName() (string, error) {
sb.WriteString("&compress=false")
}

sb.WriteString("&commitInterval=")
sb.WriteString(ci.CommitInterval.String())
//sb.WriteString("&commitInterval=")
//sb.WriteString(ci.CommitInterval.String())

return sb.String(), nil
}
Expand Down Expand Up @@ -224,6 +224,7 @@ func (ch *ClickHouseExportProcess) Start() {
}

func (ch *ClickHouseExportProcess) Stop() {
ch.stopChan <- true
close(ch.stopChan)
}

Expand Down Expand Up @@ -386,6 +387,13 @@ func (ch *ClickHouseExportProcess) flowRecordPeriodicCommit() {
for {
select {
case <-ch.stopChan:
committed, err := ch.batchCommitAll()
if err != nil {
klog.Errorf("error when do last batchCommitAll: %v", err)
} else {
committedRec += committed
klog.V(4).InfoS("Total number of records committed to DB", "count", committedRec)
}
commitTicker.Stop()
logTicker.Stop()
return
Expand Down Expand Up @@ -551,3 +559,7 @@ func (ch *ClickHouseExportProcess) UpdateCH(input *ClickHouseExportProcess, dsn
input.dsn = dsn
input.db = connect
}

func (ch *ClickHouseExportProcess) GetCommitInterval() time.Duration {
return ch.commitInterval
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestGetDataSourceName(t *testing.T) {
CommitInterval: 1 * time.Second,
}
*chInput.Compress = true
dsn := "tcp://click-house-svc:9000?username=username&password=password&database=default&debug=true&compress=true"
dsn := "tcp://click-house-svc:9000?username=username&password=password&database=default&debug=true&compress=true&commitInterval=1s"

chInputInvalid := ClickHouseInput{}

Expand Down
57 changes: 49 additions & 8 deletions pkg/flowaggregator/flowaggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ const (
updateExternalFlowCollectorAddr
updateClickHouseParam
enableDefaultClickHouse
disableClickHouse
disableFlowCollector
)

type updateMsg struct {
Expand Down Expand Up @@ -436,8 +438,8 @@ func (fa *flowAggregator) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
defer fa.dbExportProcess.Stop()
}
go fa.flowRecordExpiryCheck(stopCh)

<-stopCh
close(fa.updateCh)
}

func (fa *flowAggregator) flowRecordExpiryCheck(stopCh <-chan struct{}) {
Expand Down Expand Up @@ -493,7 +495,6 @@ func (fa *flowAggregator) flowRecordExpiryCheck(stopCh <-chan struct{}) {
fa.exportingProcess.CloseConnToCollector()
fa.exportingProcess = nil
}
expireTimer.Reset(fa.aggregationProcess.GetExpiryFromExpirePriorityQueue())
case enableDefaultClickHouse:
chInput := msg.value.(clickhouseclient.ClickHouseInput)
err := fa.InitDBExportProcess(chInput)
Expand All @@ -502,16 +503,31 @@ func (fa *flowAggregator) flowRecordExpiryCheck(stopCh <-chan struct{}) {
continue
}
klog.InfoS("Default clickhouse param is:", "database", chInput.Database, "databaseURL", chInput.DatabaseURL, "debug", chInput.Debug, "compress", *chInput.Compress, "commitInterval", chInput.CommitInterval.String())
go fa.dbExportProcess.Start()
defer fa.dbExportProcess.Stop()
case updateClickHouseParam:
chInput := msg.value.(clickhouseclient.ClickHouseInput)
log.Print(chInput)
dsn, connect, err := clickhouseclient.CheckConnection(chInput)
if err != nil {
klog.Errorf("error when updating clickhouse: %v", err)
continue
}
fa.dbExportProcess.UpdateCH(fa.dbExportProcess, dsn, connect)
klog.InfoS("Default clickhouse param is:", "database", chInput.Database, "databaseURL", chInput.DatabaseURL, "debug", chInput.Debug, "compress", *chInput.Compress, "commitInterval", chInput.CommitInterval.String())
klog.InfoS("New clickhouse param is:", "database", chInput.Database, "databaseURL", chInput.DatabaseURL, "debug", chInput.Debug, "compress", *chInput.Compress, "commitInterval", chInput.CommitInterval.String())
case disableFlowCollector:
fa.externalFlowCollectorAddr = ""
fa.externalFlowCollectorProto = ""
if fa.exportingProcess != nil {
fa.exportingProcess.CloseConnToCollector()
fa.exportingProcess = nil
}
klog.InfoS("Disable flow-collector")
case disableClickHouse:
if fa.dbExportProcess != nil {
fa.dbExportProcess.Stop()
fa.dbExportProcess = nil
}
klog.InfoS("Disable clickhouse")
}
}

Expand Down Expand Up @@ -787,11 +803,14 @@ func (fa *flowAggregator) SetFlowCollector() {
}

func (fa *flowAggregator) UpdateClickHouse(input string) {

if fa.dbExportProcess == nil {
klog.Errorf("ClickHouse hasn't been enabled yet")
return
}
inputPrevious := fa.dbExportProcess.GetDsnMap()
debug, _ := strconv.ParseBool(inputPrevious["debug"])
compress, _ := strconv.ParseBool(inputPrevious["compress"])
commitInterval, _ := time.ParseDuration(inputPrevious["commitInterval"])
commitInterval := fa.dbExportProcess.GetCommitInterval()
chInput := clickhouseclient.ClickHouseInput{
Username: inputPrevious["username"],
Password: inputPrevious["password"],
Expand All @@ -818,7 +837,7 @@ func (fa *flowAggregator) UpdateClickHouse(input string) {
commitInterval, _ := time.ParseDuration(req[1])
chInput.CommitInterval = commitInterval
if chInput.CommitInterval < minClickHouseCommitInterval {
fmt.Errorf("commitInterval %s is too small: shortest supported interval is %s",
klog.Errorf("commitInterval %s is too small: shortest supported interval is %s",
chInput.CommitInterval, minClickHouseCommitInterval)
return
}
Expand All @@ -832,7 +851,7 @@ func (fa *flowAggregator) UpdateClickHouse(input string) {

func (fa *flowAggregator) EnableDefaultClickHouse() {
if fa.dbExportProcess != nil {
fmt.Errorf("ClickHouse has been enabled")
klog.Errorf("ClickHouse has been enabled")
return
}
compress := true
Expand All @@ -858,3 +877,25 @@ func (fa *flowAggregator) SetExternalFlowCollectorAddr(externalFlowCollectorAddr
value: externalFlowCollectorAddr,
}
}

func (fa *flowAggregator) DisableClickHouse() error {
if fa.externalFlowCollectorAddr == "" {
klog.Errorf("Cannot disable both clickhouse and flow-collector")
return fmt.Errorf("cannot disable both clickhouse and flow-collector")
}
fa.updateCh <- updateMsg{
param: disableClickHouse,
}
return nil
}

func (fa *flowAggregator) DisableFlowCollector() error {
if fa.dbExportProcess == nil {
klog.Errorf("Cannot disable both clickhouse and flow-collector")
return fmt.Errorf("cannot disable both clickhouse and flow-collector")
}
fa.updateCh <- updateMsg{
param: disableFlowCollector,
}
return nil
}
2 changes: 2 additions & 0 deletions pkg/flowaggregator/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type FlowAggregatorQuerier interface {
UpdateClickHouse(input string)
EnableDefaultClickHouse()
SetExternalFlowCollectorAddr(externalFlowCollectorAddr ExternalFlowCollectorAddr)
DisableFlowCollector() error
DisableClickHouse() error
}

type ExternalFlowCollectorAddr struct {
Expand Down

0 comments on commit 9467396

Please sign in to comment.