diff --git a/pkg/antctl/antctl.go b/pkg/antctl/antctl.go index cfbc19d526c..f9cba3cf9b8 100644 --- a/pkg/antctl/antctl.go +++ b/pkg/antctl/antctl.go @@ -533,33 +533,33 @@ var CommandList = &commandList{ name: "enable", shorthand: "", supportedValues: []string{"clickhouse"}, - usage: "Choose Clickhouse for exporting flow records", + usage: "Enable Clickhouse for exporting flow records", }, { name: "database", - shorthand: "db", + shorthand: "b", usage: "Update the name of database", }, { name: "databaseURL", - shorthand: "dbURL", + shorthand: "d", usage: "Update the URL of the database", }, { name: "debug", - shorthand: "d", + shorthand: "", supportedValues: []string{"true", "false"}, usage: "Enable or disable debug log", }, { name: "compress", - shorthand: "cp", + shorthand: "c", supportedValues: []string{"true", "false"}, usage: "Enable or disable lz4 compression", }, { name: "commitInterval", - shorthand: "ci", + shorthand: "i", usage: "Set the periodical interval between batch commit of flow records to DB", }, { @@ -567,6 +567,12 @@ var CommandList = &commandList{ shorthand: "e", usage: "Update or enable flow-collector, provide the flow collector address as string with format :[:].", }, + { + name: "disable", + shorthand: "", + supportedValues: []string{"clickhouse", "flow-collector"}, + usage: "Disable clickhouse or flow-collector", + }, }, }, }, diff --git a/pkg/flowaggregator/apiserver/handlers/update/handler.go b/pkg/flowaggregator/apiserver/handlers/update/handler.go new file mode 100644 index 00000000000..dccc844f9fc --- /dev/null +++ b/pkg/flowaggregator/apiserver/handlers/update/handler.go @@ -0,0 +1,92 @@ +// Copyright 2020 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package update + +import ( + "antrea.io/antrea/pkg/flowaggregator/querier" + "antrea.io/antrea/pkg/util/flowexport" + "net" + "net/http" + "strings" +) + +const ( + defaultExternalFlowCollectorTransport = "tcp" + defaultExternalFlowCollectorPort = "4739" +) + +// HandleFunc returns the function which can handle the /update API request. +func HandleFunc(faq querier.FlowAggregatorQuerier) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + enableFlowCollector := r.URL.Query().Get("enable") + if enableFlowCollector != "" { + if enableFlowCollector == "clickhouse" { + faq.EnableDefaultClickHouse() + } + } + disableFlowCollector := r.URL.Query().Get("disable") + if disableFlowCollector != "" { + if disableFlowCollector == "clickhouse" { + if err := faq.DisableClickHouse(); err != nil { + http.Error(w, "Error when disable clickhouse: "+err.Error(), http.StatusNotFound) + } + } else if disableFlowCollector == "flow-collector" { + if err := faq.DisableFlowCollector(); err != nil { + http.Error(w, "Error when disable Flow-collector: "+err.Error(), http.StatusNotFound) + } + } + } + externalFlowCollectorAddr := r.URL.Query().Get("externalflowcollectoraddr") + if externalFlowCollectorAddr != "" { + host, port, proto, err := flowexport.ParseFlowCollectorAddr(externalFlowCollectorAddr, defaultExternalFlowCollectorPort, defaultExternalFlowCollectorTransport) + if err != nil { + http.Error(w, "Error when parsing externalFlowCollectorAddr: "+err.Error(), http.StatusNotFound) + } + faq.SetExternalFlowCollectorAddr(querier.ExternalFlowCollectorAddr{ + Address: net.JoinHostPort(host, port), + Protocol: proto, + }) + } + var sb strings.Builder + updateDataBase := r.URL.Query().Get("database") + if updateDataBase != "" { + sb.WriteString("&database=") + sb.WriteString(updateDataBase) + } + updateDataBaseURL := r.URL.Query().Get("databaseURL") + if updateDataBaseURL != "" { + sb.WriteString("&databaseURL=") + sb.WriteString(updateDataBaseURL) + } + updateDebug := r.URL.Query().Get("debug") + if updateDebug != "" { + sb.WriteString("&debug=") + sb.WriteString(updateDebug) + } + updateCompress := r.URL.Query().Get("compress") + if updateCompress != "" { + sb.WriteString("&compress=") + sb.WriteString(updateCompress) + } + updateCommitInterval := r.URL.Query().Get("commitInterval") + if updateCommitInterval != "" { + sb.WriteString("&commitInterval=") + sb.WriteString(updateCommitInterval) + } + if sb.Len() != 0 { + faq.UpdateClickHouse(sb.String()) + } + } +} diff --git a/pkg/flowaggregator/clickhouseclient/clickhouseclient.go b/pkg/flowaggregator/clickhouseclient/clickhouseclient.go index 8331bd48533..9efea081c67 100644 --- a/pkg/flowaggregator/clickhouseclient/clickhouseclient.go +++ b/pkg/flowaggregator/clickhouseclient/clickhouseclient.go @@ -130,8 +130,8 @@ func (ci *ClickHouseInput) getDataSourceName() (string, error) { sb.WriteString("&compress=false") } - sb.WriteString("&commitInterval=") - sb.WriteString(ci.CommitInterval.String()) + //sb.WriteString("&commitInterval=") + //sb.WriteString(ci.CommitInterval.String()) return sb.String(), nil } @@ -224,6 +224,7 @@ func (ch *ClickHouseExportProcess) Start() { } func (ch *ClickHouseExportProcess) Stop() { + ch.stopChan <- true close(ch.stopChan) } @@ -386,6 +387,13 @@ func (ch *ClickHouseExportProcess) flowRecordPeriodicCommit() { for { select { case <-ch.stopChan: + committed, err := ch.batchCommitAll() + if err != nil { + klog.Errorf("error when do last batchCommitAll: %v", err) + } else { + committedRec += committed + klog.V(4).InfoS("Total number of records committed to DB", "count", committedRec) + } commitTicker.Stop() logTicker.Stop() return @@ -551,3 +559,7 @@ func (ch *ClickHouseExportProcess) UpdateCH(input *ClickHouseExportProcess, dsn input.dsn = dsn input.db = connect } + +func (ch *ClickHouseExportProcess) GetCommitInterval() time.Duration { + return ch.commitInterval +} diff --git a/pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go b/pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go index fe70aaaab37..150ec493d97 100644 --- a/pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go +++ b/pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go @@ -48,7 +48,7 @@ func TestGetDataSourceName(t *testing.T) { CommitInterval: 1 * time.Second, } *chInput.Compress = true - dsn := "tcp://click-house-svc:9000?username=username&password=password&database=default&debug=true&compress=true" + dsn := "tcp://click-house-svc:9000?username=username&password=password&database=default&debug=true&compress=true&commitInterval=1s" chInputInvalid := ClickHouseInput{} diff --git a/pkg/flowaggregator/flowaggregator.go b/pkg/flowaggregator/flowaggregator.go index cd04038b566..bac1f7dfbf3 100644 --- a/pkg/flowaggregator/flowaggregator.go +++ b/pkg/flowaggregator/flowaggregator.go @@ -206,6 +206,8 @@ const ( updateExternalFlowCollectorAddr updateClickHouseParam enableDefaultClickHouse + disableClickHouse + disableFlowCollector ) type updateMsg struct { @@ -436,8 +438,8 @@ func (fa *flowAggregator) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { defer fa.dbExportProcess.Stop() } go fa.flowRecordExpiryCheck(stopCh) - <-stopCh + close(fa.updateCh) } func (fa *flowAggregator) flowRecordExpiryCheck(stopCh <-chan struct{}) { @@ -493,7 +495,6 @@ func (fa *flowAggregator) flowRecordExpiryCheck(stopCh <-chan struct{}) { fa.exportingProcess.CloseConnToCollector() fa.exportingProcess = nil } - expireTimer.Reset(fa.aggregationProcess.GetExpiryFromExpirePriorityQueue()) case enableDefaultClickHouse: chInput := msg.value.(clickhouseclient.ClickHouseInput) err := fa.InitDBExportProcess(chInput) @@ -502,16 +503,31 @@ func (fa *flowAggregator) flowRecordExpiryCheck(stopCh <-chan struct{}) { continue } klog.InfoS("Default clickhouse param is:", "database", chInput.Database, "databaseURL", chInput.DatabaseURL, "debug", chInput.Debug, "compress", *chInput.Compress, "commitInterval", chInput.CommitInterval.String()) + go fa.dbExportProcess.Start() + defer fa.dbExportProcess.Stop() case updateClickHouseParam: chInput := msg.value.(clickhouseclient.ClickHouseInput) - log.Print(chInput) dsn, connect, err := clickhouseclient.CheckConnection(chInput) if err != nil { klog.Errorf("error when updating clickhouse: %v", err) continue } fa.dbExportProcess.UpdateCH(fa.dbExportProcess, dsn, connect) - klog.InfoS("Default clickhouse param is:", "database", chInput.Database, "databaseURL", chInput.DatabaseURL, "debug", chInput.Debug, "compress", *chInput.Compress, "commitInterval", chInput.CommitInterval.String()) + klog.InfoS("New clickhouse param is:", "database", chInput.Database, "databaseURL", chInput.DatabaseURL, "debug", chInput.Debug, "compress", *chInput.Compress, "commitInterval", chInput.CommitInterval.String()) + case disableFlowCollector: + fa.externalFlowCollectorAddr = "" + fa.externalFlowCollectorProto = "" + if fa.exportingProcess != nil { + fa.exportingProcess.CloseConnToCollector() + fa.exportingProcess = nil + } + klog.InfoS("Disable flow-collector") + case disableClickHouse: + if fa.dbExportProcess != nil { + fa.dbExportProcess.Stop() + fa.dbExportProcess = nil + } + klog.InfoS("Disable clickhouse") } } @@ -787,11 +803,14 @@ func (fa *flowAggregator) SetFlowCollector() { } func (fa *flowAggregator) UpdateClickHouse(input string) { - + if fa.dbExportProcess == nil { + klog.Errorf("ClickHouse hasn't been enabled yet") + return + } inputPrevious := fa.dbExportProcess.GetDsnMap() debug, _ := strconv.ParseBool(inputPrevious["debug"]) compress, _ := strconv.ParseBool(inputPrevious["compress"]) - commitInterval, _ := time.ParseDuration(inputPrevious["commitInterval"]) + commitInterval := fa.dbExportProcess.GetCommitInterval() chInput := clickhouseclient.ClickHouseInput{ Username: inputPrevious["username"], Password: inputPrevious["password"], @@ -818,7 +837,7 @@ func (fa *flowAggregator) UpdateClickHouse(input string) { commitInterval, _ := time.ParseDuration(req[1]) chInput.CommitInterval = commitInterval if chInput.CommitInterval < minClickHouseCommitInterval { - fmt.Errorf("commitInterval %s is too small: shortest supported interval is %s", + klog.Errorf("commitInterval %s is too small: shortest supported interval is %s", chInput.CommitInterval, minClickHouseCommitInterval) return } @@ -832,7 +851,7 @@ func (fa *flowAggregator) UpdateClickHouse(input string) { func (fa *flowAggregator) EnableDefaultClickHouse() { if fa.dbExportProcess != nil { - fmt.Errorf("ClickHouse has been enabled") + klog.Errorf("ClickHouse has been enabled") return } compress := true @@ -858,3 +877,25 @@ func (fa *flowAggregator) SetExternalFlowCollectorAddr(externalFlowCollectorAddr value: externalFlowCollectorAddr, } } + +func (fa *flowAggregator) DisableClickHouse() error { + if fa.externalFlowCollectorAddr == "" { + klog.Errorf("Cannot disable both clickhouse and flow-collector") + return fmt.Errorf("cannot disable both clickhouse and flow-collector") + } + fa.updateCh <- updateMsg{ + param: disableClickHouse, + } + return nil +} + +func (fa *flowAggregator) DisableFlowCollector() error { + if fa.dbExportProcess == nil { + klog.Errorf("Cannot disable both clickhouse and flow-collector") + return fmt.Errorf("cannot disable both clickhouse and flow-collector") + } + fa.updateCh <- updateMsg{ + param: disableFlowCollector, + } + return nil +} diff --git a/pkg/flowaggregator/querier/querier.go b/pkg/flowaggregator/querier/querier.go index 27818a8dd3a..3fa9347d026 100644 --- a/pkg/flowaggregator/querier/querier.go +++ b/pkg/flowaggregator/querier/querier.go @@ -31,6 +31,8 @@ type FlowAggregatorQuerier interface { UpdateClickHouse(input string) EnableDefaultClickHouse() SetExternalFlowCollectorAddr(externalFlowCollectorAddr ExternalFlowCollectorAddr) + DisableFlowCollector() error + DisableClickHouse() error } type ExternalFlowCollectorAddr struct {