Skip to content

Commit

Permalink
Theia CLI and e2e test modification
Browse files Browse the repository at this point in the history
Signed-off-by: Yun-Tang Hsu <hsuy@vmware.com>
  • Loading branch information
yuntanghsu committed Nov 17, 2022
1 parent 8f85797 commit 61ed95e
Show file tree
Hide file tree
Showing 11 changed files with 86 additions and 577 deletions.
6 changes: 6 additions & 0 deletions build/charts/theia/templates/theia-cli/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,10 @@ rules:
- list
- create
- delete
- apiGroups:
- stats.theia.antrea.io
resources:
- clickhouse
verbs:
- get
{{- end }}
1 change: 1 addition & 0 deletions build/charts/theia/templates/theia-manager/service.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ metadata:
spec:
ports:
- port: {{ .Values.theiaManager.apiServer.apiPort }}
name: tcp
protocol: TCP
targetPort: theia-api-http
selector:
Expand Down
2 changes: 1 addition & 1 deletion ci/jenkins/test-vmc.sh
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ function deliver_antrea {

${GIT_CHECKOUT_DIR}/hack/generate-manifest.sh --ch-size 100Mi --ch-monitor-threshold 0.1 > ${GIT_CHECKOUT_DIR}/build/yamls/flow-visibility.yml
${GIT_CHECKOUT_DIR}/hack/generate-manifest.sh --no-grafana --spark-operator --theia-manager > ${GIT_CHECKOUT_DIR}/build/yamls/flow-visibility-with-spark.yml
${GIT_CHECKOUT_DIR}/hack/generate-manifest.sh --no-grafana > ${GIT_CHECKOUT_DIR}/build/yamls/flow-visibility-ch-only.yml
${GIT_CHECKOUT_DIR}/hack/generate-manifest.sh --no-grafana --theia-manager > ${GIT_CHECKOUT_DIR}/build/yamls/flow-visibility-ch-only.yml

# policy/v1beta1 is deprecated in v1.21+, unavailable in v1.25+, while policy/v1 is available in v1.21+
sed -i -e "s|apiVersion: policy/v1|apiVersion: policy/v1beta1|g" ${GIT_CHECKOUT_DIR}/build/yamls/flow-visibility.yml
Expand Down
2 changes: 1 addition & 1 deletion ci/kind/test-e2e-kind.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ TESTBED_CMD=$(dirname $0)"/kind-setup.sh"
YML_DIR=$(dirname $0)"/../../build/yamls"
FLOW_VISIBILITY_CMD=$(dirname $0)"/../../hack/generate-manifest.sh --ch-size 100Mi --ch-monitor-threshold 0.1"
FLOW_VISIBILITY_WITH_SPARK_CMD=$(dirname $0)"/../../hack/generate-manifest.sh --no-grafana --spark-operator --theia-manager"
FLOW_VISIBILITY_CH_ONLY_CMD=$(dirname $0)"/../../hack/generate-manifest.sh --no-grafana"
FLOW_VISIBILITY_CH_ONLY_CMD=$(dirname $0)"/../../hack/generate-manifest.sh --no-grafana --theia-manager"
CH_OPERATOR_YML=$(dirname $0)"/../../build/charts/theia/crds/clickhouse-operator-install-bundle.yaml"

make theia-linux
Expand Down
215 changes: 18 additions & 197 deletions pkg/theia/commands/clickhouse_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
package commands

import (
"database/sql"
"fmt"
"net/url"
"strings"

"github.com/spf13/cobra"
Expand All @@ -30,119 +28,14 @@ type chOptions struct {
stackTraces bool
}

type diskInfo struct {
shard string
name string
path string
freeSpace string
totalSpace string
usedPercentage string
}

type tableInfo struct {
shard string
database string
tableName string
totalRows sql.NullString
totalBytes sql.NullString
totalCols string
}

type insertRate struct {
shard string
rowsPerSec string
bytesPerSec string
}

type stackTraces struct {
shard string
traceFunctions string
count string
}

const (
diskQuery int = iota
tableInfoQuery
// average writing rate for all tables per second
insertRateQuery
stackTracesQuery
)

var queryMap = map[int]string{
diskQuery: `
SELECT
shardNum() as Shard,
name as DatabaseName,
path as Path,
formatReadableSize(free_space) as Free,
formatReadableSize(total_space) as Total,
TRUNCATE((1 - free_space/total_space) * 100, 2) as Used_Percentage
FROM cluster('{cluster}', system.disks);`,
tableInfoQuery: `
SELECT
Shard,
DatabaseName,
TableName,
TotalRows,
TotalBytes,
TotalCols
FROM (
SELECT
shardNum() as Shard,
database AS DatabaseName,
name AS TableName,
total_rows AS TotalRows,
formatReadableSize(total_bytes) AS TotalBytes
FROM cluster('{cluster}', system.tables) WHERE database = 'default'
) as t1
INNER JOIN (
SELECT
shardNum() as Shard,
table_catalog as DatabaseName,
table_name as TableName,
COUNT(*) as TotalCols
FROM cluster('{cluster}', INFORMATION_SCHEMA.COLUMNS)
WHERE table_catalog == 'default'
GROUP BY table_name, table_catalog, Shard
) as t2
ON t1.DatabaseName = t2.DatabaseName and t1.TableName = t2.TableName and t1.Shard = t2.Shard`,
// average writing rate for all tables per second
insertRateQuery: `
SELECT
sd.Shard,
sd.RowsPerSecond,
sd.BytesPerSecond
FROM (
SELECT
shardNum() as Shard,
(intDiv(toUInt32(date_trunc('minute', toDateTime(event_time))), 2) * 2) * 1000 as t,
TRUNCATE(avg(ProfileEvent_InsertedRows),0) as RowsPerSecond,
formatReadableSize(avg(ProfileEvent_InsertedBytes)) as BytesPerSecond,
ROW_NUMBER() OVER(PARTITION BY shardNum() ORDER BY t DESC) rowNumber
FROM cluster('{cluster}', system.metric_log)
GROUP BY t, shardNum()
ORDER BY t DESC, shardNum()
) sd
WHERE sd.rowNumber=1`,
stackTracesQuery: `
SELECT
shardNum() as Shard,
arrayStringConcat(arrayMap(x -> demangle(addressToSymbol(x)), trace), '\\n') AS trace_function,
count()
FROM cluster('{cluster}', system.stack_trace)
GROUP BY trace_function, Shard
ORDER BY count()
DESC SETTINGS allow_introspection_functions=1`,
}

var options *chOptions

var clickHouseStatusCmd = &cobra.Command{
Use: "status",
Short: "Get diagnostic infos of ClickHouse database",
Example: example,
Args: cobra.NoArgs,
RunE: getClickHouseStatus,
RunE: getStatus,
}

var example = strings.Trim(`
Expand All @@ -160,116 +53,44 @@ func init() {
clickHouseStatusCmd.Flags().BoolVar(&options.stackTraces, "stackTraces", false, "check stacktrace of clickhouse")
}

func getClickHouseStatus(cmd *cobra.Command, args []string) error {
func getStatus(cmd *cobra.Command, args []string) error {
if !options.diskInfo && !options.tableInfo && !options.insertRate && !options.stackTraces {
return fmt.Errorf("no metric related flag is specified")
}
kubeconfig, err := ResolveKubeConfig(cmd)
if err != nil {
return err
}
clientset, err := CreateK8sClient(kubeconfig)
if err != nil {
return fmt.Errorf("couldn't create k8s client using given kubeconfig, %v", err)
}

endpoint, err := cmd.Flags().GetString("clickhouse-endpoint")
if err != nil {
return err
}
if endpoint != "" {
_, err := url.ParseRequestURI(endpoint)
if err != nil {
return fmt.Errorf("failed to decode input endpoint %s into a url, err: %v", endpoint, err)
}
}
useClusterIP, err := cmd.Flags().GetBool("use-cluster-ip")
if err != nil {
return err
}
if err := CheckClickHousePod(clientset); err != nil {
return err
}
// Connect to ClickHouse and get the result
connect, pf, err := SetupClickHouseConnection(clientset, kubeconfig, endpoint, useClusterIP)
theiaClient, pf, err := SetupTheiaClientAndConnection(cmd, useClusterIP)
if err != nil {
return err
return fmt.Errorf("couldn't setup Theia manager client, %v", err)
}
if pf != nil {
defer pf.Stop()
}
var names []string
if options.diskInfo {
data, err := getDataFromClickHouse(connect, diskQuery)
if err != nil {
return fmt.Errorf("error when getting diskInfo from clickhouse: %v", err)
}
TableOutput(data)
names = append(names, "diskInfo")
}
if options.tableInfo {
data, err := getDataFromClickHouse(connect, tableInfoQuery)
if err != nil {
return fmt.Errorf("error when getting tableInfo from clickhouse: %v", err)
}
TableOutput(data)
names = append(names, "tableInfo")
}
if options.insertRate {
data, err := getDataFromClickHouse(connect, insertRateQuery)
if err != nil {
return fmt.Errorf("error when getting insertRate from clickhouse: %v", err)
}
TableOutput(data)
names = append(names, "insertRate")
}
if options.stackTraces {
data, err := getDataFromClickHouse(connect, stackTracesQuery)
if err != nil {
return fmt.Errorf("error when getting stackTraces from clickhouse: %v", err)
}
TableOutputVertical(data)
names = append(names, "stackTraces")
}
return nil
}

func getDataFromClickHouse(connect *sql.DB, query int) ([][]string, error) {
result, err := connect.Query(queryMap[query])
if err != nil {
return nil, fmt.Errorf("failed to get data from clickhouse: %v", err)
}
defer result.Close()
columnName, err := result.Columns()
if err != nil {
return nil, fmt.Errorf("failed to get the name of columns: %v", err)
}
var data [][]string
data = append(data, columnName)
for result.Next() {
var err error
switch query {
case diskQuery:
var res diskInfo
err = result.Scan(&res.shard, &res.name, &res.path, &res.freeSpace, &res.totalSpace, &res.usedPercentage)
data = append(data, []string{res.shard, res.name, res.path, res.freeSpace, res.totalSpace, res.usedPercentage + " %"})
case tableInfoQuery:
res := tableInfo{}
err = result.Scan(&res.shard, &res.database, &res.tableName, &res.totalRows, &res.totalBytes, &res.totalCols)
if !res.totalRows.Valid || !res.totalBytes.Valid {
continue
}
data = append(data, []string{res.shard, res.database, res.tableName, res.totalRows.String, res.totalBytes.String, res.totalCols})
case insertRateQuery:
res := insertRate{}
err = result.Scan(&res.shard, &res.rowsPerSec, &res.bytesPerSec)
data = append(data, []string{res.shard, res.rowsPerSec, res.bytesPerSec})
case stackTracesQuery:
res := stackTraces{}
err = result.Scan(&res.shard, &res.traceFunctions, &res.count)
data = append(data, []string{res.shard, res.traceFunctions, res.count})
}
for _, name := range names {
data, err := getClickHouseStatusByCategory(theiaClient, name)
if err != nil {
return nil, fmt.Errorf("failed to parse the data returned by database: %v", err)
return fmt.Errorf("error when getting clickhouse %v status: %s", name, err)
}
if name == "stackTraces" {
TableOutputVertical(data.Result)
} else {
TableOutput(data.Result)
}
}
if len(data) <= 1 {
return nil, fmt.Errorf("no data is returned by database")
}
return data, nil
return nil
}
4 changes: 2 additions & 2 deletions pkg/theia/commands/policy_recommendation_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (

"github.com/spf13/cobra"

"antrea.io/theia/pkg/util/policyrecommendation"
"antrea.io/theia/pkg/util"
)

// policyRecommendationDeleteCmd represents the policy-recommendation delete command
Expand All @@ -45,7 +45,7 @@ func policyRecommendationDelete(cmd *cobra.Command, args []string) error {
if prName == "" && len(args) == 1 {
prName = args[0]
}
err = policyrecommendation.ParseRecommendationName(prName)
err = util.ParseRecommendationName(prName)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/theia/commands/policy_recommendation_retrieve.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (

"github.com/spf13/cobra"

"antrea.io/theia/pkg/util/policyrecommendation"
"antrea.io/theia/pkg/util"
)

// policyRecommendationRetrieveCmd represents the policy-recommendation retrieve command
Expand Down Expand Up @@ -67,7 +67,7 @@ func policyRecommendationRetrieve(cmd *cobra.Command, args []string) error {
if prName == "" && len(args) == 1 {
prName = args[0]
}
err = policyrecommendation.ParseRecommendationName(prName)
err = util.ParseRecommendationName(prName)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/theia/commands/policy_recommendation_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (

"github.com/spf13/cobra"

"antrea.io/theia/pkg/util/policyrecommendation"
"antrea.io/theia/pkg/util"
)

// policyRecommendationStatusCmd represents the policy-recommendation status command
Expand Down Expand Up @@ -58,7 +58,7 @@ func policyRecommendationStatus(cmd *cobra.Command, args []string) error {
if prName == "" && len(args) == 1 {
prName = args[0]
}
err = policyrecommendation.ParseRecommendationName(prName)
err = util.ParseRecommendationName(prName)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 61ed95e

Please sign in to comment.