From a0111516a5d04b1621969f9f5bfe853c88d179bd Mon Sep 17 00:00:00 2001 From: Yun-Tang Hsu Date: Tue, 21 Jun 2022 19:14:54 -0700 Subject: [PATCH] Add e2e test for theia CLI Signed-off-by: Yun-Tang Hsu --- go.mod | 1 - go.sum | 3 - pkg/theia/commands/get/clickhouse/command.go | 50 +-- .../policy_recommendation_retrieve.go | 2 +- pkg/theia/util/utils.go | 30 +- test/e2e/fixture.go | 10 +- test/e2e/flowaggregator_test.go | 3 +- test/e2e/framework.go | 14 +- test/e2e/policyrecommendation_test.go | 2 +- test/e2e/theia_get_test.go | 355 ++++++++++++++++++ 10 files changed, 419 insertions(+), 51 deletions(-) create mode 100644 test/e2e/theia_get_test.go diff --git a/go.mod b/go.mod index 2179abe1..7084d6e1 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,6 @@ require ( github.com/DATA-DOG/go-sqlmock v1.5.0 github.com/containernetworking/plugins v0.8.7 github.com/google/uuid v1.1.2 - github.com/jedib0t/go-pretty/v6 v6.3.2 github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd github.com/olekukonko/tablewriter v0.0.4 github.com/sirupsen/logrus v1.8.1 diff --git a/go.sum b/go.sum index 23dde72b..b05f223b 100644 --- a/go.sum +++ b/go.sum @@ -421,8 +421,6 @@ github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NH github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/j-keck/arping v0.0.0-20160618110441-2cf9dc699c56/go.mod h1:ymszkNOg6tORTn+6F6j+Jc8TOr5osrynvN6ivFWZ2GA= github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= -github.com/jedib0t/go-pretty/v6 v6.3.2 h1:+46BKrPFAyhAn3MTT3vzvZc+qvWAX23yviAlBG9zAxA= -github.com/jedib0t/go-pretty/v6 v6.3.2/go.mod h1:B1WBBWnJhW9jnk7GHxY+p9NlmNwf/KUb4hKsRk6BdBQ= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks= github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo= @@ -568,7 +566,6 @@ github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pkg/profile v1.6.0/go.mod h1:qBsxPvzyUincmltOk6iyRVxHYg4adc0OFOv72ZdLa18= github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZI= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= diff --git a/pkg/theia/commands/get/clickhouse/command.go b/pkg/theia/commands/get/clickhouse/command.go index dc560732..2352ec5c 100644 --- a/pkg/theia/commands/get/clickhouse/command.go +++ b/pkg/theia/commands/get/clickhouse/command.go @@ -28,8 +28,8 @@ import ( ) type chOptions struct { - diskUsage bool - numRecords bool + diskInfo bool + tableInfo bool insertRate bool formatTable bool } @@ -91,14 +91,14 @@ theia get clickhouse --storage --record-number --insertion-rate --print-table func init() { Command = &cobra.Command{ Use: "clickhouse", - Short: "check clickhouse status", + Short: "Acquire clickhouse metrics", Example: example, Args: cobra.NoArgs, RunE: getClickHouseStatus, } options = &chOptions{} - Command.Flags().BoolVar(&options.diskUsage, "storage", false, "check storage") - Command.Flags().BoolVar(&options.numRecords, "record-number", false, "check number of records") + Command.Flags().BoolVar(&options.diskInfo, "diskInfo", false, "check storage") + Command.Flags().BoolVar(&options.tableInfo, "tableInfo", false, "check number of records") Command.Flags().BoolVar(&options.insertRate, "insertion-rate", false, "check insertion-rate") Command.Flags().BoolVar(&options.formatTable, "print-table", false, "output data in table format") Command.Flags().String("clickhouse-endpoint", "", "The ClickHouse service endpoint.") @@ -111,8 +111,8 @@ It can only be used when running theia in cluster.`, } func getClickHouseStatus(cmd *cobra.Command, args []string) error { - if !options.diskUsage && !options.numRecords && !options.insertRate { - return fmt.Errorf("no flag is specified") + if !options.diskInfo && !options.tableInfo && !options.insertRate { + return fmt.Errorf("no metric related flag is specified") } kubeconfig, err := util.ResolveKubeConfig(cmd) if err != nil { @@ -137,30 +137,12 @@ func getClickHouseStatus(cmd *cobra.Command, args []string) error { if err != nil { return err } - if endpoint == "" { - service := "clickhouse-clickhouse" - if useClusterIP { - serviceIP, servicePort, err := util.GetServiceAddr(clientset, service) - if err != nil { - return fmt.Errorf("error when getting the ClickHouse Service address: %v", err) - } - endpoint = fmt.Sprintf("tcp://%s:%d", serviceIP, servicePort) - } else { - listenAddress := "localhost" - listenPort := 9000 - _, servicePort, err := util.GetServiceAddr(clientset, service) - if err != nil { - return fmt.Errorf("error when getting the ClickHouse Service port: %v", err) - } - // Forward the ClickHouse service port - pf, err := util.StartPortForward(kubeconfig, service, servicePort, listenAddress, listenPort) - if err != nil { - return fmt.Errorf("error when forwarding port: %v", err) - } - defer pf.Stop() - endpoint = fmt.Sprintf("tcp://%s:%d", listenAddress, listenPort) - fmt.Println(endpoint) - } + endpoint, pf, err := util.GetEndpoint(endpoint, useClusterIP, kubeconfig, clientset) + if err != nil { + return err + } + if pf != nil { + defer pf.Stop() } if err := util.CheckClickHousePod(clientset); err != nil { @@ -172,11 +154,11 @@ func getClickHouseStatus(cmd *cobra.Command, args []string) error { return err } url := fmt.Sprintf("%s?debug=false&username=%s&password=%s", endpoint, username, password) - connect, err := util.ConnectClickHouse(clientset, url) + connect, err := util.ConnectClickHouse(url) if err != nil { return fmt.Errorf("error when connecting to ClickHouse, %v", err) } - if options.diskUsage { + if options.diskInfo { data, err := getDiskInfoFromClickHouse(connect) if err != nil { return err @@ -189,7 +171,7 @@ func getClickHouseStatus(cmd *cobra.Command, args []string) error { } } } - if options.numRecords { + if options.tableInfo { data, err := getTableInfoBasicFromClickHouse(connect) if err != nil { return err diff --git a/pkg/theia/commands/policy_recommendation_retrieve.go b/pkg/theia/commands/policy_recommendation_retrieve.go index 5085caaf..08027587 100644 --- a/pkg/theia/commands/policy_recommendation_retrieve.go +++ b/pkg/theia/commands/policy_recommendation_retrieve.go @@ -135,7 +135,7 @@ func getPolicyRecommendationResult(clientset kubernetes.Interface, kubeconfig st return "", err } url := fmt.Sprintf("%s?debug=false&username=%s&password=%s", endpoint, username, password) - connect, err := util.ConnectClickHouse(clientset, url) + connect, err := util.ConnectClickHouse(url) if err != nil { return "", fmt.Errorf("error when connecting to ClickHouse, %v", err) } diff --git a/pkg/theia/util/utils.go b/pkg/theia/util/utils.go index d4f1b6e6..dbbe5866 100644 --- a/pkg/theia/util/utils.go +++ b/pkg/theia/util/utils.go @@ -178,7 +178,7 @@ func GetClickHouseSecret(clientset kubernetes.Interface) (username []byte, passw return username, password, nil } -func ConnectClickHouse(clientset kubernetes.Interface, url string) (*sql.DB, error) { +func ConnectClickHouse(url string) (*sql.DB, error) { var connect *sql.DB var connErr error connRetryInterval := 1 * time.Second @@ -208,3 +208,31 @@ func ConnectClickHouse(clientset kubernetes.Interface, url string) (*sql.DB, err } return connect, nil } + +func GetEndpoint(endpoint string, useClusterIP bool, kubeconfig string, clientset kubernetes.Interface) (string, *portforwarder.PortForwarder, error) { + var pf *portforwarder.PortForwarder + if endpoint == "" { + service := "clickhouse-clickhouse" + if useClusterIP { + serviceIP, servicePort, err := GetServiceAddr(clientset, service) + if err != nil { + return "", nil, fmt.Errorf("error when getting the ClickHouse Service address: %v", err) + } + endpoint = fmt.Sprintf("tcp://%s:%d", serviceIP, servicePort) + } else { + listenAddress := "localhost" + listenPort := 9000 + _, servicePort, err := GetServiceAddr(clientset, service) + if err != nil { + return "", nil, fmt.Errorf("error when getting the ClickHouse Service port: %v", err) + } + // Forward the ClickHouse service port + pf, err = StartPortForward(kubeconfig, service, servicePort, listenAddress, listenPort) + if err != nil { + return "", nil, fmt.Errorf("error when forwarding port: %v", err) + } + endpoint = fmt.Sprintf("tcp://%s:%d", listenAddress, listenPort) + } + } + return endpoint, pf, nil +} diff --git a/test/e2e/fixture.go b/test/e2e/fixture.go index cde22373..05a3d13a 100644 --- a/test/e2e/fixture.go +++ b/test/e2e/fixture.go @@ -281,7 +281,7 @@ func setupTest(tb testing.TB) (*TestData, error) { return testData, nil } -func setupTestForFlowAggregator(tb testing.TB, withSparkOperator bool) (*TestData, bool, bool, error) { +func setupTestForTheia(tb testing.TB, withSparkOperator bool, flowAggregator bool) (*TestData, bool, bool, error) { v4Enabled := clusterInfo.podV4NetworkCIDR != "" v6Enabled := clusterInfo.podV6NetworkCIDR != "" testData, err := setupTest(tb) @@ -295,9 +295,11 @@ func setupTestForFlowAggregator(tb testing.TB, withSparkOperator bool) (*TestDat return testData, v4Enabled, v6Enabled, err } tb.Logf("ClickHouse Service created with ClusterIP: %v", chSvcIP) - tb.Logf("Applying flow aggregator YAML") - if err := testData.deployFlowAggregator(); err != nil { - return testData, v4Enabled, v6Enabled, err + if flowAggregator { + tb.Logf("Applying flow aggregator YAML") + if err := testData.deployFlowAggregator(); err != nil { + return testData, v4Enabled, v6Enabled, err + } } return testData, v4Enabled, v6Enabled, nil } diff --git a/test/e2e/flowaggregator_test.go b/test/e2e/flowaggregator_test.go index 5934fc0c..d30f7ed7 100644 --- a/test/e2e/flowaggregator_test.go +++ b/test/e2e/flowaggregator_test.go @@ -17,7 +17,6 @@ package e2e import ( "encoding/json" "fmt" - "net" "strconv" "strings" @@ -159,7 +158,7 @@ type testFlow struct { } func TestFlowAggregator(t *testing.T) { - data, v4Enabled, v6Enabled, err := setupTestForFlowAggregator(t, false) + data, v4Enabled, v6Enabled, err := setupTestForTheia(t, false, true) if err != nil { t.Fatalf("Error when setting up test: %v", err) } diff --git a/test/e2e/framework.go b/test/e2e/framework.go index ea6ebd31..b857bf9d 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -79,7 +79,7 @@ const ( flowVisibilityYML string = "flow-visibility.yml" flowVisibilityWithSparkYML string = "flow-visibility-with-spark.yml" chOperatorYML string = "clickhouse-operator-install-bundle.yaml" - flowVisibilityCHPodName string = "chi-clickhouse-clickhouse-0-0-0" + flowVisibilityCHPodName string = "chi-clickhouse-clickhouse" policyOutputYML string = "output.yaml" agnhostImage = "k8s.gcr.io/e2e-test-images/agnhost:2.29" @@ -89,6 +89,10 @@ const ( exporterActiveFlowExportTimeout = 2 * time.Second aggregatorActiveFlowRecordTimeout = 3500 * time.Millisecond aggregatorClickHouseCommitInterval = 1 * time.Second + + shardNum = 1 + // Storage default is 8 GiB per clickhouse pod + chStorageSize = 8.0 ) type ClusterNode struct { @@ -1116,10 +1120,12 @@ func (data *TestData) deployFlowVisibility(withSparkOperator bool) (string, erro } // check for ClickHouse Pod ready. Wait for 2x timeout as ch operator needs to be running first to handle chi - if err = data.podWaitForReady(2*defaultTimeout, flowVisibilityCHPodName, flowVisibilityNamespace); err != nil { - return "", err + for i := 0; i < shardNum; i++ { + chPodName := fmt.Sprintf("%s-%v-0-0", flowVisibilityCHPodName, i) + if err = data.podWaitForReady(2*defaultTimeout, chPodName, flowVisibilityNamespace); err != nil { + return "", err + } } - // check ClickHouse Service http port for Service connectivity chSvc, err := data.GetService("flow-visibility", "clickhouse-clickhouse") if err != nil { diff --git a/test/e2e/policyrecommendation_test.go b/test/e2e/policyrecommendation_test.go index bdcbd7c8..a483b9e5 100644 --- a/test/e2e/policyrecommendation_test.go +++ b/test/e2e/policyrecommendation_test.go @@ -44,7 +44,7 @@ const ( ) func TestPolicyRecommendation(t *testing.T) { - data, v4Enabled, v6Enabled, err := setupTestForFlowAggregator(t, true) + data, v4Enabled, v6Enabled, err := setupTestForTheia(t, true, true) if err != nil { t.Fatalf("Error when setting up test: %v", err) } diff --git a/test/e2e/theia_get_test.go b/test/e2e/theia_get_test.go new file mode 100644 index 00000000..22ca3018 --- /dev/null +++ b/test/e2e/theia_get_test.go @@ -0,0 +1,355 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "database/sql" + "fmt" + "math/rand" + "strconv" + "strings" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "antrea.io/theia/pkg/theia/util" +) + +const ( + getDiskInfoCmd = "./theia get clickhouse --diskInfo" + getTableInfoCmd = "./theia get clickhouse --tableInfo" + getInsertRateCmd = "./theia get clickhouse --insertion-rate" + insertQuery = `INSERT INTO flows ( + flowStartSeconds, + flowEndSeconds, + flowEndSecondsFromSourceNode, + flowEndSecondsFromDestinationNode, + flowEndReason, + sourceIP, + destinationIP, + sourceTransportPort, + destinationTransportPort, + protocolIdentifier, + packetTotalCount, + octetTotalCount, + packetDeltaCount, + octetDeltaCount, + reversePacketTotalCount, + reverseOctetTotalCount, + reversePacketDeltaCount, + reverseOctetDeltaCount, + sourcePodName, + sourcePodNamespace, + sourceNodeName, + destinationPodName, + destinationPodNamespace, + destinationNodeName, + destinationClusterIP, + destinationServicePort, + destinationServicePortName, + ingressNetworkPolicyName, + ingressNetworkPolicyNamespace, + ingressNetworkPolicyRuleName, + ingressNetworkPolicyRuleAction, + ingressNetworkPolicyType, + egressNetworkPolicyName, + egressNetworkPolicyNamespace, + egressNetworkPolicyRuleName, + egressNetworkPolicyRuleAction, + egressNetworkPolicyType, + tcpState, + flowType, + sourcePodLabels, + destinationPodLabels, + throughput, + reverseThroughput, + throughputFromSourceNode, + throughputFromDestinationNode, + reverseThroughputFromSourceNode, + reverseThroughputFromDestinationNode) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, + ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, + ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, + ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, + ?, ?, ?, ?, ?, ?, ?)` + recordPerCommit = 1000 + insertInterval = 1 + threshold = 5 + // insertRate needs a bigger threshold + insertRateThreshold = 25 +) + +var targetTable = map[string]string{ + ".inner.flows_node_view": "16", + ".inner.flows_pod_view": "17", + ".inner.flows_policy_view": "27", + "flows": "49", +} + +var wg sync.WaitGroup + +func TestTheiaGetCommand(t *testing.T) { + data, _, _, err := setupTestForTheia(t, false, false) + if err != nil { + t.Fatalf("Error when setting up test: %v", err) + } + defer func() { + teardownTest(t, data) + teardownFlowAggregator(t, data, false) + }() + + clientset := data.clientset + service := "clickhouse-clickhouse" + listenAddress := "localhost" + listenPort := 9000 + _, servicePort, err := util.GetServiceAddr(clientset, service) + require.NoError(t, err) + // Forward the ClickHouse service port + kubeconfig, err := data.provider.GetKubeconfigPath() + require.NoError(t, err) + pf, err := util.StartPortForward(kubeconfig, service, servicePort, listenAddress, listenPort) + require.NoError(t, err) + defer pf.Stop() + endpoint := fmt.Sprintf("tcp://%s:%d", listenAddress, listenPort) + username, password, err := util.GetClickHouseSecret(clientset) + require.NoError(t, err) + url := fmt.Sprintf("%s?debug=false&username=%s&password=%s", endpoint, username, password) + // Check connection + connect, err := util.ConnectClickHouse(url) + require.NoError(t, err) + + t.Run("testTheiaGetClickHouseDiskInfo", func(t *testing.T) { + testTheiaGetClickHouseDiskInfo(t, data) + }) + t.Run("testTheiaGetClickHouseTableInfo", func(t *testing.T) { + testTheiaGetClickHouseTableInfo(t, data, connect) + }) + t.Run("testTheiaGetClickHouseInsertRate", func(t *testing.T) { + testTheiaGetClickHouseInsertRate(t, data, connect) + }) + +} + +func testTheiaGetClickHouseDiskInfo(t *testing.T, data *TestData) { + // retrieve metrics + stdout, err := getClickHouseDBInfo(t, data, getDiskInfoCmd) + require.NoError(t, err) + resultArray := strings.Split(stdout, "\n") + assert := assert.New(t) + length := len(resultArray) + assert.GreaterOrEqualf(length, 2, "stdout: %s", stdout) + // Check header component + assert.Containsf(stdout, "shard", "stdout: %s", stdout) + assert.Containsf(stdout, "Name", "stdout: %s", stdout) + assert.Containsf(stdout, "Path", "stdout: %s", stdout) + assert.Containsf(stdout, "Free", "stdout: %s", stdout) + assert.Containsf(stdout, "Total", "stdout: %s", stdout) + assert.Containsf(stdout, "Used_Percentage", "stdout: %s", stdout) + for i := 1; i < length; i++ { + // check metrics' value + diskInfoArray := strings.Split(resultArray[i], " ") + assert.Equal(8, len(diskInfoArray), "number of columns is not correct") + assert.Equalf("default", diskInfoArray[1], "diskInfoArray: %s", diskInfoArray) + assert.Equalf("/var/lib/clickhouse/", diskInfoArray[2], "diskInfoArray: %s", diskInfoArray) + usedStorage, err := strconv.ParseFloat(strings.TrimSuffix(diskInfoArray[7], "]"), 64) + assert.NoError(err) + assert.LessOrEqualf(int(usedStorage), threshold, "diskInfoArray: %s", diskInfoArray) + size, err := strconv.ParseFloat(diskInfoArray[5], 64) + assert.NoError(err) + assert.LessOrEqualf(int((chStorageSize-size)*100/chStorageSize), threshold, "diskInfoArray: %s", diskInfoArray) + } +} + +func testTheiaGetClickHouseTableInfo(t *testing.T, data *TestData, connect *sql.DB) { + // send 10000 records to clickhouse + commitNum := 10 + wg.Add(1) + sendTraffic(t, commitNum, connect) + wg.Wait() + // retrieve metrics + stdout, err := getClickHouseDBInfo(t, data, getTableInfoCmd) + require.NoError(t, err) + resultArray := strings.Split(stdout, "\n") + assert := assert.New(t) + length := len(resultArray) + assert.GreaterOrEqualf(length, 2, "stdout: %s", stdout) + // check header component + assert.Containsf(stdout, "shard", "stdout: %s", stdout) + assert.Containsf(stdout, "DatabaseName", "stdout: %s", stdout) + assert.Containsf(stdout, "TableName", "stdout: %s", stdout) + assert.Containsf(stdout, "TotalRows", "stdout: %s", stdout) + assert.Containsf(stdout, "TotalBytes", "stdout: %s", stdout) + assert.Containsf(stdout, "TotalCols", "stdout: %s", stdout) + // check four tables are in db + assert.Containsf(stdout, ".inner.flows_node_view", "stdout: %s", stdout) + assert.Containsf(stdout, ".inner.flows_pod_view", "stdout: %s", stdout) + assert.Containsf(stdout, ".inner.flows_policy_view", "stdout: %s", stdout) + assert.Containsf(stdout, "flows", "stdout: %s", stdout) + assert.Containsf(stdout, "recommendations", "stdout: %s", stdout) + + flowNum := 0 + for i := 1; i < length; i++ { + // check metrics' value + tableInfoArray := strings.Split(resultArray[i], " ") + tableName := tableInfoArray[2] + expectedColNum, ok := targetTable[tableName] + if !ok { + continue + } + assert.Equal(7, len(tableInfoArray), "tableInfoArray: %s", tableInfoArray) + assert.Equalf("default", tableInfoArray[1], "tableInfoArray: %s", tableInfoArray) + assert.Equal(expectedColNum, strings.TrimSuffix(tableInfoArray[6], "]"), "tableInfoArray: %s", tableInfoArray) + if tableName == "flows" { + num, error := strconv.Atoi(tableInfoArray[3]) + assert.NoError(error) + flowNum += num + } + } + // sum of records in table flows in each shard should be the total number of records sent to db + assert.Equal(commitNum*recordPerCommit, flowNum) +} + +func testTheiaGetClickHouseInsertRate(t *testing.T, data *TestData, connect *sql.DB) { + commitNum := 70 + wg.Add(1) + go sendTraffic(t, commitNum, connect) + // need to wait at least 1 min to get the insertion rate. + // insertion rate is the average ProfileEvent_InsertedRows in system.metric_log in current minute + time.Sleep(1 * time.Minute) + // retrieve metrics + stdout, err := getClickHouseDBInfo(t, data, getInsertRateCmd) + require.NoError(t, err) + resultArray := strings.Split(stdout, "\n") + assert := assert.New(t) + length := len(resultArray) + assert.GreaterOrEqualf(length, 2, "stdout: %s", stdout) + // check header component + assert.Containsf(stdout, "shard", "stdout: %s", stdout) + assert.Containsf(stdout, "Rows_per_second", "stdout: %s", stdout) + assert.Containsf(stdout, "Bytes_per_second", "stdout: %s", stdout) + + for i := 1; i < length; i++ { + // check metrics' value + tableInfoArray := strings.Split(resultArray[i], " ") + assert.Equal(4, len(tableInfoArray), "tableInfoArray: %s", tableInfoArray) + actualInsertRate, error := strconv.Atoi(tableInfoArray[1]) + assert.NoError(error) + tableNum := len(targetTable) + percent := (actualInsertRate/tableNum - recordPerCommit/insertInterval) * 100 / (recordPerCommit / insertInterval) + assert.LessOrEqualf(percent, insertRateThreshold, "stdout: %s, expectedInsertRate: %s", stdout, recordPerCommit/insertInterval) + } + wg.Wait() +} + +func getClickHouseDBInfo(t *testing.T, data *TestData, query string) (stdout string, err error) { + cmd := "chmod +x ./theia" + rc, stdout, stderr, err := data.RunCommandOnNode(controlPlaneNodeName(), cmd) + if err != nil || rc != 0 { + return "", fmt.Errorf("error when running %s from %s: %v\nstdout:%s\nstderr:%s", cmd, controlPlaneNodeName(), err, stdout, stderr) + } + rc, stdout, stderr, err = data.RunCommandOnNode(controlPlaneNodeName(), query) + + if err != nil || rc != 0 { + return "", fmt.Errorf("error when running %s from %s: %v\nstdout:%s\nstderr:%s", cmd, controlPlaneNodeName(), err, stdout, stderr) + } + return strings.TrimSuffix(stdout, "\n"), nil +} + +func getRandIP() string { + return fmt.Sprintf("%d.%d.%d.%d", rand.Intn(256), rand.Intn(256), rand.Intn(256), rand.Intn(256)) +} + +func addFakeRecord(t *testing.T, stmt *sql.Stmt) { + _, err := stmt.Exec( + time.Now(), + time.Now(), + time.Now(), + time.Now(), + 0, + getRandIP(), + getRandIP(), + uint16(rand.Intn(65535)), + uint16(rand.Intn(65535)), + 6, + uint64(rand.Int()), + uint64(rand.Int()), + uint64(rand.Int()), + uint64(rand.Int()), + uint64(rand.Int()), + uint64(rand.Int()), + uint64(rand.Int()), + uint64(rand.Int()), + fmt.Sprintf("PodName-%d", rand.Int()), + fmt.Sprintf("PodNameSpace-%d", rand.Int()), + fmt.Sprintf("NodeName-%d", rand.Int()), + fmt.Sprintf("PodName-%d", rand.Int()), + fmt.Sprintf("PodNameSpace-%d", rand.Int()), + fmt.Sprintf("NodeName-%d", rand.Int()), + getRandIP(), + uint16(rand.Intn(65535)), + fmt.Sprintf("ServicePortName-%d", rand.Int()), + fmt.Sprintf("PolicyName-%d", rand.Int()), + fmt.Sprintf("PolicyNameSpace-%d", rand.Int()), + fmt.Sprintf("PolicyRuleName-%d", rand.Int()), + 1, + 1, + fmt.Sprintf("PolicyName-%d", rand.Int()), + fmt.Sprintf("PolicyNameSpace-%d", rand.Int()), + fmt.Sprintf("PolicyRuleName-%d", rand.Int()), + 1, + 1, + "tcpState", + 0, + fmt.Sprintf("PodLabels-%d", rand.Int()), + fmt.Sprintf("PodLabels-%d", rand.Int()), + uint64(rand.Int()), + uint64(rand.Int()), + uint64(rand.Int()), + uint64(rand.Int()), + uint64(rand.Int()), + uint64(rand.Int()), + ) + require.NoError(t, err) +} + +func writeRecords(t *testing.T, connect *sql.DB) { + defer wg.Done() + // Test ping DB + var err error + err = connect.Ping() + require.NoError(t, err) + // Test open Transaction + tx, err := connect.Begin() + require.NoError(t, err) + stmt, _ := tx.Prepare(insertQuery) + defer stmt.Close() + for j := 0; j < recordPerCommit; j++ { + addFakeRecord(t, stmt) + } + err = tx.Commit() + assert.NoError(t, err) +} + +func sendTraffic(t *testing.T, commitNum int, connect *sql.DB) { + defer wg.Done() + for i := 0; i < commitNum; i++ { + wg.Add(1) + go writeRecords(t, connect) + time.Sleep(time.Duration(insertInterval) * time.Second) + } +}