From 7f49f4f5e1bd3c42ece5b2372675fa656180181e Mon Sep 17 00:00:00 2001 From: Yun-Tang Hsu Date: Thu, 14 Apr 2022 18:41:47 -0700 Subject: [PATCH] Add update ttl for antctl Signed-off-by: Yun-Tang Hsu --- pkg/antctl/antctl.go | 37 +++++++++++++------ .../apiserver/handlers/update/handler.go | 11 +++++- .../clickhouseclient/clickhouseclient.go | 16 ++++++++ .../clickhouseclient/clickhouseclient_test.go | 2 +- pkg/flowaggregator/flowaggregator.go | 21 +++++++---- pkg/flowaggregator/querier/querier.go | 1 + 6 files changed, 66 insertions(+), 22 deletions(-) diff --git a/pkg/antctl/antctl.go b/pkg/antctl/antctl.go index f9cba3cf9b8..b8dd9ef0d20 100644 --- a/pkg/antctl/antctl.go +++ b/pkg/antctl/antctl.go @@ -512,17 +512,27 @@ var CommandList = &commandList{ { use: "update", short: "Update parameters used in flow aggregator", - long: "Update parameters used in flow aggregator. It includes logTicker.", - example: ` Update logTicker - $ antctl updateparam -l 2m - Update podLabels - $ antctl updateparam -p true - Update externalFlowCollectorAddr - $ antctl updateparam -e - Update activeFlowRecordTimeout - $ antctl activeflowrecordtimeout -a 120s - Update inactiveFlowRecordTimeout - $ antctl inactiveflowrecordtimeout -i 240s`, + example: ` + Enable clickhouse + $ antctl update --enable clickhouse + Update database + $ antctl update --database name + Update databaseURL + $ antctl update -d Http://xxxxx + Update debug + $ antctl update --debug true + Update compress + $ antctl update --compress true + Update commitInterval + $ antctl update -i 10s + Update externalflowcollectoraddr + $ antctl update -e :[:] + Update disable clickhouse/flow-collecotr + $ antctl update --disable clickhouse/flow-collecotr + Update ttl + $ antctl update --ttl 100 + +`, commandGroup: flat, flowAggregatorEndpoint: &endpoint{ nonResourceEndpoint: &nonResourceEndpoint{ @@ -573,6 +583,11 @@ var CommandList = &commandList{ supportedValues: []string{"clickhouse", "flow-collector"}, usage: "Disable clickhouse or flow-collector", }, + { + name: "ttl", + shorthand: "", + usage: "Modify table ttl (unit second) for clickhouse", + }, }, }, }, diff --git a/pkg/flowaggregator/apiserver/handlers/update/handler.go b/pkg/flowaggregator/apiserver/handlers/update/handler.go index dccc844f9fc..56dfb447f60 100644 --- a/pkg/flowaggregator/apiserver/handlers/update/handler.go +++ b/pkg/flowaggregator/apiserver/handlers/update/handler.go @@ -15,11 +15,12 @@ package update import ( - "antrea.io/antrea/pkg/flowaggregator/querier" - "antrea.io/antrea/pkg/util/flowexport" "net" "net/http" "strings" + + "antrea.io/antrea/pkg/flowaggregator/querier" + "antrea.io/antrea/pkg/util/flowexport" ) const ( @@ -41,10 +42,12 @@ func HandleFunc(faq querier.FlowAggregatorQuerier) http.HandlerFunc { if disableFlowCollector == "clickhouse" { if err := faq.DisableClickHouse(); err != nil { http.Error(w, "Error when disable clickhouse: "+err.Error(), http.StatusNotFound) + return } } else if disableFlowCollector == "flow-collector" { if err := faq.DisableFlowCollector(); err != nil { http.Error(w, "Error when disable Flow-collector: "+err.Error(), http.StatusNotFound) + return } } } @@ -88,5 +91,9 @@ func HandleFunc(faq querier.FlowAggregatorQuerier) http.HandlerFunc { if sb.Len() != 0 { faq.UpdateClickHouse(sb.String()) } + ttl := r.URL.Query().Get("ttl") + if ttl != "" { + faq.UpdateTTL(ttl) + } } } diff --git a/pkg/flowaggregator/clickhouseclient/clickhouseclient.go b/pkg/flowaggregator/clickhouseclient/clickhouseclient.go index 9efea081c67..ad7757b6fbf 100644 --- a/pkg/flowaggregator/clickhouseclient/clickhouseclient.go +++ b/pkg/flowaggregator/clickhouseclient/clickhouseclient.go @@ -17,6 +17,7 @@ package clickhouseclient import ( "database/sql" "fmt" + "strconv" "strings" "sync" "time" @@ -79,6 +80,7 @@ const ( reverseThroughputFromDestinationNode) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` + //ttlQuery = `ALTER TABLE flows MODIFY TTL timeInserted + INTERVAL ? SECOND` ) type ClickHouseExportProcess struct { @@ -563,3 +565,17 @@ func (ch *ClickHouseExportProcess) UpdateCH(input *ClickHouseExportProcess, dsn func (ch *ClickHouseExportProcess) GetCommitInterval() time.Duration { return ch.commitInterval } + +func (ch *ClickHouseExportProcess) UpdateTTL(ttl string) error { + + if _, err := strconv.ParseInt(ttl, 10, 64); err != nil { + klog.ErrorS(err, "Error when parsing ttl") + return err + } + query := "ALTER TABLE flows MODIFY TTL timeInserted + INTERVAL (?) SECOND" + if _, err := ch.db.Exec(query, ttl); err != nil { + klog.ErrorS(err, "Error when alter table ttl") + return err + } + return nil +} diff --git a/pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go b/pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go index 150ec493d97..fe70aaaab37 100644 --- a/pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go +++ b/pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go @@ -48,7 +48,7 @@ func TestGetDataSourceName(t *testing.T) { CommitInterval: 1 * time.Second, } *chInput.Compress = true - dsn := "tcp://click-house-svc:9000?username=username&password=password&database=default&debug=true&compress=true&commitInterval=1s" + dsn := "tcp://click-house-svc:9000?username=username&password=password&database=default&debug=true&compress=true" chInputInvalid := ClickHouseInput{} diff --git a/pkg/flowaggregator/flowaggregator.go b/pkg/flowaggregator/flowaggregator.go index bac1f7dfbf3..6a84458546a 100644 --- a/pkg/flowaggregator/flowaggregator.go +++ b/pkg/flowaggregator/flowaggregator.go @@ -18,7 +18,6 @@ import ( "bytes" "encoding/json" "fmt" - log "github.com/sirupsen/logrus" "os" "strconv" "strings" @@ -202,8 +201,7 @@ const ( type updateParam int const ( - updateIncludePodLabels updateParam = iota - updateExternalFlowCollectorAddr + updateExternalFlowCollectorAddr updateParam = iota updateClickHouseParam enableDefaultClickHouse disableClickHouse @@ -556,6 +554,7 @@ func (fa *flowAggregator) sendFlowKeyRecord(key ipfixintermediate.FlowKey, recor if err := fa.set.AddRecord(record.Record.GetOrderedElementList(), templateID); err != nil { return err } + // if there is ipfix connection issue when we enable both ipfix and clickhouse, clickhouse won't be able to do cacheset and the record status won't be changed neither. Is it expected? if fa.exportingProcess != nil { sentBytes, err := fa.exportingProcess.SendSet(fa.set) if err != nil { @@ -797,11 +796,6 @@ func (fa *flowAggregator) createInfoElementForTemplateSet(ieName string, enterpr return ie, nil } -func (fa *flowAggregator) SetFlowCollector() { - log.Print(fa.dbExportProcess.GetDsnMap()) - // todo -} - func (fa *flowAggregator) UpdateClickHouse(input string) { if fa.dbExportProcess == nil { klog.Errorf("ClickHouse hasn't been enabled yet") @@ -899,3 +893,14 @@ func (fa *flowAggregator) DisableFlowCollector() error { } return nil } + +func (fa *flowAggregator) UpdateTTL(ttl string) { + if fa.dbExportProcess == nil { + klog.Errorf("ClickHouse hasn't been enabled yet") + return + } + if err := fa.dbExportProcess.UpdateTTL(ttl); err != nil { + klog.Errorf("Error when updating ttl") + return + } +} diff --git a/pkg/flowaggregator/querier/querier.go b/pkg/flowaggregator/querier/querier.go index 3fa9347d026..44175883ff0 100644 --- a/pkg/flowaggregator/querier/querier.go +++ b/pkg/flowaggregator/querier/querier.go @@ -33,6 +33,7 @@ type FlowAggregatorQuerier interface { SetExternalFlowCollectorAddr(externalFlowCollectorAddr ExternalFlowCollectorAddr) DisableFlowCollector() error DisableClickHouse() error + UpdateTTL(ttl string) } type ExternalFlowCollectorAddr struct {