From ffa2b41698f9223c52e0af14ba12e77c8986ca19 Mon Sep 17 00:00:00 2001 From: Yun-Tang Hsu Date: Fri, 24 Jun 2022 12:12:03 -0700 Subject: [PATCH] add stacktrace flag Signed-off-by: Yun-Tang Hsu --- ci/kind/test-e2e-kind.sh | 2 + docs/theia-cli.md | 91 +++++++- go.mod | 6 - go.sum | 9 +- .../{clickhouse/command.go => clickhouse.go} | 20 +- .../command.go => clickhouse_status.go} | 206 +++++++++++------- .../commands/policy_recommendation_delete.go | 7 +- .../commands/policy_recommendation_list.go | 5 +- .../policy_recommendation_retrieve.go | 10 +- .../policy_recommendation_retrieve_test.go | 3 +- .../commands/policy_recommendation_run.go | 17 +- .../commands/policy_recommendation_status.go | 3 +- pkg/theia/commands/root.go | 3 - pkg/theia/{util => commands}/utils.go | 20 +- pkg/theia/commands/utils_test.go | 5 +- test/e2e/fixture.go | 4 +- test/e2e/flowvisibility_test.go | 11 +- test/e2e/framework.go | 75 ++++--- test/e2e/policyrecommendation_test.go | 4 +- ...a_get_test.go => theia_clickhouse_test.go} | 126 ++++++----- 20 files changed, 396 insertions(+), 231 deletions(-) rename pkg/theia/commands/{clickhouse/command.go => clickhouse.go} (61%) rename pkg/theia/commands/{clickhouse/status/command.go => clickhouse_status.go} (50%) rename pkg/theia/{util => commands}/utils.go (93%) rename test/e2e/{theia_get_test.go => theia_clickhouse_test.go} (74%) diff --git a/ci/kind/test-e2e-kind.sh b/ci/kind/test-e2e-kind.sh index 85346760..e45acb95 100755 --- a/ci/kind/test-e2e-kind.sh +++ b/ci/kind/test-e2e-kind.sh @@ -40,6 +40,7 @@ TESTBED_CMD=$(dirname $0)"/kind-setup.sh" YML_DIR=$(dirname $0)"/../../build/yamls" FLOW_VISIBILITY_CMD=$(dirname $0)"/../../hack/generate-manifest.sh --ch-size 100Mi --ch-monitor-threshold 0.1" FLOW_VISIBILITY_WITH_SPARK_CMD=$(dirname $0)"/../../hack/generate-manifest.sh --no-grafana --spark-operator" +FLOW_VISIBILITY_CH_ONLY_CMD=$(dirname $0)"/../../hack/generate-manifest.sh --no-grafana" CH_OPERATOR_YML=$(dirname $0)"/../../build/charts/theia/crds/clickhouse-operator-install-bundle.yaml" make theia-linux @@ -161,6 +162,7 @@ function run_test { docker exec -i kind-control-plane dd of=/root/clickhouse-operator-install-bundle.yaml < $CH_OPERATOR_YML $FLOW_VISIBILITY_CMD | docker exec -i kind-control-plane dd of=/root/flow-visibility.yml $FLOW_VISIBILITY_WITH_SPARK_CMD | docker exec -i kind-control-plane dd of=/root/flow-visibility-with-spark.yml + $FLOW_VISIBILITY_CH_ONLY_CMD | docker exec -i kind-control-plane dd of=/root/flow-visibility-ch-only.yml docker exec -i kind-control-plane dd of=/root/theia < $THEIACTL_BIN diff --git a/docs/theia-cli.md b/docs/theia-cli.md index 9b8c4a76..f37f31ce 100644 --- a/docs/theia-cli.md +++ b/docs/theia-cli.md @@ -8,6 +8,12 @@ visibility capabilities. - [Installation](#installation) - [Usage](#usage) + - [NetworkPolicy Recommendation feature](#networkpolicy-recommendation-feature) + - [ClickHouse](#clickhouse) + - [Disk usage information](#disk-usage-information) + - [Table Information](#table-information) + - [Insertion rate](#insertion-rate) + - [Stack trace](#stack-trace) ## Installation @@ -36,8 +42,11 @@ theia help ## Usage -To see the list of available commands and options, run `theia help`. Currently, -we have 5 commands for the NetworkPolicy Recommendation feature: +To see the list of available commands and options, run `theia help`. + +### NetworkPolicy Recommendation feature + +We currently have 5 commands for NetworkPolicy Recommendation: - `theia policy-recommendation run` - `theia policy-recommendation status` @@ -47,3 +56,81 @@ we have 5 commands for the NetworkPolicy Recommendation feature: For details, please refer to [NetworkPolicy recommendation doc]( networkpolicy-recommendation.md) + +### ClickHouse + +From Theia v0.2, we introduce one command for ClickHouse: + +- `theia clickhouse status [flags]` + +#### Disk usage information + +The `--diskInfo` flag will list disk usage information of each clickhouse shard. `Shard`, `DatabaseName`, `Path`, `Free` +, `Total` and `Used_Percentage`of each clickhouse shard will be displayed in table format. For example: + +```bash +> theia clickhouse status --diskInfo +Shard DatabaseName Path Free Total Used_Percentage +1 default /var/lib/clickhouse/ 1.84 GiB 1.84 GiB 0.04 % +``` + +#### Table Information + +The `--tableInfo` flag will list basic table information of each clickhouse shard. `Shard`, `DatabaseName`, `TableName`, +`TotalRows`, `TotalBytes` and `TotalCol`of tables in each clickhouse shard will be displayed in table format. For example: + +```bash +> theia clickhouse status --tableInfo +Shard DatabaseName TableName TotalRows TotalBytes TotalCols +1 default .inner.flows_node_view 7 2.84 KiB 16 +1 default .inner.flows_pod_view 131 5.00 KiB 20 +1 default .inner.flows_policy_view 131 6.28 KiB 27 +1 default flows 267 18.36 KiB 49 +1 default flows_node_view +1 default flows_pod_view +1 default flows_policy_view +1 default recommendations 0 0.00 B 4 +``` + +#### Insertion rate + +The `--insertRate` flag will list the insertion rate of each clickhouse shard. `Shard`, `RowsPerSecond`, and +`BytesPerSecond` of each clickhouse shard will be displayed in table format. For example: + +```bash +> theia clickhouse status --insertRate +Shard RowsPerSecond BytesPerSecond +1 230 6.31 KiB +``` + +#### Stack trace + +If ClickHouse is busy with something, and you don’t know what’s happening, you can check the stacktraces of all +the thread which are working. + +The `--stackTraces` flag will list the stacktraces of each clickhouse shard. `Shard`, `trace_function`, and +`count()` of each clickhouse shard will be displayed in table format. For example: + +```bash +> theia clickhouse status --stackTraces +Row 1: +------- +Shard: 1 +trace_functions: pthread_cond_timedwait@@GLIBC_2.3.2\nPoco::EventImpl::waitImpl(long)\nPoco::NotificationQueue:: +waitDequeueNotification(long)\nDB::BackgroundSchedulePool::threadFunction()\n\nThreadPoolImpl:: +worker(std::__1::__list_iterator)\n\nstart_thread\n__clone +count(): 128 + +Row 2: +------- +Shard: 1 +trace_functions: __poll\nPoco::Net::SocketImpl::pollImpl(Poco::Timespan&, int)\nPoco::Net::SocketImpl::poll(Poco:: +Timespan const&, int)\nPoco::Net::TCPServer::run()\nPoco::ThreadImpl::runnableEntry(void*)\nstart_thread\n__clone +count(): 5 +``` + + + + + + diff --git a/go.mod b/go.mod index fad08ef8..dad6b614 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,6 @@ require ( github.com/containernetworking/plugins v0.8.7 github.com/google/uuid v1.1.2 github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd - github.com/olekukonko/tablewriter v0.0.4 github.com/sirupsen/logrus v1.8.1 github.com/spf13/cobra v1.4.0 github.com/stretchr/testify v1.7.0 @@ -54,11 +53,6 @@ require ( github.com/imdario/mergo v0.3.12 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/json-iterator/go v1.1.11 // indirect - github.com/k8snetworkplumbingwg/network-attachment-definition-client v1.1.0 // indirect - github.com/mailru/easyjson v0.7.0 // indirect - github.com/mattn/go-colorable v0.1.8 // indirect - github.com/mattn/go-isatty v0.0.12 // indirect - github.com/mattn/go-runewidth v0.0.13 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect github.com/moby/spdystream v0.2.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect diff --git a/go.sum b/go.sum index 6e498d33..c0d8cea7 100644 --- a/go.sum +++ b/go.sum @@ -330,7 +330,6 @@ github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW github.com/golangplus/testing v0.0.0-20180327235837-af21d9c3145e/go.mod h1:0AA//k/eakGydO4jKRoRL2j92ZKSzTgj9tclaCrvXHk= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= -github.com/google/btree v1.0.1 h1:gK4Kx5IaGY9CD5sPJ36FHiBJ6ZXl0kilRiiCj+jdYp4= github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -477,8 +476,6 @@ github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Ky github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/mattn/go-runewidth v0.0.12/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk= -github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU= -github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/mattn/go-shellwords v1.0.3/go.mod h1:3xCvwCdWdlDJUrvuMn7Wuy9eWs4pE8vqg+NOMyg4B2o= github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= @@ -538,7 +535,6 @@ github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= -github.com/olekukonko/tablewriter v0.0.4 h1:vHD/YYe1Wolo78koG299f7V/VAS08c6IpCLn+Ejf/w8= github.com/olekukonko/tablewriter v0.0.4/go.mod h1:zq6QwlOf5SlnkVbMSr5EoBv3636FWnp+qbPhuoO21uA= github.com/onsi/ginkgo v0.0.0-20151202141238-7f8ab55aaf3b/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -643,7 +639,6 @@ github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasO github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk= github.com/spf13/afero v1.4.1/go.mod h1:Ai8FlHk4v/PARR026UzYexafAt9roJ7LcLMAmO6Z93I= -github.com/spf13/afero v1.6.0 h1:xoax2sJ2DT8S8xA2paPFjDCScCNeWsg75VG0DLRreiY= github.com/spf13/afero v1.6.0/go.mod h1:Ai8FlHk4v/PARR026UzYexafAt9roJ7LcLMAmO6Z93I= github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= @@ -677,6 +672,10 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/ti-mo/conntrack v0.4.0/go.mod h1:L0vkIzG/TECsuVYMMlID9QWmZQLjyP9gDq8XKTlbg4Q= github.com/ti-mo/netfilter v0.3.1/go.mod h1:t/5HvCCHA1LAYj/AZF2fWcJ23BQTA7lzTPCuwwi7xQY= +github.com/tidwall/gjson v1.14.1 h1:iymTbGkQBhveq21bEvAQ81I0LEBork8BFe1CUZXdyuo= +github.com/tidwall/gjson v1.14.1/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= diff --git a/pkg/theia/commands/clickhouse/command.go b/pkg/theia/commands/clickhouse.go similarity index 61% rename from pkg/theia/commands/clickhouse/command.go rename to pkg/theia/commands/clickhouse.go index fd96c087..3dc8673e 100644 --- a/pkg/theia/commands/clickhouse/command.go +++ b/pkg/theia/commands/clickhouse.go @@ -12,25 +12,33 @@ // See the License for the specific language governing permissions and // limitations under the License. -package clickhouse +package commands import ( "fmt" "github.com/spf13/cobra" - - "antrea.io/theia/pkg/theia/commands/clickhouse/status" ) -var ClickHouseCmd = &cobra.Command{ +var clickHouseCmd = &cobra.Command{ Use: "clickhouse", Aliases: []string{"ch"}, Short: "Commands of Theia ClickHouse feature", Run: func(cmd *cobra.Command, args []string) { - fmt.Println("Error: must also specify a subcommand to run") + fmt.Println("Error: must also specify a subcommand to run like status") }, } func init() { - ClickHouseCmd.AddCommand(status.Command) + rootCmd.AddCommand(clickHouseCmd) + clickHouseCmd.PersistentFlags().String( + "clickhouse-endpoint", + "", + "The ClickHouse service endpoint.") + clickHouseCmd.PersistentFlags().Bool( + "use-cluster-ip", + false, + `Enable this option will use ClusterIP instead of port forwarding when connecting to the ClickHouse Service. +It can only be used when running in cluster.`, + ) } diff --git a/pkg/theia/commands/clickhouse/status/command.go b/pkg/theia/commands/clickhouse_status.go similarity index 50% rename from pkg/theia/commands/clickhouse/status/command.go rename to pkg/theia/commands/clickhouse_status.go index 7d7b83d0..ecb987cd 100644 --- a/pkg/theia/commands/clickhouse/status/command.go +++ b/pkg/theia/commands/clickhouse_status.go @@ -12,26 +12,22 @@ // See the License for the specific language governing permissions and // limitations under the License. -package status +package commands import ( "database/sql" "fmt" "net/url" - "os" "strings" - "github.com/olekukonko/tablewriter" "github.com/spf13/cobra" - - "antrea.io/theia/pkg/theia/util" ) type chOptions struct { diskInfo bool tableInfo bool insertRate bool - formatTable bool + stackTraces bool } type diskInfo struct { @@ -43,7 +39,7 @@ type diskInfo struct { usedPercentage string } -type tableInfoBasic struct { +type tableInfo struct { shard string database string tableName string @@ -52,73 +48,114 @@ type tableInfoBasic struct { totalCols string } -type writeRowsPerSec struct { +type insertRate struct { shard string rowsPerSec string bytesPerSec string } +type stackTraces struct { + shard string + traceFunctions string + count string +} + const ( - diskQuery = "SELECT shardNum() as shard, name as Name, path as Path, formatReadableSize(free_space) as Free," + - " formatReadableSize(total_space) as Total, TRUNCATE((1 - free_space/total_space) * 100, 2) as Used_Percentage FROM " + - "cluster('{cluster}', system.disks) ;" - tableInfoBasicQuery = "SELECT shard, DatabaseName, TableName, TotalRows, TotalBytes, TotalCols FROM (SELECT " + - "shardNum() as shard, database AS DatabaseName, name AS TableName, total_rows AS TotalRows, " + - "formatReadableSize(total_bytes) AS TotalBytes FROM cluster('{cluster}', system.tables) WHERE database = " + - "'default') as t1 INNER JOIN(SELECT shardNum() as shard, table_catalog as DatabaseName, table_name as " + - "TableName, COUNT(*) as TotalCols FROM cluster('{cluster}', INFORMATION_SCHEMA.COLUMNS) WHERE table_catalog == " + - "'default' GROUP BY table_name, table_catalog, shard) as t2 ON t1.DatabaseName = t2.DatabaseName and " + - "t1.TableName = t2.TableName and t1.shard = t2.shard" + diskQuery = ` +SELECT shardNum() as Shard, + name as DatabaseName, + path as Path, + formatReadableSize(free_space) as Free, + formatReadableSize(total_space) as Total, + TRUNCATE((1 - free_space/total_space) * 100, 2) as Used_Percentage +FROM cluster('{cluster}', system.disks);` + + tableInfoQuery = ` +SELECT Shard, + DatabaseName, + TableName, + TotalRows, + TotalBytes, + TotalCols +FROM ( + SELECT shardNum() as Shard, + database AS DatabaseName, + name AS TableName, + total_rows AS TotalRows, + formatReadableSize(total_bytes) AS TotalBytes + FROM cluster('{cluster}', system.tables) WHERE database = 'default' + ) as t1 + INNER JOIN ( + SELECT shardNum() as Shard, + table_catalog as DatabaseName, + table_name as TableName, + COUNT(*) as TotalCols + FROM cluster('{cluster}', INFORMATION_SCHEMA.COLUMNS) + WHERE table_catalog == 'default' + GROUP BY table_name, table_catalog, Shard + ) as t2 + ON t1.DatabaseName = t2.DatabaseName and t1.TableName = t2.TableName and t1.Shard = t2.Shard` // average writing rate for all tables per second - writePerSecQuery = "SELECT sd.shard, sd.Rows_per_second, sd.Bytes_per_second FROM (SELECT shardNum() as " + - "shard, (intDiv(toUInt32(date_trunc('minute', toDateTime(event_time))), 2) * 2) * 1000 as t, " + - "TRUNCATE(avg(ProfileEvent_InsertedRows),0) as Rows_per_second, formatReadableSize(avg(ProfileEvent_InsertedBytes)) as Bytes_per_second, " + - "ROW_NUMBER() OVER(PARTITION BY shardNum() ORDER BY t DESC) rowNumber FROM cluster('{cluster}', " + - "system.metric_log) GROUP BY t, shardNum() ORDER BY t DESC, shardNum()) sd WHERE sd.rowNumber=1" + insertRateQuery = ` +SELECT sd.Shard, + sd.RowsPerSecond, + sd.BytesPerSecond +FROM ( + SELECT shardNum() as Shard, + (intDiv(toUInt32(date_trunc('minute', toDateTime(event_time))), 2) * 2) * 1000 as t, + TRUNCATE(avg(ProfileEvent_InsertedRows),0) as RowsPerSecond, + formatReadableSize(avg(ProfileEvent_InsertedBytes)) as BytesPerSecond, + ROW_NUMBER() OVER(PARTITION BY shardNum() ORDER BY t DESC) rowNumber + FROM cluster('{cluster}', system.metric_log) + GROUP BY t, shardNum() + ORDER BY t DESC, shardNum() + ) sd +WHERE sd.rowNumber=1` + + stackTracesQuery = ` +SELECT shardNum() as Shard, + arrayStringConcat(arrayMap(x -> demangle(addressToSymbol(x)), trace), '\\n') AS trace_function, + count() +FROM cluster('{cluster}', system.stack_trace) +GROUP BY trace_function, Shard +ORDER BY count() +DESC SETTINGS allow_introspection_functions=1` ) var options *chOptions -// Command is the support bundle command implementation. -var Command *cobra.Command +var clickHouseStatusCmd = &cobra.Command{ + Use: "status", + Short: "Get diagnostic infos of ClickHouse database", + Example: example, + Args: cobra.NoArgs, + RunE: getClickHouseStatus, +} var example = strings.Trim(` -theia clickhouse status --storage -theia clickhouse status --storage --print-table -theia clickhouse status --storage --record-number --insertion-rate --print-table +theia clickhouse status --diskInfo +theia clickhouse status --diskInfo --tableInfo +theia clickhouse status --diskInfo --tableInfo --insertRate `, "\n") func init() { - Command = &cobra.Command{ - Use: "status", - Short: "Get diagnostic infos of ClickHouse database", - Example: example, - Args: cobra.NoArgs, - RunE: getClickHouseStatus, - } + clickHouseCmd.AddCommand(clickHouseStatusCmd) options = &chOptions{} - Command.Flags().BoolVar(&options.diskInfo, "diskInfo", false, "check storage") - Command.Flags().BoolVar(&options.tableInfo, "tableInfo", false, "check number of records") - Command.Flags().BoolVar(&options.insertRate, "insertion-rate", false, "check insertion-rate") - Command.Flags().BoolVar(&options.formatTable, "print-table", false, "output data in table format") - Command.Flags().String("clickhouse-endpoint", "", "The ClickHouse service endpoint.") - Command.Flags().Bool( - "use-cluster-ip", - false, - `Enable this option will use Service ClusterIP instead of port forwarding when connecting to the ClickHouse service. -It can only be used when running theia in cluster.`, - ) + clickHouseStatusCmd.Flags().BoolVar(&options.diskInfo, "diskInfo", false, "check disk usage information") + clickHouseStatusCmd.Flags().BoolVar(&options.tableInfo, "tableInfo", false, "check basic table information") + clickHouseStatusCmd.Flags().BoolVar(&options.insertRate, "insertRate", false, "check the insertion-rate of clickhouse") + clickHouseStatusCmd.Flags().BoolVar(&options.stackTraces, "stackTraces", false, "check stacktrace of clickhouse") } func getClickHouseStatus(cmd *cobra.Command, args []string) error { - if !options.diskInfo && !options.tableInfo && !options.insertRate { + if !options.diskInfo && !options.tableInfo && !options.insertRate && !options.stackTraces { return fmt.Errorf("no metric related flag is specified") } - kubeconfig, err := util.ResolveKubeConfig(cmd) + kubeconfig, err := ResolveKubeConfig(cmd) if err != nil { return err } - clientset, err := util.CreateK8sClient(kubeconfig) + clientset, err := CreateK8sClient(kubeconfig) if err != nil { return fmt.Errorf("couldn't create k8s client using given kubeconfig, %v", err) } @@ -137,11 +174,11 @@ func getClickHouseStatus(cmd *cobra.Command, args []string) error { if err != nil { return err } - if err := util.CheckClickHousePod(clientset); err != nil { + if err := CheckClickHousePod(clientset); err != nil { return err } // Connect to ClickHouse and get the result - connect, pf, err := util.SetupClickHouseConnection(clientset, kubeconfig, endpoint, useClusterIP) + connect, pf, err := SetupClickHouseConnection(clientset, kubeconfig, endpoint, useClusterIP) if err != nil { return err } @@ -153,39 +190,28 @@ func getClickHouseStatus(cmd *cobra.Command, args []string) error { if err != nil { return err } - if options.formatTable { - printTable(data) - } else { - for _, arr := range data { - fmt.Println(arr) - } - } + TableOutput(data) } if options.tableInfo { data, err := getTableInfoBasicFromClickHouse(connect) if err != nil { return err } - if options.formatTable { - printTable(data) - } else { - for _, arr := range data { - fmt.Println(arr) - } - } + TableOutput(data) } if options.insertRate { data, err := getWritingRateFromClickHouse(connect) if err != nil { return err } - if options.formatTable { - printTable(data) - } else { - for _, arr := range data { - fmt.Println(arr) - } + TableOutput(data) + } + if options.stackTraces { + data, err := getStackTracesFromClickHouse(connect) + if err != nil { + return err } + TableOutputVertical(data) } return nil } @@ -205,7 +231,7 @@ func getDiskInfoFromClickHouse(connect *sql.DB) ([][]string, error) { for result.Next() { var res diskInfo result.Scan(&res.shard, &res.name, &res.path, &res.freeSpace, &res.totalSpace, &res.usedPercentage) - data = append(data, []string{res.shard, res.name, res.path, res.freeSpace, res.totalSpace, res.usedPercentage}) + data = append(data, []string{res.shard, res.name, res.path, res.freeSpace, res.totalSpace, res.usedPercentage + " %"}) } if len(data) <= 1 { return nil, fmt.Errorf("no data is returned by database") @@ -214,7 +240,7 @@ func getDiskInfoFromClickHouse(connect *sql.DB) ([][]string, error) { } func getTableInfoBasicFromClickHouse(connect *sql.DB) ([][]string, error) { - result, err := connect.Query(tableInfoBasicQuery) + result, err := connect.Query(tableInfoQuery) if err != nil { return nil, fmt.Errorf("failed to get table information: %v", err) } @@ -226,7 +252,7 @@ func getTableInfoBasicFromClickHouse(connect *sql.DB) ([][]string, error) { var data [][]string data = append(data, columnName) for result.Next() { - res := tableInfoBasic{} + res := tableInfo{} result.Scan(&res.shard, &res.database, &res.tableName, &res.totalRows, &res.totalBytes, &res.totalCols) data = append(data, []string{res.shard, res.database, res.tableName, res.totalRows, res.totalBytes, res.totalCols}) } @@ -237,7 +263,7 @@ func getTableInfoBasicFromClickHouse(connect *sql.DB) ([][]string, error) { } func getWritingRateFromClickHouse(connect *sql.DB) ([][]string, error) { - result, err := connect.Query(writePerSecQuery) + result, err := connect.Query(insertRateQuery) if err != nil { return nil, fmt.Errorf("failed to get insertion rate: %v", err) } @@ -249,7 +275,7 @@ func getWritingRateFromClickHouse(connect *sql.DB) ([][]string, error) { var data [][]string data = append(data, columnName) for result.Next() { - res := writeRowsPerSec{} + res := insertRate{} result.Scan(&res.shard, &res.rowsPerSec, &res.bytesPerSec) data = append(data, []string{res.shard, res.rowsPerSec, res.bytesPerSec}) } @@ -259,13 +285,25 @@ func getWritingRateFromClickHouse(connect *sql.DB) ([][]string, error) { return data, nil } -func printTable(data [][]string) { - table := tablewriter.NewWriter(os.Stdout) - //table, _ := tablewriter.NewCSV(os.Stdout, "./test_info.csv", true) - //table.SetAlignment(tablewriter.ALIGN_LEFT) - table.SetHeader(data[0]) - for i := 1; i < len(data); i++ { - table.Append(data[i]) +func getStackTracesFromClickHouse(connect *sql.DB) ([][]string, error) { + result, err := connect.Query(stackTracesQuery) + if err != nil { + return nil, fmt.Errorf("failed to get clickhouse stack trace: %v", err) } - table.Render() + defer result.Close() + columnName, err := result.Columns() + if err != nil { + return nil, fmt.Errorf("failed to get the head of stack trace: %v", err) + } + var data [][]string + data = append(data, columnName) + for result.Next() { + res := stackTraces{} + result.Scan(&res.shard, &res.traceFunctions, &res.count) + data = append(data, []string{res.shard, res.traceFunctions, res.count}) + } + if len(data) <= 1 { + return nil, fmt.Errorf("no data is returned by database") + } + return data, nil } diff --git a/pkg/theia/commands/policy_recommendation_delete.go b/pkg/theia/commands/policy_recommendation_delete.go index 24cabc16..a7cb8ec9 100644 --- a/pkg/theia/commands/policy_recommendation_delete.go +++ b/pkg/theia/commands/policy_recommendation_delete.go @@ -21,6 +21,7 @@ import ( "github.com/spf13/cobra" "k8s.io/client-go/kubernetes" + "antrea.io/theia/pkg/theia/commands/config" sparkv1 "antrea.io/theia/third_party/sparkoperator/v1beta2" ) @@ -82,7 +83,7 @@ $ theia policy-recommendation delete e998433e-accb-4888-9fc8-06563f073e86 clientset.CoreV1().RESTClient().Delete(). AbsPath("/apis/sparkoperator.k8s.io/v1beta2"). - Namespace(flowVisibilityNS). + Namespace(config.FlowVisibilityNS). Resource("sparkapplications"). Name("pr-" + recoID). Do(context.TODO()) @@ -102,7 +103,7 @@ func getPolicyRecommendationIdMap(clientset kubernetes.Interface, kubeconfig str sparkApplicationList := &sparkv1.SparkApplicationList{} err = clientset.CoreV1().RESTClient().Get(). AbsPath("/apis/sparkoperator.k8s.io/v1beta2"). - Namespace(flowVisibilityNS). + Namespace(config.FlowVisibilityNS). Resource("sparkapplications"). Do(context.TODO()).Into(sparkApplicationList) if err != nil { @@ -123,7 +124,7 @@ func getPolicyRecommendationIdMap(clientset kubernetes.Interface, kubeconfig str } func deletePolicyRecommendationResult(clientset kubernetes.Interface, kubeconfig string, endpoint string, useClusterIP bool, recoID string) (err error) { - connect, portForward, err := setupClickHouseConnection(clientset, kubeconfig, endpoint, useClusterIP) + connect, portForward, err := SetupClickHouseConnection(clientset, kubeconfig, endpoint, useClusterIP) if portForward != nil { defer portForward.Stop() } diff --git a/pkg/theia/commands/policy_recommendation_list.go b/pkg/theia/commands/policy_recommendation_list.go index b28b1cc5..6a8d3d95 100644 --- a/pkg/theia/commands/policy_recommendation_list.go +++ b/pkg/theia/commands/policy_recommendation_list.go @@ -23,6 +23,7 @@ import ( "github.com/spf13/cobra" "k8s.io/client-go/kubernetes" + "antrea.io/theia/pkg/theia/commands/config" sparkv1 "antrea.io/theia/third_party/sparkoperator/v1beta2" ) @@ -73,7 +74,7 @@ $ theia policy-recommendation list sparkApplicationList := &sparkv1.SparkApplicationList{} err = clientset.CoreV1().RESTClient().Get(). AbsPath("/apis/sparkoperator.k8s.io/v1beta2"). - Namespace(flowVisibilityNS). + Namespace(config.FlowVisibilityNS). Resource("sparkapplications"). Do(context.TODO()).Into(sparkApplicationList) if err != nil { @@ -121,7 +122,7 @@ $ theia policy-recommendation list } func getCompletedPolicyRecommendationList(clientset kubernetes.Interface, kubeconfig string, endpoint string, useClusterIP bool) (completedPolicyRecommendationList []policyRecommendationRow, err error) { - connect, portForward, err := setupClickHouseConnection(clientset, kubeconfig, endpoint, useClusterIP) + connect, portForward, err := SetupClickHouseConnection(clientset, kubeconfig, endpoint, useClusterIP) if portForward != nil { defer portForward.Stop() } diff --git a/pkg/theia/commands/policy_recommendation_retrieve.go b/pkg/theia/commands/policy_recommendation_retrieve.go index 81d702d7..712f8b75 100644 --- a/pkg/theia/commands/policy_recommendation_retrieve.go +++ b/pkg/theia/commands/policy_recommendation_retrieve.go @@ -21,8 +21,6 @@ import ( "github.com/spf13/cobra" "k8s.io/client-go/kubernetes" - - "antrea.io/theia/pkg/theia/util" ) // policyRecommendationRetrieveCmd represents the policy-recommendation retrieve command @@ -57,7 +55,7 @@ $ theia policy-recommendation retrieve e998433e-accb-4888-9fc8-06563f073e86 --us if err != nil { return err } - kubeconfig, err := util.ResolveKubeConfig(cmd) + kubeconfig, err := ResolveKubeConfig(cmd) if err != nil { return err } @@ -81,11 +79,11 @@ $ theia policy-recommendation retrieve e998433e-accb-4888-9fc8-06563f073e86 --us } // Verify Clickhouse is running - clientset, err := util.CreateK8sClient(kubeconfig) + clientset, err := CreateK8sClient(kubeconfig) if err != nil { return fmt.Errorf("couldn't create k8s client using given kubeconfig: %v", err) } - if err := util.CheckClickHousePod(clientset); err != nil { + if err := CheckClickHousePod(clientset); err != nil { return err } @@ -102,7 +100,7 @@ $ theia policy-recommendation retrieve e998433e-accb-4888-9fc8-06563f073e86 --us } func getPolicyRecommendationResult(clientset kubernetes.Interface, kubeconfig string, endpoint string, useClusterIP bool, filePath string, recoID string) (recoResult string, err error) { - connect, portForward, err := setupClickHouseConnection(clientset, kubeconfig, endpoint, useClusterIP) + connect, portForward, err := SetupClickHouseConnection(clientset, kubeconfig, endpoint, useClusterIP) if portForward != nil { defer portForward.Stop() } diff --git a/pkg/theia/commands/policy_recommendation_retrieve_test.go b/pkg/theia/commands/policy_recommendation_retrieve_test.go index ee9c4026..d1e1c3be 100644 --- a/pkg/theia/commands/policy_recommendation_retrieve_test.go +++ b/pkg/theia/commands/policy_recommendation_retrieve_test.go @@ -24,7 +24,6 @@ import ( "k8s.io/client-go/kubernetes/fake" "antrea.io/theia/pkg/theia/commands/config" - "antrea.io/theia/pkg/theia/util" ) func TestGetClickHouseSecret(t *testing.T) { @@ -97,7 +96,7 @@ func TestGetClickHouseSecret(t *testing.T) { } for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { - username, password, err := util.GetClickHouseSecret(tt.fakeClientset) + username, password, err := getClickHouseSecret(tt.fakeClientset) if tt.expectedErrorMsg != "" { assert.EqualErrorf(t, err, tt.expectedErrorMsg, "Error should be: %v, got: %v", tt.expectedErrorMsg, err) } diff --git a/pkg/theia/commands/policy_recommendation_run.go b/pkg/theia/commands/policy_recommendation_run.go index 810051db..d5a8d587 100644 --- a/pkg/theia/commands/policy_recommendation_run.go +++ b/pkg/theia/commands/policy_recommendation_run.go @@ -28,7 +28,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" - "antrea.io/theia/pkg/theia/util" sparkv1 "antrea.io/theia/third_party/sparkoperator/v1beta2" "antrea.io/theia/pkg/theia/commands/config" @@ -202,11 +201,11 @@ be a list of namespace string, for example: '["kube-system","flow-aggregator","f } sparkResourceArgs.executorMemory = executorMemory - kubeconfig, err := util.ResolveKubeConfig(cmd) + kubeconfig, err := ResolveKubeConfig(cmd) if err != nil { return err } - clientset, err := util.CreateK8sClient(kubeconfig) + clientset, err := CreateK8sClient(kubeconfig) if err != nil { return fmt.Errorf("couldn't create k8s client using given kubeconfig, %v", err) } @@ -216,7 +215,7 @@ be a list of namespace string, for example: '["kube-system","flow-aggregator","f return err } - err = util.PolicyRecoPreCheck(clientset) + err = PolicyRecoPreCheck(clientset) if err != nil { return err } @@ -236,9 +235,9 @@ be a list of namespace string, for example: '["kube-system","flow-aggregator","f Type: "Python", SparkVersion: config.SparkVersion, Mode: "cluster", - Image: util.ConstStrToPointer(config.SparkImage), - ImagePullPolicy: util.ConstStrToPointer(config.SparkImagePullPolicy), - MainApplicationFile: util.ConstStrToPointer(config.SparkAppFile), + Image: ConstStrToPointer(config.SparkImage), + ImagePullPolicy: ConstStrToPointer(config.SparkImagePullPolicy), + MainApplicationFile: ConstStrToPointer(config.SparkAppFile), Arguments: recoJobArgs, Driver: sparkv1.DriverSpec{ CoreRequest: &driverCoreRequest, @@ -257,7 +256,7 @@ be a list of namespace string, for example: '["kube-system","flow-aggregator","f Key: "password", }, }, - ServiceAccount: util.ConstStrToPointer(config.SparkServiceAccount), + ServiceAccount: ConstStrToPointer(config.SparkServiceAccount), }, }, Executor: sparkv1.ExecutorSpec{ @@ -335,7 +334,7 @@ Job is still running. Please check completion status for job via CLI later.`, re if err != nil { return err } - if err := util.CheckClickHousePod(clientset); err != nil { + if err := CheckClickHousePod(clientset); err != nil { return err } recoResult, err := getPolicyRecommendationResult(clientset, kubeconfig, endpoint, useClusterIP, filePath, recommendationID) diff --git a/pkg/theia/commands/policy_recommendation_status.go b/pkg/theia/commands/policy_recommendation_status.go index d73b32d3..e640d63b 100644 --- a/pkg/theia/commands/policy_recommendation_status.go +++ b/pkg/theia/commands/policy_recommendation_status.go @@ -28,6 +28,7 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" + "antrea.io/theia/pkg/theia/commands/config" sparkv1 "antrea.io/theia/third_party/sparkoperator/v1beta2" ) @@ -146,7 +147,7 @@ func getSparkAppByRecommendationID(clientset kubernetes.Interface, id string) (s err = clientset.CoreV1().RESTClient(). Get(). AbsPath("/apis/sparkoperator.k8s.io/v1beta2"). - Namespace(flowVisibilityNS). + Namespace(config.FlowVisibilityNS). Resource("sparkapplications"). Name("pr-" + id). Do(context.TODO()). diff --git a/pkg/theia/commands/root.go b/pkg/theia/commands/root.go index 41d5ec31..0a89d9b4 100644 --- a/pkg/theia/commands/root.go +++ b/pkg/theia/commands/root.go @@ -20,8 +20,6 @@ import ( "github.com/spf13/cobra" "k8s.io/klog/v2" - - "antrea.io/theia/pkg/theia/commands/clickhouse" ) // rootCmd represents the base command when called without any subcommands @@ -61,5 +59,4 @@ func init() { "", "absolute path to the k8s config file, will use $KUBECONFIG if not specified", ) - rootCmd.AddCommand(clickhouse.ClickHouseCmd) } diff --git a/pkg/theia/util/utils.go b/pkg/theia/commands/utils.go similarity index 93% rename from pkg/theia/util/utils.go rename to pkg/theia/commands/utils.go index 02bedc6b..af88d53e 100644 --- a/pkg/theia/util/utils.go +++ b/pkg/theia/commands/utils.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package util +package commands import ( "context" @@ -166,7 +166,7 @@ func ResolveKubeConfig(cmd *cobra.Command) (string, error) { } func getClickHouseSecret(clientset kubernetes.Interface) (username []byte, password []byte, err error) { - secret, err := clientset.CoreV1().Secrets(flowVisibilityNS).Get(context.TODO(), "clickhouse-secret", metav1.GetOptions{}) + secret, err := clientset.CoreV1().Secrets(config.FlowVisibilityNS).Get(context.TODO(), "clickhouse-secret", metav1.GetOptions{}) if err != nil { return username, password, fmt.Errorf("error %v when finding the ClickHouse secret, please check the deployment of ClickHouse", err) } @@ -212,7 +212,7 @@ func connectClickHouse(clientset kubernetes.Interface, url string) (*sql.DB, err return connect, nil } -func setupClickHouseConnection(clientset kubernetes.Interface, kubeconfig string, endpoint string, useClusterIP bool) (connect *sql.DB, portForward *portforwarder.PortForwarder, err error) { +func SetupClickHouseConnection(clientset kubernetes.Interface, kubeconfig string, endpoint string, useClusterIP bool) (connect *sql.DB, portForward *portforwarder.PortForwarder, err error) { if endpoint == "" { service := "clickhouse-clickhouse" if useClusterIP { @@ -258,6 +258,20 @@ func TableOutput(table [][]string) { writer.Flush() } +func TableOutputVertical(table [][]string) { + header := table[0] + writer := tabwriter.NewWriter(os.Stdout, 15, 0, 1, ' ', 0) + for i := 1; i < len(table); i++ { + fmt.Fprintln(writer, fmt.Sprintf("Row %d:\t", i)) + fmt.Fprintln(writer, fmt.Sprint("-------")) + for j, val := range table[i] { + fmt.Fprintln(writer, fmt.Sprintf("%s:\t%s", header[j], val)) + } + fmt.Fprintln(writer) + writer.Flush() + } +} + func FormatTimestamp(timestamp time.Time) string { if timestamp.IsZero() { return "N/A" diff --git a/pkg/theia/commands/utils_test.go b/pkg/theia/commands/utils_test.go index fca94460..00a43649 100644 --- a/pkg/theia/commands/utils_test.go +++ b/pkg/theia/commands/utils_test.go @@ -23,7 +23,6 @@ import ( "k8s.io/client-go/kubernetes/fake" "antrea.io/theia/pkg/theia/commands/config" - "antrea.io/theia/pkg/theia/util" ) func TestGetServiceAddr(t *testing.T) { @@ -65,7 +64,7 @@ func TestGetServiceAddr(t *testing.T) { } for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { - ip, port, err := util.GetServiceAddr(tt.fakeClientset, tt.serviceName) + ip, port, err := GetServiceAddr(tt.fakeClientset, tt.serviceName) if tt.expectedErrorMsg != "" { assert.EqualErrorf(t, err, tt.expectedErrorMsg, "Error should be: %v, got: %v", tt.expectedErrorMsg, err) } @@ -135,7 +134,7 @@ func TestPolicyRecoPreCheck(t *testing.T) { } for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { - err := util.PolicyRecoPreCheck(tt.fakeClientset) + err := PolicyRecoPreCheck(tt.fakeClientset) if tt.expectedErrorMsg != "" { assert.EqualErrorf(t, err, tt.expectedErrorMsg, "Error should be: %v, got: %v", tt.expectedErrorMsg, err) } else { diff --git a/test/e2e/fixture.go b/test/e2e/fixture.go index eb786b6d..392b8cc7 100644 --- a/test/e2e/fixture.go +++ b/test/e2e/fixture.go @@ -281,7 +281,7 @@ func setupTest(tb testing.TB) (*TestData, error) { return testData, nil } -func setupTestForFlowVisibility(tb testing.TB, withSparkOperator bool, flowAggregator bool) (*TestData, bool, bool, error) { +func setupTestForFlowVisibility(tb testing.TB, withSparkOperator bool, withGrafana bool, withFlowAggregator bool) (*TestData, bool, bool, error) { v4Enabled := clusterInfo.podV4NetworkCIDR != "" v6Enabled := clusterInfo.podV6NetworkCIDR != "" testData, err := setupTest(tb) @@ -295,7 +295,7 @@ func setupTestForFlowVisibility(tb testing.TB, withSparkOperator bool, flowAggre return testData, v4Enabled, v6Enabled, err } tb.Logf("ClickHouse Service created with ClusterIP: %v", chSvcIP) - if flowAggregator { + if withFlowAggregator { tb.Logf("Applying flow aggregator YAML") if err := testData.deployFlowAggregator(); err != nil { return testData, v4Enabled, v6Enabled, err diff --git a/test/e2e/flowvisibility_test.go b/test/e2e/flowvisibility_test.go index ed6e96d8..1664d529 100644 --- a/test/e2e/flowvisibility_test.go +++ b/test/e2e/flowvisibility_test.go @@ -22,13 +22,12 @@ import ( "fmt" "io" "io/ioutil" + "net" "net/http" "os/exec" - "syscall" - - "net" "strconv" "strings" + "syscall" "testing" "time" @@ -139,7 +138,7 @@ type testFlow struct { } func TestFlowVisibility(t *testing.T) { - data, v4Enabled, v6Enabled, err := setupTestForFlowVisibility(t, false, true) + data, v4Enabled, v6Enabled, err := setupTestForFlowVisibility(t, false, true, true) if err != nil { t.Errorf("Error when setting up test: %v", err) failOnError(err, t, data) @@ -150,7 +149,7 @@ func TestFlowVisibility(t *testing.T) { failOnError(err, t, data) } defer portForwardCmd.Process.Kill() - defer flowVisibilityCleanup(t, data, false) + defer flowVisibilityCleanup(t, data, false, true) podAIPs, podBIPs, podCIPs, podDIPs, podEIPs, err := createPerftestPods(data) if err != nil { @@ -1266,6 +1265,6 @@ func failOnError(err error, t *testing.T, data *TestData) { if portForwardCmd.Process != nil { portForwardCmd.Process.Kill() } - flowVisibilityCleanup(t, data, false) + flowVisibilityCleanup(t, data, false, true) t.Fatalf("test failed: %v", err) } diff --git a/test/e2e/framework.go b/test/e2e/framework.go index caeb1335..2430b769 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -62,27 +62,28 @@ const ( defaultInterval = 1 * time.Second realizeTimeout = 5 * time.Minute - antreaNamespace string = "kube-system" - kubeNamespace string = "kube-system" - flowAggregatorNamespace string = "flow-aggregator" - flowVisibilityNamespace string = "flow-visibility" - testNamespace string = "antrea-test" - iperfPort int32 = 5201 - clickHouseHTTPPort string = "8123" - busyboxContainerName string = "busybox" - defaultBridgeName string = "br-int" - antreaYML string = "antrea.yml" - antreaDaemonSet string = "antrea-agent" - antreaDeployment string = "antrea-controller" - flowAggregatorDeployment string = "flow-aggregator" - flowAggregatorYML string = "flow-aggregator.yml" - flowVisibilityYML string = "flow-visibility.yml" - flowVisibilityWithSparkYML string = "flow-visibility-with-spark.yml" - chOperatorYML string = "clickhouse-operator-install-bundle.yaml" - flowVisibilityCHPodName string = "chi-clickhouse-clickhouse" - policyOutputYML string = "output.yaml" - sparkOperatorPodLabel string = "app.kubernetes.io/name=spark-operator" - grafanaPodLabel string = "app=grafana" + antreaNamespace string = "kube-system" + kubeNamespace string = "kube-system" + flowAggregatorNamespace string = "flow-aggregator" + flowVisibilityNamespace string = "flow-visibility" + testNamespace string = "antrea-test" + iperfPort int32 = 5201 + clickHouseHTTPPort string = "8123" + busyboxContainerName string = "busybox" + defaultBridgeName string = "br-int" + antreaYML string = "antrea.yml" + antreaDaemonSet string = "antrea-agent" + antreaDeployment string = "antrea-controller" + flowAggregatorDeployment string = "flow-aggregator" + flowAggregatorYML string = "flow-aggregator.yml" + flowVisibilityYML string = "flow-visibility.yml" + flowVisibilityWithSparkYML string = "flow-visibility-with-spark.yml" + flowVisibilityChOnlyYML string = "flow-visibility-ch-only.yml" + chOperatorYML string = "clickhouse-operator-install-bundle.yaml" + flowVisibilityCHPodNamePrefix string = "chi-clickhouse-clickhouse" + policyOutputYML string = "output.yaml" + sparkOperatorPodLabel string = "app.kubernetes.io/name=spark-operator" + grafanaPodLabel string = "app=grafana" agnhostImage = "k8s.gcr.io/e2e-test-images/agnhost:2.29" busyboxImage = "projects.registry.vmware.com/antrea/busybox" @@ -93,8 +94,6 @@ const ( aggregatorClickHouseCommitInterval = 1 * time.Second shardNum = 1 - // Storage default is 8 GiB per clickhouse pod - chStorageSize = 8.0 ) type ClusterNode struct { @@ -1093,10 +1092,16 @@ func (data *TestData) createTestNamespace() error { // deployFlowVisibility deploys ClickHouse Operator and DB. If withSparkOperator/ // withGrafana is set to true, it also deploys Spark Operator/Grafana. func (data *TestData) deployFlowVisibility(withSparkOperator, withGrafana bool) (string, error) { - flowVisibilityManifest := flowVisibilityYML - if withSparkOperator { + + var flowVisibilityManifest string + if !withGrafana && !withSparkOperator { + flowVisibilityManifest = flowVisibilityChOnlyYML + } else if withSparkOperator { flowVisibilityManifest = flowVisibilityWithSparkYML + } else { + flowVisibilityManifest = flowVisibilityYML } + rc, _, _, err := data.provider.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl apply -f %s", chOperatorYML)) if err != nil || rc != 0 { return "", fmt.Errorf("error when deploying the ClickHouse Operator YML; %s not available on the control-plane Node", chOperatorYML) @@ -1131,7 +1136,7 @@ func (data *TestData) deployFlowVisibility(withSparkOperator, withGrafana bool) // check for ClickHouse Pod ready. Wait for 2x timeout as ch operator needs to be running first to handle chi for i := 0; i < shardNum; i++ { - chPodName := fmt.Sprintf("%s-%v-0-0", flowVisibilityCHPodName, i) + chPodName := fmt.Sprintf("%s-%v-0-0", flowVisibilityCHPodNamePrefix, i) if err = data.podWaitForReady(2*defaultTimeout, chPodName, flowVisibilityNamespace); err != nil { return "", err } @@ -1232,11 +1237,11 @@ func (data *TestData) deleteClickHouseOperator() error { return nil } -func teardownFlowVisibility(tb testing.TB, data *TestData, withSparkOperator bool) { +func teardownFlowVisibility(tb testing.TB, data *TestData, withSparkOperator bool, withGrafana bool) { if err := data.DeleteNamespace(flowAggregatorNamespace, defaultTimeout); err != nil { tb.Logf("Error when tearing down flow aggregator: %v", err) } - if err := data.deleteFlowVisibility(withSparkOperator); err != nil { + if err := data.deleteFlowVisibility(withSparkOperator, withGrafana); err != nil { tb.Logf("Error when deleting K8s resources created by flow visibility: %v", err) } if err := data.deleteClickHouseOperator(); err != nil { @@ -1244,10 +1249,14 @@ func teardownFlowVisibility(tb testing.TB, data *TestData, withSparkOperator boo } } -func (data *TestData) deleteFlowVisibility(withSparkOperator bool) error { - flowVisibilityManifest := flowVisibilityYML - if withSparkOperator { +func (data *TestData) deleteFlowVisibility(withSparkOperator bool, withGrafana bool) error { + var flowVisibilityManifest string + if !withGrafana && !withSparkOperator { + flowVisibilityManifest = flowVisibilityChOnlyYML + } else if withSparkOperator { flowVisibilityManifest = flowVisibilityWithSparkYML + } else { + flowVisibilityManifest = flowVisibilityYML } startTime := time.Now() defer func() { @@ -1358,7 +1367,7 @@ func (data *TestData) Cleanup(namespaces []string) { } } -func flowVisibilityCleanup(tb testing.TB, data *TestData, withSparkOperator bool) { +func flowVisibilityCleanup(tb testing.TB, data *TestData, withSparkOperator bool, withGrafana bool) { teardownTest(tb, data) - teardownFlowVisibility(tb, data, withSparkOperator) + teardownFlowVisibility(tb, data, withSparkOperator, withGrafana) } diff --git a/test/e2e/policyrecommendation_test.go b/test/e2e/policyrecommendation_test.go index b89b6a30..8467f6af 100644 --- a/test/e2e/policyrecommendation_test.go +++ b/test/e2e/policyrecommendation_test.go @@ -46,14 +46,14 @@ const ( ) func TestPolicyRecommendation(t *testing.T) { - data, v4Enabled, v6Enabled, err := setupTestForFlowVisibility(t, true, false) + data, v4Enabled, v6Enabled, err := setupTestForFlowVisibility(t, true, false, true) if err != nil { t.Fatalf("Error when setting up test: %v", err) } defer func() { teardownTest(t, data) deleteRecommendedPolicies(t, data) - teardownFlowVisibility(t, data, true) + teardownFlowVisibility(t, data, true, false) }() t.Run("testPolicyRecommendationRun", func(t *testing.T) { diff --git a/test/e2e/theia_get_test.go b/test/e2e/theia_clickhouse_test.go similarity index 74% rename from test/e2e/theia_get_test.go rename to test/e2e/theia_clickhouse_test.go index be187f64..fd47ceff 100644 --- a/test/e2e/theia_get_test.go +++ b/test/e2e/theia_clickhouse_test.go @@ -28,13 +28,13 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "antrea.io/theia/pkg/theia/util" + "antrea.io/theia/pkg/theia/commands" ) const ( getDiskInfoCmd = "./theia clickhouse status --diskInfo" getTableInfoCmd = "./theia clickhouse status --tableInfo" - getInsertRateCmd = "./theia clickhouse status --insertion-rate" + getInsertRateCmd = "./theia clickhouse status --insertRate" insertQuery = `INSERT INTO flows ( flowStartSeconds, flowEndSeconds, @@ -88,50 +88,44 @@ const ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` - recordPerCommit = 1000 - insertInterval = 1 - threshold = 25 - MaxInt32 = 1<<31 - 1 + recordPerCommit = 1000 + insertInterval = 1 + threshold = 25 + MaxInt32 = 1<<31 - 1 + numFieldsInDiskInfo = 9 + numFieldsInTableInfo = 7 + dateBaseName = "default" + defaultPath = "/var/lib/clickhouse/" + // 8 GiB + chStorageSize = 8.0 + sizeUnit = "GiB" ) -var targetTable = map[string]string{ +var tableColumnNumberMap = map[string]string{ ".inner.flows_node_view": "16", - ".inner.flows_pod_view": "17", + ".inner.flows_pod_view": "20", ".inner.flows_policy_view": "27", "flows": "49", } -var wg sync.WaitGroup - -func TestTheiaGetCommand(t *testing.T) { - data, _, _, err := setupTestForTheia(t, false, false) +func TestTheiaClickHouseStatusCommand(t *testing.T) { + data, _, _, err := setupTestForFlowVisibility(t, false, false, false) if err != nil { t.Fatalf("Error when setting up test: %v", err) } defer func() { teardownTest(t, data) - teardownFlowAggregator(t, data, false) + teardownFlowVisibility(t, data, false, false) }() clientset := data.clientset - service := "clickhouse-clickhouse" - listenAddress := "localhost" - listenPort := 9000 - _, servicePort, err := util.GetServiceAddr(clientset, service) - require.NoError(t, err) - // Forward the ClickHouse service port kubeconfig, err := data.provider.GetKubeconfigPath() require.NoError(t, err) - pf, err := util.StartPortForward(kubeconfig, service, servicePort, listenAddress, listenPort) - require.NoError(t, err) - defer pf.Stop() - endpoint := fmt.Sprintf("tcp://%s:%d", listenAddress, listenPort) - username, password, err := util.GetClickHouseSecret(clientset) - require.NoError(t, err) - url := fmt.Sprintf("%s?debug=false&username=%s&password=%s", endpoint, username, password) - // Check connection - connect, err := util.ConnectClickHouse(url) + connect, pf, err := commands.SetupClickHouseConnection(clientset, kubeconfig, "", false) require.NoError(t, err) + if pf != nil { + defer pf.Stop() + } t.Run("testTheiaGetClickHouseDiskInfo", func(t *testing.T) { testTheiaGetClickHouseDiskInfo(t, data) @@ -145,6 +139,9 @@ func TestTheiaGetCommand(t *testing.T) { } +// Example output +// Shard DatabaseName Path Free Total Used_Percentage +// 1 default /var/lib/clickhouse/ 888.00 KiB 100.00 MiB 99.13 % func testTheiaGetClickHouseDiskInfo(t *testing.T, data *TestData) { // retrieve metrics stdout, err := getClickHouseDBInfo(t, data, getDiskInfoCmd) @@ -154,32 +151,47 @@ func testTheiaGetClickHouseDiskInfo(t *testing.T, data *TestData) { length := len(resultArray) assert.GreaterOrEqualf(length, 2, "stdout: %s", stdout) // Check header component - assert.Containsf(stdout, "shard", "stdout: %s", stdout) - assert.Containsf(stdout, "Name", "stdout: %s", stdout) + assert.Containsf(stdout, "Shard", "stdout: %s", stdout) + assert.Containsf(stdout, "DatabaseName", "stdout: %s", stdout) assert.Containsf(stdout, "Path", "stdout: %s", stdout) assert.Containsf(stdout, "Free", "stdout: %s", stdout) assert.Containsf(stdout, "Total", "stdout: %s", stdout) assert.Containsf(stdout, "Used_Percentage", "stdout: %s", stdout) for i := 1; i < length; i++ { // check metrics' value - diskInfoArray := strings.Split(resultArray[i], " ") - assert.Equal(8, len(diskInfoArray), "number of columns is not correct") - assert.Equalf("default", diskInfoArray[1], "diskInfoArray: %s", diskInfoArray) - assert.Equalf("/var/lib/clickhouse/", diskInfoArray[2], "diskInfoArray: %s", diskInfoArray) - usedStorage, err := strconv.ParseFloat(strings.TrimSuffix(diskInfoArray[7], "]"), 64) + diskInfoArray := strings.Fields(resultArray[i]) + assert.Equal(numFieldsInDiskInfo, len(diskInfoArray), "number of columns is not correct") + assert.Equalf(dateBaseName, diskInfoArray[1], "diskInfoArray: %s", diskInfoArray) + assert.Equalf(defaultPath, diskInfoArray[2], "diskInfoArray: %s", diskInfoArray) + assert.Equalf(sizeUnit, diskInfoArray[6], "diskInfoArray: %s", diskInfoArray) + usedStorage, err := strconv.ParseFloat(diskInfoArray[7], 64) assert.NoError(err) - assert.LessOrEqualf(int(usedStorage), threshold, "diskInfoArray: %s", diskInfoArray) + assert.GreaterOrEqual(threshold, int(usedStorage), "diskInfoArray: %s", diskInfoArray) size, err := strconv.ParseFloat(diskInfoArray[5], 64) assert.NoError(err) - assert.LessOrEqualf(int((chStorageSize-size)*100/chStorageSize), threshold, "diskInfoArray: %s", diskInfoArray) + assert.GreaterOrEqual(threshold, int((chStorageSize-size)*100/chStorageSize), "diskInfoArray: %s", diskInfoArray) } } +// Example output +// Shard DatabaseName TableName TotalRows TotalBytes TotalCols +// 1 default .inner.flows_node_view 50000 4.19 MiB 16 +// 1 default .inner.flows_pod_view 48000 4.72 MiB 20 +// 1 default .inner.flows_policy_view 48000 7.16 MiB 27 +// 1 default flows 50000 13.09 MiB 49 +// 1 default flows_node_view +// 1 default flows_pod_view +// 1 default flows_policy_view +// 1 default recommendations 0 0.00 B 4 func testTheiaGetClickHouseTableInfo(t *testing.T, data *TestData, connect *sql.DB) { // send 10000 records to clickhouse commitNum := 10 + var wg sync.WaitGroup wg.Add(1) - sendTraffic(t, commitNum, connect) + go func() { + defer wg.Done() + sendTraffic(t, commitNum, connect) + }() wg.Wait() // retrieve metrics stdout, err := getClickHouseDBInfo(t, data, getTableInfoCmd) @@ -189,7 +201,7 @@ func testTheiaGetClickHouseTableInfo(t *testing.T, data *TestData, connect *sql. length := len(resultArray) assert.GreaterOrEqualf(length, 2, "stdout: %s", stdout) // check header component - assert.Containsf(stdout, "shard", "stdout: %s", stdout) + assert.Containsf(stdout, "Shard", "stdout: %s", stdout) assert.Containsf(stdout, "DatabaseName", "stdout: %s", stdout) assert.Containsf(stdout, "TableName", "stdout: %s", stdout) assert.Containsf(stdout, "TotalRows", "stdout: %s", stdout) @@ -205,15 +217,15 @@ func testTheiaGetClickHouseTableInfo(t *testing.T, data *TestData, connect *sql. flowNum := 0 for i := 1; i < length; i++ { // check metrics' value - tableInfoArray := strings.Split(resultArray[i], " ") + tableInfoArray := strings.Fields(resultArray[i]) tableName := tableInfoArray[2] - expectedColNum, ok := targetTable[tableName] + expectedColNum, ok := tableColumnNumberMap[tableName] if !ok { continue } - assert.Equal(7, len(tableInfoArray), "tableInfoArray: %s", tableInfoArray) - assert.Equalf("default", tableInfoArray[1], "tableInfoArray: %s", tableInfoArray) - assert.Equal(expectedColNum, strings.TrimSuffix(tableInfoArray[6], "]"), "tableInfoArray: %s", tableInfoArray) + assert.Equal(numFieldsInTableInfo, len(tableInfoArray), "tableInfoArray: %s", tableInfoArray) + assert.Equalf(dateBaseName, tableInfoArray[1], "tableInfoArray: %s", tableInfoArray) + assert.Equal(expectedColNum, tableInfoArray[6], "tableInfoArray: %s", tableInfoArray) if tableName == "flows" { num, error := strconv.Atoi(tableInfoArray[3]) assert.NoError(error) @@ -224,10 +236,17 @@ func testTheiaGetClickHouseTableInfo(t *testing.T, data *TestData, connect *sql. assert.Equal(commitNum*recordPerCommit, flowNum) } +// Example output +// Shard RowsPerSecond BytesPerSecond +// 1 4763 1.48 MiB func testTheiaGetClickHouseInsertRate(t *testing.T, data *TestData, connect *sql.DB) { commitNum := 70 + var wg sync.WaitGroup wg.Add(1) - go sendTraffic(t, commitNum, connect) + go func() { + defer wg.Done() + sendTraffic(t, commitNum, connect) + }() // need to wait at least 1 min to get the insertion rate. // insertion rate is the average ProfileEvent_InsertedRows in system.metric_log in current minute time.Sleep(1 * time.Minute) @@ -239,17 +258,17 @@ func testTheiaGetClickHouseInsertRate(t *testing.T, data *TestData, connect *sql length := len(resultArray) assert.GreaterOrEqualf(length, 2, "stdout: %s", stdout) // check header component - assert.Containsf(stdout, "shard", "stdout: %s", stdout) - assert.Containsf(stdout, "Rows_per_second", "stdout: %s", stdout) - assert.Containsf(stdout, "Bytes_per_second", "stdout: %s", stdout) + assert.Containsf(stdout, "Shard", "stdout: %s", stdout) + assert.Containsf(stdout, "RowsPerSecond", "stdout: %s", stdout) + assert.Containsf(stdout, "BytesPerSecond", "stdout: %s", stdout) for i := 1; i < length; i++ { // check metrics' value - tableInfoArray := strings.Split(resultArray[i], " ") + tableInfoArray := strings.Fields(resultArray[i]) assert.Equal(4, len(tableInfoArray), "tableInfoArray: %s", tableInfoArray) actualInsertRate, error := strconv.Atoi(tableInfoArray[1]) assert.NoError(error) - tableNum := len(targetTable) + tableNum := len(tableColumnNumberMap) percent := (actualInsertRate/tableNum - recordPerCommit/insertInterval) * 100 / (recordPerCommit / insertInterval) assert.LessOrEqualf(percent, threshold, "stdout: %s, expectedInsertRate: %s", stdout, recordPerCommit/insertInterval) } @@ -327,7 +346,7 @@ func addFakeRecord(t *testing.T, stmt *sql.Stmt) { require.NoError(t, err) } -func writeRecords(t *testing.T, connect *sql.DB) { +func writeRecords(t *testing.T, connect *sql.DB, wg *sync.WaitGroup) { defer wg.Done() // Test ping DB var err error @@ -346,12 +365,13 @@ func writeRecords(t *testing.T, connect *sql.DB) { } func sendTraffic(t *testing.T, commitNum int, connect *sql.DB) { - defer wg.Done() + var wg sync.WaitGroup for i := 0; i < commitNum; i++ { wg.Add(1) - go writeRecords(t, connect) + go writeRecords(t, connect, &wg) time.Sleep(time.Duration(insertInterval) * time.Second) } + wg.Wait() } func randInt(t *testing.T, limit int64) int64 {