Skip to content

Commit

Permalink
Add update ttl for antctl
Browse files Browse the repository at this point in the history
Signed-off-by: Yun-Tang Hsu <hsuy@hsuy-a01.vmware.com>
  • Loading branch information
Yun-Tang Hsu committed Apr 16, 2022
1 parent 9467396 commit 7f49f4f
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 22 deletions.
37 changes: 26 additions & 11 deletions pkg/antctl/antctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,17 +512,27 @@ var CommandList = &commandList{
{
use: "update",
short: "Update parameters used in flow aggregator",
long: "Update parameters used in flow aggregator. It includes logTicker.",
example: ` Update logTicker
$ antctl updateparam -l 2m
Update podLabels
$ antctl updateparam -p true
Update externalFlowCollectorAddr
$ antctl updateparam -e
Update activeFlowRecordTimeout
$ antctl activeflowrecordtimeout -a 120s
Update inactiveFlowRecordTimeout
$ antctl inactiveflowrecordtimeout -i 240s`,
example: `
Enable clickhouse
$ antctl update --enable clickhouse
Update database
$ antctl update --database name
Update databaseURL
$ antctl update -d Http://xxxxx
Update debug
$ antctl update --debug true
Update compress
$ antctl update --compress true
Update commitInterval
$ antctl update -i 10s
Update externalflowcollectoraddr
$ antctl update -e <IP>:<port>[:<proto>]
Update disable clickhouse/flow-collecotr
$ antctl update --disable clickhouse/flow-collecotr
Update ttl
$ antctl update --ttl 100
`,
commandGroup: flat,
flowAggregatorEndpoint: &endpoint{
nonResourceEndpoint: &nonResourceEndpoint{
Expand Down Expand Up @@ -573,6 +583,11 @@ var CommandList = &commandList{
supportedValues: []string{"clickhouse", "flow-collector"},
usage: "Disable clickhouse or flow-collector",
},
{
name: "ttl",
shorthand: "",
usage: "Modify table ttl (unit second) for clickhouse",
},
},
},
},
Expand Down
11 changes: 9 additions & 2 deletions pkg/flowaggregator/apiserver/handlers/update/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
package update

import (
"antrea.io/antrea/pkg/flowaggregator/querier"
"antrea.io/antrea/pkg/util/flowexport"
"net"
"net/http"
"strings"

"antrea.io/antrea/pkg/flowaggregator/querier"
"antrea.io/antrea/pkg/util/flowexport"
)

const (
Expand All @@ -41,10 +42,12 @@ func HandleFunc(faq querier.FlowAggregatorQuerier) http.HandlerFunc {
if disableFlowCollector == "clickhouse" {
if err := faq.DisableClickHouse(); err != nil {
http.Error(w, "Error when disable clickhouse: "+err.Error(), http.StatusNotFound)
return
}
} else if disableFlowCollector == "flow-collector" {
if err := faq.DisableFlowCollector(); err != nil {
http.Error(w, "Error when disable Flow-collector: "+err.Error(), http.StatusNotFound)
return
}
}
}
Expand Down Expand Up @@ -88,5 +91,9 @@ func HandleFunc(faq querier.FlowAggregatorQuerier) http.HandlerFunc {
if sb.Len() != 0 {
faq.UpdateClickHouse(sb.String())
}
ttl := r.URL.Query().Get("ttl")
if ttl != "" {
faq.UpdateTTL(ttl)
}
}
}
16 changes: 16 additions & 0 deletions pkg/flowaggregator/clickhouseclient/clickhouseclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package clickhouseclient
import (
"database/sql"
"fmt"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -79,6 +80,7 @@ const (
reverseThroughputFromDestinationNode)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
//ttlQuery = `ALTER TABLE flows MODIFY TTL timeInserted + INTERVAL ? SECOND`
)

type ClickHouseExportProcess struct {
Expand Down Expand Up @@ -563,3 +565,17 @@ func (ch *ClickHouseExportProcess) UpdateCH(input *ClickHouseExportProcess, dsn
func (ch *ClickHouseExportProcess) GetCommitInterval() time.Duration {
return ch.commitInterval
}

func (ch *ClickHouseExportProcess) UpdateTTL(ttl string) error {

if _, err := strconv.ParseInt(ttl, 10, 64); err != nil {
klog.ErrorS(err, "Error when parsing ttl")
return err
}
query := "ALTER TABLE flows MODIFY TTL timeInserted + INTERVAL (?) SECOND"
if _, err := ch.db.Exec(query, ttl); err != nil {
klog.ErrorS(err, "Error when alter table ttl")
return err
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestGetDataSourceName(t *testing.T) {
CommitInterval: 1 * time.Second,
}
*chInput.Compress = true
dsn := "tcp://click-house-svc:9000?username=username&password=password&database=default&debug=true&compress=true&commitInterval=1s"
dsn := "tcp://click-house-svc:9000?username=username&password=password&database=default&debug=true&compress=true"

chInputInvalid := ClickHouseInput{}

Expand Down
21 changes: 13 additions & 8 deletions pkg/flowaggregator/flowaggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"bytes"
"encoding/json"
"fmt"
log "github.com/sirupsen/logrus"
"os"
"strconv"
"strings"
Expand Down Expand Up @@ -202,8 +201,7 @@ const (
type updateParam int

const (
updateIncludePodLabels updateParam = iota
updateExternalFlowCollectorAddr
updateExternalFlowCollectorAddr updateParam = iota
updateClickHouseParam
enableDefaultClickHouse
disableClickHouse
Expand Down Expand Up @@ -556,6 +554,7 @@ func (fa *flowAggregator) sendFlowKeyRecord(key ipfixintermediate.FlowKey, recor
if err := fa.set.AddRecord(record.Record.GetOrderedElementList(), templateID); err != nil {
return err
}
// if there is ipfix connection issue when we enable both ipfix and clickhouse, clickhouse won't be able to do cacheset and the record status won't be changed neither. Is it expected?
if fa.exportingProcess != nil {
sentBytes, err := fa.exportingProcess.SendSet(fa.set)
if err != nil {
Expand Down Expand Up @@ -797,11 +796,6 @@ func (fa *flowAggregator) createInfoElementForTemplateSet(ieName string, enterpr
return ie, nil
}

func (fa *flowAggregator) SetFlowCollector() {
log.Print(fa.dbExportProcess.GetDsnMap())
// todo
}

func (fa *flowAggregator) UpdateClickHouse(input string) {
if fa.dbExportProcess == nil {
klog.Errorf("ClickHouse hasn't been enabled yet")
Expand Down Expand Up @@ -899,3 +893,14 @@ func (fa *flowAggregator) DisableFlowCollector() error {
}
return nil
}

func (fa *flowAggregator) UpdateTTL(ttl string) {
if fa.dbExportProcess == nil {
klog.Errorf("ClickHouse hasn't been enabled yet")
return
}
if err := fa.dbExportProcess.UpdateTTL(ttl); err != nil {
klog.Errorf("Error when updating ttl")
return
}
}
1 change: 1 addition & 0 deletions pkg/flowaggregator/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type FlowAggregatorQuerier interface {
SetExternalFlowCollectorAddr(externalFlowCollectorAddr ExternalFlowCollectorAddr)
DisableFlowCollector() error
DisableClickHouse() error
UpdateTTL(ttl string)
}

type ExternalFlowCollectorAddr struct {
Expand Down

0 comments on commit 7f49f4f

Please sign in to comment.