Skip to content

Commit

Permalink
Add e2e test for theia CLI
Browse files Browse the repository at this point in the history
Signed-off-by: Yun-Tang Hsu <hsuy@vmware.com>
  • Loading branch information
yuntanghsu committed Jun 22, 2022
1 parent 59cd15f commit a011151
Show file tree
Hide file tree
Showing 10 changed files with 419 additions and 51 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ require (
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/containernetworking/plugins v0.8.7
github.com/google/uuid v1.1.2
github.com/jedib0t/go-pretty/v6 v6.3.2
github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd
github.com/olekukonko/tablewriter v0.0.4
github.com/sirupsen/logrus v1.8.1
Expand Down
3 changes: 0 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -421,8 +421,6 @@ github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NH
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/j-keck/arping v0.0.0-20160618110441-2cf9dc699c56/go.mod h1:ymszkNOg6tORTn+6F6j+Jc8TOr5osrynvN6ivFWZ2GA=
github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o=
github.com/jedib0t/go-pretty/v6 v6.3.2 h1:+46BKrPFAyhAn3MTT3vzvZc+qvWAX23yviAlBG9zAxA=
github.com/jedib0t/go-pretty/v6 v6.3.2/go.mod h1:B1WBBWnJhW9jnk7GHxY+p9NlmNwf/KUb4hKsRk6BdBQ=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks=
github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo=
Expand Down Expand Up @@ -568,7 +566,6 @@ github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/profile v1.6.0/go.mod h1:qBsxPvzyUincmltOk6iyRVxHYg4adc0OFOv72ZdLa18=
github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZI=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand Down
50 changes: 16 additions & 34 deletions pkg/theia/commands/get/clickhouse/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
)

