From 5df99067e4fccf533ec32ace7fd7b4304c9aecdc Mon Sep 17 00:00:00 2001 From: Yun-Tang Hsu Date: Thu, 17 Nov 2022 14:03:22 -0800 Subject: [PATCH] Address comments Signed-off-by: Yun-Tang Hsu --- cmd/theia-manager/theia-manager.go | 6 +++--- pkg/apis/stats/v1alpha1/types.go | 2 +- pkg/apiserver/apiserver.go | 10 +++++----- .../registry/stats/clickhouse/rest.go | 4 ++-- .../controller_test.go | 10 +++++----- .../{controller.go => clickhouse_stats.go} | 19 ++++++++++--------- ...oller_test.go => clickhouse_stats_test.go} | 2 +- pkg/querier/querier.go | 2 +- pkg/theia/commands/clickhouse_status.go | 4 ++-- pkg/theia/commands/clickhouse_status_test.go | 8 ++++---- pkg/util/utils.go | 4 ++-- 11 files changed, 36 insertions(+), 35 deletions(-) rename pkg/controller/stats/{controller.go => clickhouse_stats.go} (90%) rename pkg/controller/stats/{controller_test.go => clickhouse_stats_test.go} (97%) diff --git a/cmd/theia-manager/theia-manager.go b/cmd/theia-manager/theia-manager.go index 356f7a78..addf5b81 100644 --- a/cmd/theia-manager/theia-manager.go +++ b/cmd/theia-manager/theia-manager.go @@ -52,7 +52,7 @@ func createAPIServerConfig( cipherSuites []uint16, tlsMinVersion uint16, nprq querier.NPRecommendationQuerier, - chq querier.ClickHouseStatusQuerier, + chq querier.ClickHouseStatQuerier, ) (*apiserver.Config, error) { secureServing := genericoptions.NewSecureServingOptions().WithLoopback() authentication := genericoptions.NewDelegatingAuthenticationOptions() @@ -127,7 +127,7 @@ func run(o *Options) error { npRecommendationInformer := crdInformerFactory.Crd().V1alpha1().NetworkPolicyRecommendations() recommendedNPInformer := crdInformerFactory.Crd().V1alpha1().RecommendedNetworkPolicies() npRecoController := networkpolicyrecommendation.NewNPRecommendationController(crdClient, kubeClient, npRecommendationInformer, recommendedNPInformer) - clickHouseController := stats.NewStatsController(kubeClient) + clickHouseStatQuerierImpl := stats.NewClickHouseStatQuerierImpl(kubeClient) cipherSuites, err := cipher.GenerateCipherSuitesList(o.config.APIServer.TLSCipherSuites) if err != nil { @@ -141,7 +141,7 @@ func run(o *Options) error { cipherSuites, cipher.TLSVersionMap[o.config.APIServer.TLSMinVersion], npRecoController, - clickHouseController) + clickHouseStatQuerierImpl) if err != nil { return fmt.Errorf("error creating API server config: %v", err) } diff --git a/pkg/apis/stats/v1alpha1/types.go b/pkg/apis/stats/v1alpha1/types.go index 41317653..d21bbad7 100644 --- a/pkg/apis/stats/v1alpha1/types.go +++ b/pkg/apis/stats/v1alpha1/types.go @@ -24,5 +24,5 @@ type ClickHouseStats struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` - Result [][]string `json:"jobType,omitempty"` + Stat [][]string `json:"Stat,omitempty"` } diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index c188165d..a574a222 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -72,7 +72,7 @@ type ExtraConfig struct { k8sClient kubernetes.Interface caCertController *certificate.CACertController npRecommendationQuerier querier.NPRecommendationQuerier - clickHouseStatusQuerier querier.ClickHouseStatusQuerier + clickHouseStatQuerier querier.ClickHouseStatQuerier } // Config defines the config for Theia manager apiserver. @@ -85,7 +85,7 @@ type TheiaManagerAPIServer struct { GenericAPIServer *genericapiserver.GenericAPIServer caCertController *certificate.CACertController NPRecommendationQuerier querier.NPRecommendationQuerier - ClickHouseStatusQuerier querier.ClickHouseStatusQuerier + ClickHouseStatusQuerier querier.ClickHouseStatQuerier } func (s *TheiaManagerAPIServer) Run(ctx context.Context) error { @@ -102,7 +102,7 @@ func NewConfig( k8sClient kubernetes.Interface, caCertController *certificate.CACertController, npRecommendationQuerier querier.NPRecommendationQuerier, - clickHouseStatusQuerier querier.ClickHouseStatusQuerier, + clickHouseStatQuerier querier.ClickHouseStatQuerier, ) *Config { return &Config{ genericConfig: genericConfig, @@ -110,7 +110,7 @@ func NewConfig( k8sClient: k8sClient, caCertController: caCertController, npRecommendationQuerier: npRecommendationQuerier, - clickHouseStatusQuerier: clickHouseStatusQuerier, + clickHouseStatQuerier: clickHouseStatQuerier, }, } } @@ -150,7 +150,7 @@ func (c Config) New() (*TheiaManagerAPIServer, error) { GenericAPIServer: s, caCertController: c.extraConfig.caCertController, NPRecommendationQuerier: c.extraConfig.npRecommendationQuerier, - ClickHouseStatusQuerier: c.extraConfig.clickHouseStatusQuerier, + ClickHouseStatusQuerier: c.extraConfig.clickHouseStatQuerier, } if err := installAPIGroup(apiServer); err != nil { return nil, err diff --git a/pkg/apiserver/registry/stats/clickhouse/rest.go b/pkg/apiserver/registry/stats/clickhouse/rest.go index 05109d07..27736ed2 100644 --- a/pkg/apiserver/registry/stats/clickhouse/rest.go +++ b/pkg/apiserver/registry/stats/clickhouse/rest.go @@ -31,7 +31,7 @@ const defaultNameSpace = "flow-visibility" // REST implements rest.Storage for clickhouse. type REST struct { - clickHouseStatusQuerier querier.ClickHouseStatusQuerier + clickHouseStatusQuerier querier.ClickHouseStatQuerier } var ( @@ -39,7 +39,7 @@ var ( ) // NewREST returns a REST object that will work against API services. -func NewREST(chq querier.ClickHouseStatusQuerier) *REST { +func NewREST(chq querier.ClickHouseStatQuerier) *REST { return &REST{clickHouseStatusQuerier: chq} } diff --git a/pkg/controller/networkpolicyrecommendation/controller_test.go b/pkg/controller/networkpolicyrecommendation/controller_test.go index ab5cf052..7307c56c 100644 --- a/pkg/controller/networkpolicyrecommendation/controller_test.go +++ b/pkg/controller/networkpolicyrecommendation/controller_test.go @@ -308,7 +308,7 @@ func TestNPRecommendation(t *testing.T) { // the SparkApplication id. if !serviceCreated { // Mock ClickHouse database - util.OpenSql = func(driverName, dataSourceName string) (*sql.DB, error) { + util.SqlOpenFunc = func(driverName, dataSourceName string) (*sql.DB, error) { db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual), sqlmock.MonitorPingsOption(true)) if err != nil { return db, err @@ -345,7 +345,7 @@ func TestNPRecommendation(t *testing.T) { assert.Equal(t, 1, len(nprList), "Expected exactly one NetworkPolicyRecommendation, got %d", len(nprList)) assert.Equal(t, npr, nprList[0]) - util.OpenSql = func(driverName, dataSourceName string) (*sql.DB, error) { + util.SqlOpenFunc = func(driverName, dataSourceName string) (*sql.DB, error) { db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual), sqlmock.MonitorPingsOption(true)) if err != nil { return db, err @@ -608,7 +608,7 @@ func TestGetPolicyRecommendationResult(t *testing.T) { setup: func(client kubernetes.Interface) { createClickHouseService(client) createClickHouseSecret(client) - util.OpenSql = func(driverName, dataSourceName string) (*sql.DB, error) { + util.SqlOpenFunc = func(driverName, dataSourceName string) (*sql.DB, error) { return nil, fmt.Errorf("connection error") } }, @@ -619,7 +619,7 @@ func TestGetPolicyRecommendationResult(t *testing.T) { setup: func(client kubernetes.Interface) { createClickHouseService(client) createClickHouseSecret(client) - util.OpenSql = func(driverName, dataSourceName string) (*sql.DB, error) { + util.SqlOpenFunc = func(driverName, dataSourceName string) (*sql.DB, error) { var err error db, _, err = sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual), sqlmock.MonitorPingsOption(true)) return db, err @@ -632,7 +632,7 @@ func TestGetPolicyRecommendationResult(t *testing.T) { setup: func(client kubernetes.Interface) { createClickHouseService(client) createClickHouseSecret(client) - util.OpenSql = func(driverName, dataSourceName string) (*sql.DB, error) { + util.SqlOpenFunc = func(driverName, dataSourceName string) (*sql.DB, error) { var err error var mock sqlmock.Sqlmock db, mock, err = sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual), sqlmock.MonitorPingsOption(true)) diff --git a/pkg/controller/stats/controller.go b/pkg/controller/stats/clickhouse_stats.go similarity index 90% rename from pkg/controller/stats/controller.go rename to pkg/controller/stats/clickhouse_stats.go index ba9de4e9..43bc525f 100644 --- a/pkg/controller/stats/controller.go +++ b/pkg/controller/stats/clickhouse_stats.go @@ -128,21 +128,21 @@ ORDER BY count() DESC SETTINGS allow_introspection_functions=1`, } -type StatsController struct { +type ClickHouseStatQuerierImpl struct { kubeClient kubernetes.Interface clickhouseConnect *sql.DB } -func NewStatsController( +func NewClickHouseStatQuerierImpl( kubeClient kubernetes.Interface, -) *StatsController { - c := &StatsController{ +) *ClickHouseStatQuerierImpl { + c := &ClickHouseStatQuerierImpl{ kubeClient: kubeClient, } return c } -func (c *StatsController) GetDiskInfo(namespace string) ([][]string, error) { +func (c *ClickHouseStatQuerierImpl) GetDiskInfo(namespace string) ([][]string, error) { data, err := c.getDataFromClickHouse(diskQuery, namespace) if err != nil { return nil, fmt.Errorf("error when getting diskInfo from clickhouse: %v", err) @@ -150,7 +150,7 @@ func (c *StatsController) GetDiskInfo(namespace string) ([][]string, error) { return data, nil } -func (c *StatsController) GetTableInfo(namespace string) ([][]string, error) { +func (c *ClickHouseStatQuerierImpl) GetTableInfo(namespace string) ([][]string, error) { data, err := c.getDataFromClickHouse(tableInfoQuery, namespace) if err != nil { return nil, fmt.Errorf("error when getting tableInfo from clickhouse: %v", err) @@ -158,7 +158,7 @@ func (c *StatsController) GetTableInfo(namespace string) ([][]string, error) { return data, nil } -func (c *StatsController) GetInsertRate(namespace string) ([][]string, error) { +func (c *ClickHouseStatQuerierImpl) GetInsertRate(namespace string) ([][]string, error) { data, err := c.getDataFromClickHouse(insertRateQuery, namespace) if err != nil { return nil, fmt.Errorf("error when getting insertRate from clickhouse: %v", err) @@ -166,7 +166,7 @@ func (c *StatsController) GetInsertRate(namespace string) ([][]string, error) { return data, nil } -func (c *StatsController) GetStackTraces(namespace string) ([][]string, error) { +func (c *ClickHouseStatQuerierImpl) GetStackTraces(namespace string) ([][]string, error) { data, err := c.getDataFromClickHouse(stackTracesQuery, namespace) if err != nil { return nil, fmt.Errorf("error when getting stackTraces from clickhouse: %v", err) @@ -174,7 +174,7 @@ func (c *StatsController) GetStackTraces(namespace string) ([][]string, error) { return data, nil } -func (c *StatsController) getDataFromClickHouse(query int, namespace string) ([][]string, error) { +func (c *ClickHouseStatQuerierImpl) getDataFromClickHouse(query int, namespace string) ([][]string, error) { var err error if c.clickhouseConnect == nil { c.clickhouseConnect, err = util.SetupClickHouseConnection(c.kubeClient, namespace) @@ -184,6 +184,7 @@ func (c *StatsController) getDataFromClickHouse(query int, namespace string) ([] } result, err := c.clickhouseConnect.Query(queryMap[query]) if err != nil { + c.clickhouseConnect = nil return nil, fmt.Errorf("failed to get data from clickhouse: %v", err) } defer result.Close() diff --git a/pkg/controller/stats/controller_test.go b/pkg/controller/stats/clickhouse_stats_test.go similarity index 97% rename from pkg/controller/stats/controller_test.go rename to pkg/controller/stats/clickhouse_stats_test.go index f83d3d97..4af146f1 100644 --- a/pkg/controller/stats/controller_test.go +++ b/pkg/controller/stats/clickhouse_stats_test.go @@ -74,7 +74,7 @@ func TestGetDataFromClickHouse(t *testing.T) { db, mock, err := sqlmock.New() assert.NoError(t, err) mock.ExpectQuery(regexp.QuoteMeta(queryMap[tc.query])).WillReturnRows(tc.returnedRow) - controller := StatsController{clickhouseConnect: db} + controller := ClickHouseStatQuerierImpl{clickhouseConnect: db} result, err := controller.getDataFromClickHouse(tc.query, config.FlowVisibilityNS) if tc.expectedError == nil { diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 768d5af9..319c577d 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -25,7 +25,7 @@ type NPRecommendationQuerier interface { CreateNetworkPolicyRecommendation(namespace string, networkPolicyRecommendation *v1alpha1.NetworkPolicyRecommendation) (*v1alpha1.NetworkPolicyRecommendation, error) } -type ClickHouseStatusQuerier interface { +type ClickHouseStatQuerier interface { GetDiskInfo(namespace string) ([][]string, error) GetTableInfo(namespace string) ([][]string, error) GetInsertRate(namespace string) ([][]string, error) diff --git a/pkg/theia/commands/clickhouse_status.go b/pkg/theia/commands/clickhouse_status.go index 9657e680..fd7a5b37 100644 --- a/pkg/theia/commands/clickhouse_status.go +++ b/pkg/theia/commands/clickhouse_status.go @@ -87,9 +87,9 @@ func getStatus(cmd *cobra.Command, args []string) error { return fmt.Errorf("error when getting clickhouse %v status: %s", name, err) } if name == "stackTraces" { - TableOutputVertical(data.Result) + TableOutputVertical(data.Stat) } else { - TableOutput(data.Result) + TableOutput(data.Stat) } } return nil diff --git a/pkg/theia/commands/clickhouse_status_test.go b/pkg/theia/commands/clickhouse_status_test.go index b54af8ab..3d52c2e2 100644 --- a/pkg/theia/commands/clickhouse_status_test.go +++ b/pkg/theia/commands/clickhouse_status_test.go @@ -46,7 +46,7 @@ func TestGetStatus(t *testing.T) { switch strings.TrimSpace(r.URL.Path) { case fmt.Sprintf("/apis/stats.theia.antrea.io/v1alpha1/clickhouse/diskInfo"): status := &stats.ClickHouseStats{ - Result: [][]string{{"test_diskInfo"}}, + Stat: [][]string{{"test_diskInfo"}}, } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) @@ -63,7 +63,7 @@ func TestGetStatus(t *testing.T) { switch strings.TrimSpace(r.URL.Path) { case fmt.Sprintf("/apis/stats.theia.antrea.io/v1alpha1/clickhouse/tableInfo"): status := &stats.ClickHouseStats{ - Result: [][]string{{"test_tableInfo"}}, + Stat: [][]string{{"test_tableInfo"}}, } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) @@ -80,7 +80,7 @@ func TestGetStatus(t *testing.T) { switch strings.TrimSpace(r.URL.Path) { case fmt.Sprintf("/apis/stats.theia.antrea.io/v1alpha1/clickhouse/insertRate"): status := &stats.ClickHouseStats{ - Result: [][]string{{"test_insertRate"}}, + Stat: [][]string{{"test_insertRate"}}, } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) @@ -97,7 +97,7 @@ func TestGetStatus(t *testing.T) { switch strings.TrimSpace(r.URL.Path) { case fmt.Sprintf("/apis/stats.theia.antrea.io/v1alpha1/clickhouse/stackTraces"): status := &stats.ClickHouseStats{ - Result: [][]string{{"test_stackTraces"}, {"fakeData"}}, + Stat: [][]string{{"test_stackTraces"}, {"fakeData"}}, } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) diff --git a/pkg/util/utils.go b/pkg/util/utils.go index 30e7d085..a3ba04f6 100644 --- a/pkg/util/utils.go +++ b/pkg/util/utils.go @@ -26,7 +26,7 @@ import ( "k8s.io/client-go/kubernetes" ) -var OpenSql = sql.Open +var SqlOpenFunc = sql.Open func ParseRecommendationName(npName string) error { if !strings.HasPrefix(npName, "pr-") { @@ -80,7 +80,7 @@ func ConnectClickHouse(url string) (*sql.DB, error) { var connect *sql.DB // Open the database and ping it var err error - connect, err = OpenSql("clickhouse", url) + connect, err = SqlOpenFunc("clickhouse", url) if err != nil { return connect, fmt.Errorf("failed to open ClickHouse: %v", err) }