From 8f9232eddb37760af3fda6ce12cc50230bb7d7ac Mon Sep 17 00:00:00 2001 From: Yun-Tang Hsu Date: Thu, 17 Nov 2022 14:03:22 -0800 Subject: [PATCH] Address comments Signed-off-by: Yun-Tang Hsu --- cmd/theia-manager/theia-manager.go | 8 ++++---- pkg/apis/stats/v1alpha1/types.go | 2 +- pkg/apiserver/apiserver.go | 10 +++++----- .../registry/stats/clickhouse/rest.go | 16 ++++++++-------- .../registry/stats/clickhouse/rest_test.go | 2 +- .../utils/stats/clickhouse_stats.go} | 19 ++++++++++--------- .../utils/stats/clickhouse_stats_test.go} | 2 +- .../controller_test.go | 10 +++++----- pkg/querier/querier.go | 2 +- pkg/theia/commands/clickhouse_status.go | 4 ++-- pkg/theia/commands/clickhouse_status_test.go | 8 ++++---- pkg/theia/commands/utils.go | 5 +++-- pkg/util/utils.go | 11 ++++++----- pkg/util/utils_test.go | 2 +- test/e2e/theia_clickhouse_test.go | 3 ++- 15 files changed, 54 insertions(+), 50 deletions(-) rename pkg/{controller/stats/controller.go => apiserver/utils/stats/clickhouse_stats.go} (90%) rename pkg/{controller/stats/controller_test.go => apiserver/utils/stats/clickhouse_stats_test.go} (97%) diff --git a/cmd/theia-manager/theia-manager.go b/cmd/theia-manager/theia-manager.go index 356f7a78..a991cffb 100644 --- a/cmd/theia-manager/theia-manager.go +++ b/cmd/theia-manager/theia-manager.go @@ -33,10 +33,10 @@ import ( "antrea.io/theia/pkg/apiserver" "antrea.io/theia/pkg/apiserver/certificate" + "antrea.io/theia/pkg/apiserver/utils/stats" crdclientset "antrea.io/theia/pkg/client/clientset/versioned" crdinformers "antrea.io/theia/pkg/client/informers/externalversions" "antrea.io/theia/pkg/controller/networkpolicyrecommendation" - "antrea.io/theia/pkg/controller/stats" "antrea.io/theia/pkg/querier" ) @@ -52,7 +52,7 @@ func createAPIServerConfig( cipherSuites []uint16, tlsMinVersion uint16, nprq querier.NPRecommendationQuerier, - chq querier.ClickHouseStatusQuerier, + chq querier.ClickHouseStatQuerier, ) (*apiserver.Config, error) { secureServing := genericoptions.NewSecureServingOptions().WithLoopback() authentication := genericoptions.NewDelegatingAuthenticationOptions() @@ -127,7 +127,7 @@ func run(o *Options) error { npRecommendationInformer := crdInformerFactory.Crd().V1alpha1().NetworkPolicyRecommendations() recommendedNPInformer := crdInformerFactory.Crd().V1alpha1().RecommendedNetworkPolicies() npRecoController := networkpolicyrecommendation.NewNPRecommendationController(crdClient, kubeClient, npRecommendationInformer, recommendedNPInformer) - clickHouseController := stats.NewStatsController(kubeClient) + clickHouseStatQuerierImpl := stats.NewClickHouseStatQuerierImpl(kubeClient) cipherSuites, err := cipher.GenerateCipherSuitesList(o.config.APIServer.TLSCipherSuites) if err != nil { @@ -141,7 +141,7 @@ func run(o *Options) error { cipherSuites, cipher.TLSVersionMap[o.config.APIServer.TLSMinVersion], npRecoController, - clickHouseController) + clickHouseStatQuerierImpl) if err != nil { return fmt.Errorf("error creating API server config: %v", err) } diff --git a/pkg/apis/stats/v1alpha1/types.go b/pkg/apis/stats/v1alpha1/types.go index 41317653..718cb90b 100644 --- a/pkg/apis/stats/v1alpha1/types.go +++ b/pkg/apis/stats/v1alpha1/types.go @@ -24,5 +24,5 @@ type ClickHouseStats struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` - Result [][]string `json:"jobType,omitempty"` + Stat [][]string `json:"stat,omitempty"` } diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index c188165d..a574a222 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -72,7 +72,7 @@ type ExtraConfig struct { k8sClient kubernetes.Interface caCertController *certificate.CACertController npRecommendationQuerier querier.NPRecommendationQuerier - clickHouseStatusQuerier querier.ClickHouseStatusQuerier + clickHouseStatQuerier querier.ClickHouseStatQuerier } // Config defines the config for Theia manager apiserver. @@ -85,7 +85,7 @@ type TheiaManagerAPIServer struct { GenericAPIServer *genericapiserver.GenericAPIServer caCertController *certificate.CACertController NPRecommendationQuerier querier.NPRecommendationQuerier - ClickHouseStatusQuerier querier.ClickHouseStatusQuerier + ClickHouseStatusQuerier querier.ClickHouseStatQuerier } func (s *TheiaManagerAPIServer) Run(ctx context.Context) error { @@ -102,7 +102,7 @@ func NewConfig( k8sClient kubernetes.Interface, caCertController *certificate.CACertController, npRecommendationQuerier querier.NPRecommendationQuerier, - clickHouseStatusQuerier querier.ClickHouseStatusQuerier, + clickHouseStatQuerier querier.ClickHouseStatQuerier, ) *Config { return &Config{ genericConfig: genericConfig, @@ -110,7 +110,7 @@ func NewConfig( k8sClient: k8sClient, caCertController: caCertController, npRecommendationQuerier: npRecommendationQuerier, - clickHouseStatusQuerier: clickHouseStatusQuerier, + clickHouseStatQuerier: clickHouseStatQuerier, }, } } @@ -150,7 +150,7 @@ func (c Config) New() (*TheiaManagerAPIServer, error) { GenericAPIServer: s, caCertController: c.extraConfig.caCertController, NPRecommendationQuerier: c.extraConfig.npRecommendationQuerier, - ClickHouseStatusQuerier: c.extraConfig.clickHouseStatusQuerier, + ClickHouseStatusQuerier: c.extraConfig.clickHouseStatQuerier, } if err := installAPIGroup(apiServer); err != nil { return nil, err diff --git a/pkg/apiserver/registry/stats/clickhouse/rest.go b/pkg/apiserver/registry/stats/clickhouse/rest.go index 05109d07..d82fad74 100644 --- a/pkg/apiserver/registry/stats/clickhouse/rest.go +++ b/pkg/apiserver/registry/stats/clickhouse/rest.go @@ -31,7 +31,7 @@ const defaultNameSpace = "flow-visibility" // REST implements rest.Storage for clickhouse. type REST struct { - clickHouseStatusQuerier querier.ClickHouseStatusQuerier + clickHouseStatusQuerier querier.ClickHouseStatQuerier } var ( @@ -39,7 +39,7 @@ var ( ) // NewREST returns a REST object that will work against API services. -func NewREST(chq querier.ClickHouseStatusQuerier) *REST { +func NewREST(chq querier.ClickHouseStatQuerier) *REST { return &REST{clickHouseStatusQuerier: chq} } @@ -48,24 +48,24 @@ func (r *REST) New() runtime.Object { } func (r *REST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { - var result [][]string + var stats [][]string var err error switch name { case "diskInfo": - result, err = r.clickHouseStatusQuerier.GetDiskInfo(defaultNameSpace) + stats, err = r.clickHouseStatusQuerier.GetDiskInfo(defaultNameSpace) case "tableInfo": - result, err = r.clickHouseStatusQuerier.GetTableInfo(defaultNameSpace) + stats, err = r.clickHouseStatusQuerier.GetTableInfo(defaultNameSpace) case "insertRate": - result, err = r.clickHouseStatusQuerier.GetInsertRate(defaultNameSpace) + stats, err = r.clickHouseStatusQuerier.GetInsertRate(defaultNameSpace) case "stackTraces": - result, err = r.clickHouseStatusQuerier.GetStackTraces(defaultNameSpace) + stats, err = r.clickHouseStatusQuerier.GetStackTraces(defaultNameSpace) default: return nil, fmt.Errorf("cannot recognize the statua name: %s", name) } if err != nil { return nil, fmt.Errorf("error when sending query to ClickHouse: %s", err) } - return &v1alpha1.ClickHouseStats{Result: result}, nil + return &v1alpha1.ClickHouseStats{Stat: stats}, nil } func (r *REST) NamespaceScoped() bool { diff --git a/pkg/apiserver/registry/stats/clickhouse/rest_test.go b/pkg/apiserver/registry/stats/clickhouse/rest_test.go index d1e4ef76..e2a142a2 100644 --- a/pkg/apiserver/registry/stats/clickhouse/rest_test.go +++ b/pkg/apiserver/registry/stats/clickhouse/rest_test.go @@ -73,7 +73,7 @@ func TestREST_Get(t *testing.T) { assert.NoError(t, err) status, ok := result.(*stats.ClickHouseStats) assert.True(t, ok) - assert.ElementsMatch(t, tt.expectResult, status.Result) + assert.ElementsMatch(t, tt.expectResult, status.Stat) } else { assert.Error(t, err) diff --git a/pkg/controller/stats/controller.go b/pkg/apiserver/utils/stats/clickhouse_stats.go similarity index 90% rename from pkg/controller/stats/controller.go rename to pkg/apiserver/utils/stats/clickhouse_stats.go index ba9de4e9..43bc525f 100644 --- a/pkg/controller/stats/controller.go +++ b/pkg/apiserver/utils/stats/clickhouse_stats.go @@ -128,21 +128,21 @@ ORDER BY count() DESC SETTINGS allow_introspection_functions=1`, } -type StatsController struct { +type ClickHouseStatQuerierImpl struct { kubeClient kubernetes.Interface clickhouseConnect *sql.DB } -func NewStatsController( +func NewClickHouseStatQuerierImpl( kubeClient kubernetes.Interface, -) *StatsController { - c := &StatsController{ +) *ClickHouseStatQuerierImpl { + c := &ClickHouseStatQuerierImpl{ kubeClient: kubeClient, } return c } -func (c *StatsController) GetDiskInfo(namespace string) ([][]string, error) { +func (c *ClickHouseStatQuerierImpl) GetDiskInfo(namespace string) ([][]string, error) { data, err := c.getDataFromClickHouse(diskQuery, namespace) if err != nil { return nil, fmt.Errorf("error when getting diskInfo from clickhouse: %v", err) @@ -150,7 +150,7 @@ func (c *StatsController) GetDiskInfo(namespace string) ([][]string, error) { return data, nil } -func (c *StatsController) GetTableInfo(namespace string) ([][]string, error) { +func (c *ClickHouseStatQuerierImpl) GetTableInfo(namespace string) ([][]string, error) { data, err := c.getDataFromClickHouse(tableInfoQuery, namespace) if err != nil { return nil, fmt.Errorf("error when getting tableInfo from clickhouse: %v", err) @@ -158,7 +158,7 @@ func (c *StatsController) GetTableInfo(namespace string) ([][]string, error) { return data, nil } -func (c *StatsController) GetInsertRate(namespace string) ([][]string, error) { +func (c *ClickHouseStatQuerierImpl) GetInsertRate(namespace string) ([][]string, error) { data, err := c.getDataFromClickHouse(insertRateQuery, namespace) if err != nil { return nil, fmt.Errorf("error when getting insertRate from clickhouse: %v", err) @@ -166,7 +166,7 @@ func (c *StatsController) GetInsertRate(namespace string) ([][]string, error) { return data, nil } -func (c *StatsController) GetStackTraces(namespace string) ([][]string, error) { +func (c *ClickHouseStatQuerierImpl) GetStackTraces(namespace string) ([][]string, error) { data, err := c.getDataFromClickHouse(stackTracesQuery, namespace) if err != nil { return nil, fmt.Errorf("error when getting stackTraces from clickhouse: %v", err) @@ -174,7 +174,7 @@ func (c *StatsController) GetStackTraces(namespace string) ([][]string, error) { return data, nil } -func (c *StatsController) getDataFromClickHouse(query int, namespace string) ([][]string, error) { +func (c *ClickHouseStatQuerierImpl) getDataFromClickHouse(query int, namespace string) ([][]string, error) { var err error if c.clickhouseConnect == nil { c.clickhouseConnect, err = util.SetupClickHouseConnection(c.kubeClient, namespace) @@ -184,6 +184,7 @@ func (c *StatsController) getDataFromClickHouse(query int, namespace string) ([] } result, err := c.clickhouseConnect.Query(queryMap[query]) if err != nil { + c.clickhouseConnect = nil return nil, fmt.Errorf("failed to get data from clickhouse: %v", err) } defer result.Close() diff --git a/pkg/controller/stats/controller_test.go b/pkg/apiserver/utils/stats/clickhouse_stats_test.go similarity index 97% rename from pkg/controller/stats/controller_test.go rename to pkg/apiserver/utils/stats/clickhouse_stats_test.go index f83d3d97..4af146f1 100644 --- a/pkg/controller/stats/controller_test.go +++ b/pkg/apiserver/utils/stats/clickhouse_stats_test.go @@ -74,7 +74,7 @@ func TestGetDataFromClickHouse(t *testing.T) { db, mock, err := sqlmock.New() assert.NoError(t, err) mock.ExpectQuery(regexp.QuoteMeta(queryMap[tc.query])).WillReturnRows(tc.returnedRow) - controller := StatsController{clickhouseConnect: db} + controller := ClickHouseStatQuerierImpl{clickhouseConnect: db} result, err := controller.getDataFromClickHouse(tc.query, config.FlowVisibilityNS) if tc.expectedError == nil { diff --git a/pkg/controller/networkpolicyrecommendation/controller_test.go b/pkg/controller/networkpolicyrecommendation/controller_test.go index ab5cf052..7307c56c 100644 --- a/pkg/controller/networkpolicyrecommendation/controller_test.go +++ b/pkg/controller/networkpolicyrecommendation/controller_test.go @@ -308,7 +308,7 @@ func TestNPRecommendation(t *testing.T) { // the SparkApplication id. if !serviceCreated { // Mock ClickHouse database - util.OpenSql = func(driverName, dataSourceName string) (*sql.DB, error) { + util.SqlOpenFunc = func(driverName, dataSourceName string) (*sql.DB, error) { db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual), sqlmock.MonitorPingsOption(true)) if err != nil { return db, err @@ -345,7 +345,7 @@ func TestNPRecommendation(t *testing.T) { assert.Equal(t, 1, len(nprList), "Expected exactly one NetworkPolicyRecommendation, got %d", len(nprList)) assert.Equal(t, npr, nprList[0]) - util.OpenSql = func(driverName, dataSourceName string) (*sql.DB, error) { + util.SqlOpenFunc = func(driverName, dataSourceName string) (*sql.DB, error) { db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual), sqlmock.MonitorPingsOption(true)) if err != nil { return db, err @@ -608,7 +608,7 @@ func TestGetPolicyRecommendationResult(t *testing.T) { setup: func(client kubernetes.Interface) { createClickHouseService(client) createClickHouseSecret(client) - util.OpenSql = func(driverName, dataSourceName string) (*sql.DB, error) { + util.SqlOpenFunc = func(driverName, dataSourceName string) (*sql.DB, error) { return nil, fmt.Errorf("connection error") } }, @@ -619,7 +619,7 @@ func TestGetPolicyRecommendationResult(t *testing.T) { setup: func(client kubernetes.Interface) { createClickHouseService(client) createClickHouseSecret(client) - util.OpenSql = func(driverName, dataSourceName string) (*sql.DB, error) { + util.SqlOpenFunc = func(driverName, dataSourceName string) (*sql.DB, error) { var err error db, _, err = sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual), sqlmock.MonitorPingsOption(true)) return db, err @@ -632,7 +632,7 @@ func TestGetPolicyRecommendationResult(t *testing.T) { setup: func(client kubernetes.Interface) { createClickHouseService(client) createClickHouseSecret(client) - util.OpenSql = func(driverName, dataSourceName string) (*sql.DB, error) { + util.SqlOpenFunc = func(driverName, dataSourceName string) (*sql.DB, error) { var err error var mock sqlmock.Sqlmock db, mock, err = sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual), sqlmock.MonitorPingsOption(true)) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 768d5af9..319c577d 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -25,7 +25,7 @@ type NPRecommendationQuerier interface { CreateNetworkPolicyRecommendation(namespace string, networkPolicyRecommendation *v1alpha1.NetworkPolicyRecommendation) (*v1alpha1.NetworkPolicyRecommendation, error) } -type ClickHouseStatusQuerier interface { +type ClickHouseStatQuerier interface { GetDiskInfo(namespace string) ([][]string, error) GetTableInfo(namespace string) ([][]string, error) GetInsertRate(namespace string) ([][]string, error) diff --git a/pkg/theia/commands/clickhouse_status.go b/pkg/theia/commands/clickhouse_status.go index 9657e680..fd7a5b37 100644 --- a/pkg/theia/commands/clickhouse_status.go +++ b/pkg/theia/commands/clickhouse_status.go @@ -87,9 +87,9 @@ func getStatus(cmd *cobra.Command, args []string) error { return fmt.Errorf("error when getting clickhouse %v status: %s", name, err) } if name == "stackTraces" { - TableOutputVertical(data.Result) + TableOutputVertical(data.Stat) } else { - TableOutput(data.Result) + TableOutput(data.Stat) } } return nil diff --git a/pkg/theia/commands/clickhouse_status_test.go b/pkg/theia/commands/clickhouse_status_test.go index b54af8ab..3d52c2e2 100644 --- a/pkg/theia/commands/clickhouse_status_test.go +++ b/pkg/theia/commands/clickhouse_status_test.go @@ -46,7 +46,7 @@ func TestGetStatus(t *testing.T) { switch strings.TrimSpace(r.URL.Path) { case fmt.Sprintf("/apis/stats.theia.antrea.io/v1alpha1/clickhouse/diskInfo"): status := &stats.ClickHouseStats{ - Result: [][]string{{"test_diskInfo"}}, + Stat: [][]string{{"test_diskInfo"}}, } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) @@ -63,7 +63,7 @@ func TestGetStatus(t *testing.T) { switch strings.TrimSpace(r.URL.Path) { case fmt.Sprintf("/apis/stats.theia.antrea.io/v1alpha1/clickhouse/tableInfo"): status := &stats.ClickHouseStats{ - Result: [][]string{{"test_tableInfo"}}, + Stat: [][]string{{"test_tableInfo"}}, } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) @@ -80,7 +80,7 @@ func TestGetStatus(t *testing.T) { switch strings.TrimSpace(r.URL.Path) { case fmt.Sprintf("/apis/stats.theia.antrea.io/v1alpha1/clickhouse/insertRate"): status := &stats.ClickHouseStats{ - Result: [][]string{{"test_insertRate"}}, + Stat: [][]string{{"test_insertRate"}}, } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) @@ -97,7 +97,7 @@ func TestGetStatus(t *testing.T) { switch strings.TrimSpace(r.URL.Path) { case fmt.Sprintf("/apis/stats.theia.antrea.io/v1alpha1/clickhouse/stackTraces"): status := &stats.ClickHouseStats{ - Result: [][]string{{"test_stackTraces"}, {"fakeData"}}, + Stat: [][]string{{"test_stackTraces"}, {"fakeData"}}, } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) diff --git a/pkg/theia/commands/utils.go b/pkg/theia/commands/utils.go index fb0da0e6..24f7ccfa 100644 --- a/pkg/theia/commands/utils.go +++ b/pkg/theia/commands/utils.go @@ -24,6 +24,7 @@ import ( "time" "github.com/spf13/cobra" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" @@ -85,7 +86,7 @@ func CreateTheiaManagerClient(k8sClient kubernetes.Interface, kubeconfig string, var host string var portForward *portforwarder.PortForwarder if useClusterIP { - serviceIP, servicePort, err := util.GetServiceAddr(k8sClient, config.TheiaManagerServiceName, config.FlowVisibilityNS, "tcp") + serviceIP, servicePort, err := util.GetServiceAddr(k8sClient, config.TheiaManagerServiceName, config.FlowVisibilityNS, v1.ProtocolTCP) if err != nil { return nil, nil, fmt.Errorf("error when getting the Theia Manager Service address: %v", err) } @@ -93,7 +94,7 @@ func CreateTheiaManagerClient(k8sClient kubernetes.Interface, kubeconfig string, } else { listenAddress := "localhost" listenPort := apis.TheiaManagerAPIPort - _, servicePort, err := util.GetServiceAddr(k8sClient, config.TheiaManagerServiceName, config.FlowVisibilityNS, "tcp") + _, servicePort, err := util.GetServiceAddr(k8sClient, config.TheiaManagerServiceName, config.FlowVisibilityNS, v1.ProtocolTCP) if err != nil { return nil, nil, fmt.Errorf("error when getting the Theia Manager Service port: %v", err) } diff --git a/pkg/util/utils.go b/pkg/util/utils.go index 30e7d085..6a81b6b7 100644 --- a/pkg/util/utils.go +++ b/pkg/util/utils.go @@ -22,11 +22,12 @@ import ( "github.com/ClickHouse/clickhouse-go" "github.com/google/uuid" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" ) -var OpenSql = sql.Open +var SqlOpenFunc = sql.Open func ParseRecommendationName(npName string) error { if !strings.HasPrefix(npName, "pr-") { @@ -41,7 +42,7 @@ func ParseRecommendationName(npName string) error { return nil } -func GetServiceAddr(client kubernetes.Interface, serviceName, serviceNamespace, servicePortName string) (string, int, error) { +func GetServiceAddr(client kubernetes.Interface, serviceName, serviceNamespace string, protocol v1.Protocol) (string, int, error) { var serviceIP string var servicePort int service, err := client.CoreV1().Services(serviceNamespace).Get(context.TODO(), serviceName, metav1.GetOptions{}) @@ -50,12 +51,12 @@ func GetServiceAddr(client kubernetes.Interface, serviceName, serviceNamespace, } serviceIP = service.Spec.ClusterIP for _, port := range service.Spec.Ports { - if port.Name == servicePortName { + if port.Protocol == protocol { servicePort = int(port.Port) } } if servicePort == 0 { - return serviceIP, servicePort, fmt.Errorf("error when finding the Service %s: no %s service port", serviceName, servicePortName) + return serviceIP, servicePort, fmt.Errorf("error when finding the Service %s: no %s service port", serviceName, protocol) } return serviceIP, servicePort, nil } @@ -80,7 +81,7 @@ func ConnectClickHouse(url string) (*sql.DB, error) { var connect *sql.DB // Open the database and ping it var err error - connect, err = OpenSql("clickhouse", url) + connect, err = SqlOpenFunc("clickhouse", url) if err != nil { return connect, fmt.Errorf("failed to open ClickHouse: %v", err) } diff --git a/pkg/util/utils_test.go b/pkg/util/utils_test.go index 951895da..33d77af3 100644 --- a/pkg/util/utils_test.go +++ b/pkg/util/utils_test.go @@ -65,7 +65,7 @@ func TestGetServiceAddr(t *testing.T) { } for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { - ip, port, err := GetServiceAddr(tt.fakeClientset, tt.serviceName, config.FlowVisibilityNS, "tcp") + ip, port, err := GetServiceAddr(tt.fakeClientset, tt.serviceName, config.FlowVisibilityNS, v1.ProtocolTCP) if tt.expectedErrorMsg != "" { assert.EqualErrorf(t, err, tt.expectedErrorMsg, "Error should be: %v, got: %v", tt.expectedErrorMsg, err) } diff --git a/test/e2e/theia_clickhouse_test.go b/test/e2e/theia_clickhouse_test.go index 6a2d42ed..f1553586 100644 --- a/test/e2e/theia_clickhouse_test.go +++ b/test/e2e/theia_clickhouse_test.go @@ -27,6 +27,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" @@ -401,7 +402,7 @@ func SetupClickHouseConnection(clientset kubernetes.Interface, kubeconfig string service := "clickhouse-clickhouse" listenAddress := "localhost" listenPort := 9000 - _, servicePort, err := util.GetServiceAddr(clientset, service, config.FlowVisibilityNS, "tcp") + _, servicePort, err := util.GetServiceAddr(clientset, service, config.FlowVisibilityNS, v1.ProtocolTCP) if err != nil { return nil, nil, fmt.Errorf("error when getting the ClickHouse Service port: %v", err) }