type chOptions struct {
diskUsage bool
numRecords bool
diskInfo bool
tableInfo bool
insertRate bool
formatTable bool
}
Expand Down Expand Up @@ -91,14 +91,14 @@ theia get clickhouse --storage --record-number --insertion-rate --print-table
func init() {
Command = &cobra.Command{
Use: "clickhouse",
Short: "check clickhouse status",
Short: "Acquire clickhouse metrics",
Example: example,
Args: cobra.NoArgs,
RunE: getClickHouseStatus,
}
options = &chOptions{}
Command.Flags().BoolVar(&options.diskUsage, "storage", false, "check storage")
Command.Flags().BoolVar(&options.numRecords, "record-number", false, "check number of records")
Command.Flags().BoolVar(&options.diskInfo, "diskInfo", false, "check storage")
Command.Flags().BoolVar(&options.tableInfo, "tableInfo", false, "check number of records")
Command.Flags().BoolVar(&options.insertRate, "insertion-rate", false, "check insertion-rate")
Command.Flags().BoolVar(&options.formatTable, "print-table", false, "output data in table format")
Command.Flags().String("clickhouse-endpoint", "", "The ClickHouse service endpoint.")
Expand All @@ -111,8 +111,8 @@ It can only be used when running theia in cluster.`,
}

func getClickHouseStatus(cmd *cobra.Command, args []string) error {
if !options.diskUsage && !options.numRecords && !options.insertRate {
return fmt.Errorf("no flag is specified")
if !options.diskInfo && !options.tableInfo && !options.insertRate {
return fmt.Errorf("no metric related flag is specified")
}
kubeconfig, err := util.ResolveKubeConfig(cmd)
if err != nil {
Expand All @@ -137,30 +137,12 @@ func getClickHouseStatus(cmd *cobra.Command, args []string) error {
if err != nil {
return err
}
if endpoint == "" {
service := "clickhouse-clickhouse"
if useClusterIP {
serviceIP, servicePort, err := util.GetServiceAddr(clientset, service)
if err != nil {
return fmt.Errorf("error when getting the ClickHouse Service address: %v", err)
}
endpoint = fmt.Sprintf("tcp://%s:%d", serviceIP, servicePort)
} else {
listenAddress := "localhost"
listenPort := 9000
_, servicePort, err := util.GetServiceAddr(clientset, service)
if err != nil {
return fmt.Errorf("error when getting the ClickHouse Service port: %v", err)
}
// Forward the ClickHouse service port
pf, err := util.StartPortForward(kubeconfig, service, servicePort, listenAddress, listenPort)
if err != nil {
return fmt.Errorf("error when forwarding port: %v", err)
}
defer pf.Stop()
endpoint = fmt.Sprintf("tcp://%s:%d", listenAddress, listenPort)
fmt.Println(endpoint)
}
endpoint, pf, err := util.GetEndpoint(endpoint, useClusterIP, kubeconfig, clientset)
if err != nil {
return err
}
if pf != nil {
defer pf.Stop()
}

if err := util.CheckClickHousePod(clientset); err != nil {
Expand All @@ -172,11 +154,11 @@ func getClickHouseStatus(cmd *cobra.Command, args []string) error {
return err
}
url := fmt.Sprintf("%s?debug=false&username=%s&password=%s", endpoint, username, password)
connect, err := util.ConnectClickHouse(clientset, url)
connect, err := util.ConnectClickHouse(url)
if err != nil {
return fmt.Errorf("error when connecting to ClickHouse, %v", err)
}
if options.diskUsage {
if options.diskInfo {
data, err := getDiskInfoFromClickHouse(connect)
if err != nil {
return err
Expand All @@ -189,7 +171,7 @@ func getClickHouseStatus(cmd *cobra.Command, args []string) error {
}
}
}
if options.numRecords {
if options.tableInfo {
data, err := getTableInfoBasicFromClickHouse(connect)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/theia/commands/policy_recommendation_retrieve.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func getPolicyRecommendationResult(clientset kubernetes.Interface, kubeconfig st
return "", err
}
url := fmt.Sprintf("%s?debug=false&username=%s&password=%s", endpoint, username, password)
connect, err := util.ConnectClickHouse(clientset, url)
connect, err := util.ConnectClickHouse(url)
if err != nil {
return "", fmt.Errorf("error when connecting to ClickHouse, %v", err)
}
Expand Down
30 changes: 29 additions & 1 deletion pkg/theia/util/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func GetClickHouseSecret(clientset kubernetes.Interface) (username []byte, passw
return username, password, nil
}

func ConnectClickHouse(clientset kubernetes.Interface, url string) (*sql.DB, error) {
func ConnectClickHouse(url string) (*sql.DB, error) {
var connect *sql.DB
var connErr error
connRetryInterval := 1 * time.Second
Expand Down Expand Up @@ -208,3 +208,31 @@ func ConnectClickHouse(clientset kubernetes.Interface, url string) (*sql.DB, err
}
return connect, nil
}

func GetEndpoint(endpoint string, useClusterIP bool, kubeconfig string, clientset kubernetes.Interface) (string, *portforwarder.PortForwarder, error) {
var pf *portforwarder.PortForwarder
if endpoint == "" {
service := "clickhouse-clickhouse"
if useClusterIP {
serviceIP, servicePort, err := GetServiceAddr(clientset, service)
if err != nil {
return "", nil, fmt.Errorf("error when getting the ClickHouse Service address: %v", err)
}
endpoint = fmt.Sprintf("tcp://%s:%d", serviceIP, servicePort)
} else {
listenAddress := "localhost"
listenPort := 9000
_, servicePort, err := GetServiceAddr(clientset, service)
if err != nil {
return "", nil, fmt.Errorf("error when getting the ClickHouse Service port: %v", err)
}
// Forward the ClickHouse service port
pf, err = StartPortForward(kubeconfig, service, servicePort, listenAddress, listenPort)
if err != nil {
return "", nil, fmt.Errorf("error when forwarding port: %v", err)
}
endpoint = fmt.Sprintf("tcp://%s:%d", listenAddress, listenPort)
}
}
return endpoint, pf, nil
}
10 changes: 6 additions & 4 deletions test/e2e/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func setupTest(tb testing.TB) (*TestData, error) {
return testData, nil
}

func setupTestForFlowAggregator(tb testing.TB, withSparkOperator bool) (*TestData, bool, bool, error) {
func setupTestForTheia(tb testing.TB, withSparkOperator bool, flowAggregator bool) (*TestData, bool, bool, error) {
v4Enabled := clusterInfo.podV4NetworkCIDR != ""
v6Enabled := clusterInfo.podV6NetworkCIDR != ""
testData, err := setupTest(tb)
Expand All @@ -295,9 +295,11 @@ func setupTestForFlowAggregator(tb testing.TB, withSparkOperator bool) (*TestDat
return testData, v4Enabled, v6Enabled, err
}
tb.Logf("ClickHouse Service created with ClusterIP: %v", chSvcIP)
tb.Logf("Applying flow aggregator YAML")
if err := testData.deployFlowAggregator(); err != nil {
return testData, v4Enabled, v6Enabled, err
if flowAggregator {
tb.Logf("Applying flow aggregator YAML")
if err := testData.deployFlowAggregator(); err != nil {
return testData, v4Enabled, v6Enabled, err
}
}
return testData, v4Enabled, v6Enabled, nil
}
3 changes: 1 addition & 2 deletions test/e2e/flowaggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package e2e
import (
"encoding/json"
"fmt"

"net"
"strconv"
"strings"
Expand Down Expand Up @@ -159,7 +158,7 @@ type testFlow struct {
}

func TestFlowAggregator(t *testing.T) {
data, v4Enabled, v6Enabled, err := setupTestForFlowAggregator(t, false)
data, v4Enabled, v6Enabled, err := setupTestForTheia(t, false, true)
if err != nil {
t.Fatalf("Error when setting up test: %v", err)
}
Expand Down
14 changes: 10 additions & 4 deletions test/e2e/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ const (
flowVisibilityYML string = "flow-visibility.yml"
flowVisibilityWithSparkYML string = "flow-visibility-with-spark.yml"
chOperatorYML string = "clickhouse-operator-install-bundle.yaml"
flowVisibilityCHPodName string = "chi-clickhouse-clickhouse-0-0-0"
flowVisibilityCHPodName string = "chi-clickhouse-clickhouse"
policyOutputYML string = "output.yaml"

agnhostImage = "k8s.gcr.io/e2e-test-images/agnhost:2.29"
Expand All @@ -89,6 +89,10 @@ const (
exporterActiveFlowExportTimeout = 2 * time.Second
aggregatorActiveFlowRecordTimeout = 3500 * time.Millisecond
aggregatorClickHouseCommitInterval = 1 * time.Second

shardNum = 1
// Storage default is 8 GiB per clickhouse pod
chStorageSize = 8.0
)

type ClusterNode struct {
Expand Down Expand Up @@ -1116,10 +1120,12 @@ func (data *TestData) deployFlowVisibility(withSparkOperator bool) (string, erro
}

// check for ClickHouse Pod ready. Wait for 2x timeout as ch operator needs to be running first to handle chi
if err = data.podWaitForReady(2*defaultTimeout, flowVisibilityCHPodName, flowVisibilityNamespace); err != nil {
return "", err
for i := 0; i < shardNum; i++ {
chPodName := fmt.Sprintf("%s-%v-0-0", flowVisibilityCHPodName, i)
if err = data.podWaitForReady(2*defaultTimeout, chPodName, flowVisibilityNamespace); err != nil {
return "", err
}
}

// check ClickHouse Service http port for Service connectivity
chSvc, err := data.GetService("flow-visibility", "clickhouse-clickhouse")
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/policyrecommendation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ const (
)

func TestPolicyRecommendation(t *testing.T) {
data, v4Enabled, v6Enabled, err := setupTestForFlowAggregator(t, true)
data, v4Enabled, v6Enabled, err := setupTestForTheia(t, true, true)
if err != nil {
t.Fatalf("Error when setting up test: %v", err)
}
Expand Down
Loading

0 comments on commit a011151

Please sign in to comment.